application_secret.py 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286
  1. from .user import user
  2. from .user import user_builder
  3. from .user_loader import user_loader
  4. from .secret import secret
  5. from .secret import secret_builder
  6. from .secret_coder import secret_coder
  7. from .secret_loader import secret_loader
  8. from .application_part import application_part
  9. from .apikey import bad_apikey_exception
  10. from .validators import name_validator
  11. from .validators import domain_validator
  12. from .application_secret_response import secret_response
  13. from .application_secret_response import secret_collection_response
  14. class application_secret(application_part):
  15. """
  16. This class is responsible for managing secrets in the application.
  17. """
  18. def get(self, apikey: str, name: str) -> dict:
  19. """
  20. This load secret by name. It return full secret, with crypted content,
  21. or fail response when secret with given name not exists.
  22. Parameters:
  23. apikey (str): ApiKey of the secret owner
  24. name (str): Name of the secret
  25. Returns:
  26. (dict): Result to convert into JSON
  27. """
  28. with self.__secret_database(apikey) as loader:
  29. target = loader.load_by_name(name)
  30. if target is None:
  31. return self.__not_found_response()
  32. return secret_response(target).response
  33. def __validator(self, **kwargs) -> dict | None:
  34. """
  35. This validate given parameters. It return None when all given items
  36. is valid, or dict with fail response dict.
  37. Kwargs:
  38. name (str): Name to valid
  39. new_name (str): Name to valid
  40. domain (str): Domain to valid
  41. coded (str): Crypted content to valid
  42. Returns:
  43. (dict): Fail to return as JSON
  44. (None): Validating parameters ara valid
  45. """
  46. result = None
  47. if "name" in kwargs:
  48. name = kwargs["name"]
  49. result = self._validation("name", name_validator(name))
  50. if "new_name" in kwargs:
  51. name = kwargs["new_name"]
  52. result = self._validation("name", name_validator(name))
  53. if "domain" in kwargs and result is None:
  54. domain = kwargs["domain"]
  55. result = self._validation("domain", domain_validator(domain))
  56. if not "coded" in kwargs:
  57. return result
  58. coded = kwargs["coded"]
  59. if not secret_coder.validate(coded):
  60. return self._fail_response(cause = "Invalid coded secret.")
  61. return result
  62. def create(self, apikey: str, name: str, domain: str, coded: str) -> dict:
  63. """
  64. This register new secret.
  65. Parameters:
  66. apikey (str): ApiKey of the owner
  67. name (str): Name of the secret
  68. domain (str): Domain to the secret
  69. coded (str): Crypted content of the secret
  70. Returns:
  71. (dict): Result of the action to convert into JSON
  72. """
  73. validation = self.__validator(
  74. name = name,
  75. domain = domain,
  76. coded = coded
  77. )
  78. if validation is not None:
  79. return validation
  80. with self.__secret_database(apikey) as loader:
  81. builder = secret_builder()
  82. builder.name = name
  83. builder.domain = domain
  84. builder.coded = coded
  85. builder.owner = loader.owner
  86. if loader.append(builder.result):
  87. return self._success_response()
  88. return self._fail_response(cause = "Name already in use.")
  89. def drop(self, apikey: str, name: str) -> dict:
  90. """
  91. This remove secret from database.
  92. Parameters:
  93. apikey (str): Owner of the secret
  94. name (str): Name of the secret
  95. Returns:
  96. (dict): Result to return as JSON
  97. """
  98. with self.__secret_database(apikey) as loader:
  99. target = loader.load_by_name(name)
  100. if target is None:
  101. return self.__not_found_response()
  102. if loader.drop(target):
  103. return self._success_response()
  104. return self._fail_response(cause = "Internal database error.")
  105. def update(self, apikey: str, name: str, **kwargs) -> dict:
  106. """
  107. This update secret data in the database.
  108. Parameters:
  109. apikey (str): ApiKey of the secret owner
  110. name (str): Old name of the secret
  111. Kwargs:
  112. new_name (str): New name of the secret
  113. domain (str): Domain of the secret
  114. coded (str): Coded content of the secret
  115. """
  116. validation = self.__validator(**kwargs)
  117. if validation is not None:
  118. return validation
  119. with self.__secret_database(apikey) as loader:
  120. target = loader.load_by_name(name)
  121. if target is None:
  122. return self.__not_found_response()
  123. builder = secret_builder(target)
  124. if "new_name" in kwargs:
  125. builder.name = kwargs["new_name"]
  126. if "domain" in kwargs:
  127. builder.domain = kwargs["domain"]
  128. if "coded" in kwargs:
  129. builder.coded = kwargs["coded"]
  130. if loader.update(builder.result):
  131. return self._success_response()
  132. return self._fail_response(cause = "Name already in use.")
  133. def name_in_use(self, apikey: str, name: str) -> dict:
  134. """
  135. This check that name is used by any sectet.
  136. Parameters:
  137. apikey (str): ApiKey of the owner
  138. name (str): Name to check
  139. Returns:
  140. (dict): Result of the operation to return as JSON
  141. """
  142. validation = self._validation("name", name_validator(name))
  143. if validation is not None:
  144. return validation
  145. with self.__secret_database(apikey) as loader:
  146. result = loader.name_in_use(name)
  147. return self._success_response(in_use = result, name = name)
  148. def domain_search(self, apikey: str, domain: str) -> dict:
  149. """
  150. This search in database by domain name.
  151. Parameters:
  152. apikey (str): ApiKey of the secrets owner
  153. domain (str): Domain name to search
  154. Returns:
  155. (dict): Result to return as JSON
  156. """
  157. with self.__secret_database(apikey) as loader:
  158. results = loader.search_for_domain(domain)
  159. return secret_collection_response(results).response
  160. def name_search(self, apikey: str, name: str) -> dict:
  161. """
  162. This search in database by domain name.
  163. Parameters:
  164. apikey (str): ApiKey of the secrets owner
  165. name (str): Name to search
  166. Returns:
  167. (dict): Result to return as JSON
  168. """
  169. with self.__secret_database(apikey) as loader:
  170. results = loader.search_for_name(name)
  171. return secret_collection_response(results).response
  172. def get_all(self, apikey: str) -> dict:
  173. """
  174. This return all secrets own by user.
  175. Parameters:
  176. apikey (str): ApiKey of the user
  177. Returns:
  178. (dict): Collection of the user secrets
  179. """
  180. with self.__secret_database(apikey) as loader:
  181. results = loader.get_all()
  182. return search_collection_response(results).response
  183. def __not_found_response(self) -> dict:
  184. """
  185. It create response with not found error.
  186. Returns:
  187. (dict): Result to return as JSON
  188. """
  189. return self._fail_response(cause = "Secret not found.")
  190. @property
  191. def __user_database(self) -> user_loader:
  192. """ It return user database. """
  193. return user_loader(self._connector)
  194. def __secret_database(self, apikey: str) -> secret_loader:
  195. """
  196. It return new secret database for user with given apikey. When
  197. user is not exists, then raise bad_apikey_exception.
  198. Parameters:
  199. apikey (str): ApiKey of the owner
  200. Returns:
  201. (secret_loader): Secret loader for the owner
  202. """
  203. with self.__user_database as loader:
  204. target = loader.get_by_apikey(apikey)
  205. if target is None:
  206. raise bad_apikey_exception()
  207. return secret_loader(self._connector, target)