Reapply "Merge branch 'main' of github.com:pewdiepie-archdaemon/odysseus"

This reverts commit cc8fe2f6e3.
This commit is contained in:
pewdiepie-archdaemon
2026-06-03 22:47:00 +09:00
parent cc8fe2f6e3
commit 6861c41580
16 changed files with 1647 additions and 225 deletions
+10 -148
View File
@@ -1,151 +1,13 @@
"""Search result ranking based on relevance, source quality, and recency."""
"""Compatibility re-export shim for the live ranking module.
import re
import logging
from datetime import datetime, timezone
from typing import List, Optional
from urllib.parse import urlparse
The real implementation lives in :mod:`services.search.ranking`, which is what
the search runtime (services/search/core.py) imports. This module used to hold a
parallel copy; it now re-exports so the two cannot drift out of sync again.
"""
logger = logging.getLogger(__name__)
_AGE_FORMATS = ("%Y-%m-%d", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%d %H:%M:%S")
def _utcnow_naive() -> datetime:
"""Naive UTC 'now'. Matches the naive, UTC-style published dates parsed below,
and is safe on Python 3.14 where ``datetime.utcnow()`` is removed (#1116)."""
return datetime.now(timezone.utc).replace(tzinfo=None)
def recency_score(age_str: Optional[str], now: Optional[datetime] = None) -> float:
"""Score how recent a result is: 1.0 for <=7 days old, 0.0 for >=30 days.
The age is measured against UTC, not local time. The previous code used
``datetime.now()`` (local) against UTC-style published dates, so the age was
skewed by the host's UTC offset; it was also a latent crash once neighbouring
code moves to timezone-aware datetimes (#1116). ``now`` is injectable for tests.
"""
if not age_str:
return 0.0
dt = None
for fmt in _AGE_FORMATS:
try:
dt = datetime.strptime(age_str, fmt)
break
except Exception:
dt = None
if not dt:
return 0.0
now = now or _utcnow_naive()
days_old = (now - dt).days
if days_old <= 7:
return 1.0
if days_old >= 30:
return 0.0
return (30 - days_old) / 23
_NEWS_HINTS = {"news", "nyheter", "headlines", "breaking", "latest", "today", "idag"}
_SPORTS_HINTS = {
"sport", "sports", "soccer", "football", "hockey", "nba", "nfl", "mlb",
"fifa", "world cup", "championship", "quarterfinal", "eliminates",
}
# Word-boundary match so "sport" does not fire inside "transport"/"passport"
# and a domain like "transport.gov" is not mistaken for a sports site.
_SPORTS_HINT_RE = re.compile(
r"\b(?:" + "|".join(re.escape(h) for h in _SPORTS_HINTS) + r")\b"
from services.search.ranking import ( # noqa: F401
_AGE_FORMATS,
_utcnow_naive,
rank_search_results,
recency_score,
)
_LOW_VALUE_NEWS_DOMAINS = {
"facebook.com", "www.facebook.com", "sports.yahoo.com", "yahoo.com",
"www.yahoo.com", "msn.com", "www.msn.com",
}
_TRUSTED_NEWS_DOMAINS = {
"apnews.com", "www.apnews.com", "reuters.com", "www.reuters.com",
"bbc.com", "www.bbc.com", "cbc.ca", "www.cbc.ca",
"ctvnews.ca", "www.ctvnews.ca", "globalnews.ca", "www.globalnews.ca",
"theguardian.com",
"www.theguardian.com", "euronews.com", "www.euronews.com",
"dw.com", "www.dw.com", "government.se", "www.government.se",
}
def _domain(url: str) -> str:
try:
return urlparse(url).netloc.lower()
except Exception:
return ""
def rank_search_results(query: str, results: List[dict]) -> List[dict]:
"""Rank search results by title relevance, snippet quality, domain authority, and recency."""
query_terms = [t.lower() for t in re.findall(r"\b\w+\b", query)]
query_lc = query.lower()
is_news_query = any(term in _NEWS_HINTS for term in query_terms)
is_sports_query = bool(_SPORTS_HINT_RE.search(query_lc))
def title_score(title: str) -> float:
if not title:
return 0.0
title_lc = title.lower()
matches = sum(1 for term in query_terms if re.search(rf"\b{re.escape(term)}\b", title_lc))
return matches / len(query_terms) if query_terms else 0.0
def snippet_score(snippet: str) -> float:
if not snippet:
return 0.0
length_factor = min(len(snippet), 200) / 200
term_hits = sum(1 for term in query_terms if term in snippet.lower())
term_factor = term_hits / len(query_terms) if query_terms else 0.0
return (length_factor + term_factor) / 2
def domain_score(url: str) -> float:
netloc = _domain(url)
if not netloc:
return 0.0
if netloc in _TRUSTED_NEWS_DOMAINS:
return 1.0
if netloc.endswith(".edu") or netloc.endswith(".gov"):
return 1.0
if netloc.endswith(".org"):
return 0.7
return 0.4
def news_quality_adjustment(title: str, snippet: str, url: str) -> float:
if not is_news_query:
return 0.0
text = f"{title} {snippet}".lower()
netloc = _domain(url)
adjustment = 0.0
if netloc in _TRUSTED_NEWS_DOMAINS:
adjustment += 1.2
if any(term in text for term in ("latest news", "breaking news", "daily coverage", "news from")):
adjustment += 0.4
if netloc in _LOW_VALUE_NEWS_DOMAINS:
adjustment -= 0.8
if not is_sports_query and (_SPORTS_HINT_RE.search(text) or _SPORTS_HINT_RE.search(netloc)):
adjustment -= 1.5
# A country/news query should not rank a page whose title/snippet barely
# mentions the country above actual news pages for that country.
subject_terms = [t for t in query_terms if t not in _NEWS_HINTS]
if subject_terms and not any(t in text or t in netloc for t in subject_terms):
adjustment -= 1.0
return adjustment
ranked = []
for result in results:
title = result.get("title", "")
snippet = result.get("snippet", "")
url = result.get("url", "")
age = result.get("age", None)
score = (
2.0 * title_score(title)
+ 1.0 * snippet_score(snippet)
+ 1.5 * domain_score(url)
+ 1.0 * recency_score(age)
+ news_quality_adjustment(title, snippet, url)
)
ranked.append((score, result))
ranked.sort(key=lambda x: x[0], reverse=True)
return [r for _, r in ranked]
+94
View File
@@ -0,0 +1,94 @@
"""URL validation helpers for server-side outbound requests."""
from __future__ import annotations
import ipaddress
import socket
from urllib.parse import urlparse
_INTERNAL_HOSTNAMES = {
"localhost",
"metadata",
"metadata.google.internal",
}
_INTERNAL_SUFFIXES = (
".localhost",
".local",
".internal",
".lan",
".intranet",
)
_BLOCKED_NETWORKS = (
ipaddress.ip_network("0.0.0.0/8"),
ipaddress.ip_network("10.0.0.0/8"),
ipaddress.ip_network("100.64.0.0/10"),
ipaddress.ip_network("127.0.0.0/8"),
ipaddress.ip_network("169.254.0.0/16"),
ipaddress.ip_network("172.16.0.0/12"),
ipaddress.ip_network("192.168.0.0/16"),
ipaddress.ip_network("::/128"),
ipaddress.ip_network("::1/128"),
ipaddress.ip_network("fc00::/7"),
ipaddress.ip_network("fe80::/10"),
)
def _resolve_hostname_ips(hostname: str) -> list[ipaddress._BaseAddress]:
ips: list[ipaddress._BaseAddress] = []
for family, _, _, _, sockaddr in socket.getaddrinfo(hostname, None):
if family in (socket.AF_INET, socket.AF_INET6):
ips.append(ipaddress.ip_address(sockaddr[0]))
return ips
def _blocked_ip(addr: ipaddress._BaseAddress) -> bool:
return (
any(addr in net for net in _BLOCKED_NETWORKS)
or addr.is_private
or addr.is_loopback
or addr.is_link_local
or addr.is_multicast
or addr.is_unspecified
or addr.is_reserved
)
def _host_resolves_publicly(hostname: str) -> bool:
host = hostname.strip().lower()
if host in _INTERNAL_HOSTNAMES or host.endswith(_INTERNAL_SUFFIXES):
return False
try:
return not _blocked_ip(ipaddress.ip_address(host))
except ValueError:
pass
try:
addrs = _resolve_hostname_ips(host)
except OSError:
return False
return bool(addrs) and all(not _blocked_ip(addr) for addr in addrs)
def is_public_http_url(url: str) -> bool:
parsed = urlparse((url or "").strip())
if parsed.scheme not in ("http", "https") or not parsed.hostname:
return False
return _host_resolves_publicly(parsed.hostname)
def validate_public_http_url(url: str, *, max_length: int = 2048) -> str:
"""Validate a user/API-token supplied server-side HTTP(S) endpoint.
This is for untrusted outbound URLs, not admin-created model endpoints
that are intentionally allowed to point at private model providers. DNS
failures fail closed, and DNS checks reduce obvious private-network
targets but do not eliminate every DNS rebinding race by themselves.
"""
cleaned = (url or "").strip()
if len(cleaned) > max_length:
raise ValueError("URL is too long")
if not is_public_http_url(cleaned):
raise ValueError("URL must point to a public HTTP(S) endpoint")
return cleaned