""" User and administrator management """ from datetime import datetime from typing import Tuple from bot.modules.database.session import AsyncSessionLocal from bot.modules.database.models import User from bot.modules.access_control.auth import is_admin, is_owner import logging logger = logging.getLogger(__name__) async def add_user(user_id: int, username: str = None, first_name: str = None, last_name: str = None) -> Tuple[bool, str]: """ Add user Args: user_id: User ID username: Username (if not specified, will be fetched from Telegram API) first_name: First name (if not specified, will be fetched from Telegram API) last_name: Last name (if not specified, will be fetched from Telegram API) Returns: Tuple of (success: bool, message: str) """ try: async with AsyncSessionLocal() as session: # Check existence existing_user = await session.get(User, user_id) if existing_user: # Update user information if missing updated = False if not existing_user.username and username: existing_user.username = username updated = True if not existing_user.first_name and first_name: existing_user.first_name = first_name updated = True if not existing_user.last_name and last_name: existing_user.last_name = last_name updated = True # If information is missing, try to get from Telegram API if not existing_user.username or not existing_user.first_name: try: from bot.utils.user_info_updater import update_user_info_from_telegram if await update_user_info_from_telegram(user_id, db_session=session): updated = True except Exception as e: logger.debug(f"Failed to get user {user_id} information from Telegram: {e}") if updated: await session.commit() logger.info(f"User {user_id} information updated") return (True, f"Пользователь {user_id} уже существует, информация обновлена") return (False, f"Пользователь {user_id} уже существует") # If username/first_name/last_name not specified, get from Telegram API if not username or not first_name: try: from bot.utils.telegram_user import get_user_info user_info = await get_user_info(user_id) if user_info: if not username: username = user_info.get('username') if not first_name: first_name = user_info.get('first_name') if not last_name: last_name = user_info.get('last_name') except Exception as e: logger.debug(f"Failed to get user {user_id} information from Telegram: {e}") # Create new user user = User( user_id=user_id, username=username, first_name=first_name, last_name=last_name, is_admin=False, is_blocked=False ) session.add(user) await session.commit() logger.info(f"User {user_id} added (username: {username})") return (True, f"Пользователь {user_id} успешно добавлен") except Exception as e: logger.error(f"Error adding user: {e}", exc_info=True) return (False, f"Ошибка базы данных: {str(e)}") async def remove_user(user_id: int) -> Tuple[bool, str]: """ Remove user Args: user_id: User ID Returns: Tuple of (success: bool, message: str) """ try: async with AsyncSessionLocal() as session: user = await session.get(User, user_id) if not user: return (False, f"Пользователь {user_id} не найден в базе данных") await session.delete(user) await session.commit() logger.info(f"User {user_id} removed") return (True, f"Пользователь {user_id} успешно удален") except Exception as e: logger.error(f"Error removing user: {e}", exc_info=True) return (False, f"Ошибка базы данных: {str(e)}") async def block_user(user_id: int) -> Tuple[bool, str]: """ Block user Args: user_id: User ID Returns: Tuple of (success: bool, message: str) """ try: async with AsyncSessionLocal() as session: user = await session.get(User, user_id) if not user: return (False, f"Пользователь {user_id} не найден в базе данных") if user.is_blocked: return (False, f"Пользователь {user_id} уже заблокирован") user.is_blocked = True user.updated_at = datetime.utcnow() await session.commit() logger.info(f"User {user_id} blocked") return (True, f"Пользователь {user_id} успешно заблокирован") except Exception as e: logger.error(f"Error blocking user: {e}", exc_info=True) return (False, f"Ошибка базы данных: {str(e)}") async def unblock_user(user_id: int) -> Tuple[bool, str]: """ Unblock user Args: user_id: User ID Returns: Tuple of (success: bool, message: str) """ try: async with AsyncSessionLocal() as session: user = await session.get(User, user_id) if not user: return (False, f"Пользователь {user_id} не найден в базе данных") if not user.is_blocked: return (False, f"Пользователь {user_id} не заблокирован") user.is_blocked = False user.updated_at = datetime.utcnow() await session.commit() logger.info(f"User {user_id} unblocked") return (True, f"Пользователь {user_id} успешно разблокирован") except Exception as e: logger.error(f"Error unblocking user: {e}", exc_info=True) return (False, f"Ошибка базы данных: {str(e)}") async def add_admin(user_id: int, requester_id: int) -> Tuple[bool, str]: """ Assign administrator Args: user_id: User ID to assign as admin requester_id: ID of user making the request Returns: Tuple of (success: bool, message: str) """ # Check permissions if not await is_admin(requester_id): return (False, "У вас нет прав администратора") try: async with AsyncSessionLocal() as session: user = await session.get(User, user_id) if not user: # Create user if doesn't exist user = User(user_id=user_id, is_admin=True) session.add(user) await session.commit() logger.info(f"User {user_id} created and assigned as administrator") return (True, f"Пользователь {user_id} создан и назначен администратором") else: if user.is_admin: return (False, f"Пользователь {user_id} уже является администратором") user.is_admin = True user.updated_at = datetime.utcnow() await session.commit() logger.info(f"User {user_id} assigned as administrator") return (True, f"Пользователь {user_id} успешно назначен администратором") except Exception as e: logger.error(f"Error assigning administrator: {e}", exc_info=True) return (False, f"Ошибка базы данных: {str(e)}") async def remove_admin(user_id: int, requester_id: int) -> Tuple[bool, str]: """ Remove administrator privileges Args: user_id: User ID to remove admin privileges from requester_id: ID of user making the request Returns: Tuple of (success: bool, message: str) """ # Check permissions if not await is_admin(requester_id): return (False, "У вас нет прав администратора") # Protection against self-removal if user_id == requester_id: return (False, "Вы не можете снять права администратора у самого себя") try: async with AsyncSessionLocal() as session: user = await session.get(User, user_id) if not user: return (False, f"Пользователь {user_id} не найден в базе данных") if not user.is_admin: return (False, f"Пользователь {user_id} не является администратором") user.is_admin = False user.updated_at = datetime.utcnow() await session.commit() logger.info(f"Administrator privileges removed from user {user_id}") return (True, f"Права администратора успешно сняты у пользователя {user_id}") except Exception as e: logger.error(f"Error removing administrator privileges: {e}", exc_info=True) return (False, f"Ошибка базы данных: {str(e)}")