attachments_directory.py 1.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364
  1. import asyncio
  2. import pathlib
  3. import aiofiles
  4. from .attachment import attachment
  5. from .attachment import attachment_proxy
  6. from .aleatory_file_name import aleatory_file_name
  7. class attachments_directory:
  8. def __init__(self, directory: pathlib.Path) -> None:
  9. if not directory.exists() or not directory.is_dir():
  10. raise RuntimeError("Directory for attachments not exists.")
  11. self.__directory = directory
  12. async def store(self, content: bytes, extension: str) -> attachment_proxy:
  13. name = await self.__get_new_name(extension)
  14. path = self.directory / pathlib.Path(name)
  15. async with aiofiles.open(path, "wb") as handler:
  16. await handler.write(content)
  17. return attachment_proxy.create(name)
  18. async def change(self, file: attachment, content: bytes) -> bytes:
  19. before = await self.load(file)
  20. path = self.directory / file.resources_path
  21. async with aiofiles(path, "wb") as handler:
  22. handler.write(content)
  23. return before
  24. async def load(self, file: attachment) -> bytes:
  25. path = self.directory / file.resources_path
  26. async with aiofiles.open(path, "rb") as handler:
  27. return await handler.read()
  28. async def remove(self, file: attachment) -> bytes:
  29. path = self.directory / file.resources_path
  30. content = await self.load(file)
  31. path.unlink()
  32. return content
  33. async def __get_new_name(self, extension: str) -> str:
  34. return await asyncio.to_thread(
  35. self.__get_new_name_loop,
  36. extension
  37. )
  38. def __get_new_name_loop(self, extension: str) -> str:
  39. while True:
  40. new_name = aleatory_file_name(extension)
  41. new_path = self.directory / pathlib.Path(new_name)
  42. if not new_path.exists():
  43. return new_name
  44. @property
  45. def directory(self) -> pathlib.Path:
  46. return self.__directory