瀏覽代碼

First commit to git.

Cixo Develop 11 月之前
父節點
當前提交
a7103cc077

+ 4 - 0
.gitignore

@@ -1,3 +1,7 @@
+# ---> Venv
+.venv
+.env
+
 # ---> Vim
 [._]*.s[a-w][a-z]
 [._]s[a-w][a-z]

+ 5 - 0
assets/__init__.py

@@ -0,0 +1,5 @@
+from .user import user
+from .users_manager import users_manager 
+from .user_validator import user_validator
+from .secret_generator import secret_generator
+

+ 88 - 0
assets/secret_generator.py

@@ -0,0 +1,88 @@
+import hashlib
+import os
+
+from .secret_properties import secret_properties
+
+class secret_generator(metaclass = secret_properties):
+    def __init__(self, password: str):
+        self.__password = password
+        self.__secret = None
+
+    @property
+    def password(self) -> str: 
+        return self.__password
+
+    def validate(self, secret: str) -> bool:
+        if not self.__check_secret(secret):
+            return False
+
+        hashed, salt = self.__split_secret(secret)
+        target_hashed, second_salt = self.__generate_hashed(salt)
+
+        return hashed == target_hashed
+
+    def __check_secret(self, secret: str) -> bool:
+        properties = self.__class__
+        
+        hash_length = properties.hash_hex_length
+        salt_length = properties.salt_hex_length
+        separator = properties.salt_separator
+
+        if secret.find(separator) != hash_length:
+            return False
+
+        if len(secret) != hash_length + salt_length + len(separator):
+            return False
+
+        return True
+
+    def __split_secret(self, secret: str) -> [bytes, bytes]:
+        properties = self.__class__
+        separator = properties.salt_separator
+        splited = secret.split(separator)
+
+        hashed = bytes.fromhex(splited[0])
+        salt = bytes.fromhex(splited[-1])
+
+        return hashed, salt
+
+    def __generate_hashed(self, salt: bytes | None = None) -> str:
+        properties = self.__class__
+        
+        rounds = properties.hash_rounds
+        algorithm = properties.hash_algorithm
+        password = self.password.encode("UTF-8")
+        
+        if salt is None:
+            random_size = properties.salt_length
+            salt = os.urandom(random_size)
+
+        hashed = hashlib.pbkdf2_hmac(
+            algorithm, 
+            password, 
+            salt, 
+            rounds
+        )
+
+        return hashed, salt
+
+    @property
+    def secret(self) -> str:
+        if self.__secret is not None:
+            return self.__secret
+
+        hashed, salt = self.__generate_hashed()
+        self.__secret = self.__create_secret(hashed, salt)
+
+        return self.secret
+
+    def __create_secret(self, hashed: bytes, salt: bytes) -> str:
+        properties = self.__class__
+        separator = properties.salt_separator
+
+        result = hashed.hex() + separator + salt.hex()
+
+        if not self.__check_secret(result):
+            raise Exception("Can not create secret. Check secret settings!")
+
+        return result

+ 24 - 0
assets/secret_properties.py

@@ -0,0 +1,24 @@
+class secret_properties(type):
+    @property
+    def hash_hex_length(cls):
+        return 64
+
+    @property
+    def salt_separator(cls):   
+        return ":"
+
+    @property
+    def salt_length(cls):
+        return 16
+
+    @property
+    def salt_hex_length(cls):
+        return cls.salt_length * 2
+   
+    @property
+    def hash_rounds(cls):
+        return 100000
+
+    @property
+    def hash_algorithm(cls):
+        return "sha256"

+ 15 - 0
assets/user.py

@@ -0,0 +1,15 @@
+import sqlmodel
+
+class user(sqlmodel.SQLModel, table = True):
+    id: int | None = sqlmodel.Field(default = None, primary_key = True)
+    nick: str
+    secret: str
+
+    def clone(self) -> super:   
+        return user(
+            id = self.id,
+            nick = self.nick,
+            secret = self.secret
+        )
+
+

+ 67 - 0
assets/user_validator.py

