trinity.buffer.utils module

trinity.buffer.utils module#

trinity.buffer.utils.to_async_url(url: str) str[source]#

Convert a synchronous DB URL to its async dialect equivalent.

async trinity.buffer.utils.async_run_with_retry_session(session_maker: async_sessionmaker, operation: Callable[[AsyncSession], Awaitable[Any]], max_retry_times: int = 2, max_retry_interval: float = 1.0) Any[source]#

Run an async database operation with session retry.