trinity.explorer.rollout_coordinator module#
Rollout coordinator for async batch submission and finalize.
- class trinity.explorer.rollout_coordinator.BatchLifecycleState(*values)[源代码]#
基类:
str,EnumLifecycle states for one submitted batch.
- PENDING = 'pending'#
- RUNNING = 'running'#
- FINALIZING = 'finalizing'#
- FINALIZED = 'finalized'#
- ABORTED = 'aborted'#
- class trinity.explorer.rollout_coordinator.BatchState(batch_id: int | str, batch_type: ~typing.Literal['train', 'eval'], expected_task_count: int, statuses: ~typing.Dict[int | str, ~typing.Any] = <factory>, min_wait_num: int | None = None, state: ~trinity.explorer.rollout_coordinator.BatchLifecycleState = BatchLifecycleState.PENDING, final_result: dict | None = None, finalize_lock: ~asyncio.locks.Lock = <factory>)[源代码]#
基类:
objectIn-memory state tracked for one train or eval batch.
- batch_id: int | str#
- batch_type: Literal['train', 'eval']#
- expected_task_count: int#
- statuses: Dict[int | str, Any]#
- min_wait_num: int | None = None#
- state: BatchLifecycleState = 'pending'#
- final_result: dict | None = None#
- finalize_lock: Lock#
- property completed_task_count: int#
Return the number of completed tasks tracked by status.
- __init__(batch_id: int | str, batch_type: ~typing.Literal['train', 'eval'], expected_task_count: int, statuses: ~typing.Dict[int | str, ~typing.Any] = <factory>, min_wait_num: int | None = None, state: ~trinity.explorer.rollout_coordinator.BatchLifecycleState = BatchLifecycleState.PENDING, final_result: dict | None = None, finalize_lock: ~asyncio.locks.Lock = <factory>) None#
- class trinity.explorer.rollout_coordinator.RolloutCoordinator(config: Config, rollout_model: List[InferenceModel], auxiliary_models: List[List[InferenceModel]] | None = None)[源代码]#
基类:
objectOwn scheduler-side batch state and expose batch-level finalize APIs.
- __init__(config: Config, rollout_model: List[InferenceModel], auxiliary_models: List[List[InferenceModel]] | None = None)[源代码]#
Create a coordinator with internally managed scheduler and pipeline.
- async submit_batch(*, batch_id: int | str, tasks: list[Task], batch_type: Literal['train', 'eval'], min_wait_num: int | None = None) None[源代码]#
Register a new batch and schedule its tasks.
- async finalize_train_batch(batch_id: int, *, timeout: float | None = None) dict[源代码]#
Finalize one train batch and return aggregated metrics.
- async finalize_eval_batch(batch_id: str, *, timeout: float | None = None) dict[源代码]#
Finalize one eval batch and return aggregated eval metrics.
- async abort_batch(batch_id: int | str, *, reason: str, keep_partial_results: bool = False) None[源代码]#
Abort one batch and cleanup its running and staged state.