Source code for trinity.buffer.utils

import asyncio
import traceback
from typing import Any, Awaitable, Callable

from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker

from trinity.utils.log import get_logger


[docs] def to_async_url(url: str) -> str: """Convert a synchronous DB URL to its async dialect equivalent.""" if url.startswith("sqlite:///"): return url.replace("sqlite:///", "sqlite+aiosqlite:///", 1) if url.startswith("postgresql://"): return url.replace("postgresql://", "postgresql+asyncpg://", 1) if url.startswith("mysql://"): return url.replace("mysql://", "mysql+aiomysql://", 1) return url
[docs] async def async_run_with_retry_session( session_maker: async_sessionmaker, operation: Callable[[AsyncSession], Awaitable[Any]], max_retry_times: int = 2, max_retry_interval: float = 1.0, ) -> Any: """Run an async database operation with session retry.""" logger = get_logger(__name__) max_retry_times = max(1, max_retry_times) for attempt in range(max_retry_times): async with session_maker() as session: try: async with session.begin(): result = await operation(session) return result except StopAsyncIteration: raise except Exception as exc: logger.warning( "Async attempt %s failed, retrying in %s seconds...", attempt + 1, max_retry_interval, ) logger.warning("trace = %s", traceback.format_exc()) if attempt < max_retry_times - 1: await asyncio.sleep(max_retry_interval) continue logger.error("Max retry attempts reached, raising exception.") raise exc raise RuntimeError("async_run_with_retry_session exhausted without raising")