application_secret.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. from .user import user
  2. from .user import user_builder
  3. from .user_loader import user_loader
  4. from .secret import secret
  5. from .secret import secret_builder
  6. from .secret_coder import secret_coder
  7. from .secret_loader import secret_loader
  8. from .application_part import application_part
  9. from .apikey import bad_apikey_exception
  10. from .validators import name_validator
  11. from .validators import domain_validator
  12. from .application_secret_response import secret_response
  13. from .application_secret_response import secret_collection_response
  14. class application_secret(application_part):
  15. def get(self, apikey: str, name: str) -> dict:
  16. with self.__secret_database(apikey) as loader:
  17. target = loader.load_by_name(name)
  18. if target is None:
  19. return self.__not_found_response()
  20. return secret_response(target).response
  21. def create(self, apikey: str, name: str, domain: str, coded: str) -> dict:
  22. validation = self._validation("name", name_validator(name))
  23. validation = validation or self._validation(
  24. "domain",
  25. domain_validator(domain)
  26. )
  27. if validation is not None:
  28. return validation
  29. if not secret_coder.validate(coded):
  30. return self._fail_response(cause = "Invalid coded secret.")
  31. with self.__secret_database(apikey) as loader:
  32. builder = secret_builder()
  33. builder.name = name
  34. builder.domain = domain
  35. builder.coded = coded
  36. builder.owner = loader.owner
  37. if loader.append(builder.result):
  38. return self._success_response()
  39. return self._fail_response(cause = "Name already in use.")
  40. def drop(self, apikey: str, name: str) -> dict:
  41. with self.__secret_database(apikey) as loader:
  42. target = loader.load_by_name(name)
  43. if target is None:
  44. return self.__not_found_response()
  45. if loader.drop(target):
  46. return self._success_response()
  47. return self._fail_response(cause = "Internal database error.")
  48. def update(
  49. self,
  50. apikey: str,
  51. name: str,
  52. new_name: str | None = None,
  53. domain: str | None = None,
  54. coded: str | None = None
  55. ) -> dict:
  56. validation = None
  57. if new_name is not None:
  58. validation = validation or self._validation(
  59. "name",
  60. name_validator(new_name)
  61. )
  62. if domain is not None:
  63. validation = validation or self._validation(
  64. "domain",
  65. domain_validator(domain)
  66. )
  67. if validation is not None:
  68. return validation
  69. if coded is not None and not secret_coder.validate(coded):
  70. return self._fail_response(cause = "Invalid coded secret.")
  71. with self.__secret_database(apikey) as loader:
  72. target = loader.load_by_name(name)
  73. if target is None:
  74. return self.__not_found_response()
  75. builder = secret_builder(target)
  76. if new_name is not None:
  77. builder.name = new_name
  78. if domain is not None:
  79. builder.domain = domain
  80. if coded is not None:
  81. builder.coded = coded
  82. if loader.update(builder.result):
  83. return self._success_response()
  84. return self._fail_response(cause = "Name already in use.")
  85. def name_in_use(self, apikey: str, name: str) -> bool:
  86. validation = self._validation("name", name_validator(name))
  87. if validation is not None:
  88. return validation
  89. with self.__secret_database(apikey) as loader:
  90. result = loader.name_in_use(name)
  91. return self._success_response(in_use = result, name = name)
  92. def domain_search(self, apikey: str, domain: str) -> dict:
  93. with self.__secret_database(apikey) as loader:
  94. results = loader.search_for_domain(domain)
  95. return secret_collection_response(results).response
  96. def name_search(self, apikey: str, name: str) -> dict:
  97. with self.__secret_database(apikey) as loader:
  98. results = loader.search_for_name(name)
  99. return secret_collection_response(results).response
  100. def __not_found_response(self) -> dict:
  101. return self._fail_response(cause = "Secret not found.")
  102. @property
  103. def __user_database(self) -> user_loader:
  104. return user_loader(self._connector)
  105. def __secret_database(self, apikey: str) -> secret_loader:
  106. with self.__user_database as loader:
  107. target = loader.get_by_apikey(apikey)
  108. if target is None:
  109. raise bad_apikey_exception()
  110. return secret_loader(self._connector, target)