Operator Development Guide#

Step 0: Basic Concepts of Operator Module#

In Trinity-RFT, the operator module is responsible for processing experience data in the buffer module. It supports existing data processing capabilities from Data-Juicer naturally, and allows developers to implement their own operators as well. By customizing operators, developers can implement various data processing functionalities, such as data augmentation, filtering, and transformation. You can even implement advantages/returns calculation as operators, as shown in Algorithms section.

  • DataJuicerOperator (trinity.buffer.operators.DataJuicerOperator): The operator that wraps the data processing operators from Data-Juicer. It provides a simple interface for developers to list the Data-Juicer operators they want to use. The full list of Data-Juicer operators can be found here.

  • ExperienceOperator (trinity.buffer.operators.ExperienceOperator): The base class for all operators used in experience data processing. It defines the interface and common functionalities that all operators should have. Each operator processes a batch of experience data and returns the processed data with metrics for logging.

  • ExperiencePipeline (trinity.buffer.pipelines.ExperiencePipeline): The experience data processing pipeline that manages a sequence of operators. It takes raw experiences from the Explorer, passes them through each operator in the pipeline, and writes the final processed experiences into the input buffer of the Trainer.

Note

Except for ExperiencePipeline, Trinity-RFT also provides TaskPipeline for task data processing. In the current version, the TaskPipeline only supports using Data-Juicer operators. Please see this section for details.


Developers can implement and use their own operators by following the steps below.

Step 1: Implement Operator#

The ExperienceOperatorV1 interface includes only one process method. The ExperiencePipeline will call this method with a list of Experience generated by the Explorer in one explore step. The process method should return a tuple containing the processed list of Experience and a dictionary of metrics for logging.

class ExperienceOperatorV1(ABC):

    @abstractmethod
    async def process(self, exps: List[Experience]) -> Tuple[List[Experience], Dict]:
        """Process a list of experiences and return a transformed list.

        Args:
            exps (List[Experience]): List of experiences to process, which contains
                all experiences generated by the Explorer in one explore step.
        Returns:
            Tuple[List[Experience], Dict]: A tuple containing the processed list of experiences and a dictionary of metrics.
        """

Here is an implementation of a simple operator that filters out experiences with rewards below a certain threshold:

from trinity.buffer.operators import ExperienceOperatorV1
from trinity.common.experience import Experience


class RewardFilter(ExperienceOperatorV1):

    def __init__(self, threshold: float = 0.0) -> None:
        self.threshold = threshold

    async def process(self, exps: List[Experience]) -> Tuple[List[Experience], Dict]:
        filtered_exps = [exp for exp in exps if exp.reward >= self.threshold]
        metrics = {"filtered_count": len(exps) - len(filtered_exps)}
        return filtered_exps, metrics

After implementation, you need to register this module through trinity.buffer.operators.EXPERIENCE_OPERATORS. Once registered, the module can be configured in the configuration file using the registered name.

Step 2: Use Your Operator#

After completing the above steps, you can use the newly registered operator through a YAML configuration file.

# some other configs
data_processor:
  experience_pipeline:
    operators:
      - name: "reward_filter"
        args:
          threshold: 0.1
synchronizer:
  sync_method: nccl
  sync_style: explorer_driven
  sync_interval: 2
# some other configs

Tip

The RewardFilter reduces the number of experiences, which may cause the trainer can’t get enough experiences to start a training step. To avoid the issue, you can use the advanced Dynamic Synchronization feature provided by Trinity-RFT as shown in the above configuration file. The above setting means that the Explorer will sync with the Trainer every 2 steps and will continue running regardless of how many steps the Trainer has completed. This ensures that the Trainer can always get enough experiences to start a training step as long as the Explorer is running.

Advanced Features#

Using Auxiliary Models inside Operators#

As introduced in Workflow Development Guide, Trinity-RFT supports depolying auxiliary models and calling them through OpenAI API. This feature can also be used in operators, which allows you to leverage powerful models to judge and process experiences. This is particularly useful for implementing operators that require complex reasoning or natural language understanding.

Suppose you have the following auxiliary model configuration in the YAML file:

explorer:
  auxiliary_models:
    - model_path: Qwen/Qwen2.5-32B-Instruct
      name: qwen2.5-32B
      engine_num: 1
      tensor_parallel_size: 2
      enable_thinking: false
      max_prompt_tokens: 12288
      max_response_tokens: 12288
      max_model_len: 16384
    - model_path: Qwen/Qwen3-8B
      name: qwen3-8B
      engine_num: 2
      tensor_parallel_size: 1
      enable_thinking: false
      max_prompt_tokens: 12288
      max_response_tokens: 12288
      max_model_len: 16384

Trinity-RFT will automatically inject the deployed auxiliary models into the operators as self.auxiliary_models, which is a dictionary mapping model names to model instances (Dict[str, List[openai.AsyncOpenAI]]). The key is the name of the model in the configuration file, and the number of each model instance in the list is determined by the engine_num. You can call the model’s inference API in the process method of the operator to get the model’s response based on the experience data. Below is an example of how to use the auxiliary model in an operator:

from trinity.buffer.operators import ExperienceOperatorV1
from trinity.common.experience import Experience


class OperatorWithModel(ExperienceOperatorV1):

    async def judge_experience(self, exp: Experience) -> bool:
        # Extract necessary information from the experience and prepare input for the model
        # messages = ...
        # Call the model's inference API to get the response
        response = await self.auxiliary_models["qwen2.5-32B"][0].chat.completions.create(
            model=self.auxiliary_models["qwen2.5-32B"][0].model_path,  # Trinity-RFT will automatically set the `model_path` for easy calling
            messages=messages,
        )
        # Process the model's response and update the experience accordingly
        # ...
        return exp

    async def process(self, exps: List[Experience]) -> Tuple[List[Experience], Dict]:
        # Use the model to process experiences
        # For example, you can call the model's generate method to get responses based on the experience data
        await asyncio.gather(*(self.judge_experience(exp) for exp in exps))
        return exps, {}