Files
odysseus/services/search/analytics.py
T
Giuseppe Castelluccio 6c9a16a7a8 fix: search analytics FileHandler crashes on startup writing to read-only image layer (#2366)
* fix: move search analytics log to writable /app/logs volume

services/search/analytics.py opened a FileHandler at module import
time pointing to /app/services/search_engine_error.log — inside the
container image's read-only layer. The process runs as non-root so
the open() fails with PermissionError, crashing uvicorn before it
ever binds. ANALYTICS_FILE had the same problem.

Both paths now point to /app/logs (bind-mounted from the host data
directory). The FileHandler creation is wrapped in try/except so a
missing mount doesn't hard-crash on import.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix: derive log dir from DATA_DIR instead of hardcoded /app/logs

Fixes reviewer feedback on #2366: /app/logs only exists inside Docker,
so native runs couldn't write the analytics file. DATA_DIR resolves to
the repo's data/ directory on native and /app/data (writable mount) in
Docker, making both the error log handler and ANALYTICS_FILE work on
every platform.

---------

Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-07 19:26:22 +02:00

149 lines
5.1 KiB
Python

"""Search analytics, metrics tracking, and exception hierarchy."""
import json
import logging
from collections import Counter
from pathlib import Path
from typing import Dict, Any
from core.constants import DATA_DIR
from .cache import cache_metrics
logger = logging.getLogger(__name__)
# Dedicated error logger — write to the data logs directory (writable on both
# native runs and Docker, where DATA_DIR resolves to the bind-mounted volume).
_log_dir = Path(DATA_DIR) / "logs"
_error_log_path = _log_dir / "search_engine_error.log"
error_logger = logging.getLogger("search_engine_error")
error_logger.propagate = False
try:
_log_dir.mkdir(parents=True, exist_ok=True)
_error_handler = logging.FileHandler(_error_log_path, encoding="utf-8")
_error_handler.setLevel(logging.WARNING)
_error_handler.setFormatter(logging.Formatter("%(asctime)s %(levelname)s %(name)s %(message)s"))
error_logger.addHandler(_error_handler)
except Exception as _e:
logging.getLogger(__name__).warning("search_engine_error log handler unavailable: %s", _e)
# Analytics file — also in the writable logs volume.
ANALYTICS_FILE = _log_dir / "search_analytics.json"
# ----------------------------------------------------------------------
# Custom exception hierarchy
# ----------------------------------------------------------------------
class SearchEngineError(Exception):
"""Base class for all search-engine related errors."""
class NetworkError(SearchEngineError):
"""Raised when a network request fails (e.g., timeout, DNS error)."""
class ParseError(SearchEngineError):
"""Raised when HTML or other content cannot be parsed."""
class RateLimitError(SearchEngineError):
"""Raised when the remote service returns a rate-limit (HTTP 429)."""
# ----------------------------------------------------------------------
# Analytics helpers
# ----------------------------------------------------------------------
def _default_analytics() -> Dict[str, Any]:
return {
"total_queries": 0,
"successful_queries": 0,
"failed_queries": 0,
"cache_hits": 0,
"cache_misses": 0,
"query_patterns": {},
}
def _load_analytics() -> Dict[str, Any]:
"""Load analytics data from the JSON file, creating defaults if missing."""
if not ANALYTICS_FILE.exists():
default = _default_analytics()
_save_analytics(default)
return default
try:
with open(ANALYTICS_FILE, "r", encoding="utf-8") as f:
data = json.load(f)
# Merge over defaults so a file written by an older schema (or a
# partial write) still has every counter — _record_query indexes
# these keys directly and would otherwise raise KeyError.
merged = _default_analytics()
if isinstance(data, dict):
merged.update(data)
return merged
except Exception as e:
logger.warning(f"Failed to load analytics file: {e}")
return _default_analytics()
def _save_analytics(data: Dict[str, Any]) -> None:
"""Persist analytics data to the JSON file."""
try:
with open(ANALYTICS_FILE, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2)
except Exception as e:
logger.warning(f"Failed to write analytics file: {e}")
def _record_query(query: str, success: bool, cache_hit: bool) -> None:
"""Update analytics for a single query execution."""
analytics = _load_analytics()
analytics["total_queries"] += 1
if success:
analytics["successful_queries"] += 1
else:
analytics["failed_queries"] += 1
if cache_hit:
analytics["cache_hits"] += 1
cache_metrics["hits"] += 1
else:
analytics["cache_misses"] += 1
cache_metrics["misses"] += 1
patterns = analytics["query_patterns"]
entry = patterns.get(query, {"count": 0, "successes": 0})
entry["count"] += 1
if success:
entry["successes"] += 1
patterns[query] = entry
_save_analytics(analytics)
def get_search_stats() -> Dict[str, Any]:
"""Return aggregated search analytics."""
analytics = _load_analytics()
total = analytics.get("total_queries", 0) or 1
success_rate = analytics.get("successful_queries", 0) / total
cache_total = analytics.get("cache_hits", 0) + analytics.get("cache_misses", 0) or 1
cache_hit_rate = analytics.get("cache_hits", 0) / cache_total
pattern_counter = Counter({
q: data["count"] for q, data in analytics.get("query_patterns", {}).items()
})
most_common = [q for q, _ in pattern_counter.most_common(5)]
return {
"most_common_queries": most_common,
"success_rate": success_rate,
"cache_hit_rate": cache_hit_rate,
"total_queries": analytics.get("total_queries", 0),
"successful_queries": analytics.get("successful_queries", 0),
"failed_queries": analytics.get("failed_queries", 0),
"cache_hits": analytics.get("cache_hits", 0),
"cache_misses": analytics.get("cache_misses", 0),
"cache_evictions": cache_metrics["evictions"],
"runtime_cache_hits": cache_metrics["hits"],
"runtime_cache_misses": cache_metrics["misses"],
}