trinity.explorer.rollout_coordinator module#

Rollout coordinator for async batch submission and finalize.

class trinity.explorer.rollout_coordinator.BatchLifecycleState(*values)[源代码]#

基类:str, Enum

Lifecycle states for one submitted batch.

PENDING = 'pending'#
RUNNING = 'running'#
FINALIZING = 'finalizing'#
FINALIZED = 'finalized'#
ABORTED = 'aborted'#
class trinity.explorer.rollout_coordinator.BatchState(batch_id: int | str, batch_type: ~typing.Literal['train', 'eval'], expected_task_count: int, statuses: ~typing.Dict[int | str, ~typing.Any] = <factory>, min_wait_num: int | None = None, state: ~trinity.explorer.rollout_coordinator.BatchLifecycleState = BatchLifecycleState.PENDING, final_result: dict | None = None, finalize_lock: ~asyncio.locks.Lock = <factory>)[源代码]#

基类:object

In-memory state tracked for one train or eval batch.

batch_id: int | str#
batch_type: Literal['train', 'eval']#
expected_task_count: int#
statuses: Dict[int | str, Any]#
min_wait_num: int | None = None#
state: BatchLifecycleState = 'pending'#
final_result: dict | None = None#
finalize_lock: Lock#
property completed_task_count: int#

Return the number of completed tasks tracked by status.

__init__(batch_id: int | str, batch_type: ~typing.Literal['train', 'eval'], expected_task_count: int, statuses: ~typing.Dict[int | str, ~typing.Any] = <factory>, min_wait_num: int | None = None, state: ~trinity.explorer.rollout_coordinator.BatchLifecycleState = BatchLifecycleState.PENDING, final_result: dict | None = None, finalize_lock: ~asyncio.locks.Lock = <factory>) None#
class trinity.explorer.rollout_coordinator.RolloutCoordinator(config: Config, rollout_model: List[InferenceModel], auxiliary_models: List[List[InferenceModel]] | None = None)[源代码]#

基类:object

Own scheduler-side batch state and expose batch-level finalize APIs.

__init__(config: Config, rollout_model: List[InferenceModel], auxiliary_models: List[List[InferenceModel]] | None = None)[源代码]#

Create a coordinator with internally managed scheduler and pipeline.

async prepare() None[源代码]#

Initialize the owned pipeline and scheduler.

async shutdown() None[源代码]#

Stop background work and close owned dependencies.

async submit_batch(*, batch_id: int | str, tasks: list[Task], batch_type: Literal['train', 'eval'], min_wait_num: int | None = None) None[源代码]#

Register a new batch and schedule its tasks.

async finalize_train_batch(batch_id: int, *, timeout: float | None = None) dict[源代码]#

Finalize one train batch and return aggregated metrics.

async finalize_eval_batch(batch_id: str, *, timeout: float | None = None) dict[源代码]#

Finalize one eval batch and return aggregated eval metrics.

async abort_batch(batch_id: int | str, *, reason: str, keep_partial_results: bool = False) None[源代码]#

Abort one batch and cleanup its running and staged state.

async process_experiences(payloads: list[bytes]) dict[源代码]#

Process one batch of experience payloads through the pipeline.

classmethod get_actor(config: Config, models: List, auxiliary_models: List) ActorHandle[源代码]#

Init rollout coordinator for the task-event-completion path.