130 lines
4.0 KiB
Python
130 lines
4.0 KiB
Python
"""
|
|
Video selection cache for callback handlers
|
|
|
|
Stores mappings between short identifiers and video URLs to work around
|
|
Telegram's 64-byte limit on callback_data.
|
|
"""
|
|
import secrets
|
|
import threading
|
|
import time
|
|
from typing import Optional, Dict
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Dictionary to store video URL mappings
|
|
# Format: {selection_id: {'url': str, 'user_id': int, 'created_at': float}}
|
|
_video_selections: Dict[str, Dict] = {}
|
|
_video_selections_lock = threading.Lock()
|
|
_MAX_SELECTIONS = 10000 # Maximum number of cached selections
|
|
_SELECTION_TTL = 3600 # Time to live: 1 hour in seconds
|
|
|
|
|
|
def generate_selection_id() -> str:
|
|
"""Generate a short unique identifier for video selection"""
|
|
return secrets.token_urlsafe(8) # ~11 characters, well under 64 bytes
|
|
|
|
|
|
def store_video_selection(url: str, user_id: int) -> str:
|
|
"""
|
|
Store a video URL and return a short identifier
|
|
|
|
Args:
|
|
url: Video URL to store
|
|
user_id: User ID who requested the selection
|
|
|
|
Returns:
|
|
Short identifier to use in callback_data
|
|
"""
|
|
selection_id = generate_selection_id()
|
|
|
|
with _video_selections_lock:
|
|
# Clean up old entries if cache is too large
|
|
if len(_video_selections) >= _MAX_SELECTIONS:
|
|
_cleanup_expired_selections()
|
|
|
|
# If still too large, remove oldest entries
|
|
if len(_video_selections) >= _MAX_SELECTIONS:
|
|
# Remove 10% of oldest entries
|
|
sorted_items = sorted(
|
|
_video_selections.items(),
|
|
key=lambda x: x[1].get('created_at', 0)
|
|
)
|
|
items_to_remove = len(sorted_items) // 10
|
|
for item_id, _ in sorted_items[:items_to_remove]:
|
|
del _video_selections[item_id]
|
|
|
|
_video_selections[selection_id] = {
|
|
'url': url,
|
|
'user_id': user_id,
|
|
'created_at': time.time()
|
|
}
|
|
|
|
logger.debug(f"Stored video selection: {selection_id} for user {user_id}")
|
|
return selection_id
|
|
|
|
|
|
def get_video_selection(selection_id: str, user_id: Optional[int] = None) -> Optional[str]:
|
|
"""
|
|
Retrieve a video URL by selection identifier
|
|
|
|
Args:
|
|
selection_id: Short identifier from callback_data
|
|
user_id: Optional user ID to verify ownership
|
|
|
|
Returns:
|
|
Video URL or None if not found or expired
|
|
"""
|
|
with _video_selections_lock:
|
|
selection = _video_selections.get(selection_id)
|
|
|
|
if not selection:
|
|
logger.debug(f"Selection not found: {selection_id}")
|
|
return None
|
|
|
|
# Check if expired
|
|
if time.time() - selection['created_at'] > _SELECTION_TTL:
|
|
del _video_selections[selection_id]
|
|
logger.debug(f"Selection expired: {selection_id}")
|
|
return None
|
|
|
|
# Verify user ownership if provided
|
|
if user_id is not None and selection['user_id'] != user_id:
|
|
logger.warning(
|
|
f"User {user_id} attempted to access selection {selection_id} "
|
|
f"owned by user {selection['user_id']}"
|
|
)
|
|
return None
|
|
|
|
return selection['url']
|
|
|
|
|
|
def _cleanup_expired_selections():
|
|
"""Remove expired selections from cache"""
|
|
current_time = time.time()
|
|
expired_ids = [
|
|
sel_id for sel_id, sel_data in _video_selections.items()
|
|
if current_time - sel_data['created_at'] > _SELECTION_TTL
|
|
]
|
|
|
|
for sel_id in expired_ids:
|
|
del _video_selections[sel_id]
|
|
|
|
if expired_ids:
|
|
logger.debug(f"Cleaned up {len(expired_ids)} expired selections")
|
|
|
|
|
|
def clear_user_selections(user_id: int):
|
|
"""Clear all selections for a specific user"""
|
|
with _video_selections_lock:
|
|
to_remove = [
|
|
sel_id for sel_id, sel_data in _video_selections.items()
|
|
if sel_data['user_id'] == user_id
|
|
]
|
|
for sel_id in to_remove:
|
|
del _video_selections[sel_id]
|
|
|
|
if to_remove:
|
|
logger.debug(f"Cleared {len(to_remove)} selections for user {user_id}")
|
|
|