Files
tg_loader/bot/modules/media_loader/ytdlp.py
2025-12-05 22:53:59 +03:00

910 lines
39 KiB
Python

"""
Downloads via yt-dlp
"""
import yt_dlp
from pathlib import Path
from typing import Optional, Dict, Callable
import asyncio
import threading
import logging
import time
import shutil
import json
logger = logging.getLogger(__name__)
class YtDlpErrorFilter:
"""Filter to suppress non-critical yt-dlp errors from stderr"""
def __init__(self, original_stderr):
self.original_stderr = original_stderr
self.buffer = []
def write(self, text):
"""Filter stderr output from yt-dlp"""
text_lower = text.lower()
# Suppress "Unable to extract title" errors - they're not critical
if "Unable to extract title" in text:
# Log as debug instead of error
logger.debug(f"yt-dlp: {text.strip()}")
return
# Suppress other non-critical extraction errors
if "Unable to extract" in text and ("title" in text_lower or "metadata" in text_lower):
logger.debug(f"yt-dlp: {text.strip()}")
return
# Suppress "Unable to download webpage" errors that are often non-critical
# These can occur due to network issues but yt-dlp may still succeed with retries
if "Unable to download webpage" in text:
# Check if it's a partial read (often recoverable)
if "bytes read" in text_lower or "incompleteread" in text_lower:
logger.debug(f"yt-dlp: {text.strip()} (may retry)")
return
# For other cases, still log as warning but don't show as ERROR
logger.warning(f"yt-dlp: {text.strip()}")
return
# Write everything else to original stderr
self.original_stderr.write(text)
self.original_stderr.flush()
def flush(self):
self.original_stderr.flush()
async def fix_video_aspect_ratio(video_path: str) -> Optional[str]:
"""
Fix video aspect ratio metadata for mobile compatibility
This function ensures that video has correct aspect ratio metadata
so it displays correctly on mobile devices (not as square)
Args:
video_path: Path to video file
Returns:
Path to fixed video file (same file if fixed in place, or new file)
"""
try:
# Check if ffmpeg is available
if not shutil.which('ffmpeg'):
logger.warning("ffmpeg not found, skipping aspect ratio fix")
return None
# Check if ffprobe is available
if not shutil.which('ffprobe'):
logger.warning("ffprobe not found, skipping aspect ratio fix")
return None
video_file = Path(video_path)
if not video_file.exists():
logger.warning(f"Video file not found: {video_path}")
return None
# Get video information to check aspect ratio
probe_cmd = [
'ffprobe', '-v', 'error',
'-select_streams', 'v:0',
'-show_entries', 'stream=width,height,display_aspect_ratio,sample_aspect_ratio',
'-of', 'json',
str(video_path)
]
proc = await asyncio.create_subprocess_exec(
*probe_cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await proc.communicate()
if proc.returncode != 0:
logger.warning(f"Failed to probe video: {stderr.decode()}")
return None
try:
probe_data = json.loads(stdout.decode())
streams = probe_data.get('streams', [])
if not streams:
logger.warning("No video streams found")
return None
stream = streams[0]
width = stream.get('width')
height = stream.get('height')
dar = stream.get('display_aspect_ratio') # Display Aspect Ratio
sar = stream.get('sample_aspect_ratio') # Sample Aspect Ratio
if not width or not height:
logger.warning("Could not get video dimensions")
return None
# Calculate expected aspect ratio
expected_dar = f"{width}:{height}"
# If aspect ratio metadata is missing or incorrect, fix it
needs_fix = False
if not dar or dar == 'N/A':
needs_fix = True
logger.info(f"Video missing aspect ratio metadata (DAR), will fix to {expected_dar}")
elif dar != expected_dar:
# Check if the difference is significant
try:
dar_parts = dar.split(':')
if len(dar_parts) == 2:
dar_ratio = float(dar_parts[0]) / float(dar_parts[1])
expected_ratio = width / height
if abs(dar_ratio - expected_ratio) > 0.01: # More than 1% difference
needs_fix = True
logger.info(f"Video has incorrect aspect ratio {dar}, will fix to {expected_dar}")
except (ValueError, ZeroDivisionError):
needs_fix = True
if not needs_fix:
logger.debug(f"Video aspect ratio is correct: {dar} (dimensions: {width}x{height})")
return None
# Fix aspect ratio by remuxing with correct metadata
# Use temp file to avoid corruption if process fails
temp_file = video_file.with_suffix('.tmp' + video_file.suffix)
fix_cmd = [
'ffmpeg', '-i', str(video_path),
'-c', 'copy', # Copy streams without re-encoding
'-aspect', expected_dar, # Set correct aspect ratio
'-movflags', '+faststart', # Optimize for streaming
'-y', # Overwrite
str(temp_file)
]
proc = await asyncio.create_subprocess_exec(
*fix_cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
await proc.communicate()
if proc.returncode == 0 and temp_file.exists():
# Replace original file with fixed one
video_file.unlink()
temp_file.rename(video_file)
logger.info(f"Video aspect ratio fixed: {video_path} (DAR: {expected_dar})")
return str(video_file)
else:
logger.warning(f"Failed to fix aspect ratio, keeping original file")
if temp_file.exists():
temp_file.unlink()
return None
except json.JSONDecodeError as e:
logger.warning(f"Failed to parse ffprobe output: {e}")
return None
except Exception as e:
logger.error(f"Error fixing video aspect ratio: {e}", exc_info=True)
return None
def create_progress_hook(progress_callback: Optional[Callable] = None, event_loop=None, cancel_event: Optional[threading.Event] = None, last_update_time: list = None):
"""
Create progress hook for tracking download progress
Args:
progress_callback: Async callback function for updating progress
event_loop: Event loop from main thread (for calling from executor)
cancel_event: Event for checking download cancellation
last_update_time: List to store last update time (for rate limiting)
Returns:
Hook function for yt-dlp
"""
if last_update_time is None:
last_update_time = [0]
def progress_hook(d: dict):
# Check for cancellation
if cancel_event and cancel_event.is_set():
raise KeyboardInterrupt("Download cancelled")
if d.get('status') == 'downloading':
percent = 0
if 'total_bytes' in d and d['total_bytes']:
percent = (d.get('downloaded_bytes', 0) / d['total_bytes']) * 100
elif 'total_bytes_estimate' in d and d['total_bytes_estimate']:
percent = (d.get('downloaded_bytes', 0) / d['total_bytes_estimate']) * 100
# Limit update frequency (no more than once per second)
current_time = time.time()
if progress_callback and percent > 0 and event_loop and (current_time - last_update_time[0] >= 1.0):
try:
last_update_time[0] = current_time
# Use provided event loop for safe call from another thread
# run_coroutine_threadsafe doesn't block current thread and doesn't block event loop
future = asyncio.run_coroutine_threadsafe(
progress_callback(int(percent)),
event_loop
)
# Don't wait for completion (future.result()) to avoid blocking download
except Exception as e:
logger.debug(f"Error updating progress: {e}")
return progress_hook
async def download_media(
url: str,
output_dir: str = "downloads",
quality: str = "best",
progress_callback: Optional[Callable] = None,
cookies_file: Optional[str] = None,
cancel_event: Optional[threading.Event] = None,
task_id: Optional[int] = None
) -> Optional[Dict]:
"""
Download media via yt-dlp
Args:
url: Video/media URL
output_dir: Directory for saving
quality: Video quality (best, worst, 720p, etc.)
progress_callback: Function for updating progress (accepts int 0-100)
cookies_file: Path to cookies file (optional)
cancel_event: Event for cancellation check (optional)
task_id: Task ID for unique file naming (optional)
Returns:
Dictionary with downloaded file information or None
"""
try:
# Log cookies file configuration
if cookies_file:
logger.info(f"Cookies file configured: {cookies_file}")
else:
logger.debug("No cookies file configured")
# URL validation
from bot.utils.helpers import is_valid_url
if not is_valid_url(url):
logger.error(f"Invalid or unsafe URL: {url}")
return None
# Create directory
Path(output_dir).mkdir(parents=True, exist_ok=True)
# Check free disk space (minimum 1GB)
import shutil
try:
disk_usage = shutil.disk_usage(output_dir)
free_space_gb = disk_usage.free / (1024 ** 3)
min_free_space_gb = 1.0 # Minimum 1GB free space
if free_space_gb < min_free_space_gb:
logger.error(f"Insufficient free disk space: {free_space_gb:.2f} GB (minimum {min_free_space_gb} GB required)")
return None
except Exception as e:
logger.warning(f"Failed to check free disk space: {e}")
# Get event loop BEFORE starting executor to pass it to progress hook
# Use get_running_loop() for explicit check that we're in async context
try:
loop = asyncio.get_running_loop()
except RuntimeError:
# If no running loop, try to get current one (for backward compatibility)
loop = asyncio.get_event_loop()
# List to store last progress update time
last_update_time = [0]
# Configure yt-dlp with progress hook that uses correct event loop
progress_hook_func = create_progress_hook(
progress_callback,
event_loop=loop,
cancel_event=cancel_event,
last_update_time=last_update_time
)
# Form unique filename with task_id to prevent conflicts
if task_id:
outtmpl = str(Path(output_dir) / f'%(title)s_[task_{task_id}].%(ext)s')
else:
outtmpl = str(Path(output_dir) / '%(title)s.%(ext)s')
# Configure format selector for maximum quality with Telegram/mobile compatibility
# Priority: Prefer already merged formats in mp4 container
# This ensures compatibility with Telegram and mobile devices
if quality == "best":
# Format selector for maximum quality with compatibility:
# 1. Prefer already merged mp4 files (best compatibility, no re-encoding needed)
# 2. bestvideo[ext=mp4]+bestaudio[ext=m4a] (mp4 container, compatible codecs)
# 3. bestvideo+bestaudio (fallback, will be merged to mp4)
# 4. best (best combined format if separate streams not available)
format_selector = (
'best[ext=mp4]/'
'bestvideo[ext=mp4]+bestaudio[ext=m4a]/'
'bestvideo[ext=mp4]+bestaudio/'
'bestvideo+bestaudio/best'
)
else:
# Use custom quality if specified
format_selector = quality
ydl_opts = {
'format': format_selector,
'outtmpl': outtmpl,
'quiet': False,
'no_warnings': False,
'progress_hooks': [progress_hook_func],
# Merge video and audio into single file (if separate streams)
# Use mp4 container for maximum compatibility
'merge_output_format': 'mp4',
# FFmpeg options for merging to ensure compatibility
# Copy streams when possible (no re-encoding), only encode if necessary
'postprocessor_args': {
'ffmpeg': [
'-c:v', 'copy',
'-c:a', 'aac',
'-movflags', '+faststart',
]
},
# Don't prefer free formats (they may be lower quality)
'prefer_free_formats': False,
# Additional options for better quality
'writesubtitles': False,
'writeautomaticsub': False,
'ignoreerrors': True, # Continue on extraction errors (e.g., missing title)
# Network settings for better reliability
'socket_timeout': 60, # Increase socket timeout to 60 seconds
'retries': 5, # Retry failed downloads up to 5 times (increased for network issues)
'fragment_retries': 5, # Retry failed fragments
'file_access_retries': 3, # Retry file access errors
'http_chunk_size': 10485760, # 10MB chunks for better stability
}
# Check if Node.js is available for JS extraction (required for Instagram, TikTok, YouTube, etc.)
import shutil
nodejs_path = shutil.which('node')
if nodejs_path:
logger.info(f"Node.js found at: {nodejs_path}. JS extraction will be available.")
else:
logger.warning(
"Node.js not found. Some sites (Instagram, TikTok, YouTube, etc.) may require JS extraction. "
"Install Node.js for full functionality."
)
# Configure extractor args for specific sites
ydl_opts['extractor_args'] = {}
# YouTube settings
if 'youtube.com' in url or 'youtu.be' in url:
ydl_opts['extractor_args']['youtube'] = {
'player_client': ['android', 'web'], # Use clients that don't require JS
}
# Instagram settings - improve reliability
if 'instagram.com' in url:
ydl_opts['extractor_args']['instagram'] = {
# Use mobile API for better reliability
'api': ['mobile'],
}
# Increase timeout specifically for Instagram
ydl_opts['socket_timeout'] = 120 # 2 minutes for Instagram
ydl_opts['retries'] = 5 # More retries for Instagram
logger.info("Instagram URL detected, using optimized settings with increased timeouts and retries")
# Add cookies if specified (for Instagram and other sites)
if cookies_file:
# Resolve cookies file path (support relative and absolute paths)
cookies_path = None
original_path = Path(cookies_file)
# Try multiple locations for cookies file
search_paths = []
# 1. If absolute path, use it directly
if original_path.is_absolute():
search_paths.append(original_path)
else:
# 2. Try relative to project root (where this file is located)
project_root = Path(__file__).parent.parent.parent.parent
search_paths.append(project_root / cookies_file)
# 3. Try relative to current working directory
import os
cwd = Path(os.getcwd())
search_paths.append(cwd / cookies_file)
# 4. Try just the filename in current directory
search_paths.append(Path(cookies_file).resolve())
# Find first existing path
for path in search_paths:
if path.exists() and path.is_file():
cookies_path = path
break
if cookies_path and cookies_path.exists():
ydl_opts['cookiefile'] = str(cookies_path)
logger.info(f"Using cookies from file: {cookies_path} (resolved from: {cookies_file})")
else:
logger.warning(
f"Cookies file not found. Searched in:\n"
f" - {chr(10).join(f' {p}' for p in search_paths)}\n"
f"Original path: {cookies_file}. Continuing without cookies."
)
def run_download():
"""Synchronous function to execute in separate thread"""
# This function runs in a separate thread (ThreadPoolExecutor)
# progress hook will be called from this thread and use
# run_coroutine_threadsafe for safe call in main event loop
import sys
original_stderr = sys.stderr
error_filter = None
logger.info(f"Starting yt-dlp download for URL: {url}")
try:
# Redirect stderr to filter non-critical errors
error_filter = YtDlpErrorFilter(original_stderr)
sys.stderr = error_filter
logger.info(f"Extracting info for URL: {url}")
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
# Check for cancellation before start
if cancel_event and cancel_event.is_set():
raise KeyboardInterrupt("Download cancelled")
# Get video information
logger.info(f"Extracting video info for: {url}")
info = ydl.extract_info(url, download=False)
logger.info(f"Video info extracted: title={info.get('title', 'N/A')[:50]}, duration={info.get('duration', 'N/A')}")
# Check for cancellation after getting info
if cancel_event and cancel_event.is_set():
raise KeyboardInterrupt("Download cancelled")
# Download (progress hook will be called from this thread)
logger.info(f"Starting download for: {url}")
# Note: Some postprocessors may show errors (like FixupM3u8 with aspect ratio),
# but the video file is still downloaded correctly
try:
ydl.download([url])
except Exception as download_error:
error_msg = str(download_error)
error_lower = error_msg.lower()
# Check if it's a non-critical extraction error (e.g., missing title)
# These errors don't prevent download, just metadata extraction
if "Unable to extract" in error_msg and ("title" in error_lower or "metadata" in error_lower):
logger.debug(
f"Non-critical extraction error (metadata may be missing): {error_msg}. "
f"Video file should still be available. Will check file existence."
)
# Don't raise - video is likely already downloaded
# Check for incomplete read errors (often recoverable with retries)
elif "incompleteread" in error_lower or ("unable to download webpage" in error_lower and "bytes read" in error_lower):
logger.warning(
f"Incomplete read error (may retry or file may still be available): {error_msg}. "
f"Will check if file was downloaded."
)
# Don't raise immediately - file might still be downloaded
# yt-dlp will retry automatically if configured
# Check if it's just a postprocessing error (video is already downloaded)
elif "Postprocessing" in error_msg or "aspect ratio" in error_lower:
logger.warning(
f"Postprocessing error (non-critical): {error_msg}. "
f"Video file should still be available. Will check file existence."
)
# Don't raise - video is likely already downloaded
# Check for Instagram-specific errors
elif 'instagram.com' in url:
if 'timeout' in error_lower or 'timed out' in error_lower:
logger.error(f"Instagram download timeout: {error_msg}")
raise Exception(
f"Instagram download timeout. This may be due to:\n"
f"- Network issues\n"
f"- Instagram rate limiting\n"
f"- Missing or expired cookies\n"
f"Please try again later or check your cookies file."
)
elif 'incompleteread' in error_lower or 'incomplete read' in error_lower:
logger.error(f"Instagram incomplete read: {error_msg}")
raise Exception(
f"Instagram download incomplete. This may be due to:\n"
f"- Network instability\n"
f"- Instagram server issues\n"
f"Please try again."
)
elif 'unable to download webpage' in error_lower:
logger.error(f"Instagram webpage download failed: {error_msg}")
raise Exception(
f"Failed to access Instagram content. This may be due to:\n"
f"- Private or deleted post\n"
f"- Missing or expired cookies\n"
f"- Instagram blocking requests\n"
f"Please check the URL and cookies file."
)
else:
# Other Instagram errors
logger.error(f"Instagram download error: {error_msg}")
raise Exception(f"Instagram download failed: {error_msg}")
else:
# Real error for other sites, re-raise
raise
return info
except KeyboardInterrupt:
# Interrupt download on cancellation
logger.info("Download interrupted")
raise
finally:
# Restore original stderr
if error_filter:
sys.stderr = original_stderr
# Execute in executor for non-blocking download
# None uses ThreadPoolExecutor by default
# This ensures download doesn't block message processing
# Event loop continues processing messages in parallel with download
info = await loop.run_in_executor(None, run_download)
# Search for downloaded file
title = info.get('title', 'video')
# Handle cases where title extraction failed (e.g., PornHub)
if not title or title == 'NA' or title.strip() == '':
# Try to generate title from URL or use default
try:
from urllib.parse import urlparse
parsed = urlparse(url)
if parsed.netloc:
# Use domain name as part of title
domain = parsed.netloc.replace('www.', '').split('.')[0]
title = f"video_from_{domain}"
else:
title = 'video'
except Exception:
title = 'video'
# Clean title from invalid characters
title = "".join(c for c in title if c.isalnum() or c in (' ', '-', '_')).strip()
if not title: # If cleaning removed everything, use default
title = 'video'
ext = info.get('ext', 'mp4')
logger.info(f"Searching for downloaded file. Title: {title}, ext: {ext}, task_id: {task_id}")
# Form filename with task_id
if task_id:
filename = f"{title}_[task_{task_id}].{ext}"
else:
filename = f"{title}.{ext}"
file_path = Path(output_dir) / filename
logger.debug(f"Expected file path: {file_path}")
# If file not found at expected path, search in directory
if not file_path.exists():
logger.info(f"File not found at expected path {file_path}, starting search...")
# If task_id exists, search for file with this task_id
if task_id:
# Pattern 1: exact match with task_id
pattern = f"*[task_{task_id}].{ext}"
files = list(Path(output_dir).glob(pattern))
logger.debug(f"Search by pattern '{pattern}': found {len(files)} files")
if not files:
# Pattern 2: search files containing task_id (in case format differs slightly)
pattern2 = f"*task_{task_id}*.{ext}"
files = list(Path(output_dir).glob(pattern2))
logger.debug(f"Search by pattern '{pattern2}': found {len(files)} files")
if files:
# Take newest file from found ones
file_path = max(files, key=lambda p: p.stat().st_mtime)
logger.info(f"Found file by task_id: {file_path}")
else:
# If not found by task_id, search newest file with this extension
logger.info(f"File with task_id {task_id} not found, searching newest .{ext} file")
files = list(Path(output_dir).glob(f"*.{ext}"))
if files:
# Filter files created recently (last 5 minutes)
import time
current_time = time.time()
recent_files = [
f for f in files
if (current_time - f.stat().st_mtime) < 300 # 5 minutes
]
if recent_files:
file_path = max(recent_files, key=lambda p: p.stat().st_mtime)
logger.info(f"Found recently created file: {file_path}")
else:
file_path = max(files, key=lambda p: p.stat().st_mtime)
logger.warning(f"No recent files found, taking newest: {file_path}")
else:
# Search file by extension
files = list(Path(output_dir).glob(f"*.{ext}"))
if files:
# Take newest file
file_path = max(files, key=lambda p: p.stat().st_mtime)
logger.info(f"Found file by time: {file_path}")
if file_path.exists():
file_size = file_path.stat().st_size
logger.info(f"File found: {file_path}, size: {file_size / (1024*1024):.2f} MB")
# Post-process video to fix aspect ratio for mobile compatibility
if ext.lower() in ['mp4', 'mov', 'avi', 'mkv', 'webm']:
fixed_file_path = await fix_video_aspect_ratio(str(file_path))
if fixed_file_path:
file_path = Path(fixed_file_path)
logger.info(f"Video aspect ratio fixed: {file_path}")
return {
'file_path': str(file_path),
'title': title,
'duration': info.get('duration'),
'thumbnail': info.get('thumbnail'),
'size': file_path.stat().st_size
}
else:
# Output list of all files in directory for debugging
all_files = list(Path(output_dir).glob("*"))
logger.error(
f"File not found after download: {file_path}\n"
f"Files in downloads directory: {[str(f.name) for f in all_files[:10]]}"
)
return None
except Exception as e:
logger.error(f"Error downloading via yt-dlp: {e}", exc_info=True)
return None
async def get_media_info(url: str, cookies_file: Optional[str] = None) -> Optional[Dict]:
"""
Get media information without downloading
Args:
url: Media URL
cookies_file: Path to cookies file (optional)
Returns:
Dictionary with information or None
"""
try:
loop = asyncio.get_running_loop()
ydl_opts = {
'quiet': True,
'no_warnings': True,
}
# Add cookies if specified
if cookies_file:
# Resolve cookies file path (support relative and absolute paths)
cookies_path = None
original_path = Path(cookies_file)
# Try multiple locations for cookies file
search_paths = []
# 1. If absolute path, use it directly
if original_path.is_absolute():
search_paths.append(original_path)
else:
# 2. Try relative to project root (where this file is located)
project_root = Path(__file__).parent.parent.parent.parent
search_paths.append(project_root / cookies_file)
# 3. Try relative to current working directory
import os
cwd = Path(os.getcwd())
search_paths.append(cwd / cookies_file)
# 4. Try just the filename in current directory
search_paths.append(Path(cookies_file).resolve())
# Find first existing path
for path in search_paths:
if path.exists() and path.is_file():
cookies_path = path
break
if cookies_path and cookies_path.exists():
ydl_opts['cookiefile'] = str(cookies_path)
logger.debug(f"Using cookies to get info: {cookies_path} (resolved from: {cookies_file})")
else:
logger.warning(
f"Cookies file not found for get_media_info. Searched in:\n"
f" - {chr(10).join(f' {p}' for p in search_paths)}\n"
f"Original path: {cookies_file}"
)
def extract_info_sync():
"""Synchronous function for extracting information"""
import sys
original_stderr = sys.stderr
error_filter = None
try:
# Redirect stderr to filter non-critical errors
error_filter = YtDlpErrorFilter(original_stderr)
sys.stderr = error_filter
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
return ydl.extract_info(url, download=False)
finally:
# Restore original stderr
if error_filter:
sys.stderr = original_stderr
# Run synchronous yt-dlp in executor to avoid blocking event loop
info = await loop.run_in_executor(None, extract_info_sync)
return {
'title': info.get('title'),
'duration': info.get('duration'),
'thumbnail': info.get('thumbnail'),
'uploader': info.get('uploader'),
'view_count': info.get('view_count'),
}
except Exception as e:
logger.error(f"Error getting media info: {e}", exc_info=True)
return None
async def get_videos_list(url: str, cookies_file: Optional[str] = None) -> Optional[Dict]:
"""
Get list of videos from webpage
Args:
url: Webpage URL
cookies_file: Path to cookies file (optional)
Returns:
Dictionary with:
- 'type': 'playlist' or 'video'
- 'videos': List of video dictionaries with 'id', 'url', 'title', 'duration', 'thumbnail'
- 'playlist_title': Title of playlist/page (if playlist)
or None if error
"""
try:
loop = asyncio.get_running_loop()
ydl_opts = {
'quiet': True,
'no_warnings': True,
'extract_flat': 'in_playlist', # Extract flat for playlist entries, full for single videos
# Note: ignoreerrors is NOT set here - it will be enabled in exception handler if needed
}
# Add cookies if specified
if cookies_file:
cookies_path = None
original_path = Path(cookies_file)
search_paths = []
if original_path.is_absolute():
search_paths.append(original_path)
else:
project_root = Path(__file__).parent.parent.parent.parent
search_paths.append(project_root / cookies_file)
import os
cwd = Path(os.getcwd())
search_paths.append(cwd / cookies_file)
search_paths.append(Path(cookies_file).resolve())
for path in search_paths:
if path.exists() and path.is_file():
cookies_path = path
break
if cookies_path and cookies_path.exists():
ydl_opts['cookiefile'] = str(cookies_path)
def extract_info_sync():
"""Synchronous function for extracting information"""
import sys
original_stderr = sys.stderr
error_filter = None
try:
# Redirect stderr to filter non-critical errors
error_filter = YtDlpErrorFilter(original_stderr)
sys.stderr = error_filter
try:
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
return ydl.extract_info(url, download=False)
except Exception as e:
# Log but don't fail completely - some metadata might still be available
logger.warning(f"Error extracting info (some metadata may be missing): {e}")
# Try to extract with ignoreerrors to get partial info
ydl_opts['ignoreerrors'] = True
try:
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
return ydl.extract_info(url, download=False)
except Exception as e2:
logger.error(f"Failed to extract info even with ignoreerrors: {e2}")
raise
finally:
# Restore original stderr
if error_filter:
sys.stderr = original_stderr
# Extract info without downloading
info = await loop.run_in_executor(None, extract_info_sync)
if not info:
return None
# Check if it's a playlist or single video
_type = info.get('_type', 'video')
entries = info.get('entries', [])
if _type == 'playlist' and entries:
# It's a playlist - extract entries
videos = []
for entry in entries[:20]: # Limit to 20 videos to avoid timeout
if entry:
entry_url = entry.get('url') or entry.get('webpage_url')
if not entry_url:
continue
# Handle missing title gracefully
title = entry.get('title')
if not title or title == 'NA':
# Try to generate a title from URL or use default
try:
from urllib.parse import urlparse
parsed = urlparse(entry_url)
title = f"Video from {parsed.netloc}" if parsed.netloc else "Video"
except Exception:
title = "Video"
videos.append({
'id': entry.get('id'),
'url': entry_url,
'title': title,
'duration': entry.get('duration'),
'thumbnail': entry.get('thumbnail'),
})
if videos:
return {
'type': 'playlist',
'videos': videos,
'playlist_title': info.get('title', 'Playlist'),
}
else:
# No valid entries found, treat as single video
return {
'type': 'video',
'videos': [{
'id': info.get('id'),
'url': url,
'title': info.get('title', 'Video'),
'duration': info.get('duration'),
'thumbnail': info.get('thumbnail'),
}],
'playlist_title': None,
}
else:
# Single video
return {
'type': 'video',
'videos': [{
'id': info.get('id'),
'url': url,
'title': info.get('title', 'Video'),
'duration': info.get('duration'),
'thumbnail': info.get('thumbnail'),
}],
'playlist_title': None,
}
except Exception as e:
logger.error(f"Error getting videos list: {e}", exc_info=True)
return None