@@ -0,0 +1,67 @@
+import enum
+
+from .user import user 
+
+class user_validation_result(enum.Enum):
+    VALID = 0
+    NICK_TOO_SHORT = 1
+    NICK_TOO_LONG = 2
+    NICK_EMPTY = 3
+    SECRET_NOT_SET = 4
+    NICK_ALREADY_TAKEN = 5
+
+class user_validation_exception(BaseException): 
+    def __init__(self, target: user_validation_result):
+        super().__init__("Exception when validate user: " + target.name)
+        self.errors = target
+
+class user_validator:
+    def __init__(self, target: user):
+        self.__target = user.clone()
+
+    @property
+    def target(self) -> user:   
+        return self.__target
+
+    @property
+    def nick_validation_result(self) -> user_validation_result:
+        return user_validator.validate_nick(self.target.nick)
+    
+    def validate_nick(target) -> user_validation_result:
+        nick_length = len(target)
+
+        if nick_length == 0:
+            return user_validation_result.NICK_EMPTY
+
+        if nick_length < 4: 
+            return user_validation_result.NICK_TOO_SHORT
+
+        if nick_length > 20:    
+            return user_validation_result.NICK_TOO_LONG
+
+        return user_validation_result.VALID
+
+    @property
+    def has_valid_nick(self) -> bool:
+        return self.nick_validation_result == user_validation_resut.VALID
+
+    @property
+    def secret_validation_result(self) -> user_validation_result:
+        return user_validator.validate_secret(self.target.secret)
+
+    def validate_secret(target) -> user_validation_result:
+        if len(target) != 64:   
+            return user_validation_result.SECRET_NOT_SET
+
+        return user_validation_result.VALID
+
+    @property
+    def has_valid_secret(self) -> bool:
+        return self.secret_validation_result == user_validation_result.VALID
+
+    @property
+    def has_id(self) -> bool:
+        return self.target.id is not None
+        
+        
+

+ 124 - 0
assets/users_manager.py

@@ -0,0 +1,124 @@
+import sqlmodel
+import sqlalchemy
+
+from .user import user
+from .user_validator import user_validator
+from .user_validator import user_validation_result
+from .user_validator import user_validation_exception
+
+class users_manager:
+    def __init__(self, engine: sqlalchemy.engine.base.Engine):
+        self.__engine = engine
+
+    @property
+    def session(self) -> sqlmodel.Session:
+        return sqlmodel.Session(self.__engine)
+    
+    def register(self, target: user) -> user:
+        validation_result = self.__validate_user(target)
+
+        if validation_result != user_validation_result.VALID:
+            raise user_validation_exception(validation_result)
+
+        if self.nick_taken(target.nick):
+            raise user_validation_exception(
+                user_validation_result.NICK_ALREADY_TAKEN
+            )
+
+        if validator.has_id:
+            raise Exception("Can not register user which already exists.")
+
+        with self.session as session:
+            session.add(target)
+            session.commit()
+            session.refresh(target)
+
+        return target
+
+    def load(self, target: int) -> user | None:
+        query = sqlmodel
+        query = query.select(user)
+        query = query.where(user.id == target)
+
+        result = None
+
+        with self.session as session:
+            result = session.exec(query).first()
+
+        return result
+
+    def who_take_nick(self, target: str) -> int | None:
+        query = sqlmodel
+        query = query.select(user)
+        query = query.where(user.nick == target)
+
+        with self.session as session:
+            result = session.exec(query).first()
+
+            if result is None:
+                return None
+
+            return result.id
+
+    def nick_taken(self, target: str) -> bool:
+        return self.who_take_nick(target) != None
+
+    def __validate_user(self, target: user) -> user_validation_result:
+        validator = user_validator(target)
+
+        if not validator.has_valid_nick:
+            return validator.nick_validation_result
+
+        if not validator.has_valid_secret:
+            return validator.secret_validation_result
+
+        return user_validation_result.VALID
+
+    def save(self, target: user) -> user:
+        validation_result = self.__validate_user(target)
+
+        if validation_result != user_validation_result.VALID:
+            raise user_validation_exception(validation_result)
+        
+        if self.who_take_nick(target.nick) != target.id:
+            raise user_validation_exception(
+                user_validation_result.NICK_ALREADY_TAKEN
+            )
+
+        if not target.has_id:
+            raise Exception("Register user before save it.")
+
+        with self.session as session:
+            session.add(target)
+            session.commit()
+            session.refresh(target)
+
+    def login(self, nick: str, secret: str) -> user | None:
+        nick_validation = user_validator.validate_nick(nick)
+
+        if nick_validation != user_validation_result.VALID:
+            raise user_validation_exception(nick_validation)
+
+        secret_validation = user_validator.validate_secret(secret)
+        
+        if secret_validation != user_validation_result.VALID:
+            raise user_validation_exception(secret_validation)
+        
+        result = None
+
+        with self.session as session:
+            query = sqlmodel
+            query = query.select(user)
+            query = query.where(user.nick == nick)
+            query = query.where(user.secret == secret)
+
+            result = session.exec(query).first()
+
+        return result
+
+    def remove(self, target: user) -> None:
+        with self.session as session:
+            session.delete(target)
+            session.commit()
+
+

