Add source
This commit is contained in:
146
bot/utils/helpers.py
Normal file
146
bot/utils/helpers.py
Normal file
@@ -0,0 +1,146 @@
|
||||
"""
|
||||
Utility functions
|
||||
"""
|
||||
import re
|
||||
import uuid
|
||||
from typing import Optional, Tuple
|
||||
|
||||
|
||||
def is_valid_url(url: str) -> bool:
|
||||
"""
|
||||
Validate URL with protection against dangerous schemes
|
||||
|
||||
Args:
|
||||
url: URL to validate
|
||||
|
||||
Returns:
|
||||
True if URL is valid and safe
|
||||
"""
|
||||
if not url or not isinstance(url, str):
|
||||
return False
|
||||
|
||||
# Check URL length (maximum 2048 characters)
|
||||
if len(url) > 2048:
|
||||
return False
|
||||
|
||||
# Block dangerous schemes
|
||||
dangerous_schemes = ['file://', 'javascript:', 'data:', 'vbscript:', 'about:']
|
||||
url_lower = url.lower().strip()
|
||||
for scheme in dangerous_schemes:
|
||||
if url_lower.startswith(scheme):
|
||||
return False
|
||||
|
||||
# Check URL format
|
||||
url_pattern = re.compile(
|
||||
r'^https?://' # http:// or https://
|
||||
r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+[A-Z]{2,6}\.?|' # domain...
|
||||
r'localhost|' # localhost...
|
||||
r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})' # ...or ip
|
||||
r'(?::\d+)?' # optional port
|
||||
r'(?:/?|[/?]\S+)$', re.IGNORECASE)
|
||||
return url_pattern.match(url) is not None
|
||||
|
||||
|
||||
def parse_user_id(text: str) -> Optional[int]:
|
||||
"""
|
||||
Parse user_id from text (numeric only)
|
||||
|
||||
Args:
|
||||
text: Text that may contain user_id
|
||||
|
||||
Returns:
|
||||
User ID as integer or None if not a valid number
|
||||
"""
|
||||
try:
|
||||
# Remove @ if present
|
||||
text = text.replace("@", "").strip()
|
||||
return int(text)
|
||||
except (ValueError, AttributeError):
|
||||
return None
|
||||
|
||||
|
||||
async def resolve_user_identifier(identifier: str) -> Tuple[Optional[int], Optional[str]]:
|
||||
"""
|
||||
Resolve user identifier (user_id or username) to user_id
|
||||
|
||||
Args:
|
||||
identifier: User ID (number) or username (with or without @)
|
||||
|
||||
Returns:
|
||||
Tuple of (user_id: Optional[int], error_message: Optional[str])
|
||||
If user_id is None, error_message contains the reason
|
||||
"""
|
||||
# First, try to parse as user_id
|
||||
user_id = parse_user_id(identifier)
|
||||
if user_id:
|
||||
return (user_id, None)
|
||||
|
||||
# If not a number, try to resolve username via Telegram API
|
||||
username = identifier.lstrip('@').strip()
|
||||
if not username:
|
||||
return (None, "Идентификатор не может быть пустым")
|
||||
|
||||
try:
|
||||
from bot.modules.task_scheduler.executor import get_app_client
|
||||
app_client = get_app_client()
|
||||
|
||||
if not app_client:
|
||||
return (None, "Telegram клиент не инициализирован. Попробуйте использовать User ID.")
|
||||
|
||||
# Try to get user by username via get_chat
|
||||
# Note: This only works if bot has already interacted with the user
|
||||
chat = await app_client.get_chat(username)
|
||||
if chat and hasattr(chat, 'id'):
|
||||
return (chat.id, None)
|
||||
else:
|
||||
return (None, f"Пользователь @{username} не найден через Telegram API")
|
||||
|
||||
except Exception as e:
|
||||
# Log the error but return user-friendly message
|
||||
import logging
|
||||
logger = logging.getLogger(__name__)
|
||||
logger.debug(f"Failed to resolve username {username}: {e}")
|
||||
return (None, f"Не удалось найти пользователя @{username}. Убедитесь, что бот взаимодействовал с этим пользователем, или используйте User ID.")
|
||||
|
||||
|
||||
def format_file_size(size_bytes: int) -> str:
|
||||
"""Format file size"""
|
||||
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
|
||||
if size_bytes < 1024.0:
|
||||
return f"{size_bytes:.2f} {unit}"
|
||||
size_bytes /= 1024.0
|
||||
return f"{size_bytes:.2f} PB"
|
||||
|
||||
|
||||
def format_duration(seconds) -> str:
|
||||
"""
|
||||
Format duration
|
||||
|
||||
Args:
|
||||
seconds: Duration in seconds (int or float)
|
||||
|
||||
Returns:
|
||||
Formatted string in "HH:MM:SS" or "MM:SS" format
|
||||
"""
|
||||
# Convert to int as we don't need fractional seconds for display
|
||||
seconds = int(seconds) if seconds else 0
|
||||
hours = seconds // 3600
|
||||
minutes = (seconds % 3600) // 60
|
||||
secs = seconds % 60
|
||||
|
||||
if hours > 0:
|
||||
return f"{hours:02d}:{minutes:02d}:{secs:02d}"
|
||||
return f"{minutes:02d}:{secs:02d}"
|
||||
|
||||
|
||||
def generate_unique_task_id() -> int:
|
||||
"""
|
||||
Generate unique task ID
|
||||
Uses UUID to guarantee uniqueness
|
||||
|
||||
Returns:
|
||||
Unique 63-bit integer ID
|
||||
"""
|
||||
# Use UUID and take first 63 bits (to fit in int)
|
||||
return uuid.uuid4().int & ((1 << 63) - 1)
|
||||
|
||||
Reference in New Issue
Block a user