| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286 |
- from .user import user
- from .user import user_builder
- from .user_loader import user_loader
- from .secret import secret
- from .secret import secret_builder
- from .secret_coder import secret_coder
- from .secret_loader import secret_loader
- from .application_part import application_part
- from .apikey import bad_apikey_exception
- from .validators import name_validator
- from .validators import domain_validator
- from .application_secret_response import secret_response
- from .application_secret_response import secret_collection_response
- class application_secret(application_part):
- """
- This class is responsible for managing secrets in the application.
- """
- def get(self, apikey: str, name: str) -> dict:
- """
- This load secret by name. It return full secret, with crypted content,
- or fail response when secret with given name not exists.
- Parameters:
- apikey (str): ApiKey of the secret owner
- name (str): Name of the secret
- Returns:
- (dict): Result to convert into JSON
- """
- with self.__secret_database(apikey) as loader:
- target = loader.load_by_name(name)
- if target is None:
- return self.__not_found_response()
- return secret_response(target).response
- def __validator(self, **kwargs) -> dict | None:
- """
- This validate given parameters. It return None when all given items
- is valid, or dict with fail response dict.
- Kwargs:
- name (str): Name to valid
- new_name (str): Name to valid
- domain (str): Domain to valid
- coded (str): Crypted content to valid
- Returns:
- (dict): Fail to return as JSON
- (None): Validating parameters ara valid
- """
-
- result = None
- if "name" in kwargs:
- name = kwargs["name"]
- result = self._validation("name", name_validator(name))
-
- if "new_name" in kwargs:
- name = kwargs["new_name"]
- result = self._validation("name", name_validator(name))
- if "domain" in kwargs and result is None:
- domain = kwargs["domain"]
- result = self._validation("domain", domain_validator(domain))
- if not "coded" in kwargs:
- return result
-
- coded = kwargs["coded"]
- if not secret_coder.validate(coded):
- return self._fail_response(cause = "Invalid coded secret.")
- return result
- def create(self, apikey: str, name: str, domain: str, coded: str) -> dict:
- """
- This register new secret.
- Parameters:
- apikey (str): ApiKey of the owner
- name (str): Name of the secret
- domain (str): Domain to the secret
- coded (str): Crypted content of the secret
- Returns:
- (dict): Result of the action to convert into JSON
- """
- validation = self.__validator(
- name = name,
- domain = domain,
- coded = coded
- )
- if validation is not None:
- return validation
- with self.__secret_database(apikey) as loader:
- builder = secret_builder()
- builder.name = name
- builder.domain = domain
- builder.coded = coded
- builder.owner = loader.owner
- if loader.append(builder.result):
- return self._success_response()
- return self._fail_response(cause = "Name already in use.")
- def drop(self, apikey: str, name: str) -> dict:
- """
- This remove secret from database.
- Parameters:
- apikey (str): Owner of the secret
- name (str): Name of the secret
- Returns:
- (dict): Result to return as JSON
- """
- with self.__secret_database(apikey) as loader:
- target = loader.load_by_name(name)
- if target is None:
- return self.__not_found_response()
- if loader.drop(target):
- return self._success_response()
- return self._fail_response(cause = "Internal database error.")
- def update(self, apikey: str, name: str, **kwargs) -> dict:
- """
- This update secret data in the database.
- Parameters:
- apikey (str): ApiKey of the secret owner
- name (str): Old name of the secret
- Kwargs:
- new_name (str): New name of the secret
- domain (str): Domain of the secret
- coded (str): Coded content of the secret
- """
- validation = self.__validator(**kwargs)
- if validation is not None:
- return validation
- with self.__secret_database(apikey) as loader:
- target = loader.load_by_name(name)
- if target is None:
- return self.__not_found_response()
- builder = secret_builder(target)
- if "new_name" in kwargs:
- builder.name = kwargs["new_name"]
- if "domain" in kwargs:
- builder.domain = kwargs["domain"]
- if "coded" in kwargs:
- builder.coded = kwargs["coded"]
- if loader.update(builder.result):
- return self._success_response()
- return self._fail_response(cause = "Name already in use.")
- def name_in_use(self, apikey: str, name: str) -> dict:
- """
- This check that name is used by any sectet.
- Parameters:
- apikey (str): ApiKey of the owner
- name (str): Name to check
- Returns:
- (dict): Result of the operation to return as JSON
- """
- validation = self._validation("name", name_validator(name))
- if validation is not None:
- return validation
- with self.__secret_database(apikey) as loader:
- result = loader.name_in_use(name)
- return self._success_response(in_use = result, name = name)
- def domain_search(self, apikey: str, domain: str) -> dict:
- """
- This search in database by domain name.
- Parameters:
- apikey (str): ApiKey of the secrets owner
- domain (str): Domain name to search
- Returns:
- (dict): Result to return as JSON
- """
- with self.__secret_database(apikey) as loader:
- results = loader.search_for_domain(domain)
- return secret_collection_response(results).response
- def name_search(self, apikey: str, name: str) -> dict:
- """
- This search in database by domain name.
- Parameters:
- apikey (str): ApiKey of the secrets owner
- name (str): Name to search
- Returns:
- (dict): Result to return as JSON
- """
- with self.__secret_database(apikey) as loader:
- results = loader.search_for_name(name)
- return secret_collection_response(results).response
- def get_all(self, apikey: str) -> dict:
- """
- This return all secrets own by user.
- Parameters:
- apikey (str): ApiKey of the user
- Returns:
- (dict): Collection of the user secrets
- """
-
- with self.__secret_database(apikey) as loader:
- results = loader.get_all()
- return search_collection_response(results).response
- def __not_found_response(self) -> dict:
- """
- It create response with not found error.
- Returns:
- (dict): Result to return as JSON
- """
- return self._fail_response(cause = "Secret not found.")
- @property
- def __user_database(self) -> user_loader:
- """ It return user database. """
-
- return user_loader(self._connector)
- def __secret_database(self, apikey: str) -> secret_loader:
- """
- It return new secret database for user with given apikey. When
- user is not exists, then raise bad_apikey_exception.
- Parameters:
- apikey (str): ApiKey of the owner
- Returns:
- (secret_loader): Secret loader for the owner
- """
- with self.__user_database as loader:
- target = loader.get_by_apikey(apikey)
- if target is None:
- raise bad_apikey_exception()
- return secret_loader(self._connector, target)
|