Bladeren bron

Finaly work on secret database.

Cixo Develop 7 maanden geleden
bovenliggende
commit
4a2acbf6c6
6 gewijzigde bestanden met toevoegingen van 295 en 15 verwijderingen
  1. 1 0
      assets/__init__.py
  2. 5 2
      assets/secret.py
  3. 187 10
      assets/secret_loader.py
  4. 2 1
      assets/user_loader.py
  5. 9 0
      tests/002-users.py
  6. 91 2
      tests/004-password_database.py

+ 1 - 0
assets/__init__.py

@@ -6,5 +6,6 @@ from .database import database
 from .apikey import apikey
 from .secret import secret
 from .secret import secret_builder
+from .secret_loader import secret_loader
 from .secret_crypto import secret_crypto
 from .builder import builder

+ 5 - 2
assets/secret.py

@@ -78,7 +78,10 @@ class secret_builder(builder, target_type = secret):
     def owner(self, target: user):
         """ This set new owner of the secret. """
 
-        self._target.owner = user.id
+        if not target.in_database:
+            raise TypeError("Target user for secret is not registered.")
+
+        self._target.owner = target.id
 
     @property
     def name(self) -> str | None:
@@ -119,7 +122,7 @@ class secret_builder(builder, target_type = secret):
         crypter = secret_crypto(key)
 
         if self._target.nonce is not None:
-            crypter.set_iv(self.__target.nonce)
+            crypter = crypter.set_iv(self._target.nonce)
 
         crypted, nonce = crypter.crypted(target)
 

+ 187 - 10
assets/secret_loader.py

@@ -1,10 +1,27 @@
+import typing
 import sqlmodel
+import sqlalchemy.engine.base
 
 from .user import user
 from .secret import secret
 
 class secret_loader(sqlmodel.Session):
-    def __init__(self, owner: user) -> None:
+    def __init__(
+        self, 
+        connection: sqlalchemy.engine.base.Engine, 
+        owner: user
+    ) -> None:
+        """
+        This create new loader. It require connection to the database, and 
+        user, which own secrets to work on.
+        
+        Parameters:
+            connection (sqlalchemy.engine.base.Engine): Connection to database
+            owner (user): User which own secrets to work on
+        """
+
+        super().__init__(connection)
+
         if not owner.in_database:
             raise Exception("User to build loaded for not exists.")
 
@@ -12,25 +29,185 @@ class secret_loader(sqlmodel.Session):
 
     @property
     def owner(self) -> user:
+        """ This return owner of secrets which work on. """
+
         return self.__owner
 
     def append(self, target: secret) -> bool:
-        if target.in_database or not target.is_ready:
-            return False
+        """
+        This append new secret to database. Secret must have same owner, as 
+        set in loader. When owner is not set, user from loader would be used.
+        When user is not correct, that mean is other in secret and in loader, 
+        TypeError would be raised. When secret name is already in use, then
+        it return False. When item is already in database, Exception would be
+        raised. When secret is not ready, that mean 
+
+        Parameters:
+            target (secret): New secret to insert in database
+
+        Returns:
+            (bool): True when append successfull, False when failed
+        """
+        
+        if target.owner is None:
+            target.owner = self.owner    
+        elif target.owner != self.owner.id:
+            raise TypeError("Owner of secret is other than in loader.")
+        
+        if target.in_database: 
+            raise Exception("Secret is already in database. Use update.")
+
+        if not target.ready:
+            raise Exception("Secret is not ready to append.")
 
-        if self.get_by_name(target.name) is not None:
+        if self.load_by_name(target.name) is not None:
             return False
 
         self.add(target)
         self.commit()
         self.refresh(target)
 
