| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240 |
- import asyncio
- import tortoise.transactions
- from .user import user
- from .user import user_proxy
- from .validators import validators
- from .exceptions import nick_in_use
- from .exceptions import nick_not_exists
- from .exceptions import user_is_not_admin
- from .exceptions import apikey_not_exists
- from .exceptions import invalid_nick_syntax
- from .exceptions import invalid_apikey_syntax
- from .exceptions import invalid_password_syntax
- from .exceptions import user_is_only_admin
- class users_manager:
- @classmethod
- async def api_register(
- cls,
- apikey: str,
- nick: str,
- password: str
- ) -> [user, user]:
- requester = await cls.require_by_apikey(apikey)
- if not requester.is_admin:
- raise user_is_not_admin()
- return await cls.register(nick, password), requester
- @classmethod
- async def register(cls, nick: str, password: str) -> user:
- if await cls.is_nick_in_use(nick):
- raise nick_in_use(nick)
- try:
- nick = validators.nick(nick)
- except Exception as error:
- raise invalid_nick_syntax(nick, error)
- async with tortoise.transactions.in_transaction():
- proxy = await user_proxy.create(nick, password)
- while await cls.is_apikey_in_use(proxy.result().apikey):
- proxy.drop_sessions()
- user = proxy.result()
-
- await user.save()
- return user
- @classmethod
- async def set_privileges_process(
- cls,
- apikey: str,
- nick: str,
- privileges: bool
- ) -> [user, user]:
- requester = await cls.get_by_apikey(apikey)
- if requester.nick == nick:
- raise can_not_change_own_privileges()
- if not requester.is_admin:
- raise user_is_not_admin()
- return requester, await cls.set_privileges(nick, privileges)
-
- @classmethod
- async def set_privileges(cls, nick: str, privileges: bool) -> user:
- async with tortoise.transactions.in_transaction():
- target = await cls.require_by_nick(nick)
- admins_count = await cls.count_admins()
-
- if admins_count == 1 and target.is_admin:
- raise user_is_only_admin
- target.is_admin = privileges
-
- await target.save()
- return target
- @classmethod
- async def remove(cls, nick: str) -> None:
- async with tortoise.transactions.in_transaction():
- target = await cls.require_by_nick(nick)
- admins_count = await cls.count_admins()
-
- if target.is_admin and admins_count == 1:
- raise user_is_only_admin()
- await target.delete()
- @classmethod
- async def remove_process(cls, apikey: str, nick: str) -> None:
- requester = await cls.require_by_apikey(apikey)
-
- if requester.nick == nick:
- return await cls.remove(nick)
- if requester.is_admin is False:
- raise user_is_not_admin()
- await cls.remove(nick)
- @classmethod
- async def count_admins(cls) -> int:
- return await user.filter(is_admin = True).count()
- @classmethod
- async def login(cls, nick: str, password: str) -> user | None:
- try:
- nick = validators.nick(nick)
- except Exception as error:
- raise invalid_nick_syntax(nick, error)
- async with tortoise.transactions.in_transaction():
- result = await user.filter(nick = nick).first()
- proxy = user_proxy(result)
- if await proxy.compare_password(password):
- return result
- return None
- @classmethod
- async def change_own_password(cls, apikey: str, password: str) -> user:
- target = await cls.require_by_apikey(apikey)
- return await cls.change_password(target, password)
- @classmethod
- async def change_other_password(
- cls,
- apikey: str,
- nick: str,
- password: str
- ) -> [user, user]:
- requester = await cls.require_by_apikey(apikey)
- target = await cls.require_by_nick(nick)
- if not requester.is_admin:
- raise user_is_not_admin()
- return requester, await cls.change_password(target, password)
- @classmethod
- async def drop_own_sessions(cls, apikey: str) -> user:
- target = await cls.require_by_apikey(apikey)
- return await cls.drop_sessions(target)
- @classmethod
- async def drop_other_sessions(
- cls,
- apikey: str,
- nick: str
- ) -> [user, user]:
- requester = await cls.require_by_apikey(apikey)
- target = await cls.require_by_nick(nick)
- if not requester.is_admin:
- raise user_is_not_admin()
- return requester, await cls.drop_sessions(target)
- @classmethod
- async def change_password(cls, target: user, password: str) -> user:
- try:
- password = validatord.password(password)
- except Exception as error:
- raise invalid_password_syntax(password, error)
- proxy = user_proxy(target)
- await proxy.set_password(password)
-
- async with tortoise.transactions.in_transaction():
- result = proxy.result()
- await result.save()
- return result
- @classmethod
- async def drop_sessions(cls, target: user) -> user:
- async with tortoise.transactions.in_transaction():
- proxy = user_proxy(target)
- proxy.drop_sessions(self)
-
- while await cls.is_apikey_in_use(proxy.result().apikey):
- proxy.drop_sessions(self)
-
- result = proxy.result()
- await result.save()
- return result
- @classmethod
- async def get_by_apikey(cls, apikey: str) -> user | None:
- try:
- apikey = validators.apikey(apikey)
- except Exception as error:
- raise invalid_apikey_syntax(apikey, error)
- async with tortoise.transactions.in_transaction():
- return await user.filter(apikey = apikey).first()
- @classmethod
- async def is_apikey_in_use(cls, apikey: str) -> bool:
- return (await cls.get_by_apikey(apikey)) is not None
- @classmethod
- async def require_by_apikey(cls, apikey: str) -> user:
- target = await cls.get_by_apikey(apikey)
- if target is None:
- raise apikey_not_exists(apikey)
- return target
- @classmethod
- async def get_by_nick(cls, nick: str) -> user | None:
- try:
- nick = validators.nick(nick)
- except Exception as error:
- raise invalid_nick_syntax(nick, error)
- async with tortoise.transactions.in_transaction():
- return await user.filter(nick = nick).first()
-
- @classmethod
- async def is_nick_in_use(cls, nick: str) -> bool:
- return (await cls.get_by_nick(nick)) is not None
- @classmethod
- async def require_by_nick(cls, nick: str) -> user:
- target = await cls.get_by_nick(nick)
- if target is None:
- raise nick_not_exists(nick)
- return target
|