|  | @@ -0,0 +1,181 @@
 | 
	
		
			
				|  |  | +import typer
 | 
	
		
			
				|  |  | +import enum
 | 
	
		
			
				|  |  | +import pathlib
 | 
	
		
			
				|  |  | +import getpass
 | 
	
		
			
				|  |  | +import json
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +from assets import *
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +default_config = pathlib.Path("config.json")
 | 
	
		
			
				|  |  | +default_users_db = pathlib.Path("users.json")
 | 
	
		
			
				|  |  | +default_db = pathlib.Path("database.db")
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +config_help = "This is configuration file of the app to use."
 | 
	
		
			
				|  |  | +users_db_help = "This is location of the users json database."
 | 
	
		
			
				|  |  | +db_help = "This is location of SQLite3 database file."
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +description = "This is core reservationer package. It could be used to host "
 | 
	
		
			
				|  |  | +description = description + "app, or manage configuration or database."
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +app = typer.Typer(help = description)
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +class user_command(str, enum.Enum):
 | 
	
		
			
				|  |  | +    """
 | 
	
		
			
				|  |  | +    That commands could be used in the user subcommand.
 | 
	
		
			
				|  |  | +    """
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    add = "register"
 | 
	
		
			
				|  |  | +    delete = "delete"
 | 
	
		
			
				|  |  | +    password = "password-change"
 | 
	
		
			
				|  |  | +    logout = "full-logout"
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +def password_prompt() -> str:
 | 
	
		
			
				|  |  | +    while True: 
 | 
	
		
			
				|  |  | +        first = getpass.getpass("Password: ")
 | 
	
		
			
				|  |  | +        second = getpass.getpass("Repeat password: ")
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        if first == second:
 | 
	
		
			
				|  |  | +            return first
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        print("Passwords do not match.")
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | [email protected]()
 | 
	
		
			
				|  |  | +def server(
 | 
	
		
			
				|  |  | +    port: int = typer.Option(8080, help = "Port to listen on."),
 | 
	
		
			
				|  |  | +    address: str = typer.Option("0.0.0.0", help = "Address to listen on."),
 | 
	
		
			
				|  |  | +    config: pathlib.Path = typer.Option(default_config, help = config_help)
 | 
	
		
			
				|  |  | +) -> None:
 | 
	
		
			
				|  |  | +    """
 | 
	
		
			
				|  |  | +    Start app on selected port and interfaces.
 | 
	
		
			
				|  |  | +    """
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    print(port)
 | 
	
		
			
				|  |  | +    print(address)
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | [email protected]()
 | 
	
		
			
				|  |  | +def user(
 | 
	
		
			
				|  |  | +    command: user_command = typer.Argument(help = "Command to run on users."),
 | 
	
		
			
				|  |  | +    nick: str = typer.Argument(help = "Nick of the user to work on."),
 | 
	
		
			
				|  |  | +    config: pathlib.Path = typer.Option(default_config, help = config_help)
 | 
	
		
			
				|  |  | +) -> None:
 | 
	
		
			
				|  |  | +    """
 | 
	
		
			
				|  |  | +    Modify user database.
 | 
	
		
			
				|  |  | +    """
 | 
	
		
			
				|  |  | +    
 | 
	
		
			
				|  |  | +    try:
 | 
	
		
			
				|  |  | +        loader = config_loader(app_config).load(config)
 | 
	
		
			
				|  |  | +        collection = loader.resources.users
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        # User adding 
 | 
	
		
			
				|  |  | +        if command == user_command.add:
 | 
	
		
			
				|  |  | +            
 | 
	
		
			
				|  |  | +            # Check that user exists
 | 
	
		
			
				|  |  | +            if collection.exists(nick):
 | 
	
		
			
				|  |  | +                raise Exception("User with that nick already exists.")
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            # Create new user
 | 
	
		
			
				|  |  | +            creator = user_factory()
 | 
	
		
			
				|  |  | +            creator.nick = nick
 | 
	
		
			
				|  |  | +            creator.password = password_prompt()
 | 
	
		
			
				|  |  | +            
 | 
	
		
			
				|  |  | +            # Add it
 | 
	
		
			
				|  |  | +            collection.add(creator.result)
 | 
	
		
			
				|  |  | +            
 | 
	
		
			
				|  |  | +            print("Adding \"" + nick + "\" to the database.")
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        # User remove
 | 
	
		
			
				|  |  | +        elif command == user_command.delete:
 | 
	
		
			
				|  |  | +            
 | 
	
		
			
				|  |  | +            # Load from database
 | 
	
		
			
				|  |  | +            target = collection.get_by_nick(nick)
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            # Check that user exists
 | 
	
		
			
				|  |  | +            if target is None:
 | 
	
		
			
				|  |  | +                raise Exception("User with given nick not exists.")
 | 
	
		
			
				|  |  | +            
 | 
	
		
			
				|  |  | +            # When exists remove it
 | 
	
		
			
				|  |  | +            collection.remove(target)
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        # Change user password
 | 
	
		
			
				|  |  | +        elif command == user_command.password:
 | 
	
		
			
				|  |  | +                
 | 
	
		
			
				|  |  | +            # Load user by nick 
 | 
	
		
			
				|  |  | +            target = collection.get_by_nick(nick)
 | 
	
		
			
				|  |  | +            
 | 
	
		
			
				|  |  | +            # Check that user exists
 | 
	
		
			
				|  |  | +            if target is None:
 | 
	
		
			
				|  |  | +                raise Exception("User not exists, can not change password.")
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            # Change password
 | 
	
		
			
				|  |  | +            handler = user_factory(target)
 | 
	
		
			
				|  |  | +            handler.password = password_prompt()
 | 
	
		
			
				|  |  | +            modified = handler.result
 | 
	
		
			
				|  |  | +            
 | 
	
		
			
				|  |  | +            # Store it in collection
 | 
	
		
			
				|  |  | +            collection.remove(target).add(modified)
 | 
	
		
			
				|  |  | +        
 | 
	
		
			
				|  |  | +        # Logout user from all devices
 | 
	
		
			
				|  |  | +        elif command == user_command.logout:
 | 
	
		
			
				|  |  | +            
 | 
	
		
			
				|  |  | +            # Load user from database
 | 
	
		
			
				|  |  | +            target = collection.get_by_nick(nick)
 | 
	
		
			
				|  |  | +            
 | 
	
		
			
				|  |  | +            # Check that exists
 | 
	
		
			
				|  |  | +            if target is None:
 | 
	
		
			
				|  |  | +                raise Exception("User not exists, can not logout.")
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            # Refresh apikey
 | 
	
		
			
				|  |  | +            modified = user_factory(target) \
 | 
	
		
			
				|  |  | +            .refresh_apikey() \
 | 
	
		
			
				|  |  | +            .result
 | 
	
		
			
				|  |  | +            
 | 
	
		
			
				|  |  | +            # Store result
 | 
	
		
			
				|  |  | +            collection.remove(target).add(modified)
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        # Save collection to file
 | 
	
		
			
				|  |  | +        users_saver(collection) \
 | 
	
		
			
				|  |  | +        .drop(loader.result.users_path) \
 | 
	
		
			
				|  |  | +        .save()
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        print("Users database saved success.")
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    except validator_exception as error:
 | 
	
		
			
				|  |  | +        print("Password is not correct, too easy to break.")
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    except json.JSONDecodeError as error:
 | 
	
		
			
				|  |  | +        print("User JSON has syntax exception.")
 | 
	
		
			
				|  |  | +        print(str(error))
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    except Exception as error:
 | 
	
		
			
				|  |  | +        print("Can not done work.")
 | 
	
		
			
				|  |  | +        print(str(error))
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | [email protected]()
 | 
	
		
			
				|  |  | +def initialize(
 | 
	
		
			
				|  |  | +    config: pathlib.Path = typer.Option(default_config, help = config_help),
 | 
	
		
			
				|  |  | +    users: pathlib.Path = typer.Option(default_users_db, help = users_db_help),
 | 
	
		
			
				|  |  | +    database: pathlib.Path = typer.Option(default_db, help = db_help)
 | 
	
		
			
				|  |  | +) -> None:
 | 
	
		
			
				|  |  | +    """
 | 
	
		
			
				|  |  | +    Initialize app configuration.
 | 
	
		
			
				|  |  | +    """
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    try:
 | 
	
		
			
				|  |  | +        
 | 
	
		
			
				|  |  | +        # Generating config file
 | 
	
		
			
				|  |  | +        config_generator(app_config) \
 | 
	
		
			
				|  |  | +        .modify("users_file", str(users)) \
 | 
	
		
			
				|  |  | +        .modify("database_uri", "sqlite:///" + str(database)) \
 | 
	
		
			
				|  |  | +        .save(config)
 | 
	
		
			
				|  |  | +        
 | 
	
		
			
				|  |  | +        # Generating new blank users database
 | 
	
		
			
				|  |  | +        users_saver(users_collection()).save(users)
 | 
	
		
			
				|  |  | +        
 | 
	
		
			
				|  |  | +        print("Config file is being created.")
 | 
	
		
			
				|  |  | +    
 | 
	
		
			
				|  |  | +    except Exception as error:
 | 
	
		
			
				|  |  | +        print("Config initialization failed.")
 | 
	
		
			
				|  |  | +        print(str(error))
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +if __name__ == "__main__":
 | 
	
		
			
				|  |  | +    app()
 |