-    def get_by_name(self, target: str) -> secret | None:
+        return True
+
+    @property
+    def __select(self) -> sqlmodel.sql.expression.Select:
+        """ It create new selecr query with set owner statement. """
+
+        return sqlmodel.select(secret).where(secret.owner == self.owner.id)
+
+    def search_for_domain(self, target: str) -> typing.Iterable[secret]:
+        """ 
+        This search in secret database for secrets by domain. It return 
+        iterator which items is in secret type.
+
+        Parameters:
+            target (str): Domain name to search
+
+        Returns:
+            (Iterable[secret]): Iterator with found secrets
+        """
+
+        target = "%" + target + "%"
+        query = self.__select.where(secret.domain.like(target))
+        result = self.exec(query)
+
+        for item in result:
+            yield item
+
+    def search_for_name(self, target: str) -> typing.Iterable[secret]:
+        """
+        This search in secrets database for secrets which have names like 
+        given. It returns iterator with found result.
+
+        Parameters:
+            target (str): Name to search in database
+        
+        Returns:
+            (Iterable[secret]): Found secrets
+        """
+
+        target = "%" + target.upper() + "%"
+        query = self.__select.where(secret.name.like(target))
+        result = self.exec(query)
+
+        for item in result:
+            yield item
+
+    def name_in_use(self, target: str) -> bool:
+        """
+        This check that name is already in use.
+
+        Parameters:
+            target (str): Name to check
+
+        Returns:
+            (bool): True when name is user, False when not
+        """
+
+        return self.load_by_name(target) is not None
+
+    def load_by_name(self, target: str) -> secret | None:
+        """
+        It load single secret by name. When secret not exists, return None.
+
+        Parameters:
+            target (str): Name of the secret
+
+        Returns:
+            (secret | None): Loaded secret or None when not exists
+        """
+
         target = target.upper()
+        query = self.__select.where(secret.name == target)
+        result = self.exec(query)
 
-        query = sqlmodel.select(secret)
-        query = query.where(user.id == self.owner.id)
-        query = query.where(secret.name == target)
-        query = query.limit(1)
+        return result.first()
+
+    def drop(self, target: secret) -> bool:
+        """
+        This remove secret from database. When secret not exists in database
+        return False. Whem all went good, return True.
+
+        Parameters:
+            target (secret): Secret to drop
+
+        Returns:
+            (bool): True when remove successfull, False when not
+        """
+
+        if not target.in_database:
+            return False
+
+        self.delete(target)
+        self.commit()
+
+        return True
+
+    def clear(self) -> None:
+        """
+        This drop all secrets of the user. It is useable before user remove.
+        """
+
+        query = self.__select
+        result = self.exec(query)
+
+        for item in result:
+            self.delete(item)
+
+        self.commit()
+
+    def update(self, target: secret) -> bool:
+        """
+        This update secret in the database. When secret not exists in 
+        database. When owner is not set propertly, raise TypeError. When
+        target is not in database, or secret is not ready, raise Exception.
+        When name is already in use by other secret, it return False.
+
+        Parameters:
+            target (secret): Secret to update
+
+        Returns:
+            (bool): True when updated, False when name is in use
+        """
+
+        if target.owner is None:
+            target.owner = self.owner
+        elif target.owner != self.owner.id:
+            raise TypeError("Owner of the secret is other than in loader.")
+
+        if not target.in_database:
+            raise Exception("Target is not in database. Use append.")
+
+        if not target.ready:
+            return Exception("Target is not ready to update.")
+
+        check = self.load_by_name(target.name)
+
+        if check is not None and check.id != target.id:
+            return False
+
+        self.add(target)
+        self.commit()
+        self.refresh(target)
 
-        return self.exec(query).first()
+        return True 

+ 2 - 1
assets/user_loader.py

@@ -16,6 +16,7 @@ class user_loader(sqlmodel.Session):
         return: user | None - Logged user, or None when not exists
         """
 
+        nick = nick.upper()
         result = self.get_by_nick(nick)
 
         if result is None:
@@ -95,7 +96,7 @@ class user_loader(sqlmodel.Session):
         return: bool - True when nick in use, False if not
         """
 