+ 30 - 0
core.py

@@ -0,0 +1,30 @@
+import os
+import dotenv
+import fastapi
+import fastapi.staticfiles
+import fastapi.responses
+import assets
+
+dotenv.load_dotenv()
+
+print("Loading cx-notes...")
+print("Note name:", os.getenv("notes_name"))
+print("Database:", os.getenv("database"))
+
+api = fastapi.FastAPI()
+static = fastapi.staticfiles.StaticFiles(directory = "./static")
+
+database = assets.storage(os.getenv("database"))
+app = assets.notes(os.getenv("notes_name"), database)
+
+api.mount("/static/", static)
+
[email protected]("/", response_class = fastapi.responses.HTMLResponse)
+async def index():
+    try:
+        with open("./static/core.html") as index:
+            return index.read()
+    except:
+        return "Error 404. Can not open file." 
+
+

+ 14 - 0
static/core.html

@@ -0,0 +1,14 @@
+<!DOCTYPE html>
+
+<html>
+    <head>
+        <meta charset="UTF-8">        
+        <meta name="viewport" content="width=device-width, initial-scale=1.0">
+
+        <script src="./static/script/core.js" type="module"></script>
+        <link rel="stylesheet" type="text/css" href="./static/theme/core.css">
+    </head>
+    
+    <body>
+    </body>
+</html>

+ 0 - 0
static/script/core.js


+ 0 - 0
static/theme/core.css


+ 9 - 0
tests/00-template.py

@@ -0,0 +1,9 @@
+import sys
+import pathlib
+
+file = pathlib.Path(__file__)
+directory = file.parent
+import_root = directory.parent
+
+sys.path.append(str(import_root))
+

+ 27 - 0
tests/01-secret-generator.py

@@ -0,0 +1,27 @@
+import sys
+import pathlib
+
+file = pathlib.Path(__file__)
+directory = file.parent
+import_root = directory.parent
+
+sys.path.append(str(import_root))
+
+from assets import secret_generator
+
+password_a = secret_generator("Password_a")
+password_b = secret_generator("Password_b")
+verify_password_a = secret_generator("Password_a")
+
+print("Check secret generation: ")
+print("Secret for 'Password_a': " + password_a.secret)
+print("Check that secret is generated once in the single instance: ")
+print("Secret for 'Password_a': " + password_a.secret)
+print("Secret for 'Password_b': " + password_b.secret)
+
+validation_result = "False"
+
+if verify_password_a.validate(password_a.secret):
+    validation_result = "True"
+
+print("Check validation for 'Password_a' by secret: " + validation_result)

+ 9 - 0
tests/02-database.py

@@ -0,0 +1,9 @@
+import sys
+import pathlib
+
+file = pathlib.Path(__file__)
+directory = file.parent
+import_root = directory.parent
+
+sys.path.append(str(import_root))
+

二進制
tests/test.db