Source code for trinity.perf.resource_backends

"""System resource collection backends for performance tooling."""

from __future__ import annotations

import time
from dataclasses import asdict, dataclass
from typing import Iterable

import psutil
from pynvml import (
    NVMLError,
    nvmlDeviceGetCount,
    nvmlDeviceGetHandleByIndex,
    nvmlDeviceGetMemoryInfo,
    nvmlDeviceGetName,
    nvmlDeviceGetUtilizationRates,
    nvmlInit,
    nvmlShutdown,
)


[docs] @dataclass class GPUSample: """One GPU sample at one point in time.""" gpu_id: int name: str gpu_util_percent: float gpu_memory_used_mb: float gpu_memory_total_mb: float
[docs] def to_dict(self) -> dict: """Serialize the GPU sample to a dictionary.""" return asdict(self)
[docs] @dataclass class ResourceSample: """One system resource sample at one point in time.""" timestamp: float cpu_percent: float memory_rss_mb: float memory_percent: float gpu_metrics: list[GPUSample]
[docs] def to_dict(self) -> dict: """Serialize the resource sample to a dictionary.""" payload = asdict(self) payload["gpu_metrics"] = [gpu_sample.to_dict() for gpu_sample in self.gpu_metrics] return payload
[docs] class SystemResourceBackend: """Collect system-level CPU, memory and per-GPU metrics."""
[docs] def __init__( self, gpu_subsample_count: int = 5, gpu_subsample_interval_seconds: float = 0.2, ) -> None: if gpu_subsample_count <= 0: raise ValueError("gpu_subsample_count must be greater than 0.") if gpu_subsample_interval_seconds < 0: raise ValueError("gpu_subsample_interval_seconds must be non-negative.") self._process = psutil.Process() self._initialized = False self._gpu_count = 0 self._gpu_subsample_count = gpu_subsample_count self._gpu_subsample_interval_seconds = gpu_subsample_interval_seconds
[docs] def open(self) -> None: """Initialize the GPU management library and validate the environment.""" if self._initialized: return try: nvmlInit() self._gpu_count = nvmlDeviceGetCount() except NVMLError as error: raise RuntimeError(f"Failed to initialize NVML: {error}") from error if self._gpu_count <= 0: self.close() raise RuntimeError("No GPU devices detected by NVML.") self._process.cpu_percent(interval=None) self._initialized = True
[docs] def close(self) -> None: """Release NVML resources.""" if not self._initialized: return try: nvmlShutdown() except NVMLError: pass self._initialized = False self._gpu_count = 0
[docs] def sample(self) -> ResourceSample: """Collect one resource sample.""" if not self._initialized: raise RuntimeError("SystemResourceBackend must be opened before sampling.") timestamp = time.time() memory_info = self._process.memory_info() gpu_metrics = self._collect_gpu_metrics() for _ in range(1, self._gpu_subsample_count): if self._gpu_subsample_interval_seconds > 0: time.sleep(self._gpu_subsample_interval_seconds) gpu_metrics = self._merge_gpu_metrics(gpu_metrics, self._collect_gpu_metrics()) return ResourceSample( timestamp=timestamp, cpu_percent=float(self._process.cpu_percent(interval=None)), memory_rss_mb=float(memory_info.rss) / (1024 * 1024), memory_percent=float(self._process.memory_percent()), gpu_metrics=gpu_metrics, )
def _collect_gpu_metrics(self) -> list[GPUSample]: """Collect one instantaneous GPU snapshot from NVML.""" gpu_metrics: list[GPUSample] = [] for gpu_index in range(self._gpu_count): gpu_handle = nvmlDeviceGetHandleByIndex(gpu_index) utilization = nvmlDeviceGetUtilizationRates(gpu_handle) gpu_memory = nvmlDeviceGetMemoryInfo(gpu_handle) gpu_name = nvmlDeviceGetName(gpu_handle) if isinstance(gpu_name, bytes): gpu_name = gpu_name.decode("utf-8") gpu_metrics.append( GPUSample( gpu_id=gpu_index, name=str(gpu_name), gpu_util_percent=float(utilization.gpu), gpu_memory_used_mb=float(gpu_memory.used) / (1024 * 1024), gpu_memory_total_mb=float(gpu_memory.total) / (1024 * 1024), ) ) return gpu_metrics def _merge_gpu_metrics( self, base_metrics: Iterable[GPUSample], next_metrics: Iterable[GPUSample], ) -> list[GPUSample]: """Merge GPU snapshots by keeping peak utilization and memory usage.""" merged_metrics = {gpu_metric.gpu_id: gpu_metric for gpu_metric in base_metrics} for gpu_metric in next_metrics: prior_metric = merged_metrics.get(gpu_metric.gpu_id) if prior_metric is None: merged_metrics[gpu_metric.gpu_id] = gpu_metric continue merged_metrics[gpu_metric.gpu_id] = GPUSample( gpu_id=gpu_metric.gpu_id, name=gpu_metric.name, gpu_util_percent=max(prior_metric.gpu_util_percent, gpu_metric.gpu_util_percent), gpu_memory_used_mb=max( prior_metric.gpu_memory_used_mb, gpu_metric.gpu_memory_used_mb ), gpu_memory_total_mb=gpu_metric.gpu_memory_total_mb, ) return [merged_metrics[gpu_index] for gpu_index in sorted(merged_metrics)]