""" Downloads via yt-dlp """ import yt_dlp from pathlib import Path from typing import Optional, Dict, Callable import asyncio import threading import logging import time import shutil import json logger = logging.getLogger(__name__) async def fix_video_aspect_ratio(video_path: str) -> Optional[str]: """ Fix video aspect ratio metadata for mobile compatibility This function ensures that video has correct aspect ratio metadata so it displays correctly on mobile devices (not as square) Args: video_path: Path to video file Returns: Path to fixed video file (same file if fixed in place, or new file) """ try: # Check if ffmpeg is available if not shutil.which('ffmpeg'): logger.warning("ffmpeg not found, skipping aspect ratio fix") return None # Check if ffprobe is available if not shutil.which('ffprobe'): logger.warning("ffprobe not found, skipping aspect ratio fix") return None video_file = Path(video_path) if not video_file.exists(): logger.warning(f"Video file not found: {video_path}") return None # Get video information to check aspect ratio probe_cmd = [ 'ffprobe', '-v', 'error', '-select_streams', 'v:0', '-show_entries', 'stream=width,height,display_aspect_ratio,sample_aspect_ratio', '-of', 'json', str(video_path) ] proc = await asyncio.create_subprocess_exec( *probe_cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await proc.communicate() if proc.returncode != 0: logger.warning(f"Failed to probe video: {stderr.decode()}") return None try: probe_data = json.loads(stdout.decode()) streams = probe_data.get('streams', []) if not streams: logger.warning("No video streams found") return None stream = streams[0] width = stream.get('width') height = stream.get('height') dar = stream.get('display_aspect_ratio') # Display Aspect Ratio sar = stream.get('sample_aspect_ratio') # Sample Aspect Ratio if not width or not height: logger.warning("Could not get video dimensions") return None # Calculate expected aspect ratio expected_dar = f"{width}:{height}" # If aspect ratio metadata is missing or incorrect, fix it needs_fix = False if not dar or dar == 'N/A': needs_fix = True logger.info(f"Video missing aspect ratio metadata (DAR), will fix to {expected_dar}") elif dar != expected_dar: # Check if the difference is significant try: dar_parts = dar.split(':') if len(dar_parts) == 2: dar_ratio = float(dar_parts[0]) / float(dar_parts[1]) expected_ratio = width / height if abs(dar_ratio - expected_ratio) > 0.01: # More than 1% difference needs_fix = True logger.info(f"Video has incorrect aspect ratio {dar}, will fix to {expected_dar}") except (ValueError, ZeroDivisionError): needs_fix = True if not needs_fix: logger.debug(f"Video aspect ratio is correct: {dar} (dimensions: {width}x{height})") return None # Fix aspect ratio by remuxing with correct metadata # Use temp file to avoid corruption if process fails temp_file = video_file.with_suffix('.tmp' + video_file.suffix) fix_cmd = [ 'ffmpeg', '-i', str(video_path), '-c', 'copy', # Copy streams without re-encoding '-aspect', expected_dar, # Set correct aspect ratio '-movflags', '+faststart', # Optimize for streaming '-y', # Overwrite str(temp_file) ] proc = await asyncio.create_subprocess_exec( *fix_cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) await proc.communicate() if proc.returncode == 0 and temp_file.exists(): # Replace original file with fixed one video_file.unlink() temp_file.rename(video_file) logger.info(f"Video aspect ratio fixed: {video_path} (DAR: {expected_dar})") return str(video_file) else: logger.warning(f"Failed to fix aspect ratio, keeping original file") if temp_file.exists(): temp_file.unlink() return None except json.JSONDecodeError as e: logger.warning(f"Failed to parse ffprobe output: {e}") return None except Exception as e: logger.error(f"Error fixing video aspect ratio: {e}", exc_info=True) return None def create_progress_hook(progress_callback: Optional[Callable] = None, event_loop=None, cancel_event: Optional[threading.Event] = None, last_update_time: list = None): """ Create progress hook for tracking download progress Args: progress_callback: Async callback function for updating progress event_loop: Event loop from main thread (for calling from executor) cancel_event: Event for checking download cancellation last_update_time: List to store last update time (for rate limiting) Returns: Hook function for yt-dlp """ if last_update_time is None: last_update_time = [0] def progress_hook(d: dict): # Check for cancellation if cancel_event and cancel_event.is_set(): raise KeyboardInterrupt("Download cancelled") if d.get('status') == 'downloading': percent = 0 if 'total_bytes' in d and d['total_bytes']: percent = (d.get('downloaded_bytes', 0) / d['total_bytes']) * 100 elif 'total_bytes_estimate' in d and d['total_bytes_estimate']: percent = (d.get('downloaded_bytes', 0) / d['total_bytes_estimate']) * 100 # Limit update frequency (no more than once per second) current_time = time.time() if progress_callback and percent > 0 and event_loop and (current_time - last_update_time[0] >= 1.0): try: last_update_time[0] = current_time # Use provided event loop for safe call from another thread # run_coroutine_threadsafe doesn't block current thread and doesn't block event loop future = asyncio.run_coroutine_threadsafe( progress_callback(int(percent)), event_loop ) # Don't wait for completion (future.result()) to avoid blocking download except Exception as e: logger.debug(f"Error updating progress: {e}") return progress_hook async def download_media( url: str, output_dir: str = "downloads", quality: str = "best", progress_callback: Optional[Callable] = None, cookies_file: Optional[str] = None, cancel_event: Optional[threading.Event] = None, task_id: Optional[int] = None ) -> Optional[Dict]: """ Download media via yt-dlp Args: url: Video/media URL output_dir: Directory for saving quality: Video quality (best, worst, 720p, etc.) progress_callback: Function for updating progress (accepts int 0-100) cookies_file: Path to cookies file (optional) cancel_event: Event for cancellation check (optional) task_id: Task ID for unique file naming (optional) Returns: Dictionary with downloaded file information or None """ try: # Log cookies file configuration if cookies_file: logger.info(f"Cookies file configured: {cookies_file}") else: logger.debug("No cookies file configured") # URL validation from bot.utils.helpers import is_valid_url if not is_valid_url(url): logger.error(f"Invalid or unsafe URL: {url}") return None # Create directory Path(output_dir).mkdir(parents=True, exist_ok=True) # Check free disk space (minimum 1GB) import shutil try: disk_usage = shutil.disk_usage(output_dir) free_space_gb = disk_usage.free / (1024 ** 3) min_free_space_gb = 1.0 # Minimum 1GB free space if free_space_gb < min_free_space_gb: logger.error(f"Insufficient free disk space: {free_space_gb:.2f} GB (minimum {min_free_space_gb} GB required)") return None except Exception as e: logger.warning(f"Failed to check free disk space: {e}") # Get event loop BEFORE starting executor to pass it to progress hook # Use get_running_loop() for explicit check that we're in async context try: loop = asyncio.get_running_loop() except RuntimeError: # If no running loop, try to get current one (for backward compatibility) loop = asyncio.get_event_loop() # List to store last progress update time last_update_time = [0] # Configure yt-dlp with progress hook that uses correct event loop progress_hook_func = create_progress_hook( progress_callback, event_loop=loop, cancel_event=cancel_event, last_update_time=last_update_time ) # Form unique filename with task_id to prevent conflicts if task_id: outtmpl = str(Path(output_dir) / f'%(title)s_[task_{task_id}].%(ext)s') else: outtmpl = str(Path(output_dir) / '%(title)s.%(ext)s') # Configure format selector for maximum quality with Telegram/mobile compatibility # Priority: Prefer already merged formats in mp4 container # This ensures compatibility with Telegram and mobile devices if quality == "best": # Format selector for maximum quality with compatibility: # 1. Prefer already merged mp4 files (best compatibility, no re-encoding needed) # 2. bestvideo[ext=mp4]+bestaudio[ext=m4a] (mp4 container, compatible codecs) # 3. bestvideo+bestaudio (fallback, will be merged to mp4) # 4. best (best combined format if separate streams not available) format_selector = ( 'best[ext=mp4]/' 'bestvideo[ext=mp4]+bestaudio[ext=m4a]/' 'bestvideo[ext=mp4]+bestaudio/' 'bestvideo+bestaudio/best' ) else: # Use custom quality if specified format_selector = quality ydl_opts = { 'format': format_selector, 'outtmpl': outtmpl, 'quiet': False, 'no_warnings': False, 'progress_hooks': [progress_hook_func], # Merge video and audio into single file (if separate streams) # Use mp4 container for maximum compatibility 'merge_output_format': 'mp4', # FFmpeg options for merging to ensure compatibility # Copy streams when possible (no re-encoding), only encode if necessary 'postprocessor_args': { 'ffmpeg': [ '-c:v', 'copy', '-c:a', 'aac', '-movflags', '+faststart', ] }, # Don't prefer free formats (they may be lower quality) 'prefer_free_formats': False, # Additional options for better quality 'writesubtitles': False, 'writeautomaticsub': False, 'ignoreerrors': False, # Network settings for better reliability 'socket_timeout': 60, # Increase socket timeout to 60 seconds 'retries': 3, # Retry failed downloads up to 3 times 'fragment_retries': 3, # Retry failed fragments 'file_access_retries': 3, # Retry file access errors } # Check if Node.js is available for JS extraction (required for Instagram, TikTok, YouTube, etc.) import shutil nodejs_path = shutil.which('node') if nodejs_path: logger.info(f"Node.js found at: {nodejs_path}. JS extraction will be available.") else: logger.warning( "Node.js not found. Some sites (Instagram, TikTok, YouTube, etc.) may require JS extraction. " "Install Node.js for full functionality." ) # Configure extractor args for specific sites ydl_opts['extractor_args'] = {} # YouTube settings if 'youtube.com' in url or 'youtu.be' in url: ydl_opts['extractor_args']['youtube'] = { 'player_client': ['android', 'web'], # Use clients that don't require JS } # Instagram settings - improve reliability if 'instagram.com' in url: ydl_opts['extractor_args']['instagram'] = { # Use mobile API for better reliability 'api': ['mobile'], } # Increase timeout specifically for Instagram ydl_opts['socket_timeout'] = 120 # 2 minutes for Instagram ydl_opts['retries'] = 5 # More retries for Instagram logger.info("Instagram URL detected, using optimized settings with increased timeouts and retries") # Add cookies if specified (for Instagram and other sites) if cookies_file: # Resolve cookies file path (support relative and absolute paths) cookies_path = None original_path = Path(cookies_file) # Try multiple locations for cookies file search_paths = [] # 1. If absolute path, use it directly if original_path.is_absolute(): search_paths.append(original_path) else: # 2. Try relative to project root (where this file is located) project_root = Path(__file__).parent.parent.parent.parent search_paths.append(project_root / cookies_file) # 3. Try relative to current working directory import os cwd = Path(os.getcwd()) search_paths.append(cwd / cookies_file) # 4. Try just the filename in current directory search_paths.append(Path(cookies_file).resolve()) # Find first existing path for path in search_paths: if path.exists() and path.is_file(): cookies_path = path break if cookies_path and cookies_path.exists(): ydl_opts['cookiefile'] = str(cookies_path) logger.info(f"Using cookies from file: {cookies_path} (resolved from: {cookies_file})") else: logger.warning( f"Cookies file not found. Searched in:\n" f" - {chr(10).join(f' {p}' for p in search_paths)}\n" f"Original path: {cookies_file}. Continuing without cookies." ) def run_download(): """Synchronous function to execute in separate thread""" # This function runs in a separate thread (ThreadPoolExecutor) # progress hook will be called from this thread and use # run_coroutine_threadsafe for safe call in main event loop try: with yt_dlp.YoutubeDL(ydl_opts) as ydl: # Check for cancellation before start if cancel_event and cancel_event.is_set(): raise KeyboardInterrupt("Download cancelled") # Get video information info = ydl.extract_info(url, download=False) # Check for cancellation after getting info if cancel_event and cancel_event.is_set(): raise KeyboardInterrupt("Download cancelled") # Download (progress hook will be called from this thread) # Note: Some postprocessors may show errors (like FixupM3u8 with aspect ratio), # but the video file is still downloaded correctly try: ydl.download([url]) except Exception as download_error: error_msg = str(download_error) error_lower = error_msg.lower() # Check if it's just a postprocessing error (video is already downloaded) if "Postprocessing" in error_msg or "aspect ratio" in error_lower: logger.warning( f"Postprocessing error (non-critical): {error_msg}. " f"Video file should still be available. Will check file existence." ) # Don't raise - video is likely already downloaded # Check for Instagram-specific errors elif 'instagram.com' in url: if 'timeout' in error_lower or 'timed out' in error_lower: logger.error(f"Instagram download timeout: {error_msg}") raise Exception( f"Instagram download timeout. This may be due to:\n" f"- Network issues\n" f"- Instagram rate limiting\n" f"- Missing or expired cookies\n" f"Please try again later or check your cookies file." ) elif 'incompleteread' in error_lower or 'incomplete read' in error_lower: logger.error(f"Instagram incomplete read: {error_msg}") raise Exception( f"Instagram download incomplete. This may be due to:\n" f"- Network instability\n" f"- Instagram server issues\n" f"Please try again." ) elif 'unable to download webpage' in error_lower: logger.error(f"Instagram webpage download failed: {error_msg}") raise Exception( f"Failed to access Instagram content. This may be due to:\n" f"- Private or deleted post\n" f"- Missing or expired cookies\n" f"- Instagram blocking requests\n" f"Please check the URL and cookies file." ) else: # Other Instagram errors logger.error(f"Instagram download error: {error_msg}") raise Exception(f"Instagram download failed: {error_msg}") else: # Real error for other sites, re-raise raise return info except KeyboardInterrupt: # Interrupt download on cancellation logger.info("Download interrupted") raise # Execute in executor for non-blocking download # None uses ThreadPoolExecutor by default # This ensures download doesn't block message processing # Event loop continues processing messages in parallel with download info = await loop.run_in_executor(None, run_download) # Search for downloaded file title = info.get('title', 'video') # Clean title from invalid characters title = "".join(c for c in title if c.isalnum() or c in (' ', '-', '_')).strip() ext = info.get('ext', 'mp4') logger.info(f"Searching for downloaded file. Title: {title}, ext: {ext}, task_id: {task_id}") # Form filename with task_id if task_id: filename = f"{title}_[task_{task_id}].{ext}" else: filename = f"{title}.{ext}" file_path = Path(output_dir) / filename logger.debug(f"Expected file path: {file_path}") # If file not found at expected path, search in directory if not file_path.exists(): logger.info(f"File not found at expected path {file_path}, starting search...") # If task_id exists, search for file with this task_id if task_id: # Pattern 1: exact match with task_id pattern = f"*[task_{task_id}].{ext}" files = list(Path(output_dir).glob(pattern)) logger.debug(f"Search by pattern '{pattern}': found {len(files)} files") if not files: # Pattern 2: search files containing task_id (in case format differs slightly) pattern2 = f"*task_{task_id}*.{ext}" files = list(Path(output_dir).glob(pattern2)) logger.debug(f"Search by pattern '{pattern2}': found {len(files)} files") if files: # Take newest file from found ones file_path = max(files, key=lambda p: p.stat().st_mtime) logger.info(f"Found file by task_id: {file_path}") else: # If not found by task_id, search newest file with this extension logger.info(f"File with task_id {task_id} not found, searching newest .{ext} file") files = list(Path(output_dir).glob(f"*.{ext}")) if files: # Filter files created recently (last 5 minutes) import time current_time = time.time() recent_files = [ f for f in files if (current_time - f.stat().st_mtime) < 300 # 5 minutes ] if recent_files: file_path = max(recent_files, key=lambda p: p.stat().st_mtime) logger.info(f"Found recently created file: {file_path}") else: file_path = max(files, key=lambda p: p.stat().st_mtime) logger.warning(f"No recent files found, taking newest: {file_path}") else: # Search file by extension files = list(Path(output_dir).glob(f"*.{ext}")) if files: # Take newest file file_path = max(files, key=lambda p: p.stat().st_mtime) logger.info(f"Found file by time: {file_path}") if file_path.exists(): file_size = file_path.stat().st_size logger.info(f"File found: {file_path}, size: {file_size / (1024*1024):.2f} MB") # Post-process video to fix aspect ratio for mobile compatibility if ext.lower() in ['mp4', 'mov', 'avi', 'mkv', 'webm']: fixed_file_path = await fix_video_aspect_ratio(str(file_path)) if fixed_file_path: file_path = Path(fixed_file_path) logger.info(f"Video aspect ratio fixed: {file_path}") return { 'file_path': str(file_path), 'title': title, 'duration': info.get('duration'), 'thumbnail': info.get('thumbnail'), 'size': file_path.stat().st_size } else: # Output list of all files in directory for debugging all_files = list(Path(output_dir).glob("*")) logger.error( f"File not found after download: {file_path}\n" f"Files in downloads directory: {[str(f.name) for f in all_files[:10]]}" ) return None except Exception as e: logger.error(f"Error downloading via yt-dlp: {e}", exc_info=True) return None async def get_media_info(url: str, cookies_file: Optional[str] = None) -> Optional[Dict]: """ Get media information without downloading Args: url: Media URL cookies_file: Path to cookies file (optional) Returns: Dictionary with information or None """ try: loop = asyncio.get_running_loop() ydl_opts = { 'quiet': True, 'no_warnings': True, } # Add cookies if specified if cookies_file: # Resolve cookies file path (support relative and absolute paths) cookies_path = None original_path = Path(cookies_file) # Try multiple locations for cookies file search_paths = [] # 1. If absolute path, use it directly if original_path.is_absolute(): search_paths.append(original_path) else: # 2. Try relative to project root (where this file is located) project_root = Path(__file__).parent.parent.parent.parent search_paths.append(project_root / cookies_file) # 3. Try relative to current working directory import os cwd = Path(os.getcwd()) search_paths.append(cwd / cookies_file) # 4. Try just the filename in current directory search_paths.append(Path(cookies_file).resolve()) # Find first existing path for path in search_paths: if path.exists() and path.is_file(): cookies_path = path break if cookies_path and cookies_path.exists(): ydl_opts['cookiefile'] = str(cookies_path) logger.debug(f"Using cookies to get info: {cookies_path} (resolved from: {cookies_file})") else: logger.warning( f"Cookies file not found for get_media_info. Searched in:\n" f" - {chr(10).join(f' {p}' for p in search_paths)}\n" f"Original path: {cookies_file}" ) def extract_info_sync(): """Synchronous function for extracting information""" with yt_dlp.YoutubeDL(ydl_opts) as ydl: return ydl.extract_info(url, download=False) # Run synchronous yt-dlp in executor to avoid blocking event loop info = await loop.run_in_executor(None, extract_info_sync) return { 'title': info.get('title'), 'duration': info.get('duration'), 'thumbnail': info.get('thumbnail'), 'uploader': info.get('uploader'), 'view_count': info.get('view_count'), } except Exception as e: logger.error(f"Error getting media info: {e}", exc_info=True) return None async def get_videos_list(url: str, cookies_file: Optional[str] = None) -> Optional[Dict]: """ Get list of videos from webpage Args: url: Webpage URL cookies_file: Path to cookies file (optional) Returns: Dictionary with: - 'type': 'playlist' or 'video' - 'videos': List of video dictionaries with 'id', 'url', 'title', 'duration', 'thumbnail' - 'playlist_title': Title of playlist/page (if playlist) or None if error """ try: loop = asyncio.get_running_loop() ydl_opts = { 'quiet': True, 'no_warnings': True, 'extract_flat': 'in_playlist', # Extract flat for playlist entries, full for single videos } # Add cookies if specified if cookies_file: cookies_path = None original_path = Path(cookies_file) search_paths = [] if original_path.is_absolute(): search_paths.append(original_path) else: project_root = Path(__file__).parent.parent.parent.parent search_paths.append(project_root / cookies_file) import os cwd = Path(os.getcwd()) search_paths.append(cwd / cookies_file) search_paths.append(Path(cookies_file).resolve()) for path in search_paths: if path.exists() and path.is_file(): cookies_path = path break if cookies_path and cookies_path.exists(): ydl_opts['cookiefile'] = str(cookies_path) def extract_info_sync(): """Synchronous function for extracting information""" with yt_dlp.YoutubeDL(ydl_opts) as ydl: return ydl.extract_info(url, download=False) # Extract info without downloading info = await loop.run_in_executor(None, extract_info_sync) if not info: return None # Check if it's a playlist or single video _type = info.get('_type', 'video') entries = info.get('entries', []) if _type == 'playlist' and entries: # It's a playlist - extract entries videos = [] for entry in entries[:20]: # Limit to 20 videos to avoid timeout if entry: entry_url = entry.get('url') or entry.get('webpage_url') if not entry_url: continue videos.append({ 'id': entry.get('id'), 'url': entry_url, 'title': entry.get('title', 'Unknown'), 'duration': entry.get('duration'), 'thumbnail': entry.get('thumbnail'), }) if videos: return { 'type': 'playlist', 'videos': videos, 'playlist_title': info.get('title', 'Playlist'), } else: # No valid entries found, treat as single video return { 'type': 'video', 'videos': [{ 'id': info.get('id'), 'url': url, 'title': info.get('title', 'Video'), 'duration': info.get('duration'), 'thumbnail': info.get('thumbnail'), }], 'playlist_title': None, } else: # Single video return { 'type': 'video', 'videos': [{ 'id': info.get('id'), 'url': url, 'title': info.get('title', 'Video'), 'duration': info.get('duration'), 'thumbnail': info.get('thumbnail'), }], 'playlist_title': None, } except Exception as e: logger.error(f"Error getting videos list: {e}", exc_info=True) return None