fix(youtube): consolidate duplicate handler

Make src.youtube_handler a compatibility wrapper around services.youtube.youtube_handler so transcript state, URL parsing, and timeout behavior no longer diverge.
This commit is contained in:
holden093
2026-06-15 08:03:41 +02:00
committed by GitHub
parent 96052c5e8a
commit 4c41834dc7
3 changed files with 168 additions and 288 deletions
+46 -15
View File
@@ -64,20 +64,40 @@ def is_youtube_url(url: str) -> bool:
return "youtube.com" in url or "youtu.be" in url
# youtube.com-shaped hosts. music.youtube.com serves the same /watch and
# /shorts paths, so links shared from YouTube Music must resolve too.
_YT_HOSTS = ("www.youtube.com", "youtube.com", "m.youtube.com", "music.youtube.com")
# Path prefixes whose first following segment is the video id. Covers the
# /embed/ player, Shorts (/shorts/), live streams (/live/), and the legacy
# /v/ embed — all of which `is_youtube_url` already treats as YouTube, so
# they must be extractable or the link is silently dropped (neither web-fetched
# nor transcript-fetched) by the chat pipeline.
_YT_PATH_PREFIXES = ("/embed/", "/shorts/", "/live/", "/v/")
def extract_youtube_id(url: str) -> Optional[str]:
"""Extract YouTube video ID from various URL formats."""
"""Extract a YouTube video ID from the common URL shapes:
watch?v=, youtu.be/<id>, /embed/<id>, /shorts/<id>, /live/<id>, /v/<id>,
across youtube.com / m.youtube.com / music.youtube.com / youtu.be."""
if not isinstance(url, str):
return None
parsed = urllib.parse.urlparse(url)
if parsed.hostname in ("www.youtube.com", "youtube.com", "m.youtube.com"):
host = (parsed.hostname or "").lower()
if host in _YT_HOSTS:
if parsed.path == "/watch":
params = urllib.parse.parse_qs(parsed.query)
if "v" in params:
if params.get("v"):
return params["v"][0]
elif parsed.path.startswith("/embed/"):
return parsed.path.split("/")[-1]
elif parsed.hostname == "youtu.be":
return parsed.path[1:]
else:
for prefix in _YT_PATH_PREFIXES:
if parsed.path.startswith(prefix):
vid = parsed.path[len(prefix):].split("/")[0]
if vid:
return vid
elif host == "youtu.be":
vid = parsed.path.lstrip("/").split("/")[0]
if vid:
return vid
return None
@@ -170,6 +190,8 @@ def format_transcript_for_context(
if segments:
ctx += "Timestamped Transcript:\n"
for seg in segments:
if not isinstance(seg, dict):
continue
ctx += f"[{seg['timestamp']}] {seg['text']}\n"
# Check length — fall back to plain text if too long
if len(ctx) > 12000:
@@ -202,15 +224,24 @@ async def fetch_youtube_comments(
f"https://www.youtube.com/watch?v={video_id}",
]
proc = await asyncio.wait_for(
asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
),
timeout=timeout,
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await proc.communicate()
# Bound the wait on the process actually finishing, not on spawning it.
# create_subprocess_exec returns as soon as the child starts, so wrapping
# it in wait_for never enforces the timeout — proc.communicate() is the
# blocking step. Kill and reap the child if it overruns so it does not
# linger after we return.
try:
stdout, stderr = await asyncio.wait_for(
proc.communicate(), timeout=timeout
)
except asyncio.TimeoutError:
proc.kill()
await proc.wait()
raise
if proc.returncode != 0:
return {"success": False, "error": f"yt-dlp failed: {stderr.decode()[:200]}", "comments": []}
+18 -273
View File
@@ -1,278 +1,23 @@
"""
YouTube handling — transcript extraction, comment fetching (yt-dlp),
and context formatting for LLM injection. Used by chat_handler.py.
"""Compatibility wrapper for the canonical services.youtube.youtube_handler module.
Odysseus historically carried two independent copies of the YouTube handler
one here under ``src`` and one under ``services.youtube``. They drifted: the
comment-fetch timeout fix landed only in the ``src`` copy, while ``app.py``
calls ``services.youtube.init_youtube()`` at startup. Because the chat flow
imported ``extract_transcript_async`` from ``src.youtube_handler`` (a different
module object), the ``YOUTUBE_AVAILABLE`` / ``YouTubeTranscriptApi`` globals set
by ``init_youtube`` never reached it and transcript extraction always reported
"YouTube transcript API not available".
Keep the old ``src.youtube_handler`` import path working, but make it resolve to
the single source of truth so module state and behavior can't diverge again.
"""
import asyncio
import json
import logging
import shutil
import importlib
import sys
import urllib.parse
from pathlib import Path
from typing import Dict, Any, Optional
logger = logging.getLogger(__name__)
# Import the canonical module directly (services.youtube.youtube_handler)
# without triggering the heavy services/__init__.py top-level imports.
_youtube_handler = importlib.import_module("services.youtube.youtube_handler")
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
YOUTUBE_INSTRUCTION_PROMPT = """When the user shares a YouTube video, respond with a structured breakdown:
1. **Summary** — Concise overview of the video's content and main thesis (2-4 sentences)
2. **Key Points** — Bullet list of the most important topics, arguments, or moments
3. **Notable Timestamps** — If timestamps are available from the transcript, highlight 3-5 interesting moments with their approximate timestamps (e.g. "03:45 — discusses X")
4. **Audience Reception** — If comments are available, summarize what viewers think: general sentiment, top reactions, any debate or controversy
Keep it conversational and concise. Do NOT web search for this video — use only the transcript and comments provided."""
# ---------------------------------------------------------------------------
# Init / helpers
# ---------------------------------------------------------------------------
# Will be set at startup by init_youtube()
YouTubeTranscriptApi = None
YOUTUBE_AVAILABLE = False
def _find_ytdlp() -> str:
"""Find the yt-dlp binary: venv bin first, then system PATH."""
venv_bin = Path(sys.executable).parent / "yt-dlp"
if venv_bin.exists():
return str(venv_bin)
found = shutil.which("yt-dlp")
return found or "yt-dlp"
def init_youtube():
"""Import and cache the YouTube transcript API."""
global YouTubeTranscriptApi, YOUTUBE_AVAILABLE
try:
from youtube_transcript_api import YouTubeTranscriptApi as _Api
YouTubeTranscriptApi = _Api
YOUTUBE_AVAILABLE = True
logger.info("YouTube transcript API available")
except ImportError as e:
logger.warning(f"youtube-transcript-api not installed: {e}")
YOUTUBE_AVAILABLE = False
def is_youtube_url(url: str) -> bool:
if not isinstance(url, str):
return False
return "youtube.com" in url or "youtu.be" in url
def extract_youtube_id(url: str) -> Optional[str]:
"""Extract YouTube video ID from various URL formats."""
parsed = urllib.parse.urlparse(url)
if parsed.hostname in ("www.youtube.com", "youtube.com", "m.youtube.com"):
if parsed.path == "/watch":
params = urllib.parse.parse_qs(parsed.query)
if "v" in params:
return params["v"][0]
elif parsed.path.startswith("/embed/"):
return parsed.path.split("/")[-1]
elif parsed.hostname == "youtu.be":
return parsed.path[1:]
return None
async def extract_transcript_async(
url: str, video_id: str, max_retries: int = 3
) -> Dict[str, Any]:
"""
Async YouTube transcript extraction with retries.
Args:
url: Full YouTube URL
video_id: Extracted video ID
max_retries: Number of attempts
Returns:
Dict with success/error/transcript keys
"""
if not YOUTUBE_AVAILABLE or YouTubeTranscriptApi is None:
return {"success": False, "error": "YouTube transcript API not available", "transcript": None}
for attempt in range(max_retries):
try:
api = YouTubeTranscriptApi()
transcript = api.fetch(video_id)
transcript_list = list(transcript)
formatted = []
for snippet in transcript_list:
text = snippet.text.strip()
if not text:
continue
start = snippet.start
formatted.append({
"text": text,
"start": start,
"duration": snippet.duration,
"timestamp": f"{int(start // 60):02d}:{int(start % 60):02d}",
})
full_text = " ".join(e["text"] for e in formatted)
max_len = 8000
if len(full_text) > max_len:
full_text = full_text[:max_len] + "... [transcript truncated]"
return {
"success": True,
"transcript": full_text,
"video_id": video_id,
"language": "en",
"is_generated": False,
"segments": formatted,
}
except Exception as e:
logger.warning(f"Transcript attempt {attempt + 1} failed: {e}")
if attempt < max_retries - 1:
await asyncio.sleep(1 * (attempt + 1))
return {"success": False, "error": f"Failed after {max_retries} attempts", "transcript": None}
def format_transcript_for_context(
transcript_data: Dict[str, Any], url: str,
title: str = "", channel: str = ""
) -> str:
"""Format transcript data for inclusion in LLM context."""
if not transcript_data.get("success"):
header = ""
if title:
header = f" \"{title}\""
if channel:
header += f" by {channel}"
return f"\n[YouTube Video{header}: Transcript unavailable ({transcript_data.get('error', 'Unknown error')}). Use the comments below if available, do NOT web search for this video.]"
transcript = transcript_data.get("transcript", "")
video_id = transcript_data.get("video_id", "")
language = transcript_data.get("language", "unknown")
is_generated = transcript_data.get("is_generated", False)
segments = transcript_data.get("segments", [])
ctx = "\n[YOUTUBE VIDEO TRANSCRIPT]\n"
if title:
ctx += f"Title: {title}\n"
if channel:
ctx += f"Channel: {channel}\n"
ctx += f"Video ID: {video_id}\n"
ctx += f"Language: {language}\n"
ctx += f"Source: {'Auto-generated' if is_generated else 'Manual'}\n"
ctx += f"URL: {url}\n\n"
# Include timestamped segments for the LLM to reference
if segments:
ctx += "Timestamped Transcript:\n"
for seg in segments:
if not isinstance(seg, dict):
continue
ctx += f"[{seg['timestamp']}] {seg['text']}\n"
# Check length — fall back to plain text if too long
if len(ctx) > 12000:
ctx = ctx[:ctx.index("Timestamped Transcript:\n")]
ctx += "Transcript:\n"
ctx += transcript
else:
ctx += "Transcript:\n"
ctx += transcript
ctx += "\n[END TRANSCRIPT]\n"
return ctx
async def fetch_youtube_comments(
video_id: str, max_comments: int = 25, timeout: int = 30
) -> Dict[str, Any]:
"""Fetch top comments for a YouTube video using yt-dlp.
Returns dict with 'success', 'comments' list, 'error'.
"""
try:
cmd = [
_find_ytdlp(),
"--skip-download",
"--write-comments",
"--extractor-args", f"youtube:max_comments={max_comments},all,100,0",
"--dump-json",
"--js-runtimes", "node",
"--remote-components", "ejs:github",
f"https://www.youtube.com/watch?v={video_id}",
]
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
# Bound the wait on the process actually finishing, not on spawning it.
# create_subprocess_exec returns as soon as the child starts, so wrapping
# it in wait_for never enforces the timeout — proc.communicate() is the
# blocking step. Kill and reap the child if it overruns so it does not
# linger after we return.
try:
stdout, stderr = await asyncio.wait_for(
proc.communicate(), timeout=timeout
)
except asyncio.TimeoutError:
proc.kill()
await proc.wait()
raise
if proc.returncode != 0:
return {"success": False, "error": f"yt-dlp failed: {stderr.decode()[:200]}", "comments": []}
data = json.loads(stdout.decode())
title = data.get("title", "")
channel = data.get("channel", "") or data.get("uploader", "")
raw_comments = data.get("comments", [])
comments = []
for c in raw_comments[:max_comments]:
text = (c.get("text") or "").strip()
if not text:
continue
comments.append({
"author": c.get("author", "Unknown"),
"text": text,
"likes": c.get("like_count", 0),
})
# Sort by likes descending — most popular comments first
comments.sort(key=lambda x: x.get("likes", 0), reverse=True)
return {"success": True, "comments": comments, "count": len(comments),
"title": title, "channel": channel}
except asyncio.TimeoutError:
logger.warning(f"Comment fetch timed out for {video_id}")
return {"success": False, "error": "Comment fetch timed out", "comments": []}
except FileNotFoundError:
logger.warning("yt-dlp not installed — cannot fetch comments")
return {"success": False, "error": "yt-dlp not installed", "comments": []}
except Exception as e:
logger.warning(f"Failed to fetch comments for {video_id}: {e}")
return {"success": False, "error": str(e), "comments": []}
def format_comments_for_context(comments_data: Dict[str, Any], url: str) -> str:
"""Format YouTube comments for inclusion in LLM context."""
if not comments_data.get("success") or not comments_data.get("comments"):
return ""
comments = comments_data["comments"]
ctx = f"\n[YOUTUBE VIDEO COMMENTS — Top {len(comments)} by popularity]\n"
ctx += f"URL: {url}\n\n"
for i, c in enumerate(comments, 1):
likes = c.get("likes", 0)
likes_str = f" [{likes} likes]" if likes else ""
ctx += f"{i}. @{c['author']}{likes_str}: {c['text']}\n\n"
if len(ctx) > 4000:
ctx = ctx[:4000] + "\n[Comments truncated]\n"
ctx += "[END COMMENTS]\n"
return ctx
sys.modules[__name__] = _youtube_handler
+104
View File
@@ -0,0 +1,104 @@
"""Regression: the YouTube handler must live in a single module.
Odysseus carried two independent copies of the handler — ``src.youtube_handler``
and ``services.youtube.youtube_handler`` — that silently drifted:
* ``app.py`` calls ``services.youtube.init_youtube()`` at startup, but the chat
flow imported ``extract_transcript_async`` from ``src.youtube_handler``. Those
were different module objects, so the ``YOUTUBE_AVAILABLE`` /
``YouTubeTranscriptApi`` globals set by ``init_youtube`` never reached the chat
path and transcript extraction always reported "not available".
* The comment-fetch timeout fix (PR #1002) landed only in the ``src`` copy.
These tests pin the two import paths to one module object and verify the shared
state and the broadened URL parsing.
"""
import sys
import types
import pytest
def test_src_and_service_youtube_are_same_module():
"""Both historical import paths must resolve to one module object so
behavior and module-level state cannot diverge again."""
import src.youtube_handler as src_yt
import services.youtube.youtube_handler as svc_yt
assert src_yt is svc_yt
def test_init_youtube_visible_through_chat_import_path(monkeypatch):
"""init_youtube() is invoked via services.youtube (as app.py does), but the
chat flow reads the API globals through src.youtube_handler. After
consolidation the globals set by init must be visible on both paths."""
import src.youtube_handler as src_yt
from services.youtube import init_youtube
# Pin the globals so monkeypatch restores them after the test, regardless
# of whether youtube_transcript_api is actually installed in this env.
monkeypatch.setattr(src_yt, "YOUTUBE_AVAILABLE", False, raising=False)
monkeypatch.setattr(src_yt, "YouTubeTranscriptApi", None, raising=False)
# Stand in for the real transcript package so init_youtube() succeeds
# without a network/library dependency.
stub = types.ModuleType("youtube_transcript_api")
class _StubApi:
pass
stub.YouTubeTranscriptApi = _StubApi
monkeypatch.setitem(sys.modules, "youtube_transcript_api", stub)
init_youtube() # called exactly the way app.py calls it
assert src_yt.YOUTUBE_AVAILABLE is True
assert src_yt.YouTubeTranscriptApi is _StubApi
@pytest.mark.parametrize(
"url,expected",
[
# Classic watch URLs across the youtube.com hosts.
("https://www.youtube.com/watch?v=dQw4w9WgXcQ", "dQw4w9WgXcQ"),
("https://youtube.com/watch?v=dQw4w9WgXcQ&t=42s", "dQw4w9WgXcQ"),
("https://m.youtube.com/watch?v=dQw4w9WgXcQ", "dQw4w9WgXcQ"),
# YouTube Music shares the same paths and must resolve.
("https://music.youtube.com/watch?v=dQw4w9WgXcQ", "dQw4w9WgXcQ"),
# Short links.
("https://youtu.be/dQw4w9WgXcQ", "dQw4w9WgXcQ"),
("https://youtu.be/dQw4w9WgXcQ?si=ab_cd", "dQw4w9WgXcQ"),
# Player/embed and the legacy /v/ embed.
("https://www.youtube.com/embed/dQw4w9WgXcQ", "dQw4w9WgXcQ"),
("https://www.youtube.com/embed/dQw4w9WgXcQ/", "dQw4w9WgXcQ"),
("https://www.youtube.com/v/dQw4w9WgXcQ", "dQw4w9WgXcQ"),
# Shorts and live — previously unrecognized, so the chat pipeline
# dropped them entirely (excluded from web-fetch as a YouTube URL, but
# no id meant no transcript fetch either).
("https://www.youtube.com/shorts/dQw4w9WgXcQ", "dQw4w9WgXcQ"),
("https://www.youtube.com/shorts/dQw4w9WgXcQ?feature=share", "dQw4w9WgXcQ"),
("https://www.youtube.com/live/dQw4w9WgXcQ", "dQw4w9WgXcQ"),
# Host matching is case-insensitive.
("https://WWW.YouTube.com/watch?v=dQw4w9WgXcQ", "dQw4w9WgXcQ"),
# Non-video paths and non-YouTube hosts yield no id.
("https://www.youtube.com/", None),
("https://www.youtube.com/feed/subscriptions", None),
("https://example.com/watch?v=dQw4w9WgXcQ", None),
("https://vimeo.com/76979871", None),
],
)
def test_extract_youtube_id(url, expected):
from src.youtube_handler import extract_youtube_id
assert extract_youtube_id(url) == expected
def test_shorts_url_is_recognized_and_extractable():
"""A Shorts URL is treated as a YouTube link (so the chat pipeline excludes
it from generic web-fetch). It must therefore yield an id, or the video is
silently dropped — fetched by neither path."""
from src.youtube_handler import is_youtube_url, extract_youtube_id
url = "https://www.youtube.com/shorts/dQw4w9WgXcQ"
assert is_youtube_url(url)
assert extract_youtube_id(url) == "dQw4w9WgXcQ"