(Operators)=
## Operator Development Guide

### Step 0: Basic Concepts of Operator Module

In Trinity-RFT, the operator module is responsible for processing experience data in the buffer module. It supports existing data processing capabilities from [Data-Juicer](https://github.com/datajuicer/data-juicer) naturally, and allows developers to implement their own operators as well.
By customizing operators, developers can implement various data processing functionalities, such as data augmentation, filtering, and transformation. You can even implement advantages/returns calculation as operators, as shown in {ref}`Algorithms <Algorithms>` section.

- **DataJuicerOperator** ({class}`trinity.buffer.operators.DataJuicerOperator`): The operator that wraps the data processing operators from Data-Juicer. It provides a simple interface for developers to list the Data-Juicer operators they want to use. The full list of Data-Juicer operators can be found [here](https://agentscope-ai.github.io/data-juicer/en/main/docs/Operators.html).
- **ExperienceOperator** ({class}`trinity.buffer.operators.ExperienceOperator`): The base class for all operators used in experience data processing. It defines the interface and common functionalities that all operators should have. Each operator processes a batch of experience data and returns the processed data with metrics for logging.
- **ExperiencePipeline** ({class}`trinity.buffer.pipelines.ExperiencePipeline`): The experience data processing pipeline that manages a sequence of operators. It takes raw experiences from the `Explorer`, passes them through each operator in the pipeline, and writes the final processed experiences into the input buffer of the `Trainer`.

```{note}
Except for `ExperiencePipeline`, Trinity-RFT also provides `TaskPipeline` for task data processing.
In the current version, the `TaskPipeline` only supports using Data-Juicer operators. Please see this {ref}`section <Data Processing>` for details.
```
---

Developers can implement and use their own operators by following the steps below.

### Step 1: Implement Operator

The `ExperienceOperatorV1` interface includes only one `process` method. The `ExperiencePipeline` will call this method with a list of `Experience` generated by the `Explorer` in one explore step. The `process` method should return a tuple containing the processed list of `Experience` and a dictionary of metrics for logging.

```python
class ExperienceOperatorV1(ABC):

    @abstractmethod
    async def process(self, exps: List[Experience]) -> Tuple[List[Experience], Dict]:
        """Process a list of experiences and return a transformed list.

        Args:
            exps (List[Experience]): List of experiences to process, which contains
                all experiences generated by the Explorer in one explore step.
        Returns:
            Tuple[List[Experience], Dict]: A tuple containing the processed list of experiences and a dictionary of metrics.
        """
```

Here is an implementation of a simple operator that filters out experiences with rewards below a certain threshold:

```python
from trinity.buffer.operators import ExperienceOperatorV1
from trinity.common.experience import Experience


class RewardFilter(ExperienceOperatorV1):

    def __init__(self, threshold: float = 0.0) -> None:
        self.threshold = threshold

    async def process(self, exps: List[Experience]) -> Tuple[List[Experience], Dict]:
        filtered_exps = [exp for exp in exps if exp.reward >= self.threshold]
        metrics = {"filtered_count": len(exps) - len(filtered_exps)}
        return filtered_exps, metrics
```

After implementation, you need to register this module through {class}`trinity.buffer.operators.EXPERIENCE_OPERATORS`. Once registered, the module can be configured in the configuration file using the registered name.

### Step 2: Use Your Operator

After completing the above steps, you can use the newly registered operator through a YAML configuration file.

```yaml
# some other configs
data_processor:
  experience_pipeline:
    operators:
      - name: "reward_filter"
        args:
          threshold: 0.1
synchronizer:
  sync_method: nccl
  sync_style: explorer_driven
  sync_interval: 2
# some other configs
```

```{tip}
The `RewardFilter` reduces the number of experiences, which may cause the trainer can't get enough experiences to start a training step. To avoid the issue, you can use the advanced {ref}`Dynamic Synchronization <Synchronizer>` feature provided by Trinity-RFT as shown in the above configuration file.
The above setting means that the `Explorer` will sync with the `Trainer` every 2 steps and will continue running regardless of how many steps the `Trainer` has completed. This ensures that the `Trainer` can always get enough experiences to start a training step as long as the `Explorer` is running.
```

### Advanced Features

#### Using Auxiliary Models inside Operators

As introduced in [Workflow Development Guide](develop_workflow.md), Trinity-RFT supports depolying auxiliary models and calling them through OpenAI API. This feature can also be used in operators, which allows you to leverage powerful models to judge and process experiences. This is particularly useful for implementing operators that require complex reasoning or natural language understanding.

Suppose you have the following auxiliary model configuration in the YAML file:

```yaml
explorer:
  auxiliary_models:
    - model_path: Qwen/Qwen2.5-32B-Instruct
      name: qwen2.5-32B
      engine_num: 1
      tensor_parallel_size: 2
      enable_thinking: false
      max_prompt_tokens: 12288
      max_response_tokens: 12288
      max_model_len: 16384
    - model_path: Qwen/Qwen3-8B
      name: qwen3-8B
      engine_num: 2
      tensor_parallel_size: 1
      enable_thinking: false
      max_prompt_tokens: 12288
      max_response_tokens: 12288
      max_model_len: 16384
```

Trinity-RFT will automatically inject the deployed auxiliary models into the operators as `self.auxiliary_models`, which is a dictionary mapping model names to model instances (`Dict[str, List[openai.AsyncOpenAI]]`).
The key is the `name` of the model in the configuration file, and the number of each model instance in the list is determined by the `engine_num`.
You can call the model's inference API in the `process` method of the operator to get the model's response based on the experience data. Below is an example of how to use the auxiliary model in an operator:


```python
from trinity.buffer.operators import ExperienceOperatorV1
from trinity.common.experience import Experience


class OperatorWithModel(ExperienceOperatorV1):

    async def judge_experience(self, exp: Experience) -> bool:
        # Extract necessary information from the experience and prepare input for the model
        # messages = ...
        # Call the model's inference API to get the response
        response = await self.auxiliary_models["qwen2.5-32B"][0].chat.completions.create(
            model=self.auxiliary_models["qwen2.5-32B"][0].model_path,  # Trinity-RFT will automatically set the `model_path` for easy calling
            messages=messages,
        )
        # Process the model's response and update the experience accordingly
        # ...
        return exp

    async def process(self, exps: List[Experience]) -> Tuple[List[Experience], Dict]:
        # Use the model to process experiences
        # For example, you can call the model's generate method to get responses based on the experience data
        await asyncio.gather(*(self.judge_experience(exp) for exp in exps))
        return exps, {}
```
