gfn.containers.replay_buffer ============================ .. py:module:: gfn.containers.replay_buffer Attributes ---------- .. autoapisummary:: gfn.containers.replay_buffer.ContainerUnion Classes ------- .. autoapisummary:: gfn.containers.replay_buffer.Container gfn.containers.replay_buffer.NormBasedDiversePrioritizedReplayBuffer gfn.containers.replay_buffer.ReplayBuffer gfn.containers.replay_buffer.TerminatingStateBuffer Module Contents --------------- .. py:class:: Container Bases: :py:obj:`Protocol` Base class for protocol classes. Protocol classes are defined as:: class Proto(Protocol): def meth(self) -> int: ... Such classes are primarily used with static type checkers that recognize structural subtyping (static duck-typing). For example:: class C: def meth(self) -> int: return 0 def func(x: Proto) -> int: return x.meth() func(C()) # Passes static type check See PEP 544 for details. Protocol classes decorated with @typing.runtime_checkable act as simple-minded runtime protocols that check only the presence of given attributes, ignoring their type signatures. Protocol classes can be generic, they are defined as:: class GenProto(Protocol[T]): def meth(self) -> T: ... .. py:method:: __getitem__(idx) .. py:method:: __len__() .. py:method:: extend(other) .. py:property:: log_rewards :type: torch.Tensor | None .. py:property:: terminating_states .. py:data:: ContainerUnion .. py:class:: NormBasedDiversePrioritizedReplayBuffer(env, capacity = 1000, cutoff_distance = 0.0, p_norm_distance = 1.0, remote_manager_rank = None, remote_buffer_freq = 1, communication_backend = 'mpi', timing = False, async_score = False, async_comm = False, lazy_sort = False, baseline_filtering = False, scoring_only = False, baseline_refresh_after = 10) Bases: :py:obj:`ReplayBuffer` A replay buffer with diversity-based prioritization. .. attribute:: env The environment associated with the containers. .. attribute:: capacity The maximum number of items the buffer can hold. .. attribute:: training_container The buffer contents (Trajectories, Transitions, or StatesContainer). This is dynamically set based on the type of the first added object. .. attribute:: prioritized_capacity Whether to use prioritized capacity (keep highest-reward items). This is set to True by default. .. attribute:: prioritized_sampling Whether to sample items with probability proportional to their reward. .. attribute:: cutoff_distance Threshold used to determine whether a new terminating state is different enough from those already in the buffer. .. attribute:: p_norm_distance p-norm value for distance calculation (used in torch.cdist). .. py:method:: _diversity_repr(container) :staticmethod: Returns the tensor used for pairwise distance in diversity filtering. For conditional GFNs, concatenates conditions with the state tensor so that identical states under different conditions are treated as distinct. .. py:method:: _local_add(training_container) Adds with diversity-based prioritization to the local buffer. Overrides the base class hook so that ``add()`` (which handles remote communication) delegates local insertion here. .. py:attribute:: cutoff_distance :value: 0.0 .. py:attribute:: p_norm_distance :value: 1.0 .. py:class:: ReplayBuffer(env, capacity = 1000, prioritized_capacity = False, prioritized_sampling = False, remote_manager_rank = None, remote_buffer_freq = 1, communication_backend = 'mpi', timing = False, async_score = False, async_comm = False, lazy_sort = False, baseline_filtering = False, scoring_only = False, baseline_refresh_after = 10) A replay buffer for storing training containers. Supports local-only operation and distributed remote buffer communication. Features: - **Local buffering**: Stores Trajectories, Transitions, or StatesContainers up to a fixed capacity. - **Prioritized capacity**: Optionally keeps only the highest-reward items when the buffer is full. - **Prioritized sampling**: Optionally samples with probability proportional to reward (softmax over log-rewards). - **Remote buffer communication**: When ``remote_manager_rank`` is set, periodically sends batched containers to a remote ``ReplayBufferManager`` and receives score dictionaries back. - **Communication backends**: The ``communication_backend`` parameter selects between ``"torch"`` (PyTorch distributed / Gloo) and ``"mpi"`` (MPI4PY, ~8-12 GB/s vs ~100 MB/s with Gloo). - **Async scoring**: When ``async_score`` is enabled, trajectory sends are fire-and-forget; scores are collected lazily on the next ``add()`` call (1-iteration stale), decoupling training throughput from buffer scoring latency. - **Timing instrumentation**: When ``timing`` is enabled, serialization, send, and receive durations are recorded for profiling. .. attribute:: env The environment associated with the containers. .. attribute:: capacity The maximum number of items the buffer can hold. .. attribute:: training_container The buffer contents (Trajectories, Transitions, or StatesContainer). Dynamically set based on the type of the first added object. .. py:method:: __len__() Returns the number of items in the buffer. :returns: The number of items in the buffer (including pending batches). .. py:method:: __repr__() Returns a string representation of the ReplayBuffer. :returns: A string summary of the buffer. .. py:attribute:: _add_counter :value: 0 .. py:attribute:: _baseline_kept :type: int :value: 0 .. py:attribute:: _baseline_log_reward :type: float .. py:attribute:: _baseline_skipped_sends :type: int :value: 0 .. py:attribute:: _baseline_total :type: int :value: 0 .. py:method:: _collect_pending_score() Collect a pending score response from a previous async send. Returns None if no score is pending (e.g., first iteration). .. py:attribute:: _consecutive_filtered_empty :type: int :value: 0 .. py:method:: _filter_and_send(container, send_fn) Filter by baseline, prepare for remote, and send. Returns whatever ``send_fn`` returns (a score dict for sync sends, None for async sends), or None if baseline filtering drops everything. .. py:method:: _filter_by_baseline(container) Filter a container to keep only items with log_reward >= baseline. Returns the (possibly subset) container, or None if every item is below the baseline. After ``baseline_refresh_after`` consecutive fully-filtered batches, the next batch bypasses the filter so the worker can receive a fresh baseline. ``Transitions`` is not supported (its log_rewards is per-transition with ``-inf`` for non-terminating rows, so per-row filtering would break DB/SubTB). .. py:method:: _flush_pending() Concatenate all pending batches into ``training_container``. Called lazily when the accumulated size reaches 2 * capacity, or eagerly by callers that need a consistent view. Merges all pending batches into a single combined batch first, then extends ``training_container`` once to avoid extra copy cost. .. py:attribute:: _is_full :value: False .. py:method:: _isend_and_defer_score(training_container) Non-blocking send (isend), deferred score: fire-and-forget data, collect score on next add(). The send handle is kept alive in ``_send_handle`` until the next call to ``_wait_previous_send``. .. py:method:: _local_add(training_container) Adds a training object to the local buffer, handling capacity. Subclasses override this to customize local insertion logic (e.g., diversity filtering). The base class ``add()`` calls this method, then handles remote buffer communication separately. :param training_container: The Trajectories, Transitions, or StatesContainer object to add. .. py:attribute:: _pending_batches :type: list[ContainerUnion] :value: [] .. py:attribute:: _pending_len :type: int :value: 0 .. py:attribute:: _pending_score :type: bool :value: False .. py:method:: _prepare_for_remote(container) Convert a container to a lightweight form for remote scoring. When ``scoring_only`` is True, extracts terminating states and log-rewards into a ``StatesContainer``. ``Transitions`` is rejected because its ``log_rewards`` shape does not match ``terminating_states`` (it is per-transition, not per-trajectory). When ``scoring_only`` is False, returns the container unchanged. .. py:method:: _recv_score() Receive a score dictionary from the remote manager. .. py:method:: _send_data(training_container) Send a training container to the remote manager. .. py:attribute:: _send_handle :type: gfn.utils.distributed.AsyncSendHandle | None :value: None .. py:method:: _send_objs(training_container) Sends a training container to the remote manager (synchronous). .. py:method:: _send_objs_async(training_container) Sends a training container without waiting for the score response. The score will be collected on the next call to ``_collect_pending_score``. .. py:method:: _sort_and_truncate(training_container) Sort by log-reward (if prioritized) and truncate to capacity. .. py:method:: _update_baseline(score_dict) Extract and store the baseline log-reward from a score response. Called after receiving a score dict from the buffer manager. Only updates if baseline_filtering is enabled and the score dict contains a ``baseline_log_reward`` key. .. py:method:: _wait_previous_send() Block until the previous non-blocking send has completed. This is typically near-instantaneous because MPI internally buffers the data, but guarantees the send buffer can be safely reused. .. py:method:: add(training_container) Adds a training container to the buffer. The type of the training container is dynamically set based on the type of the first added container. When ``async_score`` is enabled, scores are collected lazily: the first call returns None (no pending score yet), and subsequent calls return the score from the *previous* submission. This decouples training throughput from buffer scoring latency. When ``baseline_filtering`` is enabled, only trajectories with log-reward above the remote buffer's baseline are sent. If all trajectories in the pending batch are below the baseline, the send is skipped entirely. :param training_container: The Trajectories, Transitions, or StatesContainer object to add. .. py:attribute:: async_comm :value: False .. py:attribute:: async_score :value: False .. py:attribute:: baseline_filtering :value: False .. py:attribute:: baseline_refresh_after :value: 10 .. py:attribute:: capacity :value: 1000 .. py:attribute:: communication_backend :value: 'mpi' .. py:property:: device :type: torch.device The device on which the buffer's data is stored. :returns: The device object of the buffer's contents. .. py:method:: drain_pending_score(timeout_sec = 30.0) Drain any outstanding async score before shutdown. Should be called before sending the EXIT signal when ``async_score`` or ``async_comm`` is enabled, to avoid leaving the buffer manager with an undelivered response. For ``async_comm`` mode this also waits for the outstanding non-blocking send to complete. Uses a timeout to avoid hanging indefinitely if the buffer manager has crashed. Returns None on timeout (score is lost). .. py:attribute:: env .. py:method:: initialize(training_container) Initializes the buffer with the type of the first added object. :param training_container: The initial Trajectories, Transitions, or StatesContainer object to set the buffer type. .. py:attribute:: lazy_sort :value: False .. py:method:: load(path) Loads buffer contents from a ``.pt`` file saved by :meth:`save`. :param path: File path to the saved buffer. .. py:attribute:: pending_container :type: ContainerUnion | None :value: None .. py:attribute:: prioritized_capacity :value: False .. py:attribute:: prioritized_sampling :value: False .. py:attribute:: remote_buffer_freq :value: 1 .. py:attribute:: remote_manager_rank :value: None .. py:method:: sample(n_samples) Samples training objects from the buffer. :param n_samples: The number of items to sample. :returns: A sampled Trajectories, Transitions, or StatesContainer. .. py:method:: save(path) Saves the buffer to a single ``.pt`` file. :param path: File path (e.g. ``"replay_buffer.pt"``). .. py:attribute:: scoring_only :value: False .. py:attribute:: timing :value: False .. py:attribute:: timing_data :type: dict[str, list[float]] .. py:method:: timing_log() Returns a formatted string of the timing information for the replay buffer. .. py:attribute:: training_container :type: ContainerUnion | None :value: None .. py:class:: TerminatingStateBuffer(env, capacity = 1000, communication_backend = 'mpi', timing = False, **kwargs) Bases: :py:obj:`ReplayBuffer` A replay buffer for storing terminating states. .. attribute:: env The environment associated with the containers. .. attribute:: capacity The maximum number of items the buffer can hold. .. attribute:: training_container The buffer contents (StatesContainer). .. py:method:: _local_add(training_container) Extracts terminating states and adds them to the local buffer. Overrides the base class hook so that ``add()`` (which handles remote communication) delegates local insertion here. .. py:attribute:: training_container