| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149 |
- from .user import user
- from .user import user_builder
- from .user_loader import user_loader
- from .secret import secret
- from .secret import secret_builder
- from .secret_coder import secret_coder
- from .secret_loader import secret_loader
- from .application_part import application_part
- from .apikey import bad_apikey_exception
- from .validators import name_validator
- from .validators import domain_validator
- from .application_secret_response import secret_response
- from .application_secret_response import secret_collection_response
- class application_secret(application_part):
- def get(self, apikey: str, name: str) -> dict:
- with self.__secret_database(apikey) as loader:
- target = loader.load_by_name(name)
- if target is None:
- return self.__not_found_response()
- return secret_response(target).response
- def create(self, apikey: str, name: str, domain: str, coded: str) -> dict:
- validation = self._validation("name", name_validator(name))
- validation = validation or self._validation(
- "domain",
- domain_validator(domain)
- )
- if validation is not None:
- return validation
- if not secret_coder.validate(coded):
- return self._fail_response(cause = "Invalid coded secret.")
- with self.__secret_database(apikey) as loader:
- builder = secret_builder()
- builder.name = name
- builder.domain = domain
- builder.coded = coded
- builder.owner = loader.owner
- if loader.append(builder.result):
- return self._success_response()
- return self._fail_response(cause = "Name already in use.")
- def drop(self, apikey: str, name: str) -> dict:
- with self.__secret_database(apikey) as loader:
- target = loader.load_by_name(name)
- if target is None:
- return self.__not_found_response()
- if loader.drop(target):
- return self._success_response()
- return self._fail_response(cause = "Internal database error.")
- def update(
- self,
- apikey: str,
- name: str,
- new_name: str | None = None,
- domain: str | None = None,
- coded: str | None = None
- ) -> dict:
- validation = None
-
- if new_name is not None:
- validation = validation or self._validation(
- "name",
- name_validator(new_name)
- )
-
- if domain is not None:
- validation = validation or self._validation(
- "domain",
- domain_validator(domain)
- )
- if validation is not None:
- return validation
- if coded is not None and not secret_coder.validate(coded):
- return self._fail_response(cause = "Invalid coded secret.")
- with self.__secret_database(apikey) as loader:
- target = loader.load_by_name(name)
- if target is None:
- return self.__not_found_response()
- builder = secret_builder(target)
- if new_name is not None:
- builder.name = new_name
- if domain is not None:
- builder.domain = domain
- if coded is not None:
- builder.coded = coded
- if loader.update(builder.result):
- return self._success_response()
- return self._fail_response(cause = "Name already in use.")
- def name_in_use(self, apikey: str, name: str) -> bool:
- validation = self._validation("name", name_validator(name))
- if validation is not None:
- return validation
- with self.__secret_database(apikey) as loader:
- result = loader.name_in_use(name)
- return self._success_response(in_use = result, name = name)
- def domain_search(self, apikey: str, domain: str) -> dict:
- with self.__secret_database(apikey) as loader:
- results = loader.search_for_domain(domain)
- return secret_collection_response(results).response
- def name_search(self, apikey: str, name: str) -> dict:
- with self.__secret_database(apikey) as loader:
- results = loader.search_for_name(name)
- return secret_collection_response(results).response
- def __not_found_response(self) -> dict:
- return self._fail_response(cause = "Secret not found.")
- @property
- def __user_database(self) -> user_loader:
- return user_loader(self._connector)
- def __secret_database(self, apikey: str) -> secret_loader:
- with self.__user_database as loader:
- target = loader.get_by_apikey(apikey)
- if target is None:
- raise bad_apikey_exception()
- return secret_loader(self._connector, target)
|