-        return self.get_by_nick(nick) is not None
+        return self.get_by_nick(nick.upper()) is not None
 
     def get_by_apikey(self, apikey: str) -> user | None:
         """

+ 9 - 0
tests/002-users.py

@@ -15,6 +15,14 @@ builder.password = "qwerty"
 
 test1 = builder.result
 
+def drop_database() -> None:
+    db = pathlib.Path("./002-test.db")
+
+    if db.is_file():
+        db.unlink()
+
+drop_database()
+
 connection = sqlmodel.create_engine("sqlite:///002-test.db")
 sqlmodel.SQLModel.metadata.create_all(connection)
 
@@ -31,3 +39,4 @@ with assets.user_loader(connection) as loader:
     loader.save(logged)
     print("Logged user apikey after change: " + str(logged.apikey))
     
+drop_database()

+ 91 - 2
tests/004-password_database.py

@@ -9,7 +9,16 @@ sys.path.append(str(root))
 import assets
 import sqlmodel
 
-connection = sqlmodel.create_engine("sqlite:///04-test.db")
+def drop_database() -> None:
+    db = pathlib.Path("./004-test.db")
+
+    if db.is_file():
+        db.unlink()
+
+drop_database()
+
+connection = sqlmodel.create_engine("sqlite:///004-test.db")
+sqlmodel.SQLModel.metadata.create_all(connection)
 
 user_builder = assets.user_builder()
 user_builder.nick = "test"
@@ -17,6 +26,9 @@ user_builder.password = "qwerty"
 
 sample_user = user_builder.result
 
+with assets.user_loader(connection) as loader:
+    loader.register(sample_user)
+
 secret_builder = assets.secret_builder()
 
 secret_builder.clear()
@@ -37,4 +49,81 @@ secret_2 = secret_builder.result
 
 print(sample_user)
 print(secret_1)
-print(secret_2)
+print(secret_2)
+
+print("Inserting into database")
+
+with assets.secret_loader(connection, sample_user) as loader:
+    loader.append(secret_1)
+    loader.append(secret_2)
+
+print()
+print("Loading secrets by name:")
+
+with assets.secret_loader(connection, sample_user) as loader:
+    load_1 = loader.load_by_name("secret1")
+    
+    print("Loaded secret 1 from database:")
+    print(load_1)
+
+    load_2 = loader.load_by_name("secret2")
+
+    print("Loaded secret 2 from database:")
+    print(load_2)
+
+print()
+print("Loading secrets by domain:")
+
+with assets.secret_loader(connection, sample_user) as loader:
+    print("Results for domain \"secret\": ")
+
+    for item in loader.search_for_domain("secret"):
+        print(item)
+
+    print("Results for domain \"not_exists\":")
+
+    for item in loader.search_for_domain("not_exists"):
+        print(item)
+
+print()
+print("Modify secrets:")
+
+with assets.secret_loader(connection, sample_user) as loader:
+    loaded = loader.load_by_name("secret1")
+
+    print("Before:")
+    print(loaded)
+
+    builder = assets.secret_builder(loaded)
+
+    builder.domain = "https://other.com"
+    builder.crypt("qwerty", "sample_example")
+
+    loader.update(builder.result)
+
+    print("After:")
+    print(loader.load_by_name("secret1"))
+
+print()
+print("Delete secrets:")
+
+with assets.secret_loader(connection, sample_user) as loader:
+    loaded = loader.load_by_name("secret1")
+    loader.drop(loaded)
+
+    print("After:")
+    print(loader.load_by_name("secret1"))
+
+print()
+print("Clear:")
+
+with assets.secret_loader(connection, sample_user) as loader:
+    print("Before:")
+    print(loader.load_by_name("secret2"))
+
+    loader.clear()
+
+    print("After:")
+    print(loader.load_by_name("secret2"))
+
+drop_database()