| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240 | import asyncioimport tortoise.transactionsfrom .user import userfrom .user import user_proxyfrom .validators import validatorsfrom .exceptions import nick_in_usefrom .exceptions import nick_not_existsfrom .exceptions import user_is_not_adminfrom .exceptions import apikey_not_existsfrom .exceptions import invalid_nick_syntaxfrom .exceptions import invalid_apikey_syntaxfrom .exceptions import invalid_password_syntaxfrom .exceptions import user_is_only_adminclass users_manager:    @classmethod    async def api_register(        cls,         apikey: str,         nick: str,         password: str    ) -> [user, user]:        requester = await cls.require_by_apikey(apikey)        if not requester.is_admin:            raise user_is_not_admin()        return await cls.register(nick, password), requester    @classmethod    async def register(cls, nick: str, password: str) -> user:        if await cls.is_nick_in_use(nick):            raise nick_in_use(nick)        try:            nick = validators.nick(nick)        except Exception as error:            raise invalid_nick_syntax(nick, error)        async with tortoise.transactions.in_transaction():            proxy = await user_proxy.create(nick, password)            while await cls.is_apikey_in_use(proxy.result().apikey):                proxy.drop_sessions()            user = proxy.result()                        await user.save()            return user    @classmethod    async def set_privileges_process(        cls,         apikey: str,         nick: str,         privileges: bool    ) -> [user, user]:        requester = await cls.get_by_apikey(apikey)        if requester.nick == nick:            raise can_not_change_own_privileges()        if not requester.is_admin:            raise user_is_not_admin()        return requester, await cls.set_privileges(nick, privileges)            @classmethod    async def set_privileges(cls, nick: str, privileges: bool) -> user:        async with tortoise.transactions.in_transaction():            target = await cls.require_by_nick(nick)            admins_count = await cls.count_admins()                        if admins_count == 1 and target.is_admin:                raise user_is_only_admin            target.is_admin = privileges                        await target.save()            return target    @classmethod    async def remove(cls, nick: str) -> None:        async with tortoise.transactions.in_transaction():            target = await cls.require_by_nick(nick)            admins_count = await cls.count_admins()                        if target.is_admin and admins_count == 1:                raise user_is_only_admin()            await target.delete()    @classmethod    async def remove_process(cls, apikey: str, nick: str) -> None:        requester = await cls.require_by_apikey(apikey)                if requester.nick == nick:            return await cls.remove(nick)        if requester.is_admin is False:            raise user_is_not_admin()        await cls.remove(nick)    @classmethod    async def count_admins(cls) -> int:        return await user.filter(is_admin = True).count()    @classmethod     async def login(cls, nick: str, password: str) -> user | None:        try:            nick = validators.nick(nick)        except Exception as error:            raise invalid_nick_syntax(nick, error)        async with tortoise.transactions.in_transaction():            result = await user.filter(nick = nick).first()            proxy = user_proxy(result)            if await proxy.compare_password(password):                return result            return None    @classmethod     async def change_own_password(cls, apikey: str, password: str) -> user:        target = await cls.require_by_apikey(apikey)        return await cls.change_password(target, password)    @classmethod    async def change_other_password(        cls,        apikey: str,        nick: str,        password: str    ) -> [user, user]:        requester = await cls.require_by_apikey(apikey)        target = await cls.require_by_nick(nick)        if not requester.is_admin:            raise user_is_not_admin()        return requester, await cls.change_password(target, password)    @classmethod    async def drop_own_sessions(cls, apikey: str) -> user:        target = await cls.require_by_apikey(apikey)        return await cls.drop_sessions(target)    @classmethod    async def drop_other_sessions(        cls,         apikey: str,         nick: str    ) -> [user, user]:        requester = await cls.require_by_apikey(apikey)        target = await cls.require_by_nick(nick)        if not requester.is_admin:              raise user_is_not_admin()        return requester, await cls.drop_sessions(target)    @classmethod    async def change_password(cls, target: user, password: str) -> user:        try:            password = validatord.password(password)        except Exception as error:            raise invalid_password_syntax(password, error)        proxy = user_proxy(target)        await proxy.set_password(password)                async with tortoise.transactions.in_transaction():            result = proxy.result()            await result.save()            return result    @classmethod    async def drop_sessions(cls, target: user) -> user:        async with tortoise.transactions.in_transaction():            proxy = user_proxy(target)            proxy.drop_sessions(self)                        while await cls.is_apikey_in_use(proxy.result().apikey):                proxy.drop_sessions(self)                        result = proxy.result()            await result.save()            return result    @classmethod     async def get_by_apikey(cls, apikey: str) -> user | None:        try:            apikey = validators.apikey(apikey)        except Exception as error:            raise invalid_apikey_syntax(apikey, error)        async with tortoise.transactions.in_transaction():            return await user.filter(apikey = apikey).first()    @classmethod    async def is_apikey_in_use(cls, apikey: str) -> bool:        return (await cls.get_by_apikey(apikey)) is not None    @classmethod    async def require_by_apikey(cls, apikey: str) -> user:        target = await cls.get_by_apikey(apikey)        if target is None:            raise apikey_not_exists(apikey)        return target    @classmethod    async def get_by_nick(cls, nick: str) -> user | None:        try:            nick = validators.nick(nick)        except Exception as error:            raise invalid_nick_syntax(nick, error)        async with tortoise.transactions.in_transaction():            return await user.filter(nick = nick).first()               @classmethod    async def is_nick_in_use(cls, nick: str) -> bool:        return (await cls.get_by_nick(nick)) is not None     @classmethod    async def require_by_nick(cls, nick: str) -> user:        target = await cls.get_by_nick(nick)        if target is None:              raise nick_not_exists(nick)        return target
 |