Source code for trinity.buffer.writer.sql_writer
"""Writer of the SQL buffer."""
from trinity.buffer.buffer_writer import BufferWriter
from trinity.buffer.storage.sql import SQLExperienceStorage, SQLStorage, SQLTaskStorage
from trinity.common.config import StorageConfig
from trinity.common.constants import StorageType
[docs]
class SQLWriter(BufferWriter):
"""Writer of the SQL buffer."""
[docs]
def __init__(self, config: StorageConfig) -> None:
assert config.storage_type == StorageType.SQL.value
self.wrap_in_ray = config.wrap_in_ray
self._storage = None
self._async_storage = None
self._config = config
@property
def storage(self):
if self._storage is None:
self._storage = SQLStorage.get_wrapper(self._config)
return self._storage
async def _get_async_storage(self):
if self._async_storage is None:
if self._config.schema_type is None:
self._async_storage = SQLTaskStorage(self._config)
else:
self._async_storage = SQLExperienceStorage(self._config)
await self._async_storage.prepare()
return self._async_storage
[docs]
async def write(self, data):
if self.wrap_in_ray:
await self.storage.write.remote(data)
else:
storage = await self._get_async_storage()
await storage.write(data)
[docs]
async def acquire(self) -> int:
if self.wrap_in_ray:
return await self.storage.acquire.remote()
else:
storage = await self._get_async_storage()
return storage.acquire()
[docs]
async def release(self) -> int:
if self.wrap_in_ray:
return await self.storage.release.remote()
else:
storage = await self._get_async_storage()
return storage.release()