|
@@ -13,7 +13,23 @@ from .application_secret_response import secret_response
|
|
|
from .application_secret_response import secret_collection_response
|
|
from .application_secret_response import secret_collection_response
|
|
|
|
|
|
|
|
class application_secret(application_part):
|
|
class application_secret(application_part):
|
|
|
|
|
+ """
|
|
|
|
|
+ This class is responsible for managing secrets in the application.
|
|
|
|
|
+ """
|
|
|
|
|
+
|
|
|
def get(self, apikey: str, name: str) -> dict:
|
|
def get(self, apikey: str, name: str) -> dict:
|
|
|
|
|
+ """
|
|
|
|
|
+ This load secret by name. It return full secret, with crypted content,
|
|
|
|
|
+ or fail response when secret with given name not exists.
|
|
|
|
|
+
|
|
|
|
|
+ Parameters:
|
|
|
|
|
+ apikey (str): ApiKey of the secret owner
|
|
|
|
|
+ name (str): Name of the secret
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ (dict): Result to convert into JSON
|
|
|
|
|
+ """
|
|
|
|
|
+
|
|
|
with self.__secret_database(apikey) as loader:
|
|
with self.__secret_database(apikey) as loader:
|
|
|
target = loader.load_by_name(name)
|
|
target = loader.load_by_name(name)
|
|
|
|
|
|
|
@@ -22,19 +38,69 @@ class application_secret(application_part):
|
|
|
|
|
|
|
|
return secret_response(target).response
|
|
return secret_response(target).response
|
|
|
|
|
|
|
|
|
|
+ def __validator(self, **kwargs) -> dict | None:
|
|
|
|
|
+ """
|
|
|
|
|
+ This validate given parameters. It return None when all given items
|
|
|
|
|
+ is valid, or dict with fail response dict.
|
|
|
|
|
+
|
|
|
|
|
+ Kwargs:
|
|
|
|
|
+ name (str): Name to valid
|
|
|
|
|
+ new_name (str): Name to valid
|
|
|
|
|
+ domain (str): Domain to valid
|
|
|
|
|
+ coded (str): Crypted content to valid
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ (dict): Fail to return as JSON
|
|
|
|
|
+ (None): Validating parameters ara valid
|
|
|
|
|
+ """
|
|
|
|
|
+
|
|
|
|
|
+ result = None
|
|
|
|
|
+
|
|
|
|
|
+ if "name" in kwargs:
|
|
|
|
|
+ name = kwargs["name"]
|
|
|
|
|
+ result = self._validation("name", name_validator(name))
|
|
|
|
|
+
|
|
|
|
|
+ if "new_name" in kwargs:
|
|
|
|
|
+ name = kwargs["new_name"]
|
|
|
|
|
+ result = self._validation("name", name_validator(name))
|
|
|
|
|
+
|
|
|
|
|
+ if "domain" in kwargs and result is None:
|
|
|
|
|
+ domain = kwargs["domain"]
|
|
|
|
|
+ result = self._validation("domain", domain_validator(domain))
|
|
|
|
|
+
|
|
|
|
|
+ if not "coded" in kwargs:
|
|
|
|
|
+ return result
|
|
|
|
|
+
|
|
|
|
|
+ coded = kwargs["coded"]
|
|
|
|
|
+
|
|
|
|
|
+ if not secret_coder.validate(coded):
|
|
|
|
|
+ return self._fail_response(cause = "Invalid coded secret.")
|
|
|
|
|
+
|
|
|
|
|
+ return result
|
|
|
|
|
+
|
|
|
def create(self, apikey: str, name: str, domain: str, coded: str) -> dict:
|
|
def create(self, apikey: str, name: str, domain: str, coded: str) -> dict:
|
|
|
- validation = self._validation("name", name_validator(name))
|
|
|
|
|
- validation = validation or self._validation(
|
|
|
|
|
- "domain",
|
|
|
|
|
- domain_validator(domain)
|
|
|
|
|
|
|
+ """
|
|
|
|
|
+ This register new secret.
|
|
|
|
|
+
|
|
|
|
|
+ Parameters:
|
|
|
|
|
+ apikey (str): ApiKey of the owner
|
|
|
|
|
+ name (str): Name of the secret
|
|
|
|
|
+ domain (str): Domain to the secret
|
|
|
|
|
+ coded (str): Crypted content of the secret
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ (dict): Result of the action to convert into JSON
|
|
|
|
|
+ """
|
|
|
|
|
+
|
|
|
|
|
+ validation = self.__validator(
|
|
|
|
|
+ name = name,
|
|
|
|
|
+ domain = domain,
|
|
|
|
|
+ coded = coded
|
|
|
)
|
|
)
|
|
|
|
|
|
|
|
if validation is not None:
|
|
if validation is not None:
|
|
|
return validation
|
|
return validation
|
|
|
|
|
|
|
|
- if not secret_coder.validate(coded):
|
|
|
|
|
- return self._fail_response(cause = "Invalid coded secret.")
|
|
|
|
|
-
|
|
|
|
|
with self.__secret_database(apikey) as loader:
|
|
with self.__secret_database(apikey) as loader:
|
|
|
builder = secret_builder()
|
|
builder = secret_builder()
|
|
|
builder.name = name
|
|
builder.name = name
|
|
@@ -48,6 +114,17 @@ class application_secret(application_part):
|
|
|
return self._fail_response(cause = "Name already in use.")
|
|
return self._fail_response(cause = "Name already in use.")
|
|
|
|
|
|
|
|
def drop(self, apikey: str, name: str) -> dict:
|
|
def drop(self, apikey: str, name: str) -> dict:
|
|
|
|
|
+ """
|
|
|
|
|
+ This remove secret from database.
|
|
|
|
|
+
|
|
|
|
|
+ Parameters:
|
|
|
|
|
+ apikey (str): Owner of the secret
|
|
|
|
|
+ name (str): Name of the secret
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ (dict): Result to return as JSON
|
|
|
|
|
+ """
|
|
|
|
|
+
|
|
|
with self.__secret_database(apikey) as loader:
|
|
with self.__secret_database(apikey) as loader:
|
|
|
target = loader.load_by_name(name)
|
|
target = loader.load_by_name(name)
|
|
|
|
|
|
|
@@ -59,34 +136,25 @@ class application_secret(application_part):
|
|
|
|
|
|
|
|
return self._fail_response(cause = "Internal database error.")
|
|
return self._fail_response(cause = "Internal database error.")
|
|
|
|
|
|
|
|
- def update(
|
|
|
|
|
- self,
|
|
|
|
|
- apikey: str,
|
|
|
|
|
- name: str,
|
|
|
|
|
- new_name: str | None = None,
|
|
|
|
|
- domain: str | None = None,
|
|
|
|
|
- coded: str | None = None
|
|
|
|
|
- ) -> dict:
|
|
|
|
|
- validation = None
|
|
|
|
|
-
|
|
|
|
|
- if new_name is not None:
|
|
|
|
|
- validation = validation or self._validation(
|
|
|
|
|
- "name",
|
|
|
|
|
- name_validator(new_name)
|
|
|
|
|
- )
|
|
|
|
|
-
|
|
|
|
|
- if domain is not None:
|
|
|
|
|
- validation = validation or self._validation(
|
|
|
|
|
- "domain",
|
|
|
|
|
- domain_validator(domain)
|
|
|
|
|
- )
|
|
|
|
|
|
|
+ def update(self, apikey: str, name: str, **kwargs) -> dict:
|
|
|
|
|
+ """
|
|
|
|
|
+ This update secret data in the database.
|
|
|
|
|
+
|
|
|
|
|
+ Parameters:
|
|
|
|
|
+ apikey (str): ApiKey of the secret owner
|
|
|
|
|
+ name (str): Old name of the secret
|
|
|
|
|
+
|
|
|
|
|
+ Kwargs:
|
|
|
|
|
+ new_name (str): New name of the secret
|
|
|
|
|
+ domain (str): Domain of the secret
|
|
|
|
|
+ coded (str): Coded content of the secret
|
|
|
|
|
+ """
|
|
|
|
|
+
|
|
|
|
|
+ validation = self.__validator(**kwargs)
|
|
|
|
|
|
|
|
if validation is not None:
|
|
if validation is not None:
|
|
|
return validation
|
|
return validation
|
|
|
|
|
|
|
|
- if coded is not None and not secret_coder.validate(coded):
|
|
|
|
|
- return self._fail_response(cause = "Invalid coded secret.")
|
|
|
|
|
-
|
|
|
|
|
with self.__secret_database(apikey) as loader:
|
|
with self.__secret_database(apikey) as loader:
|
|
|
target = loader.load_by_name(name)
|
|
target = loader.load_by_name(name)
|
|
|
|
|
|
|
@@ -95,21 +163,32 @@ class application_secret(application_part):
|
|
|
|
|
|
|
|
builder = secret_builder(target)
|
|
builder = secret_builder(target)
|
|
|
|
|
|
|
|
- if new_name is not None:
|
|
|
|
|
- builder.name = new_name
|
|
|
|
|
|
|
+ if "new_name" in kwargs:
|
|
|
|
|
+ builder.name = kwargs["new_name"]
|
|
|
|
|
|
|
|
- if domain is not None:
|
|
|
|
|
- builder.domain = domain
|
|
|
|
|
|
|
+ if "domain" in kwargs:
|
|
|
|
|
+ builder.domain = kwargs["domain"]
|
|
|
|
|
|
|
|
- if coded is not None:
|
|
|
|
|
- builder.coded = coded
|
|
|
|
|
|
|
+ if "coded" in kwargs:
|
|
|
|
|
+ builder.coded = kwargs["coded"]
|
|
|
|
|
|
|
|
if loader.update(builder.result):
|
|
if loader.update(builder.result):
|
|
|
return self._success_response()
|
|
return self._success_response()
|
|
|
|
|
|
|
|
return self._fail_response(cause = "Name already in use.")
|
|
return self._fail_response(cause = "Name already in use.")
|
|
|
|
|
|
|
|
- def name_in_use(self, apikey: str, name: str) -> bool:
|
|
|
|
|
|
|
+ def name_in_use(self, apikey: str, name: str) -> dict:
|
|
|
|
|
+ """
|
|
|
|
|
+ This check that name is used by any sectet.
|
|
|
|
|
+
|
|
|
|
|
+ Parameters:
|
|
|
|
|
+ apikey (str): ApiKey of the owner
|
|
|
|
|
+ name (str): Name to check
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ (dict): Result of the operation to return as JSON
|
|
|
|
|
+ """
|
|
|
|
|
+
|
|
|
validation = self._validation("name", name_validator(name))
|
|
validation = self._validation("name", name_validator(name))
|
|
|
|
|
|
|
|
if validation is not None:
|
|
if validation is not None:
|
|
@@ -121,25 +200,83 @@ class application_secret(application_part):
|
|
|
return self._success_response(in_use = result, name = name)
|
|
return self._success_response(in_use = result, name = name)
|
|
|
|
|
|
|
|
def domain_search(self, apikey: str, domain: str) -> dict:
|
|
def domain_search(self, apikey: str, domain: str) -> dict:
|
|
|
|
|
+ """
|
|
|
|
|
+ This search in database by domain name.
|
|
|
|
|
+
|
|
|
|
|
+ Parameters:
|
|
|
|
|
+ apikey (str): ApiKey of the secrets owner
|
|
|
|
|
+ domain (str): Domain name to search
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ (dict): Result to return as JSON
|
|
|
|
|
+ """
|
|
|
|
|
+
|
|
|
with self.__secret_database(apikey) as loader:
|
|
with self.__secret_database(apikey) as loader:
|
|
|
results = loader.search_for_domain(domain)
|
|
results = loader.search_for_domain(domain)
|
|
|
|
|
|
|
|
return secret_collection_response(results).response
|
|
return secret_collection_response(results).response
|
|
|
|
|
|
|
|
def name_search(self, apikey: str, name: str) -> dict:
|
|
def name_search(self, apikey: str, name: str) -> dict:
|
|
|
|
|
+ """
|
|
|
|
|
+ This search in database by domain name.
|
|
|
|
|
+
|
|
|
|
|
+ Parameters:
|
|
|
|
|
+ apikey (str): ApiKey of the secrets owner
|
|
|
|
|
+ name (str): Name to search
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ (dict): Result to return as JSON
|
|
|
|
|
+ """
|
|
|
|
|
+
|
|
|
with self.__secret_database(apikey) as loader:
|
|
with self.__secret_database(apikey) as loader:
|
|
|
results = loader.search_for_name(name)
|
|
results = loader.search_for_name(name)
|
|
|
|
|
|
|
|
return secret_collection_response(results).response
|
|
return secret_collection_response(results).response
|
|
|
|
|
|
|
|
|
|
+ def get_all(self, apikey: str) -> dict:
|
|
|
|
|
+ """
|
|
|
|
|
+ This return all secrets own by user.
|
|
|
|
|
+
|
|
|
|
|
+ Parameters:
|
|
|
|
|
+ apikey (str): ApiKey of the user
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ (dict): Collection of the user secrets
|
|
|
|
|
+ """
|
|
|
|
|
+
|
|
|
|
|
+ with self.__secret_database(apikey) as loader:
|
|
|
|
|
+ results = loader.get_all()
|
|
|
|
|
+
|
|
|
|
|
+ return search_collection_response(results).response
|
|
|
|
|
+
|
|
|
def __not_found_response(self) -> dict:
|
|
def __not_found_response(self) -> dict:
|
|
|
|
|
+ """
|
|
|
|
|
+ It create response with not found error.
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ (dict): Result to return as JSON
|
|
|
|
|
+ """
|
|
|
|
|
+
|
|
|
return self._fail_response(cause = "Secret not found.")
|
|
return self._fail_response(cause = "Secret not found.")
|
|
|
|
|
|
|
|
@property
|
|
@property
|
|
|
def __user_database(self) -> user_loader:
|
|
def __user_database(self) -> user_loader:
|
|
|
|
|
+ """ It return user database. """
|
|
|
|
|
+
|
|
|
return user_loader(self._connector)
|
|
return user_loader(self._connector)
|
|
|
|
|
|
|
|
def __secret_database(self, apikey: str) -> secret_loader:
|
|
def __secret_database(self, apikey: str) -> secret_loader:
|
|
|
|
|
+ """
|
|
|
|
|
+ It return new secret database for user with given apikey. When
|
|
|
|
|
+ user is not exists, then raise bad_apikey_exception.
|
|
|
|
|
+
|
|
|
|
|
+ Parameters:
|
|
|
|
|
+ apikey (str): ApiKey of the owner
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ (secret_loader): Secret loader for the owner
|
|
|
|
|
+ """
|
|
|
|
|
+
|
|
|
with self.__user_database as loader:
|
|
with self.__user_database as loader:
|
|
|
target = loader.get_by_apikey(apikey)
|
|
target = loader.get_by_apikey(apikey)
|
|
|
|
|
|