gfn.containers.replay_buffer_manager ==================================== .. py:module:: gfn.containers.replay_buffer_manager Attributes ---------- .. autoapisummary:: gfn.containers.replay_buffer_manager.DATA_TAG gfn.containers.replay_buffer_manager.METADATA_TAG Classes ------- .. autoapisummary:: gfn.containers.replay_buffer_manager.ReplayBufferManager Module Contents --------------- .. py:data:: DATA_TAG :value: 0 .. py:data:: METADATA_TAG :value: 1 .. py:class:: ReplayBufferManager(env, rank, num_training_ranks, scoring_function = None, diverse_replay_buffer = False, capacity = 10000, remote_manager_rank = None, communication_backend = 'mpi', timing = False, store_locally = True, baseline_strategy = 'min', baseline_percentile = 0.1, baseline_ema_alpha = 0.1) Receives training containers on the manager rank and replies with scores. The manager optionally stores incoming data in a local replay buffer (``store_locally=True``) and injects a ``baseline_log_reward`` into every score response so workers with ``baseline_filtering`` can skip sending payloads that would be immediately evicted. The baseline source is controlled by ``baseline_strategy``: - ``"min"`` / ``"percentile"`` read from the local buffer once it reaches capacity. - ``"ema"`` (and any strategy when the buffer is unavailable, e.g. ``store_locally=False``) reads from a running EMA of incoming batch minima. .. py:attribute:: _baseline_ema :type: float | None :value: None .. py:attribute:: _comm_stats :type: dict[int, dict] .. py:method:: _compute_metadata() :abstractmethod: .. py:method:: _handle_message_async(sender_rank, msg, msg_data_len = 0) Dispatch a message using non-blocking ``isend`` for responses. .. py:method:: _handle_message_sync(sender_rank, msg, msg_data_len = 0) Dispatch a message using blocking ``send`` for responses. Simpler than the async variant and uses zero CPU while the send is in flight, making it preferable when all ranks share a CPU. .. py:method:: _inject_baseline_log_reward(score_dict, incoming) Inject ``baseline_log_reward`` into *score_dict* per ``baseline_strategy``. Updates the EMA tracker from ``incoming.log_rewards`` (used as the source under ``"ema"`` and as a fallback when the local buffer is unavailable), then picks a baseline from the buffer (``"min"`` / ``"percentile"`` once at capacity) or the EMA. Non-finite rewards are excluded so containers with ``-inf`` (e.g. Transitions) do not poison the statistics. .. py:attribute:: _pending_sends :type: list[gfn.utils.distributed.AsyncSendHandle] :value: [] .. py:method:: _print_timing_summary() Print communication and timing stats at shutdown. .. py:method:: _prune_completed_sends() Remove completed non-blocking sends from the pending list. .. py:method:: _recv_object() .. py:attribute:: _timing_data :type: dict[str, list[float]] .. py:attribute:: baseline_ema_alpha :value: 0.1 .. py:attribute:: baseline_percentile :value: 0.1 .. py:attribute:: baseline_strategy :value: 'min' .. py:attribute:: capacity :value: 10000 .. py:attribute:: communication_backend :value: 'mpi' .. py:method:: default_scoring_function(obj, sender_rank = -1) Default score function if none provided, placeholder. .. py:attribute:: diverse_replay_buffer :value: False .. py:attribute:: exit_counter :value: 0 .. py:method:: get_metadata(manager_rank, backend) :staticmethod: Sends a get metadata signal to the replay buffer manager. Uses ``METADATA_TAG`` so the response is never confused with pending data/score messages on the default tag. .. py:attribute:: is_running :value: True .. py:attribute:: num_training_ranks .. py:attribute:: rank .. py:attribute:: remote_manager_rank :value: None .. py:method:: run(async_send = True) Runs on remote buffer manager ranks. Waits for training data, computes reward, sends back scores. :param async_send: If True (default), use non-blocking ``isend`` for responses. If False, use blocking ``send`` for responses. .. py:attribute:: scoring_function .. py:method:: send_termination_signal(manager_rank, backend) :staticmethod: Sends a termination signal to the replay buffer manager. .. py:attribute:: store_locally :value: True .. py:attribute:: timing :value: False