secret_loader.py 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218
  1. import typing
  2. import sqlmodel
  3. import sqlalchemy.engine.base
  4. from .user import user
  5. from .secret import secret
  6. class secret_loader(sqlmodel.Session):
  7. """
  8. This class is responsible for loading and managing secrets in the
  9. database.
  10. """
  11. def __init__(
  12. self,
  13. connection: sqlalchemy.engine.base.Engine,
  14. owner: user
  15. ) -> None:
  16. """
  17. This create new loader. It require connection to the database, and
  18. user, which own secrets to work on.
  19. Parameters:
  20. connection (sqlalchemy.engine.base.Engine): Connection to database
  21. owner (user): User which own secrets to work on
  22. """
  23. super().__init__(connection)
  24. if not owner.in_database:
  25. raise Exception("User to build loaded for not exists.")
  26. self.__owner = owner
  27. @property
  28. def owner(self) -> user:
  29. """ This return owner of secrets which work on. """
  30. return self.__owner
  31. def append(self, target: secret) -> bool:
  32. """
  33. This append new secret to database. Secret must have same owner, as
  34. set in loader. When owner is not set, user from loader would be used.
  35. When user is not correct, that mean is other in secret and in loader,
  36. TypeError would be raised. When secret name is already in use, then
  37. it return False. When item is already in database, Exception would be
  38. raised. When secret is not ready, that mean
  39. Parameters:
  40. target (secret): New secret to insert in database
  41. Returns:
  42. (bool): True when append successfull, False when failed
  43. """
  44. if target.owner is None:
  45. target.owner = self.owner
  46. elif target.owner != self.owner.id:
  47. raise TypeError("Owner of secret is other than in loader.")
  48. if target.in_database:
  49. raise Exception("Secret is already in database. Use update.")
  50. if not target.ready:
  51. raise Exception("Secret is not ready to append.")
  52. if self.load_by_name(target.name) is not None:
  53. return False
  54. self.add(target)
  55. self.commit()
  56. self.refresh(target)
  57. return True
  58. @property
  59. def __select(self) -> sqlmodel.sql.expression.Select:
  60. """ It create new selecr query with set owner statement. """
  61. return sqlmodel.select(secret).where(secret.owner == self.owner.id)
  62. def search_for_domain(self, target: str) -> typing.Iterable[secret]:
  63. """
  64. This search in secret database for secrets by domain. It return
  65. iterator which items is in secret type.
  66. Parameters:
  67. target (str): Domain name to search
  68. Returns:
  69. (Iterable[secret]): Iterator with found secrets
  70. """
  71. target = "%" + target + "%"
  72. query = self.__select.where(secret.domain.like(target))
  73. result = self.exec(query)
  74. for item in result:
  75. yield item
  76. def search_for_name(self, target: str) -> typing.Iterable[secret]:
  77. """
  78. This search in secrets database for secrets which have names like
  79. given. It returns iterator with found result.
  80. Parameters:
  81. target (str): Name to search in database
  82. Returns:
  83. (Iterable[secret]): Found secrets
  84. """
  85. target = "%" + target.upper() + "%"
  86. query = self.__select.where(secret.name.like(target))
  87. result = self.exec(query)
  88. for item in result:
  89. yield item
  90. def name_in_use(self, target: str) -> bool:
  91. """
  92. This check that name is already in use.
  93. Parameters:
  94. target (str): Name to check
  95. Returns:
  96. (bool): True when name is user, False when not
  97. """
  98. return self.load_by_name(target) is not None
  99. def load_by_name(self, target: str) -> secret | None:
  100. """
  101. It load single secret by name. When secret not exists, return None.
  102. Parameters:
  103. target (str): Name of the secret
  104. Returns:
  105. (secret | None): Loaded secret or None when not exists
  106. """
  107. target = target.upper()
  108. query = self.__select.where(secret.name == target)
  109. result = self.exec(query)
  110. return result.first()
  111. def drop(self, target: secret) -> bool:
  112. """
  113. This remove secret from database. When secret not exists in database
  114. return False. Whem all went good, return True.
  115. Parameters:
  116. target (secret): Secret to drop
  117. Returns:
  118. (bool): True when remove successfull, False when not
  119. """
  120. if not target.in_database:
  121. return False
  122. self.delete(target)
  123. self.commit()
  124. return True
  125. def clear(self) -> None:
  126. """
  127. This drop all secrets of the user. It is useable before user remove.
  128. """
  129. query = self.__select
  130. result = self.exec(query)
  131. for item in result:
  132. self.delete(item)
  133. self.commit()
  134. def update(self, target: secret) -> bool:
  135. """
  136. This update secret in the database. When secret not exists in
  137. database. When owner is not set propertly, raise TypeError. When
  138. target is not in database, or secret is not ready, raise Exception.
  139. When name is already in use by other secret, it return False.
  140. Parameters:
  141. target (secret): Secret to update
  142. Returns:
  143. (bool): True when updated, False when name is in use
  144. """
  145. if target.owner is None:
  146. target.owner = self.owner
  147. elif target.owner != self.owner.id:
  148. raise TypeError("Owner of the secret is other than in loader.")
  149. if not target.in_database:
  150. raise Exception("Target is not in database. Use append.")
  151. if not target.ready:
  152. return Exception("Target is not ready to update.")
  153. check = self.load_by_name(target.name)
  154. if check is not None and check.id != target.id:
  155. return False
  156. self.add(target)
  157. self.commit()
  158. self.refresh(target)
  159. return True