Files
tg_loader/bot/modules/message_handler/video_selection_cache.py

130 lines
4.0 KiB
Python

"""
Video selection cache for callback handlers
Stores mappings between short identifiers and video URLs to work around
Telegram's 64-byte limit on callback_data.
"""
import secrets
import threading
import time
from typing import Optional, Dict
import logging
logger = logging.getLogger(__name__)
# Dictionary to store video URL mappings
# Format: {selection_id: {'url': str, 'user_id': int, 'created_at': float}}
_video_selections: Dict[str, Dict] = {}
_video_selections_lock = threading.Lock()
_MAX_SELECTIONS = 10000 # Maximum number of cached selections
_SELECTION_TTL = 3600 # Time to live: 1 hour in seconds
def generate_selection_id() -> str:
"""Generate a short unique identifier for video selection"""
return secrets.token_urlsafe(8) # ~11 characters, well under 64 bytes
def store_video_selection(url: str, user_id: int) -> str:
"""
Store a video URL and return a short identifier
Args:
url: Video URL to store
user_id: User ID who requested the selection
Returns:
Short identifier to use in callback_data
"""
selection_id = generate_selection_id()
with _video_selections_lock:
# Clean up old entries if cache is too large
if len(_video_selections) >= _MAX_SELECTIONS:
_cleanup_expired_selections()
# If still too large, remove oldest entries
if len(_video_selections) >= _MAX_SELECTIONS:
# Remove 10% of oldest entries
sorted_items = sorted(
_video_selections.items(),
key=lambda x: x[1].get('created_at', 0)
)
items_to_remove = len(sorted_items) // 10
for item_id, _ in sorted_items[:items_to_remove]:
del _video_selections[item_id]
_video_selections[selection_id] = {
'url': url,
'user_id': user_id,
'created_at': time.time()
}
logger.debug(f"Stored video selection: {selection_id} for user {user_id}")
return selection_id
def get_video_selection(selection_id: str, user_id: Optional[int] = None) -> Optional[str]:
"""
Retrieve a video URL by selection identifier
Args:
selection_id: Short identifier from callback_data
user_id: Optional user ID to verify ownership
Returns:
Video URL or None if not found or expired
"""
with _video_selections_lock:
selection = _video_selections.get(selection_id)
if not selection:
logger.debug(f"Selection not found: {selection_id}")
return None
# Check if expired
if time.time() - selection['created_at'] > _SELECTION_TTL:
del _video_selections[selection_id]
logger.debug(f"Selection expired: {selection_id}")
return None
# Verify user ownership if provided
if user_id is not None and selection['user_id'] != user_id:
logger.warning(
f"User {user_id} attempted to access selection {selection_id} "
f"owned by user {selection['user_id']}"
)
return None
return selection['url']
def _cleanup_expired_selections():
"""Remove expired selections from cache"""
current_time = time.time()
expired_ids = [
sel_id for sel_id, sel_data in _video_selections.items()
if current_time - sel_data['created_at'] > _SELECTION_TTL
]
for sel_id in expired_ids:
del _video_selections[sel_id]
if expired_ids:
logger.debug(f"Cleaned up {len(expired_ids)} expired selections")
def clear_user_selections(user_id: int):
"""Clear all selections for a specific user"""
with _video_selections_lock:
to_remove = [
sel_id for sel_id, sel_data in _video_selections.items()
if sel_data['user_id'] == user_id
]
for sel_id in to_remove:
del _video_selections[sel_id]
if to_remove:
logger.debug(f"Cleared {len(to_remove)} selections for user {user_id}")