trinity.explorer package#

Subpackages#

Submodules#

Module contents#

Explorer package exports.

class trinity.explorer.Explorer(config: Config)[源代码]#

基类:object

Responsible for exploring the taskset.

__init__(config: Config)[源代码]#
async benchmark() bool[源代码]#

Benchmark the model checkpoints.

async eval()[源代码]#

Evaluation on all evaluation data samples.

async explore() str[源代码]#
The timeline of the exploration process:
<--------------------------------- one period -------------------------------------> |
explorer | <---------------- step_1 --------------> | |
| <---------------- step_2 --------------> | |
... |
| <---------------- step_n ---------------> | |
| <---------------------- eval --------------------> | <-- sync --> |

|--------------------------------------------------------------------------------------|

trainer | <-- idle --> | <-- step_1 --> | <-- step_2 --> | ... | <-- step_n --> | <-- sync --> |

async explore_step() bool[源代码]#
async finish_current_steps() None[源代码]#
classmethod get_actor(config: Config)[源代码]#

Get a Ray actor for the explorer.

async get_weight(name: str) Tensor[源代码]#

Get the weight of the loaded model (For checkpoint weights update).

async is_alive() bool[源代码]#

Check if the explorer is alive.

need_eval() bool[源代码]#
async need_sync() bool[源代码]#
async prepare() None[源代码]#

Preparation before running.

async save_checkpoint() None[源代码]#
async serve() None[源代码]#

Run the explorer in serving mode.

In serving mode, the explorer starts an OpenAI compatible server to handle requests. Agent applications can be deployed separately and interact with the explorer via the API.

import openai


client = openai.OpenAI(
    base_url=f"{explorer_server_url}/v1",
    api_key="EMPTY",
)
response = client.chat.completions.create(
    model=config.model.model_path,
    messages=[{"role": "user", "content": "Hello!"}]
)
async setup_model_level_weight_sync_group()[源代码]#

Setup process group for each model, only used in serve mode.

async setup_weight_sync_group(master_address: str, master_port: int, state_dict_meta: List = None)[源代码]#
async shutdown() None[源代码]#
async sync_weight() None[源代码]#

Synchronize model weights.

class trinity.explorer.RolloutCoordinator(config: Config, rollout_model: List[InferenceModel], auxiliary_models: List[List[InferenceModel]] | None = None)[源代码]#

基类:object

Own scheduler-side batch state and expose batch-level finalize APIs.

__init__(config: Config, rollout_model: List[InferenceModel], auxiliary_models: List[List[InferenceModel]] | None = None)[源代码]#

Create a coordinator with internally managed scheduler and pipeline.

async abort_batch(batch_id: int | str, *, reason: str, keep_partial_results: bool = False) None[源代码]#

Abort one batch and cleanup its running and staged state.

async finalize_eval_batch(batch_id: str, *, timeout: float | None = None) dict[源代码]#

Finalize one eval batch and return aggregated eval metrics.

async finalize_train_batch(batch_id: int, *, timeout: float | None = None) dict[源代码]#

Finalize one train batch and return aggregated metrics.

classmethod get_actor(config: Config, models: List, auxiliary_models: List) ActorHandle[源代码]#

Init rollout coordinator for the task-event-completion path.

async prepare() None[源代码]#

Initialize the owned pipeline and scheduler.

async process_experiences(payloads: list[bytes]) dict[源代码]#

Process one batch of experience payloads through the pipeline.

async shutdown() None[源代码]#

Stop background work and close owned dependencies.

async submit_batch(*, batch_id: int | str, tasks: list[Task], batch_type: Literal['train', 'eval'], min_wait_num: int | None = None) None[源代码]#

Register a new batch and schedule its tasks.