import asyncio import pathlib import aiofiles from .attachment import attachment from .attachment import attachment_proxy from .aleatory_file_name import aleatory_file_name class attachments_directory: def __init__(self, directory: pathlib.Path) -> None: if not directory.exists() or not directory.is_dir(): raise RuntimeError("Directory for attachments not exists.") self.__directory = directory async def store(self, content: bytes, extension: str) -> attachment_proxy: name = await self.__get_new_name(extension) path = self.directory / pathlib.Path(name) async with aiofiles.open(path, "wb") as handler: await handler.write(content) return attachment_proxy.create(name) async def change(self, file: attachment, content: bytes) -> bytes: before = await self.load(file) path = self.directory / file.resources_path async with aiofiles(path, "wb") as handler: handler.write(content) return before async def load(self, file: attachment) -> bytes: path = self.directory / file.resources_path async with aiofiles.open(path, "rb") as handler: return await handler.read() async def remove(self, file: attachment) -> bytes: path = self.directory / file.resources_path content = await self.load(file) path.unlink() return content async def __get_new_name(self, extension: str) -> str: return await asyncio.to_thread( self.__get_new_name_loop, extension ) def __get_new_name_loop(self, extension: str) -> str: while True: new_name = aleatory_file_name(extension) new_path = self.directory / pathlib.Path(new_name) if not new_path.exists(): return new_name @property def directory(self) -> pathlib.Path: return self.__directory