Source code for trinity.perf.resource_sampler
"""Threaded resource sampler for performance tooling."""
from __future__ import annotations
import threading
import time
from typing import Optional
from trinity.perf.resource_backends import ResourceSample, SystemResourceBackend
[docs]
class ResourceSampler:
"""Periodically collect system resource samples in a background thread."""
[docs]
def __init__(
self,
interval_seconds: float,
backend: Optional[SystemResourceBackend] = None,
) -> None:
if interval_seconds <= 0:
raise ValueError("interval_seconds must be greater than 0.")
self.interval_seconds = interval_seconds
self.backend = backend or SystemResourceBackend()
self._samples: list[ResourceSample] = []
self._lock = threading.Lock()
self._stop_event = threading.Event()
self._thread: Optional[threading.Thread] = None
self._started = False
[docs]
def start(self) -> None:
"""Start sampling in the background."""
if self._started:
raise RuntimeError("ResourceSampler has already been started.")
self.backend.open()
self._started = True
self._stop_event.clear()
self._thread = threading.Thread(target=self._run, name="resource-sampler", daemon=True)
self._thread.start()
[docs]
def stop(self) -> list[ResourceSample]:
"""Stop sampling and return the collected samples."""
if not self._started:
return self.samples()
self._stop_event.set()
if self._thread is not None:
self._thread.join()
self._thread = None
self.backend.close()
self._started = False
return self.samples()
[docs]
def samples(self) -> list[ResourceSample]:
"""Return a snapshot of all collected samples."""
with self._lock:
return list(self._samples)
def _run(self) -> None:
next_sample_time = time.monotonic()
while not self._stop_event.is_set():
self._collect_once()
next_sample_time += self.interval_seconds
remaining_time = max(0.0, next_sample_time - time.monotonic())
if self._stop_event.wait(remaining_time):
break
def _collect_once(self) -> None:
sample = self.backend.sample()
with self._lock:
self._samples.append(sample)