trinity.buffer.operators.filters.reward_filter module#

class trinity.buffer.operators.filters.reward_filter.RewardFilter(threshold: float = 0.0)[source]#

Bases: ExperienceOperator

Filter experiences based on the reward value.

Note: This filter assumes that the reward is already calculated and stored in the Experience object.

__init__(threshold: float = 0.0)[source]#
process(exps: List[Experience]) Tuple[List[Experience], dict][source]#

Filter experiences based on reward value.

class trinity.buffer.operators.filters.reward_filter.RewardSTDFilter(threshold: float = 0.0)[source]#

Bases: ExperienceOperator

Filter experiences based on the standard deviation of rewards within each group.

Note: This filter assumes that the reward is already calculated and stored in the Experience object.

__init__(threshold: float = 0.0)[source]#
process(exps: List[Experience]) Tuple[List[Experience], dict][source]#

Filter experiences based on reward std.

class trinity.buffer.operators.filters.reward_filter.DAPODynamicSamplingFilter(metric_key: str = 'accuracy', correct_threshold: float = 0.0)[source]#

Bases: ExperienceOperator

DAPO dynamic sampling (arXiv:2503.14476 Sec. 3.2).

Keeps a task group only when some but not all rollouts are correct: 0 < |{correct}| < G. Uses outcome accuracy from experience metrics, not length-shaped total reward.

__init__(metric_key: str = 'accuracy', correct_threshold: float = 0.0) None[source]#

Initialize the dynamic sampling filter.

Parameters:
  • metric_key – Metric name used to determine rollout correctness.

  • correct_threshold – Minimum score treated as correct.

process(exps: List[Experience]) Tuple[List[Experience], dict][source]#

Keep only mixed-correctness groups for DAPO training.

Parameters:

exps – Experiences grouped by task id during filtering.

Returns:

Filtered experiences and filtering metrics.

Return type:

Tuple[List[Experience], dict]

class trinity.buffer.operators.filters.reward_filter.MaskResponseTruncatedOperator[source]#

Bases: ExperienceOperator

DAPO overlong filtering stage 1 (Sec. 3.4): exclude truncated responses from loss.

Zeros action_mask so truncated rollouts do not contribute to the policy gradient.

process(exps: List[Experience]) Tuple[List[Experience], dict][source]#

Mask action positions for truncated responses.

Parameters:

exps – Experiences to process.

Returns:

Original experiences and masking metrics.

Return type:

Tuple[List[Experience], dict]

class trinity.buffer.operators.filters.reward_filter.InvalidRewardFilter[source]#

Bases: ExperienceOperator

Filters out experiences with invalid reward values.

Note: This operator assumes that rewards are already computed and stored in the Experience object.Any experience with a missing (None) or invalid (NaN) reward is removed to prevent low-quality data from entering the training pipeline.

process(exps: List[Experience]) Tuple[List[Experience], dict][source]#

Process a list of experiences and return a transformed list.

Parameters:

exps (List[Experience]) – List of experiences to process, which contains all experiences generated by the Explorer in one explore step.

Returns:

A tuple containing the processed list of experiences and a dictionary of metrics.

Return type:

Tuple[List[Experience], Dict]