attachments_manager.py 1.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041
  1. import base64
  2. import pathlib
  3. import asyncio
  4. from .attachment import attachment
  5. from .attachment import attachment_proxy
  6. from .attachment_file import attachment_file
  7. from .exceptions import resources_directory_not_exists
  8. class attachments_manager:
  9. def __init__(
  10. self,
  11. resources: pathlib.Path,
  12. init_directory: bool = False
  13. ) -> None:
  14. if init_directory:
  15. resources.mkdir()
  16. if not resources.is_dir() or not resources.exists():
  17. raise resources_directory_not_exists(resources)
  18. self.__resources = resources
  19. @property
  20. def resources(self) -> pathlib.Path:
  21. return self.__resources
  22. async def __decode(self, content: str) -> bytes:
  23. return await asyncio.to_thread(
  24. base64.b64decode,
  25. content.encode("ascii")
  26. )
  27. async def upload(self, content: str, extension: str) -> attachment_file:
  28. decoded = await self.__decode(content)
  29. file_handler = attachment_file.create(self.resources, extension)
  30. return await file_handler.store(decoded)
  31. def restore(self, file: attachment) -> attachment_file:
  32. return attachment_file(file.resources_path, self.resources)