secret_loader.py 6.0 KB


  1. import typing
  2. import sqlmodel
  3. import sqlalchemy.engine.base
  4. from .user import user
  5. from .secret import secret
  6. class secret_loader(sqlmodel.Session):
  7. def __init__(
  8. self,
  9. connection: sqlalchemy.engine.base.Engine,
  10. owner: user
  11. ) -> None:
  12. """
  13. This create new loader. It require connection to the database, and
  14. user, which own secrets to work on.
  15. Parameters:
  16. connection (sqlalchemy.engine.base.Engine): Connection to database
  17. owner (user): User which own secrets to work on
  18. """
  19. super().__init__(connection)
  20. if not owner.in_database:
  21. raise Exception("User to build loaded for not exists.")
  22. self.__owner = owner
  23. @property
  24. def owner(self) -> user:
  25. """ This return owner of secrets which work on. """
  26. return self.__owner
  27. def append(self, target: secret) -> bool:
  28. """
  29. This append new secret to database. Secret must have same owner, as
  30. set in loader. When owner is not set, user from loader would be used.
  31. When user is not correct, that mean is other in secret and in loader,
  32. TypeError would be raised. When secret name is already in use, then
  33. it return False. When item is already in database, Exception would be
  34. raised. When secret is not ready, that mean
  35. Parameters:
  36. target (secret): New secret to insert in database
  37. Returns:
  38. (bool): True when append successfull, False when failed
  39. """
  40. if target.owner is None:
  41. target.owner = self.owner
  42. elif target.owner != self.owner.id:
  43. raise TypeError("Owner of secret is other than in loader.")
  44. if target.in_database:
  45. raise Exception("Secret is already in database. Use update.")
  46. if not target.ready:
  47. raise Exception("Secret is not ready to append.")
  48. if self.load_by_name(target.name) is not None:
  49. return False
  50. self.add(target)
  51. self.commit()
  52. self.refresh(target)
  53. return True
  54. @property
  55. def __select(self) -> sqlmodel.sql.expression.Select:
  56. """ It create new selecr query with set owner statement. """
  57. return sqlmodel.select(secret).where(secret.owner == self.owner.id)
  58. def search_for_domain(self, target: str) -> typing.Iterable[secret]:
  59. """
  60. This search in secret database for secrets by domain. It return
  61. iterator which items is in secret type.
  62. Parameters:
  63. target (str): Domain name to search
  64. Returns:
  65. (Iterable[secret]): Iterator with found secrets
  66. """
  67. target = "%" + target + "%"
  68. query = self.__select.where(secret.domain.like(target))
  69. result = self.exec(query)
  70. for item in result:
  71. yield item
  72. def search_for_name(self, target: str) -> typing.Iterable[secret]:
  73. """
  74. This search in secrets database for secrets which have names like
  75. given. It returns iterator with found result.
  76. Parameters:
  77. target (str): Name to search in database
  78. Returns:
  79. (Iterable[secret]): Found secrets
  80. """
  81. target = "%" + target.upper() + "%"
  82. query = self.__select.where(secret.name.like(target))
  83. result = self.exec(query)
  84. for item in result:
  85. yield item
  86. def name_in_use(self, target: str) -> bool:
  87. """
  88. This check that name is already in use.
  89. Parameters:
  90. target (str): Name to check
  91. Returns:
  92. (bool): True when name is user, False when not
  93. """
  94. return self.load_by_name(target) is not None
  95. def load_by_name(self, target: str) -> secret | None:
  96. """
  97. It load single secret by name. When secret not exists, return None.
  98. Parameters:
  99. target (str): Name of the secret
  100. Returns:
  101. (secret | None): Loaded secret or None when not exists
  102. """
  103. target = target.upper()
  104. query = self.__select.where(secret.name == target)
  105. result = self.exec(query)
  106. return result.first()
  107. def drop(self, target: secret) -> bool:
  108. """
  109. This remove secret from database. When secret not exists in database
  110. return False. Whem all went good, return True.
  111. Parameters:
  112. target (secret): Secret to drop
  113. Returns:
  114. (bool): True when remove successfull, False when not
  115. """
  116. if not target.in_database:
  117. return False
  118. self.delete(target)
  119. self.commit()
  120. return True
  121. def clear(self) -> None:
  122. """
  123. This drop all secrets of the user. It is useable before user remove.
  124. """
  125. query = self.__select
  126. result = self.exec(query)
  127. for item in result:
  128. self.delete(item)
  129. self.commit()
  130. def update(self, target: secret) -> bool:
  131. """
  132. This update secret in the database. When secret not exists in
  133. database. When owner is not set propertly, raise TypeError. When
  134. target is not in database, or secret is not ready, raise Exception.
  135. When name is already in use by other secret, it return False.
  136. Parameters:
  137. target (secret): Secret to update
  138. Returns:
  139. (bool): True when updated, False when name is in use
  140. """
  141. if target.owner is None:
  142. target.owner = self.owner
  143. elif target.owner != self.owner.id:
  144. raise TypeError("Owner of the secret is other than in loader.")
  145. if not target.in_database:
  146. raise Exception("Target is not in database. Use append.")
  147. if not target.ready:
  148. return Exception("Target is not ready to update.")
  149. check = self.load_by_name(target.name)
  150. if check is not None and check.id != target.id:
  151. return False
  152. self.add(target)
  153. self.commit()
  154. self.refresh(target)
  155. return True