trinity.buffer.pipelines.experience_pipeline module#

trinity.buffer.pipelines.experience_pipeline.get_input_buffers(pipeline_config: ExperiencePipelineConfig) Dict[源代码]#

Get input buffers for the experience pipeline.

trinity.buffer.pipelines.experience_pipeline.default_input_save_path(config: Config) str[源代码]#

Compute the default input_save_path for the experience pipeline.

This is the single source of truth for where the pipeline writes its input database when save_input is enabled but no path is provided: a SQLite database at <checkpoint_job_dir>/buffer/explorer_output.db.

It is derived purely from the config components (no Ray/GPU side effects), so callers that load a config without a full validation pass — e.g. trinity view — resolve the exact same path the pipeline writes to at run time. The abspath mirrors GlobalConfigValidator making a relative checkpoint_root_dir absolute, keeping the two contexts consistent.

class trinity.buffer.pipelines.experience_pipeline.ExperiencePipeline(config: Config)[源代码]#

基类:object

A class to process experiences.

__init__(config: Config)[源代码]#
async prepare() None[源代码]#
async process(exp_bytes: bytes) Dict[源代码]#

Process a batch of experiences.

参数:

exp_bytes (bytes) -- Serialized experiences to process. These experiences are typically generated by an explorer in one step.

返回:

A dictionary containing metrics collected during the processing of experiences.

返回类型:

Dict

async process_serialized_chunks(exp_chunks: list[bytes]) Dict[源代码]#

Process a batch assembled from multiple serialized task payloads.

async close() None[源代码]#