trinity.buffer.operators.filters.reward_filter module#
- class trinity.buffer.operators.filters.reward_filter.RewardFilter(threshold: float = 0.0)[source]#
Bases:
ExperienceOperatorFilter experiences based on the reward value.
Note: This filter assumes that the reward is already calculated and stored in the Experience object.
- process(exps: List[Experience]) Tuple[List[Experience], dict][source]#
Filter experiences based on reward value.
- class trinity.buffer.operators.filters.reward_filter.RewardSTDFilter(threshold: float = 0.0)[source]#
Bases:
ExperienceOperatorFilter experiences based on the standard deviation of rewards within each group.
Note: This filter assumes that the reward is already calculated and stored in the Experience object.
- process(exps: List[Experience]) Tuple[List[Experience], dict][source]#
Filter experiences based on reward std.
- class trinity.buffer.operators.filters.reward_filter.DAPODynamicSamplingFilter(metric_key: str = 'accuracy', correct_threshold: float = 0.0)[source]#
Bases:
ExperienceOperatorDAPO dynamic sampling (arXiv:2503.14476 Sec. 3.2).
Keeps a task group only when some but not all rollouts are correct: 0 < |{correct}| < G. Uses outcome accuracy from experience metrics, not length-shaped total reward.
- __init__(metric_key: str = 'accuracy', correct_threshold: float = 0.0) None[source]#
Initialize the dynamic sampling filter.
- Parameters:
metric_key β Metric name used to determine rollout correctness.
correct_threshold β Minimum score treated as correct.
- process(exps: List[Experience]) Tuple[List[Experience], dict][source]#
Keep only mixed-correctness groups for DAPO training.
- Parameters:
exps β Experiences grouped by task id during filtering.
- Returns:
Filtered experiences and filtering metrics.
- Return type:
Tuple[List[Experience], dict]
- class trinity.buffer.operators.filters.reward_filter.MaskResponseTruncatedOperator[source]#
Bases:
ExperienceOperatorDAPO overlong filtering stage 1 (Sec. 3.4): exclude truncated responses from loss.
Zeros action_mask so truncated rollouts do not contribute to the policy gradient.
- process(exps: List[Experience]) Tuple[List[Experience], dict][source]#
Mask action positions for truncated responses.
- Parameters:
exps β Experiences to process.
- Returns:
Original experiences and masking metrics.
- Return type:
Tuple[List[Experience], dict]
- class trinity.buffer.operators.filters.reward_filter.InvalidRewardFilter[source]#
Bases:
ExperienceOperatorFilters out experiences with invalid reward values.
Note: This operator assumes that rewards are already computed and stored in the Experience object.Any experience with a missing (None) or invalid (NaN) reward is removed to prevent low-quality data from entering the training pipeline.
- process(exps: List[Experience]) Tuple[List[Experience], dict][source]#
Process a list of experiences and return a transformed list.
- Parameters:
exps (List[Experience]) β List of experiences to process, which contains all experiences generated by the Explorer in one explore step.
- Returns:
A tuple containing the processed list of experiences and a dictionary of metrics.
- Return type:
Tuple[List[Experience], Dict]