Files
tg_loader/bot/modules/access_control/user_manager.py
2025-12-04 00:12:56 +03:00

250 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
User and administrator management
"""
from datetime import datetime
from typing import Tuple
from bot.modules.database.session import AsyncSessionLocal
from bot.modules.database.models import User
from bot.modules.access_control.auth import is_admin, is_owner
import logging
logger = logging.getLogger(__name__)
async def add_user(user_id: int, username: str = None, first_name: str = None, last_name: str = None) -> Tuple[bool, str]:
"""
Add user
Args:
user_id: User ID
username: Username (if not specified, will be fetched from Telegram API)
first_name: First name (if not specified, will be fetched from Telegram API)
last_name: Last name (if not specified, will be fetched from Telegram API)
Returns:
Tuple of (success: bool, message: str)
"""
try:
async with AsyncSessionLocal() as session:
# Check existence
existing_user = await session.get(User, user_id)
if existing_user:
# Update user information if missing
updated = False
if not existing_user.username and username:
existing_user.username = username
updated = True
if not existing_user.first_name and first_name:
existing_user.first_name = first_name
updated = True
if not existing_user.last_name and last_name:
existing_user.last_name = last_name
updated = True
# If information is missing, try to get from Telegram API
if not existing_user.username or not existing_user.first_name:
try:
from bot.utils.user_info_updater import update_user_info_from_telegram
if await update_user_info_from_telegram(user_id, db_session=session):
updated = True
except Exception as e:
logger.debug(f"Failed to get user {user_id} information from Telegram: {e}")
if updated:
await session.commit()
logger.info(f"User {user_id} information updated")
return (True, f"Пользователь {user_id} уже существует, информация обновлена")
return (False, f"Пользователь {user_id} уже существует")
# If username/first_name/last_name not specified, get from Telegram API
if not username or not first_name:
try:
from bot.utils.telegram_user import get_user_info
user_info = await get_user_info(user_id)
if user_info:
if not username:
username = user_info.get('username')
if not first_name:
first_name = user_info.get('first_name')
if not last_name:
last_name = user_info.get('last_name')
except Exception as e:
logger.debug(f"Failed to get user {user_id} information from Telegram: {e}")
# Create new user
user = User(
user_id=user_id,
username=username,
first_name=first_name,
last_name=last_name,
is_admin=False,
is_blocked=False
)
session.add(user)
await session.commit()
logger.info(f"User {user_id} added (username: {username})")
return (True, f"Пользователь {user_id} успешно добавлен")
except Exception as e:
logger.error(f"Error adding user: {e}", exc_info=True)
return (False, f"Ошибка базы данных: {str(e)}")
async def remove_user(user_id: int) -> Tuple[bool, str]:
"""
Remove user
Args:
user_id: User ID
Returns:
Tuple of (success: bool, message: str)
"""
try:
async with AsyncSessionLocal() as session:
user = await session.get(User, user_id)
if not user:
return (False, f"Пользователь {user_id} не найден в базе данных")
await session.delete(user)
await session.commit()
logger.info(f"User {user_id} removed")
return (True, f"Пользователь {user_id} успешно удален")
except Exception as e:
logger.error(f"Error removing user: {e}", exc_info=True)
return (False, f"Ошибка базы данных: {str(e)}")
async def block_user(user_id: int) -> Tuple[bool, str]:
"""
Block user
Args:
user_id: User ID
Returns:
Tuple of (success: bool, message: str)
"""
try:
async with AsyncSessionLocal() as session:
user = await session.get(User, user_id)
if not user:
return (False, f"Пользователь {user_id} не найден в базе данных")
if user.is_blocked:
return (False, f"Пользователь {user_id} уже заблокирован")
user.is_blocked = True
user.updated_at = datetime.utcnow()
await session.commit()
logger.info(f"User {user_id} blocked")
return (True, f"Пользователь {user_id} успешно заблокирован")
except Exception as e:
logger.error(f"Error blocking user: {e}", exc_info=True)
return (False, f"Ошибка базы данных: {str(e)}")
async def unblock_user(user_id: int) -> Tuple[bool, str]:
"""
Unblock user
Args:
user_id: User ID
Returns:
Tuple of (success: bool, message: str)
"""
try:
async with AsyncSessionLocal() as session:
user = await session.get(User, user_id)
if not user:
return (False, f"Пользователь {user_id} не найден в базе данных")
if not user.is_blocked:
return (False, f"Пользователь {user_id} не заблокирован")
user.is_blocked = False
user.updated_at = datetime.utcnow()
await session.commit()
logger.info(f"User {user_id} unblocked")
return (True, f"Пользователь {user_id} успешно разблокирован")
except Exception as e:
logger.error(f"Error unblocking user: {e}", exc_info=True)
return (False, f"Ошибка базы данных: {str(e)}")
async def add_admin(user_id: int, requester_id: int) -> Tuple[bool, str]:
"""
Assign administrator
Args:
user_id: User ID to assign as admin
requester_id: ID of user making the request
Returns:
Tuple of (success: bool, message: str)
"""
# Check permissions
if not await is_admin(requester_id):
return (False, "У вас нет прав администратора")
try:
async with AsyncSessionLocal() as session:
user = await session.get(User, user_id)
if not user:
# Create user if doesn't exist
user = User(user_id=user_id, is_admin=True)
session.add(user)
await session.commit()
logger.info(f"User {user_id} created and assigned as administrator")
return (True, f"Пользователь {user_id} создан и назначен администратором")
else:
if user.is_admin:
return (False, f"Пользователь {user_id} уже является администратором")
user.is_admin = True
user.updated_at = datetime.utcnow()
await session.commit()
logger.info(f"User {user_id} assigned as administrator")
return (True, f"Пользователь {user_id} успешно назначен администратором")
except Exception as e:
logger.error(f"Error assigning administrator: {e}", exc_info=True)
return (False, f"Ошибка базы данных: {str(e)}")
async def remove_admin(user_id: int, requester_id: int) -> Tuple[bool, str]:
"""
Remove administrator privileges
Args:
user_id: User ID to remove admin privileges from
requester_id: ID of user making the request
Returns:
Tuple of (success: bool, message: str)
"""
# Check permissions
if not await is_admin(requester_id):
return (False, "У вас нет прав администратора")
# Protection against self-removal
if user_id == requester_id:
return (False, "Вы не можете снять права администратора у самого себя")
try:
async with AsyncSessionLocal() as session:
user = await session.get(User, user_id)
if not user:
return (False, f"Пользователь {user_id} не найден в базе данных")
if not user.is_admin:
return (False, f"Пользователь {user_id} не является администратором")
user.is_admin = False
user.updated_at = datetime.utcnow()
await session.commit()
logger.info(f"Administrator privileges removed from user {user_id}")
return (True, f"Права администратора успешно сняты у пользователя {user_id}")
except Exception as e:
logger.error(f"Error removing administrator privileges: {e}", exc_info=True)
return (False, f"Ошибка базы данных: {str(e)}")