| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364 |
- import asyncio
- import pathlib
- import aiofiles
- from .attachment import attachment
- from .attachment import attachment_proxy
- from .aleatory_file_name import aleatory_file_name
- class attachments_directory:
- def __init__(self, directory: pathlib.Path) -> None:
- if not directory.exists() or not directory.is_dir():
- raise RuntimeError("Directory for attachments not exists.")
- self.__directory = directory
- async def store(self, content: bytes, extension: str) -> attachment_proxy:
- name = await self.__get_new_name(extension)
- path = self.directory / pathlib.Path(name)
- async with aiofiles.open(path, "wb") as handler:
- await handler.write(content)
- return attachment_proxy.create(name)
- async def change(self, file: attachment, content: bytes) -> bytes:
- before = await self.load(file)
- path = self.directory / file.resources_path
-
- async with aiofiles(path, "wb") as handler:
- handler.write(content)
-
- return before
- async def load(self, file: attachment) -> bytes:
- path = self.directory / file.resources_path
- async with aiofiles.open(path, "rb") as handler:
- return await handler.read()
- async def remove(self, file: attachment) -> bytes:
- path = self.directory / file.resources_path
- content = await self.load(file)
- path.unlink()
- return content
- async def __get_new_name(self, extension: str) -> str:
- return await asyncio.to_thread(
- self.__get_new_name_loop,
- extension
- )
- def __get_new_name_loop(self, extension: str) -> str:
- while True:
- new_name = aleatory_file_name(extension)
- new_path = self.directory / pathlib.Path(new_name)
- if not new_path.exists():
- return new_name
- @property
- def directory(self) -> pathlib.Path:
- return self.__directory
|