From 4c41834dc7fab2fae34b3ea65941eeb8dba08696 Mon Sep 17 00:00:00 2001 From: holden093 Date: Mon, 15 Jun 2026 08:03:41 +0200 Subject: [PATCH] fix(youtube): consolidate duplicate handler Make src.youtube_handler a compatibility wrapper around services.youtube.youtube_handler so transcript state, URL parsing, and timeout behavior no longer diverge. --- services/youtube/youtube_handler.py | 61 +++- src/youtube_handler.py | 291 ++------------------ tests/test_youtube_handler_consolidation.py | 104 +++++++ 3 files changed, 168 insertions(+), 288 deletions(-) create mode 100644 tests/test_youtube_handler_consolidation.py diff --git a/services/youtube/youtube_handler.py b/services/youtube/youtube_handler.py index b36989e8d..d1b1e9b91 100644 --- a/services/youtube/youtube_handler.py +++ b/services/youtube/youtube_handler.py @@ -64,20 +64,40 @@ def is_youtube_url(url: str) -> bool: return "youtube.com" in url or "youtu.be" in url +# youtube.com-shaped hosts. music.youtube.com serves the same /watch and +# /shorts paths, so links shared from YouTube Music must resolve too. +_YT_HOSTS = ("www.youtube.com", "youtube.com", "m.youtube.com", "music.youtube.com") +# Path prefixes whose first following segment is the video id. Covers the +# /embed/ player, Shorts (/shorts/), live streams (/live/), and the legacy +# /v/ embed — all of which `is_youtube_url` already treats as YouTube, so +# they must be extractable or the link is silently dropped (neither web-fetched +# nor transcript-fetched) by the chat pipeline. +_YT_PATH_PREFIXES = ("/embed/", "/shorts/", "/live/", "/v/") + + def extract_youtube_id(url: str) -> Optional[str]: - """Extract YouTube video ID from various URL formats.""" + """Extract a YouTube video ID from the common URL shapes: + watch?v=, youtu.be/, /embed/, /shorts/, /live/, /v/, + across youtube.com / m.youtube.com / music.youtube.com / youtu.be.""" if not isinstance(url, str): return None parsed = urllib.parse.urlparse(url) - if parsed.hostname in ("www.youtube.com", "youtube.com", "m.youtube.com"): + host = (parsed.hostname or "").lower() + if host in _YT_HOSTS: if parsed.path == "/watch": params = urllib.parse.parse_qs(parsed.query) - if "v" in params: + if params.get("v"): return params["v"][0] - elif parsed.path.startswith("/embed/"): - return parsed.path.split("/")[-1] - elif parsed.hostname == "youtu.be": - return parsed.path[1:] + else: + for prefix in _YT_PATH_PREFIXES: + if parsed.path.startswith(prefix): + vid = parsed.path[len(prefix):].split("/")[0] + if vid: + return vid + elif host == "youtu.be": + vid = parsed.path.lstrip("/").split("/")[0] + if vid: + return vid return None @@ -170,6 +190,8 @@ def format_transcript_for_context( if segments: ctx += "Timestamped Transcript:\n" for seg in segments: + if not isinstance(seg, dict): + continue ctx += f"[{seg['timestamp']}] {seg['text']}\n" # Check length — fall back to plain text if too long if len(ctx) > 12000: @@ -202,15 +224,24 @@ async def fetch_youtube_comments( f"https://www.youtube.com/watch?v={video_id}", ] - proc = await asyncio.wait_for( - asyncio.create_subprocess_exec( - *cmd, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - ), - timeout=timeout, + proc = await asyncio.create_subprocess_exec( + *cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, ) - stdout, stderr = await proc.communicate() + # Bound the wait on the process actually finishing, not on spawning it. + # create_subprocess_exec returns as soon as the child starts, so wrapping + # it in wait_for never enforces the timeout — proc.communicate() is the + # blocking step. Kill and reap the child if it overruns so it does not + # linger after we return. + try: + stdout, stderr = await asyncio.wait_for( + proc.communicate(), timeout=timeout + ) + except asyncio.TimeoutError: + proc.kill() + await proc.wait() + raise if proc.returncode != 0: return {"success": False, "error": f"yt-dlp failed: {stderr.decode()[:200]}", "comments": []} diff --git a/src/youtube_handler.py b/src/youtube_handler.py index 001847535..0f9eec263 100644 --- a/src/youtube_handler.py +++ b/src/youtube_handler.py @@ -1,278 +1,23 @@ -""" -YouTube handling — transcript extraction, comment fetching (yt-dlp), -and context formatting for LLM injection. Used by chat_handler.py. +"""Compatibility wrapper for the canonical services.youtube.youtube_handler module. + +Odysseus historically carried two independent copies of the YouTube handler — +one here under ``src`` and one under ``services.youtube``. They drifted: the +comment-fetch timeout fix landed only in the ``src`` copy, while ``app.py`` +calls ``services.youtube.init_youtube()`` at startup. Because the chat flow +imported ``extract_transcript_async`` from ``src.youtube_handler`` (a different +module object), the ``YOUTUBE_AVAILABLE`` / ``YouTubeTranscriptApi`` globals set +by ``init_youtube`` never reached it and transcript extraction always reported +"YouTube transcript API not available". + +Keep the old ``src.youtube_handler`` import path working, but make it resolve to +the single source of truth so module state and behavior can't diverge again. """ -import asyncio -import json -import logging -import shutil +import importlib import sys -import urllib.parse -from pathlib import Path -from typing import Dict, Any, Optional -logger = logging.getLogger(__name__) +# Import the canonical module directly (services.youtube.youtube_handler) +# without triggering the heavy services/__init__.py top-level imports. +_youtube_handler = importlib.import_module("services.youtube.youtube_handler") -# --------------------------------------------------------------------------- -# Constants -# --------------------------------------------------------------------------- - -YOUTUBE_INSTRUCTION_PROMPT = """When the user shares a YouTube video, respond with a structured breakdown: - -1. **Summary** — Concise overview of the video's content and main thesis (2-4 sentences) -2. **Key Points** — Bullet list of the most important topics, arguments, or moments -3. **Notable Timestamps** — If timestamps are available from the transcript, highlight 3-5 interesting moments with their approximate timestamps (e.g. "03:45 — discusses X") -4. **Audience Reception** — If comments are available, summarize what viewers think: general sentiment, top reactions, any debate or controversy - -Keep it conversational and concise. Do NOT web search for this video — use only the transcript and comments provided.""" - -# --------------------------------------------------------------------------- -# Init / helpers -# --------------------------------------------------------------------------- - -# Will be set at startup by init_youtube() -YouTubeTranscriptApi = None -YOUTUBE_AVAILABLE = False - - -def _find_ytdlp() -> str: - """Find the yt-dlp binary: venv bin first, then system PATH.""" - venv_bin = Path(sys.executable).parent / "yt-dlp" - if venv_bin.exists(): - return str(venv_bin) - found = shutil.which("yt-dlp") - return found or "yt-dlp" - - -def init_youtube(): - """Import and cache the YouTube transcript API.""" - global YouTubeTranscriptApi, YOUTUBE_AVAILABLE - try: - from youtube_transcript_api import YouTubeTranscriptApi as _Api - YouTubeTranscriptApi = _Api - YOUTUBE_AVAILABLE = True - logger.info("YouTube transcript API available") - except ImportError as e: - logger.warning(f"youtube-transcript-api not installed: {e}") - YOUTUBE_AVAILABLE = False - - -def is_youtube_url(url: str) -> bool: - if not isinstance(url, str): - return False - return "youtube.com" in url or "youtu.be" in url - - -def extract_youtube_id(url: str) -> Optional[str]: - """Extract YouTube video ID from various URL formats.""" - parsed = urllib.parse.urlparse(url) - if parsed.hostname in ("www.youtube.com", "youtube.com", "m.youtube.com"): - if parsed.path == "/watch": - params = urllib.parse.parse_qs(parsed.query) - if "v" in params: - return params["v"][0] - elif parsed.path.startswith("/embed/"): - return parsed.path.split("/")[-1] - elif parsed.hostname == "youtu.be": - return parsed.path[1:] - return None - - -async def extract_transcript_async( - url: str, video_id: str, max_retries: int = 3 -) -> Dict[str, Any]: - """ - Async YouTube transcript extraction with retries. - - Args: - url: Full YouTube URL - video_id: Extracted video ID - max_retries: Number of attempts - - Returns: - Dict with success/error/transcript keys - """ - if not YOUTUBE_AVAILABLE or YouTubeTranscriptApi is None: - return {"success": False, "error": "YouTube transcript API not available", "transcript": None} - - for attempt in range(max_retries): - try: - api = YouTubeTranscriptApi() - transcript = api.fetch(video_id) - transcript_list = list(transcript) - - formatted = [] - for snippet in transcript_list: - text = snippet.text.strip() - if not text: - continue - start = snippet.start - formatted.append({ - "text": text, - "start": start, - "duration": snippet.duration, - "timestamp": f"{int(start // 60):02d}:{int(start % 60):02d}", - }) - - full_text = " ".join(e["text"] for e in formatted) - max_len = 8000 - if len(full_text) > max_len: - full_text = full_text[:max_len] + "... [transcript truncated]" - - return { - "success": True, - "transcript": full_text, - "video_id": video_id, - "language": "en", - "is_generated": False, - "segments": formatted, - } - except Exception as e: - logger.warning(f"Transcript attempt {attempt + 1} failed: {e}") - if attempt < max_retries - 1: - await asyncio.sleep(1 * (attempt + 1)) - - return {"success": False, "error": f"Failed after {max_retries} attempts", "transcript": None} - - -def format_transcript_for_context( - transcript_data: Dict[str, Any], url: str, - title: str = "", channel: str = "" -) -> str: - """Format transcript data for inclusion in LLM context.""" - if not transcript_data.get("success"): - header = "" - if title: - header = f" \"{title}\"" - if channel: - header += f" by {channel}" - return f"\n[YouTube Video{header}: Transcript unavailable ({transcript_data.get('error', 'Unknown error')}). Use the comments below if available, do NOT web search for this video.]" - - transcript = transcript_data.get("transcript", "") - video_id = transcript_data.get("video_id", "") - language = transcript_data.get("language", "unknown") - is_generated = transcript_data.get("is_generated", False) - segments = transcript_data.get("segments", []) - - ctx = "\n[YOUTUBE VIDEO TRANSCRIPT]\n" - if title: - ctx += f"Title: {title}\n" - if channel: - ctx += f"Channel: {channel}\n" - ctx += f"Video ID: {video_id}\n" - ctx += f"Language: {language}\n" - ctx += f"Source: {'Auto-generated' if is_generated else 'Manual'}\n" - ctx += f"URL: {url}\n\n" - # Include timestamped segments for the LLM to reference - if segments: - ctx += "Timestamped Transcript:\n" - for seg in segments: - if not isinstance(seg, dict): - continue - ctx += f"[{seg['timestamp']}] {seg['text']}\n" - # Check length — fall back to plain text if too long - if len(ctx) > 12000: - ctx = ctx[:ctx.index("Timestamped Transcript:\n")] - ctx += "Transcript:\n" - ctx += transcript - else: - ctx += "Transcript:\n" - ctx += transcript - ctx += "\n[END TRANSCRIPT]\n" - return ctx - - -async def fetch_youtube_comments( - video_id: str, max_comments: int = 25, timeout: int = 30 -) -> Dict[str, Any]: - """Fetch top comments for a YouTube video using yt-dlp. - - Returns dict with 'success', 'comments' list, 'error'. - """ - try: - cmd = [ - _find_ytdlp(), - "--skip-download", - "--write-comments", - "--extractor-args", f"youtube:max_comments={max_comments},all,100,0", - "--dump-json", - "--js-runtimes", "node", - "--remote-components", "ejs:github", - f"https://www.youtube.com/watch?v={video_id}", - ] - - proc = await asyncio.create_subprocess_exec( - *cmd, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - ) - # Bound the wait on the process actually finishing, not on spawning it. - # create_subprocess_exec returns as soon as the child starts, so wrapping - # it in wait_for never enforces the timeout — proc.communicate() is the - # blocking step. Kill and reap the child if it overruns so it does not - # linger after we return. - try: - stdout, stderr = await asyncio.wait_for( - proc.communicate(), timeout=timeout - ) - except asyncio.TimeoutError: - proc.kill() - await proc.wait() - raise - - if proc.returncode != 0: - return {"success": False, "error": f"yt-dlp failed: {stderr.decode()[:200]}", "comments": []} - - data = json.loads(stdout.decode()) - title = data.get("title", "") - channel = data.get("channel", "") or data.get("uploader", "") - raw_comments = data.get("comments", []) - - comments = [] - for c in raw_comments[:max_comments]: - text = (c.get("text") or "").strip() - if not text: - continue - comments.append({ - "author": c.get("author", "Unknown"), - "text": text, - "likes": c.get("like_count", 0), - }) - - # Sort by likes descending — most popular comments first - comments.sort(key=lambda x: x.get("likes", 0), reverse=True) - - return {"success": True, "comments": comments, "count": len(comments), - "title": title, "channel": channel} - - except asyncio.TimeoutError: - logger.warning(f"Comment fetch timed out for {video_id}") - return {"success": False, "error": "Comment fetch timed out", "comments": []} - except FileNotFoundError: - logger.warning("yt-dlp not installed — cannot fetch comments") - return {"success": False, "error": "yt-dlp not installed", "comments": []} - except Exception as e: - logger.warning(f"Failed to fetch comments for {video_id}: {e}") - return {"success": False, "error": str(e), "comments": []} - - -def format_comments_for_context(comments_data: Dict[str, Any], url: str) -> str: - """Format YouTube comments for inclusion in LLM context.""" - if not comments_data.get("success") or not comments_data.get("comments"): - return "" - - comments = comments_data["comments"] - ctx = f"\n[YOUTUBE VIDEO COMMENTS — Top {len(comments)} by popularity]\n" - ctx += f"URL: {url}\n\n" - - for i, c in enumerate(comments, 1): - likes = c.get("likes", 0) - likes_str = f" [{likes} likes]" if likes else "" - ctx += f"{i}. @{c['author']}{likes_str}: {c['text']}\n\n" - - if len(ctx) > 4000: - ctx = ctx[:4000] + "\n[Comments truncated]\n" - - ctx += "[END COMMENTS]\n" - return ctx +sys.modules[__name__] = _youtube_handler diff --git a/tests/test_youtube_handler_consolidation.py b/tests/test_youtube_handler_consolidation.py new file mode 100644 index 000000000..dd6543d20 --- /dev/null +++ b/tests/test_youtube_handler_consolidation.py @@ -0,0 +1,104 @@ +"""Regression: the YouTube handler must live in a single module. + +Odysseus carried two independent copies of the handler — ``src.youtube_handler`` +and ``services.youtube.youtube_handler`` — that silently drifted: + +* ``app.py`` calls ``services.youtube.init_youtube()`` at startup, but the chat + flow imported ``extract_transcript_async`` from ``src.youtube_handler``. Those + were different module objects, so the ``YOUTUBE_AVAILABLE`` / + ``YouTubeTranscriptApi`` globals set by ``init_youtube`` never reached the chat + path and transcript extraction always reported "not available". +* The comment-fetch timeout fix (PR #1002) landed only in the ``src`` copy. + +These tests pin the two import paths to one module object and verify the shared +state and the broadened URL parsing. +""" +import sys +import types + +import pytest + + +def test_src_and_service_youtube_are_same_module(): + """Both historical import paths must resolve to one module object so + behavior and module-level state cannot diverge again.""" + import src.youtube_handler as src_yt + import services.youtube.youtube_handler as svc_yt + + assert src_yt is svc_yt + + +def test_init_youtube_visible_through_chat_import_path(monkeypatch): + """init_youtube() is invoked via services.youtube (as app.py does), but the + chat flow reads the API globals through src.youtube_handler. After + consolidation the globals set by init must be visible on both paths.""" + import src.youtube_handler as src_yt + from services.youtube import init_youtube + + # Pin the globals so monkeypatch restores them after the test, regardless + # of whether youtube_transcript_api is actually installed in this env. + monkeypatch.setattr(src_yt, "YOUTUBE_AVAILABLE", False, raising=False) + monkeypatch.setattr(src_yt, "YouTubeTranscriptApi", None, raising=False) + + # Stand in for the real transcript package so init_youtube() succeeds + # without a network/library dependency. + stub = types.ModuleType("youtube_transcript_api") + + class _StubApi: + pass + + stub.YouTubeTranscriptApi = _StubApi + monkeypatch.setitem(sys.modules, "youtube_transcript_api", stub) + + init_youtube() # called exactly the way app.py calls it + + assert src_yt.YOUTUBE_AVAILABLE is True + assert src_yt.YouTubeTranscriptApi is _StubApi + + +@pytest.mark.parametrize( + "url,expected", + [ + # Classic watch URLs across the youtube.com hosts. + ("https://www.youtube.com/watch?v=dQw4w9WgXcQ", "dQw4w9WgXcQ"), + ("https://youtube.com/watch?v=dQw4w9WgXcQ&t=42s", "dQw4w9WgXcQ"), + ("https://m.youtube.com/watch?v=dQw4w9WgXcQ", "dQw4w9WgXcQ"), + # YouTube Music shares the same paths and must resolve. + ("https://music.youtube.com/watch?v=dQw4w9WgXcQ", "dQw4w9WgXcQ"), + # Short links. + ("https://youtu.be/dQw4w9WgXcQ", "dQw4w9WgXcQ"), + ("https://youtu.be/dQw4w9WgXcQ?si=ab_cd", "dQw4w9WgXcQ"), + # Player/embed and the legacy /v/ embed. + ("https://www.youtube.com/embed/dQw4w9WgXcQ", "dQw4w9WgXcQ"), + ("https://www.youtube.com/embed/dQw4w9WgXcQ/", "dQw4w9WgXcQ"), + ("https://www.youtube.com/v/dQw4w9WgXcQ", "dQw4w9WgXcQ"), + # Shorts and live — previously unrecognized, so the chat pipeline + # dropped them entirely (excluded from web-fetch as a YouTube URL, but + # no id meant no transcript fetch either). + ("https://www.youtube.com/shorts/dQw4w9WgXcQ", "dQw4w9WgXcQ"), + ("https://www.youtube.com/shorts/dQw4w9WgXcQ?feature=share", "dQw4w9WgXcQ"), + ("https://www.youtube.com/live/dQw4w9WgXcQ", "dQw4w9WgXcQ"), + # Host matching is case-insensitive. + ("https://WWW.YouTube.com/watch?v=dQw4w9WgXcQ", "dQw4w9WgXcQ"), + # Non-video paths and non-YouTube hosts yield no id. + ("https://www.youtube.com/", None), + ("https://www.youtube.com/feed/subscriptions", None), + ("https://example.com/watch?v=dQw4w9WgXcQ", None), + ("https://vimeo.com/76979871", None), + ], +) +def test_extract_youtube_id(url, expected): + from src.youtube_handler import extract_youtube_id + + assert extract_youtube_id(url) == expected + + +def test_shorts_url_is_recognized_and_extractable(): + """A Shorts URL is treated as a YouTube link (so the chat pipeline excludes + it from generic web-fetch). It must therefore yield an id, or the video is + silently dropped — fetched by neither path.""" + from src.youtube_handler import is_youtube_url, extract_youtube_id + + url = "https://www.youtube.com/shorts/dQw4w9WgXcQ" + assert is_youtube_url(url) + assert extract_youtube_id(url) == "dQw4w9WgXcQ"