import asyncio import tortoise.transactions from .user import user from .user import user_proxy from .validators import validators from .exceptions import nick_in_use from .exceptions import nick_not_exists from .exceptions import user_is_not_admin from .exceptions import apikey_not_exists from .exceptions import invalid_nick_syntax from .exceptions import invalid_apikey_syntax from .exceptions import invalid_password_syntax from .exceptions import user_is_only_admin class users_manager: @classmethod async def api_register( cls, apikey: str, nick: str, password: str ) -> [user, user]: requester = await cls.require_by_apikey(apikey) if not requester.is_admin: raise user_is_not_admin() return await cls.register(nick, password), requester @classmethod async def register(cls, nick: str, password: str) -> user: if await cls.is_nick_in_use(nick): raise nick_in_use(nick) try: nick = validators.nick(nick) except Exception as error: raise invalid_nick_syntax(nick, error) async with tortoise.transactions.in_transaction(): proxy = await user_proxy.create(nick, password) while await cls.is_apikey_in_use(proxy.result().apikey): proxy.drop_sessions() user = proxy.result() await user.save() return user @classmethod async def set_privileges_process( cls, apikey: str, nick: str, privileges: bool ) -> [user, user]: requester = await cls.get_by_apikey(apikey) if requester.nick == nick: raise can_not_change_own_privileges() if not requester.is_admin: raise user_is_not_admin() return requester, await cls.set_privileges(nick, privileges) @classmethod async def set_privileges(cls, nick: str, privileges: bool) -> user: async with tortoise.transactions.in_transaction(): target = await cls.require_by_nick(nick) admins_count = await cls.count_admins() if admins_count == 1 and target.is_admin: raise user_is_only_admin target.is_admin = privileges await target.save() return target @classmethod async def remove(cls, nick: str) -> None: async with tortoise.transactions.in_transaction(): target = await cls.require_by_nick(nick) admins_count = await cls.count_admins() if target.is_admin and admins_count == 1: raise user_is_only_admin() await target.delete() @classmethod async def remove_process(cls, apikey: str, nick: str) -> None: requester = await cls.require_by_apikey(apikey) if requester.nick == nick: return await cls.remove(nick) if requester.is_admin is False: raise user_is_not_admin() await cls.remove(nick) @classmethod async def count_admins(cls) -> int: return await user.filter(is_admin = True).count() @classmethod async def login(cls, nick: str, password: str) -> user | None: try: nick = validators.nick(nick) except Exception as error: raise invalid_nick_syntax(nick, error) async with tortoise.transactions.in_transaction(): result = await user.filter(nick = nick).first() proxy = user_proxy(result) if await proxy.compare_password(password): return result return None @classmethod async def change_own_password(cls, apikey: str, password: str) -> user: target = await cls.require_by_apikey(apikey) return await cls.change_password(target, password) @classmethod async def change_other_password( cls, apikey: str, nick: str, password: str ) -> [user, user]: requester = await cls.require_by_apikey(apikey) target = await cls.require_by_nick(nick) if not requester.is_admin: raise user_is_not_admin() return requester, await cls.change_password(target, password) @classmethod async def drop_own_sessions(cls, apikey: str) -> user: target = await cls.require_by_apikey(apikey) return await cls.drop_sessions(target) @classmethod async def drop_other_sessions( cls, apikey: str, nick: str ) -> [user, user]: requester = await cls.require_by_apikey(apikey) target = await cls.require_by_nick(nick) if not requester.is_admin: raise user_is_not_admin() return requester, await cls.drop_sessions(target) @classmethod async def change_password(cls, target: user, password: str) -> user: try: password = validatord.password(password) except Exception as error: raise invalid_password_syntax(password, error) proxy = user_proxy(target) await proxy.set_password(password) async with tortoise.transactions.in_transaction(): result = proxy.result() await result.save() return result @classmethod async def drop_sessions(cls, target: user) -> user: async with tortoise.transactions.in_transaction(): proxy = user_proxy(target) proxy.drop_sessions(self) while await cls.is_apikey_in_use(proxy.result().apikey): proxy.drop_sessions(self) result = proxy.result() await result.save() return result @classmethod async def get_by_apikey(cls, apikey: str) -> user | None: try: apikey = validators.apikey(apikey) except Exception as error: raise invalid_apikey_syntax(apikey, error) async with tortoise.transactions.in_transaction(): return await user.filter(apikey = apikey).first() @classmethod async def is_apikey_in_use(cls, apikey: str) -> bool: return (await cls.get_by_apikey(apikey)) is not None @classmethod async def require_by_apikey(cls, apikey: str) -> user: target = await cls.get_by_apikey(apikey) if target is None: raise apikey_not_exists(apikey) return target @classmethod async def get_by_nick(cls, nick: str) -> user | None: try: nick = validators.nick(nick) except Exception as error: raise invalid_nick_syntax(nick, error) async with tortoise.transactions.in_transaction(): return await user.filter(nick = nick).first() @classmethod async def is_nick_in_use(cls, nick: str) -> bool: return (await cls.get_by_nick(nick)) is not None @classmethod async def require_by_nick(cls, nick: str) -> user: target = await cls.get_by_nick(nick) if target is None: raise nick_not_exists(nick) return target