diff --git a/app.py b/app.py index f9512f36e..abd49e26b 100644 --- a/app.py +++ b/app.py @@ -577,7 +577,7 @@ app.include_router(setup_preset_routes(preset_manager)) # Diagnostics from routes.diagnostics_routes import setup_diagnostics_routes -app.include_router(setup_diagnostics_routes(rag_manager, rag_available, research_handler)) +app.include_router(setup_diagnostics_routes(rag_manager, rag_available, research_handler, memory_vector)) # Cleanup from routes.cleanup_routes import setup_cleanup_routes diff --git a/routes/diagnostics_routes.py b/routes/diagnostics_routes.py index daebef8d2..d6763798d 100644 --- a/routes/diagnostics_routes.py +++ b/routes/diagnostics_routes.py @@ -16,9 +16,18 @@ def setup_diagnostics_routes( rag_manager, rag_available: bool, research_handler, + memory_vector=None, ) -> APIRouter: router = APIRouter(tags=["diagnostics"]) + @router.get("/api/diagnostics/services") + async def get_service_health(request: Request) -> Dict[str, Any]: + """Consolidated degraded-state report for ChromaDB, SearXNG, email, + ntfy, and provider endpoints. Non-intrusive probes — safe to poll.""" + require_admin(request) + from src.service_health import collect_service_health + return await collect_service_health(rag_manager, memory_vector) + @router.get("/api/db/stats") async def get_database_stats(request: Request) -> Dict[str, Any]: require_admin(request) diff --git a/routes/email_helpers.py b/routes/email_helpers.py index 890680a87..7626b58c2 100644 --- a/routes/email_helpers.py +++ b/routes/email_helpers.py @@ -762,10 +762,14 @@ def _open_imap_connection(host: str, port: int, *, starttls: bool, timeout: int imaplib._MAXLINE = 50_000_000 return conn -def _imap_connect(account_id: str | None = None, owner: str = ""): +def _imap_connect(account_id: str | None = None, owner: str = "", + timeout: int = _IMAP_TIMEOUT_SECONDS): # SECURITY: passing `owner` scopes the fallback config lookup so a brand # new user doesn't get connected against another user's default mailbox # when they have no account configured. + # + # `timeout` is overridable so short-lived callers (e.g. the service-health + # probe) can impose a tighter budget than the default IMAP timeout. cfg = _get_email_config(account_id, owner=owner) # Connection mode: # STARTTLS on → plain + upgrade @@ -778,7 +782,7 @@ def _imap_connect(account_id: str | None = None, owner: str = ""): cfg["imap_host"], cfg["imap_port"], starttls=bool(cfg.get("imap_starttls")), - timeout=_IMAP_TIMEOUT_SECONDS, + timeout=timeout, ) try: conn.login(cfg["imap_user"], cfg["imap_password"]) diff --git a/src/service_health.py b/src/service_health.py new file mode 100644 index 000000000..4b24bc9ed --- /dev/null +++ b/src/service_health.py @@ -0,0 +1,506 @@ +"""Consolidated service health / degraded-state reporting. + +ROADMAP: "Better degraded-state reporting for ChromaDB, SearXNG, email, ntfy, +and provider probes." There was no single readout of which subsystems are +actually working — `/api/health` is only a liveness ping and each subsystem's +signal lives in a different module. This collects them into one uniform, +*non-intrusive* report (no test push is sent, no real search is run), so the +admin endpoint built on top of it is safe to poll. + +Each probe returns: + + {"name": str, "status": "ok"|"degraded"|"down"|"disabled", + "detail": str, "meta": dict} + +- ok — reachable / working +- degraded — partially working (one of several components down) +- down — configured & enabled but unreachable / erroring +- disabled — not configured or turned off (not counted as a failure) + +Design notes (driven by review feedback): + +- **Bounded wall-clock.** Per-item probes (providers, email accounts) fan out + across a bounded thread pool with a hard total budget (`_FANOUT_BUDGET`); + stragglers are reported as a controlled `timeout` rather than blocking. The + aggregate adds a per-subsystem deadline (`_SUBSYSTEM_DEADLINE`) and an overall + ceiling (`_AGGREGATE_DEADLINE`), so the endpoint cannot hang regardless of how + many endpoints/accounts are configured or how slowly they respond. +- **No secret leakage.** Even though the endpoint is admin-only, the response + never returns credential-bearing URLs or raw exception text: URLs are passed + through `_safe_url` (userinfo / query / fragment stripped) and failures are + mapped to controlled categories via `_classify_error`. + +The probe functions take their inputs as parameters (settings dict, account +list, endpoint list, manager objects) and isolate the network call to +``_http_get`` / injected callables, so they unit-test without touching the +network. +""" + +import asyncio +import concurrent.futures +import logging +import socket +import ssl +import time +from typing import Any, Callable, Dict, List, Optional +from urllib.parse import urlparse + +logger = logging.getLogger(__name__) + +# Status ordering for rolling up an overall verdict. "disabled" is excluded — +# a turned-off feature must never drag the overall status down. +_SEVERITY = {"ok": 0, "degraded": 1, "down": 2} + +OK = "ok" +DEGRADED = "degraded" +DOWN = "down" +DISABLED = "disabled" + +# Timing budgets (seconds). _PROBE_TIMEOUT bounds a single network op; +# _FANOUT_BUDGET bounds a whole fan-out (providers/email) regardless of count; +# the aggregate layer adds a per-subsystem deadline and an overall ceiling. +_PROBE_TIMEOUT = 4 +_PROBE_CONCURRENCY = 8 +_FANOUT_BUDGET = 8 +_SUBSYSTEM_DEADLINE = 10 +_AGGREGATE_DEADLINE = 14 + +# Controlled, secret-free phrasing for each failure category. +_ERROR_DETAIL = { + "timeout": "probe timed out", + "connection_refused": "connection refused", + "dns_error": "host could not be resolved", + "tls_error": "TLS handshake failed", + "network_error": "network error", + "http_error": "server returned an error response", + "auth_or_protocol_error": "authentication or protocol error", + "no_models": "endpoint returned no models", + "no_host": "no host configured", + "error": "probe failed", +} + + +def _svc(name: str, status: str, detail: str, **meta: Any) -> Dict[str, Any]: + return {"name": name, "status": status, "detail": detail, "meta": dict(meta)} + + +def _safe_url(url: Optional[str]) -> str: + """Strip credentials (userinfo), query, and fragment from a URL. + + Keeps scheme / host / port / path so the report is still useful, but never + echoes `user:pass@`, `?api_key=…`, or `#…` back to the caller. Returns + "" if the URL can't be parsed into at least a host. + """ + if not url: + return "" + raw = url.strip() + try: + p = urlparse(raw if "://" in raw else "//" + raw) + host = p.hostname or "" + if not host: + return "" + netloc = f"{host}:{p.port}" if p.port else host + path = (p.path or "").rstrip("/") + scheme = f"{p.scheme}://" if p.scheme else "" + return f"{scheme}{netloc}{path}" + except Exception: + return "" + + +def _classify_error(exc: BaseException) -> str: + """Map an exception to a controlled, secret-free category token. + + Never returns `str(exc)` — httpx/imaplib exception text can embed the target + URL (which may carry credentials) or server-supplied detail. + """ + if isinstance(exc, (asyncio.TimeoutError, concurrent.futures.TimeoutError, + TimeoutError, socket.timeout)): + return "timeout" + name = type(exc).__name__ + mod = (type(exc).__module__ or "") + if isinstance(exc, ssl.SSLError) or "SSL" in name or "Certificate" in name: + return "tls_error" + if isinstance(exc, socket.gaierror) or name in ("gaierror", "herror"): + return "dns_error" + if isinstance(exc, ConnectionRefusedError) or "ConnectionRefused" in name \ + or name in ("ConnectError",): + return "connection_refused" + if "Timeout" in name: + return "timeout" + if mod.startswith("imaplib") or name in ("error", "abort", "readonly"): + return "auth_or_protocol_error" + if name == "HTTPStatusError": + return "http_error" + if name in ("ConnectTimeout", "ReadTimeout", "ReadError", "WriteError", + "PoolTimeout", "RemoteProtocolError", "NetworkError", + "ProxyError", "ProtocolError"): + return "network_error" + if isinstance(exc, OSError): + return "network_error" + return "error" + + +def _detail_for(category: str) -> str: + return _ERROR_DETAIL.get(category, _ERROR_DETAIL["error"]) + + +def _http_get(url: str, timeout: float = _PROBE_TIMEOUT): + """Single network entry point for the HTTP probes (monkeypatched in tests).""" + import httpx + return httpx.get(url, timeout=timeout) + + +def _bounded_map(items: List[Any], worker: Callable[[int, Any], Dict[str, Any]], + *, budget: float = _FANOUT_BUDGET, + concurrency: int = _PROBE_CONCURRENCY) -> List[Optional[Dict[str, Any]]]: + """Run ``worker(index, item)`` across a bounded thread pool, in order. + + `worker` must catch its own exceptions and return a per-item dict. Any item + not finished within `budget` seconds *in total* is left as ``None`` (the + caller substitutes a controlled `timeout` entry). The pool is shut down with + ``wait=False`` so stragglers never block the response — their own per-op + timeout reaps them shortly after. + """ + n = len(items) + out: List[Optional[Dict[str, Any]]] = [None] * n + if n == 0: + return out + ex = concurrent.futures.ThreadPoolExecutor(max_workers=max(1, min(concurrency, n))) + futures = {ex.submit(worker, i, items[i]): i for i in range(n)} + try: + for fut in concurrent.futures.as_completed(futures, timeout=budget): + i = futures[fut] + try: + out[i] = fut.result() + except Exception as e: # worker is expected to handle its own errors + out[i] = {"ok": False, "error": _classify_error(e)} + except concurrent.futures.TimeoutError: + pass # unfinished items stay None → marked timeout by the caller + finally: + ex.shutdown(wait=False, cancel_futures=True) + return out + + +# ── ChromaDB (vector RAG + vector memory) ── + +def chromadb_health(rag_manager: Any, memory_vector: Any) -> Dict[str, Any]: + """Report on the two ChromaDB-backed stores via their `.healthy` flags. + + Both absent → disabled (Chroma/embeddings not installed or off). + Both healthy → ok. One down → degraded. Both present but unhealthy → down. + """ + rag_present = rag_manager is not None + mem_present = memory_vector is not None + if not rag_present and not mem_present: + return _svc("chromadb", DISABLED, + "Vector RAG and vector memory are not initialized.", + rag=None, memory=None) + + rag_ok = bool(rag_present and getattr(rag_manager, "healthy", False)) + mem_ok = bool(mem_present and getattr(memory_vector, "healthy", False)) + meta = {"rag": rag_ok if rag_present else None, + "memory": mem_ok if mem_present else None} + + healthy = [ok for ok in (rag_ok if rag_present else None, + mem_ok if mem_present else None) if ok is not None] + if healthy and all(healthy): + return _svc("chromadb", OK, "Vector stores healthy.", **meta) + if any(healthy): + return _svc("chromadb", DEGRADED, + "One vector store is unavailable.", **meta) + return _svc("chromadb", DOWN, "Vector stores are unavailable.", **meta) + + +# ── SearXNG ── + +def _searxng_instance(settings: Dict[str, Any]) -> str: + """Mirror src/search/providers.py:_get_search_instance precedence.""" + url = (settings.get("search_url") or "").strip() + if url: + return url.rstrip("/") + from src.constants import SEARXNG_INSTANCE + return SEARXNG_INSTANCE.rstrip("/") + + +def searxng_health(settings: Dict[str, Any], + *, http_get: Callable = _http_get) -> Dict[str, Any]: + """Non-intrusive reachability probe for the configured SearXNG instance. + + Tries `/healthz` (2xx), falling back to the instance root (any non-5xx means + the host answered). No search query is run. The configured instance is + probed in full, but only its sanitized form is returned in `meta`. + """ + provider = (settings.get("search_provider") or "searxng") + if provider != "searxng": + return _svc("searxng", DISABLED, + f"Search provider is '{provider}', not SearXNG.", + provider=provider) + instance = _searxng_instance(settings) + if not instance: + return _svc("searxng", DISABLED, "No SearXNG instance configured.") + safe_instance = _safe_url(instance) + last_category = "error" + for path, accept in (("/healthz", lambda c: 200 <= c < 300), + ("/", lambda c: 0 < c < 500)): + try: + r = http_get(instance + path, timeout=_PROBE_TIMEOUT) + code = getattr(r, "status_code", 0) + if accept(code): + return _svc("searxng", OK, f"Reachable (HTTP {code}).", + instance=safe_instance, probed=path, http_status=code) + last_category = "http_error" + except Exception as e: # connection refused, DNS, timeout, … + last_category = _classify_error(e) + return _svc("searxng", DOWN, f"Unreachable ({_detail_for(last_category)}).", + instance=safe_instance, error=last_category) + + +# ── ntfy ── + +def _ntfy_integration(integrations: List[Dict[str, Any]]) -> Optional[Dict[str, Any]]: + """First enabled ntfy integration with a base_url (matches note_routes).""" + for i in integrations or []: + if (i.get("preset") == "ntfy" and i.get("enabled", True) + and i.get("base_url")): + return i + return None + + +def ntfy_health(integrations: List[Dict[str, Any]], settings: Dict[str, Any], + *, http_get: Callable = _http_get) -> Dict[str, Any]: + """Non-intrusive ntfy probe via the server's built-in `/v1/health` route. + + No test notification is POSTed — `/v1/health` returns `{"healthy":true}` + without publishing to a topic. The request keeps whatever credentials the + configured base_url carries, but `meta.base` is sanitized. + """ + channel = settings.get("reminder_channel") or "browser" + intg = _ntfy_integration(integrations) + if not intg: + return _svc("ntfy", DISABLED, "No ntfy integration configured.", + reminder_channel=channel) + raw = (intg.get("base_url") or "").strip() + parsed = urlparse(raw) + probe_base = (f"{parsed.scheme}://{parsed.netloc}" + if parsed.scheme and parsed.netloc else raw.rstrip("/")) + safe_base = _safe_url(raw) + try: + r = http_get(probe_base + "/v1/health", timeout=_PROBE_TIMEOUT) + code = getattr(r, "status_code", 0) + if code and code < 500: + return _svc("ntfy", OK, f"Reachable (HTTP {code}).", + base=safe_base, reminder_channel=channel, http_status=code) + return _svc("ntfy", DOWN, "Server returned an error response.", + base=safe_base, reminder_channel=channel, error="http_error") + except Exception as e: + category = _classify_error(e) + return _svc("ntfy", DOWN, f"Unreachable ({_detail_for(category)}).", + base=safe_base, reminder_channel=channel, error=category) + + +# ── Email (IMAP) ── + +def email_health(accounts: List[Dict[str, Any]], + *, connect: Optional[Callable] = None) -> Dict[str, Any]: + """Try a short IMAP connect+logout per configured account, concurrently. + + All connect → ok. Some fail → degraded. All fail → down. No account + configured → disabled. Bounded by `_FANOUT_BUDGET` regardless of count. + `meta` carries only the account label and a controlled error category — + never credentials or raw exception text. + """ + if not accounts: + return _svc("email", DISABLED, "No email accounts configured.") + if connect is None: + from routes.email_helpers import _imap_connect + # Impose the service-health budget on the IMAP connect itself. + connect = lambda aid: _imap_connect(aid, timeout=_PROBE_TIMEOUT) # noqa: E731 + + def _label(acc: Dict[str, Any]) -> str: + return acc.get("account_name") or acc.get("account_id") or "account" + + def _check(_i: int, acc: Dict[str, Any]) -> Dict[str, Any]: + name = _label(acc) + if not (acc.get("imap_host") or ""): + return {"name": name, "ok": False, "error": "no_host"} + try: + conn = connect(acc.get("account_id")) + try: + conn.logout() + except Exception: + pass + return {"name": name, "ok": True, "error": None} + except Exception as e: + return {"name": name, "ok": False, "error": _classify_error(e)} + + raw = _bounded_map(accounts, _check, budget=_FANOUT_BUDGET, + concurrency=_PROBE_CONCURRENCY) + per_account = [r if r is not None + else {"name": _label(accounts[i]), "ok": False, "error": "timeout"} + for i, r in enumerate(raw)] + return _rollup_items("email", "mailbox(es)", per_account) + + +# ── Provider endpoints ── + +def providers_health(endpoints: List[Dict[str, Any]], + *, probe: Optional[Callable] = None) -> Dict[str, Any]: + """Probe each enabled model endpoint's model list, concurrently. + + `endpoints` is a list of plain dicts ({name, base_url, api_key}) so this + stays decoupled from the ORM and trivially testable. Non-empty model list + → reachable. Bounded by `_FANOUT_BUDGET` regardless of count. `meta` never + contains api_key or raw URLs — only a display name (or a sanitized URL when + no name is set) and a controlled error category. + """ + if not endpoints: + return _svc("providers", DISABLED, "No model endpoints configured.") + if probe is None: + from routes.model_routes import _probe_endpoint as probe + + def _label(ep: Dict[str, Any]) -> str: + return ep.get("name") or _safe_url(ep.get("base_url")) or "endpoint" + + def _check(_i: int, ep: Dict[str, Any]) -> Dict[str, Any]: + name = _label(ep) + try: + models = probe(ep.get("base_url"), ep.get("api_key"), + timeout=_PROBE_TIMEOUT) or [] + except Exception as e: + return {"name": name, "ok": False, "model_count": 0, + "error": _classify_error(e)} + count = len(models) + return {"name": name, "ok": bool(count), "model_count": count, + "error": None if count else "no_models"} + + raw = _bounded_map(endpoints, _check, budget=_FANOUT_BUDGET, + concurrency=_PROBE_CONCURRENCY) + per_endpoint = [r if r is not None + else {"name": _label(endpoints[i]), "ok": False, + "model_count": 0, "error": "timeout"} + for i, r in enumerate(raw)] + return _rollup_items("providers", "endpoint(s)", per_endpoint, key="endpoints") + + +def _rollup_items(name: str, noun: str, items: List[Dict[str, Any]], + key: str = "accounts") -> Dict[str, Any]: + """Shared ok/degraded/down rollup for a list of per-item probe results.""" + total = len(items) + ok_count = sum(1 for it in items if it.get("ok")) + if ok_count == total: + status, detail = OK, f"{ok_count}/{total} {noun} reachable." + elif ok_count == 0: + status, detail = DOWN, f"No {noun} reachable." + else: + status, detail = DEGRADED, f"{ok_count}/{total} {noun} reachable." + return _svc(name, status, detail, **{key: items}) + + +# ── Aggregate ── + +def _rollup(services: List[Dict[str, Any]]) -> str: + worst = OK + for s in services: + sev = _SEVERITY.get(s.get("status")) + if sev is not None and sev > _SEVERITY[worst]: + worst = s["status"] + return worst + + +def _gather_inputs() -> Dict[str, Any]: + """Pull live config/account/endpoint lists from the app's data sources. + + Each lookup fails soft: a broken source yields an empty/neutral value so a + single failure can't take down the whole health report. + """ + settings: Dict[str, Any] = {} + integrations: List[Dict[str, Any]] = [] + accounts: List[Dict[str, Any]] = [] + endpoints: List[Dict[str, Any]] = [] + try: + from src.settings import load_settings + settings = load_settings() or {} + except Exception as e: + logger.debug(f"service_health: settings load failed: {e}") + try: + from src.integrations import load_integrations + integrations = load_integrations() or [] + except Exception as e: + logger.debug(f"service_health: integrations load failed: {e}") + try: + from routes.email_helpers import _list_email_accounts + accounts = _list_email_accounts() or [] + except Exception as e: + logger.debug(f"service_health: email accounts load failed: {e}") + try: + from core.database import SessionLocal, ModelEndpoint + db = SessionLocal() + try: + rows = db.query(ModelEndpoint).filter( + ModelEndpoint.is_enabled == True).all() # noqa: E712 + endpoints = [{"name": r.name, "base_url": r.base_url, + "api_key": r.api_key} for r in rows] + finally: + db.close() + except Exception as e: + logger.debug(f"service_health: endpoint load failed: {e}") + return {"settings": settings, "integrations": integrations, + "accounts": accounts, "endpoints": endpoints} + + +async def _run_subsystem(name: str, fn: Callable, *args: Any) -> Dict[str, Any]: + """Run one (sync) subsystem probe in a thread under a hard deadline. + + A subsystem that overruns `_SUBSYSTEM_DEADLINE` (or raises) becomes a + controlled `down`/`timeout` entry instead of hanging or leaking the error. + """ + try: + return await asyncio.wait_for(asyncio.to_thread(fn, *args), + timeout=_SUBSYSTEM_DEADLINE) + except asyncio.TimeoutError: + return _svc(name, DOWN, _detail_for("timeout"), error="timeout") + except Exception as e: + category = _classify_error(e) + return _svc(name, DOWN, _detail_for(category), error=category) + + +async def collect_service_health(rag_manager: Any = None, + memory_vector: Any = None) -> Dict[str, Any]: + """Run every probe and return {overall, services, timestamp}. + + Bounded end-to-end: in-process ChromaDB flags are read synchronously; the + four network subsystems run concurrently, each under `_SUBSYSTEM_DEADLINE`, + with an overall `_AGGREGATE_DEADLINE` backstop. Per-item probes inside + providers/email are themselves bounded by `_FANOUT_BUDGET`. + """ + from datetime import datetime, timezone + + inputs = _gather_inputs() + settings = inputs["settings"] + + # ChromaDB is in-process and synchronous (just reads flags). + chroma = chromadb_health(rag_manager, memory_vector) + + names = ["searxng", "ntfy", "email", "providers"] + coros = [ + _run_subsystem("searxng", searxng_health, settings), + _run_subsystem("ntfy", ntfy_health, inputs["integrations"], settings), + _run_subsystem("email", email_health, inputs["accounts"]), + _run_subsystem("providers", providers_health, inputs["endpoints"]), + ] + try: + results = await asyncio.wait_for(asyncio.gather(*coros), + timeout=_AGGREGATE_DEADLINE) + except asyncio.TimeoutError: + # Hard backstop — should not normally fire given per-subsystem deadlines. + results = [_svc(n, DOWN, _detail_for("timeout"), error="timeout") + for n in names] + + services = [chroma, *results] + return { + "overall": _rollup(services), + "services": services, + # Timezone-aware UTC (…+00:00). Avoids the deprecated naive + # datetime.utcnow() flagged in review (overlaps with #1116). + "timestamp": datetime.now(timezone.utc).isoformat(), + } diff --git a/tests/test_diagnostics_service_route.py b/tests/test_diagnostics_service_route.py new file mode 100644 index 000000000..c375a0e64 --- /dev/null +++ b/tests/test_diagnostics_service_route.py @@ -0,0 +1,68 @@ +"""Route-level regression tests for GET /api/diagnostics/services. + +The reviewer asked for explicit coverage of unauthenticated / non-admin / admin +access to this admin diagnostics route, beyond the unit tests for the collector. + +These need a real FastAPI + TestClient (the conftest only stubs FastAPI when it +is *not* installed). When the full app deps aren't present we skip rather than +fail, so the suite stays green in minimal environments; CI installs +requirements, so the tests run there. +""" +import pytest + +fastapi = pytest.importorskip("fastapi") +pytest.importorskip("starlette.testclient") + +from fastapi import FastAPI, HTTPException, Request +from starlette.testclient import TestClient + +# Importing the route module pulls a few app deps; skip cleanly if unavailable. +diag = pytest.importorskip("routes.diagnostics_routes") + + +def _client_with_admin_gate(monkeypatch, gate): + """Mount the diagnostics router with `require_admin` and the collector + patched (via monkeypatch so the module globals are restored afterwards), + and return a TestClient. `gate` plays the role of require_admin.""" + import src.service_health as sh + + async def _fake_collect(_rag, _mem): + return {"overall": "ok", "services": [], "timestamp": "t"} + + # monkeypatch.setattr restores these after the test — a plain assignment + # would leak the fakes into every later test in the session. + monkeypatch.setattr(diag, "require_admin", gate) + monkeypatch.setattr(sh, "collect_service_health", _fake_collect) + + app = FastAPI() + app.include_router(diag.setup_diagnostics_routes( + rag_manager=None, rag_available=False, research_handler=None, + memory_vector=None)) + return TestClient(app, raise_server_exceptions=False) + + +def test_unauthenticated_is_rejected(monkeypatch): + def gate(_request: Request): + raise HTTPException(401, "Not authenticated") + client = _client_with_admin_gate(monkeypatch, gate) + r = client.get("/api/diagnostics/services") + assert r.status_code == 401 + + +def test_non_admin_is_forbidden(monkeypatch): + def gate(_request: Request): + raise HTTPException(403, "Admin only") + client = _client_with_admin_gate(monkeypatch, gate) + r = client.get("/api/diagnostics/services") + assert r.status_code == 403 + + +def test_admin_gets_report(monkeypatch): + def gate(_request: Request): + return None # admin allowed + client = _client_with_admin_gate(monkeypatch, gate) + r = client.get("/api/diagnostics/services") + assert r.status_code == 200 + body = r.json() + assert set(body) == {"overall", "services", "timestamp"} + assert body["overall"] == "ok" diff --git a/tests/test_service_health.py b/tests/test_service_health.py new file mode 100644 index 000000000..56283cef8 --- /dev/null +++ b/tests/test_service_health.py @@ -0,0 +1,472 @@ +"""Tests for src.service_health — the consolidated degraded-state report. + +Imports the real module (conftest.py stubs the heavy deps). Network is never +touched: HTTP probes take an injected `http_get`, and the email/provider probes +take an injected `connect` / `probe`. Asserts the ok/degraded/down/disabled +mapping per subsystem, the overall rollup, and that no secrets leak into meta. +""" +import types + +import pytest + +from src import service_health as sh + + +def _resp(status_code): + return types.SimpleNamespace(status_code=status_code) + + +def _raise(*_a, **_k): + raise RuntimeError("connection refused") + + +# ── chromadb_health ── + +class _Store: + def __init__(self, healthy): + self.healthy = healthy + + +def test_chromadb_both_healthy_ok(): + s = sh.chromadb_health(_Store(True), _Store(True)) + assert s["status"] == sh.OK + assert s["meta"] == {"rag": True, "memory": True} + + +def test_chromadb_one_down_degraded(): + s = sh.chromadb_health(_Store(True), _Store(False)) + assert s["status"] == sh.DEGRADED + + +def test_chromadb_both_unhealthy_down(): + s = sh.chromadb_health(_Store(False), _Store(False)) + assert s["status"] == sh.DOWN + + +def test_chromadb_both_absent_disabled(): + s = sh.chromadb_health(None, None) + assert s["status"] == sh.DISABLED + + +def test_chromadb_one_absent_one_healthy_ok(): + # An absent store is not a failure; the present one being healthy is ok. + s = sh.chromadb_health(_Store(True), None) + assert s["status"] == sh.OK + assert s["meta"]["memory"] is None + + +# ── searxng_health ── + +def test_searxng_disabled_when_other_provider(): + s = sh.searxng_health({"search_provider": "brave"}) + assert s["status"] == sh.DISABLED + + +def test_searxng_ok_on_healthz(): + s = sh.searxng_health( + {"search_provider": "searxng", "search_url": "http://sx:8080"}, + http_get=lambda url, timeout: _resp(200), + ) + assert s["status"] == sh.OK + assert s["meta"]["probed"] == "/healthz" + + +def test_searxng_ok_on_root_fallback(): + def getter(url, timeout): + return _resp(404) if url.endswith("/healthz") else _resp(200) + + s = sh.searxng_health( + {"search_provider": "searxng", "search_url": "http://sx:8080"}, + http_get=getter, + ) + assert s["status"] == sh.OK + assert s["meta"]["probed"] == "/" + + +def test_searxng_down_on_exception(): + s = sh.searxng_health( + {"search_provider": "searxng", "search_url": "http://sx:8080"}, + http_get=_raise, + ) + assert s["status"] == sh.DOWN + + +def test_searxng_down_on_5xx(): + s = sh.searxng_health( + {"search_provider": "searxng", "search_url": "http://sx:8080"}, + http_get=lambda url, timeout: _resp(502), + ) + assert s["status"] == sh.DOWN + + +# ── ntfy_health ── + +def _ntfy_intg(): + return [{"preset": "ntfy", "enabled": True, "base_url": "http://ntfy:80"}] + + +def test_ntfy_disabled_without_integration(): + s = sh.ntfy_health([], {"reminder_channel": "ntfy"}) + assert s["status"] == sh.DISABLED + + +def test_ntfy_ok(): + s = sh.ntfy_health(_ntfy_intg(), {"reminder_channel": "ntfy"}, + http_get=lambda url, timeout: _resp(200)) + assert s["status"] == sh.OK + assert s["meta"]["base"] == "http://ntfy:80" + + +def test_ntfy_probes_v1_health_not_a_topic(): + seen = {} + + def getter(url, timeout): + seen["url"] = url + return _resp(200) + + sh.ntfy_health(_ntfy_intg(), {"reminder_channel": "ntfy"}, http_get=getter) + # Non-intrusive: hits /v1/health, never publishes to a topic. + assert seen["url"].endswith("/v1/health") + + +def test_ntfy_down_on_exception(): + s = sh.ntfy_health(_ntfy_intg(), {"reminder_channel": "ntfy"}, + http_get=_raise) + assert s["status"] == sh.DOWN + + +# ── email_health ── + +def _acct(name, host="imap.example.com"): + return {"account_id": name, "account_name": name, "imap_host": host, + "imap_password": "hunter2"} + + +class _Conn: + def logout(self): + pass + + +def test_email_disabled_without_accounts(): + assert sh.email_health([])["status"] == sh.DISABLED + + +def test_email_ok_all_connect(): + s = sh.email_health([_acct("a"), _acct("b")], connect=lambda _id: _Conn()) + assert s["status"] == sh.OK + + +def test_email_degraded_some_fail(): + def connect(account_id): + if account_id == "bad": + raise RuntimeError("auth failed") + return _Conn() + + s = sh.email_health([_acct("good"), _acct("bad")], connect=connect) + assert s["status"] == sh.DEGRADED + + +def test_email_down_all_fail(): + s = sh.email_health([_acct("a")], connect=_raise) + assert s["status"] == sh.DOWN + + +def test_email_account_without_host_marked_failed(): + s = sh.email_health([_acct("a", host="")], connect=lambda _id: _Conn()) + assert s["status"] == sh.DOWN + + +def test_email_meta_never_leaks_password(): + s = sh.email_health([_acct("a")], connect=lambda _id: _Conn()) + assert "hunter2" not in repr(s) + + +# ── providers_health ── + +def _ep(name): + return {"name": name, "base_url": f"http://{name}:8000/v1", "api_key": "sk-secret"} + + +def test_providers_disabled_without_endpoints(): + assert sh.providers_health([])["status"] == sh.DISABLED + + +def test_providers_ok_all_reachable(): + s = sh.providers_health([_ep("a")], + probe=lambda base, key, timeout: ["m1", "m2"]) + assert s["status"] == sh.OK + assert s["meta"]["endpoints"][0]["model_count"] == 2 + + +def test_providers_degraded_some_empty(): + def probe(base, key, timeout): + return ["m1"] if "good" in base else [] + + s = sh.providers_health([_ep("good"), _ep("bad")], probe=probe) + assert s["status"] == sh.DEGRADED + + +def test_providers_down_all_fail(): + s = sh.providers_health([_ep("a")], probe=_raise) + assert s["status"] == sh.DOWN + + +def test_providers_meta_never_leaks_api_key(): + s = sh.providers_health([_ep("a")], + probe=lambda base, key, timeout: ["m1"]) + assert "sk-secret" not in repr(s) + + +# ── rollup ── + +def test_rollup_picks_worst_non_disabled(): + services = [ + {"status": sh.OK}, {"status": sh.DISABLED}, + {"status": sh.DEGRADED}, {"status": sh.OK}, + ] + assert sh._rollup(services) == sh.DEGRADED + + +def test_rollup_down_beats_degraded(): + assert sh._rollup([{"status": sh.DEGRADED}, {"status": sh.DOWN}]) == sh.DOWN + + +def test_rollup_all_disabled_is_ok(): + assert sh._rollup([{"status": sh.DISABLED}, {"status": sh.DISABLED}]) == sh.OK + + +# ── collect_service_health (async aggregate) ── + +def test_collect_service_health_shape(monkeypatch): + import asyncio + + # Avoid touching real data sources / network. + monkeypatch.setattr(sh, "_gather_inputs", lambda: { + "settings": {"search_provider": "disabled"}, + "integrations": [], + "accounts": [], + "endpoints": [], + }) + out = asyncio.run(sh.collect_service_health(_Store(True), _Store(True))) + assert set(out) == {"overall", "services", "timestamp"} + names = {s["name"] for s in out["services"]} + assert names == {"chromadb", "searxng", "ntfy", "email", "providers"} + # Chroma healthy, everything else disabled → overall ok. + assert out["overall"] == sh.OK + + +# ── _safe_url: strip userinfo / query / fragment ── + +@pytest.mark.parametrize("raw,expected", [ + ("http://user:pass@host:8080/path?api_key=secret#frag", "http://host:8080/path"), + ("https://admin:hunter2@searx.example.com/", "https://searx.example.com"), + ("http://ntfy.local:80?token=abc", "http://ntfy.local:80"), + ("host:8080", "host:8080"), + ("", ""), + (None, ""), +]) +def test_safe_url_strips_secrets(raw, expected): + out = sh._safe_url(raw) + assert out == expected + for bad in ("pass", "secret", "hunter2", "abc", "token", "@"): + if raw and bad in raw and bad not in expected: + assert bad not in out + + +# ── _classify_error: controlled categories, never raw text ── + +def test_classify_error_categories(): + import socket + assert sh._classify_error(TimeoutError()) == "timeout" + assert sh._classify_error(socket.timeout()) == "timeout" + assert sh._classify_error(socket.gaierror()) == "dns_error" + assert sh._classify_error(ConnectionRefusedError()) == "connection_refused" + assert sh._classify_error(OSError("boom")) == "network_error" + assert sh._classify_error(ValueError("x")) == "error" + + +# ── Sanitization in subsystem output (blocker #2) ── + +def test_searxng_meta_redacts_instance_url(): + s = sh.searxng_health( + {"search_provider": "searxng", + "search_url": "http://user:s3cr3t@searx.local:8080/?token=zzz"}, + http_get=lambda url, timeout: _resp(200), + ) + blob = repr(s) + assert "s3cr3t" not in blob and "zzz" not in blob and "user:" not in blob + assert s["meta"]["instance"] == "http://searx.local:8080" + + +def test_searxng_down_uses_error_category_not_raw_exception(): + def boom(url, timeout): + raise RuntimeError("failed connecting to http://user:pw@searx.local secret-token") + s = sh.searxng_health( + {"search_provider": "searxng", "search_url": "http://searx.local"}, + http_get=boom, + ) + assert s["status"] == sh.DOWN + assert s["meta"]["error"] == "error" # controlled category token + assert "secret-token" not in repr(s) and "pw@" not in repr(s) + + +def test_ntfy_meta_redacts_userinfo_in_base(): + intg = [{"preset": "ntfy", "enabled": True, + "base_url": "https://user:topsecret@ntfy.example.com"}] + seen = {} + + def getter(url, timeout): + seen["url"] = url # the probe itself may keep credentials + return _resp(200) + + s = sh.ntfy_health(intg, {"reminder_channel": "ntfy"}, http_get=getter) + assert s["meta"]["base"] == "https://ntfy.example.com" + assert "topsecret" not in repr(s) + + +def test_providers_name_fallback_is_sanitized(): + # No display name → falls back to the base_url, which must be sanitized. + ep = {"base_url": "http://user:k3y@prov.local:9000/v1?api_key=zzz", "api_key": "sk-x"} + s = sh.providers_health([ep], probe=lambda b, k, t: ["m1"]) + entry = s["meta"]["endpoints"][0] + assert entry["name"] == "http://prov.local:9000/v1" + assert "k3y" not in repr(s) and "zzz" not in repr(s) and "sk-x" not in repr(s) + + +def test_providers_probe_exception_maps_to_category(): + def boom(base, key, timeout): + raise RuntimeError(f"500 from {base} with key {key}") # would leak base+key + s = sh.providers_health([_ep("a")], probe=boom) + assert s["status"] == sh.DOWN + assert s["meta"]["endpoints"][0]["error"] == "error" + assert "sk-secret" not in repr(s) and "http://a" not in repr(s) + + +def test_email_connect_exception_maps_to_category(): + def boom(account_id): + raise RuntimeError("login failed for user bob with password hunter2") + s = sh.email_health([_acct("a")], connect=boom) + assert s["status"] == sh.DOWN + assert s["meta"]["accounts"][0]["error"] == "error" + assert "hunter2" not in repr(s) + + +# ── Bounded wall-clock (blocker #1) ── + +def test_providers_bounded_marks_slow_as_timeout(monkeypatch): + import time + monkeypatch.setattr(sh, "_FANOUT_BUDGET", 1) + + def probe(base, key, timeout): + if "slow" in base: + time.sleep(10) # would blow the budget if unbounded + return ["m1"] + + eps = [{"name": "fast", "base_url": "http://fast", "api_key": "k"}, + {"name": "slow", "base_url": "http://slow", "api_key": "k"}] + t0 = time.monotonic() + out = sh.providers_health(eps, probe=probe) + elapsed = time.monotonic() - t0 + assert elapsed < 4, f"providers_health not bounded: took {elapsed:.1f}s" + by = {e["name"]: e for e in out["meta"]["endpoints"]} + assert by["fast"]["ok"] is True + assert by["slow"]["ok"] is False and by["slow"]["error"] == "timeout" + assert out["status"] == sh.DEGRADED + + +def test_providers_bounded_with_many_slow_endpoints(monkeypatch): + import time + monkeypatch.setattr(sh, "_FANOUT_BUDGET", 1) + + def probe(base, key, timeout): + time.sleep(10) + return ["m1"] + + eps = [{"name": f"ep{i}", "base_url": f"http://ep{i}", "api_key": "k"} + for i in range(25)] + t0 = time.monotonic() + out = sh.providers_health(eps, probe=probe) + elapsed = time.monotonic() - t0 + # 25 endpoints * sleep would be huge if sequential; bounded keeps it ~budget. + assert elapsed < 4, f"not bounded with many endpoints: {elapsed:.1f}s" + assert out["status"] == sh.DOWN + assert all(e["error"] == "timeout" for e in out["meta"]["endpoints"]) + + +def test_email_bounded_marks_slow_as_timeout(monkeypatch): + import time + monkeypatch.setattr(sh, "_FANOUT_BUDGET", 1) + + def connect(account_id): + if account_id == "slow": + time.sleep(10) + return _Conn() + + accts = [_acct("fast"), _acct("slow")] + accts[1]["account_id"] = "slow" + t0 = time.monotonic() + out = sh.email_health(accts, connect=connect) + elapsed = time.monotonic() - t0 + assert elapsed < 4, f"email_health not bounded: took {elapsed:.1f}s" + by = {a["name"]: a for a in out["meta"]["accounts"]} + assert by["slow"]["error"] == "timeout" + + +def test_collect_runs_subsystems_concurrently(monkeypatch): + # The aggregate is bounded by running the (internally-bounded) subsystems + # concurrently, so total wall-clock ≈ max(subsystem), not the sum. Each of + # the four network subsystems here sleeps ~0.6s; sequential would be ~2.4s. + import asyncio + import time + monkeypatch.setattr(sh, "_gather_inputs", lambda: { + "settings": {}, "integrations": [], "accounts": [], "endpoints": [], + }) + + def slow(name): + def _fn(*_a, **_k): + time.sleep(0.6) + return {"name": name, "status": sh.OK, "detail": "", "meta": {}} + return _fn + + monkeypatch.setattr(sh, "searxng_health", slow("searxng")) + monkeypatch.setattr(sh, "ntfy_health", slow("ntfy")) + monkeypatch.setattr(sh, "email_health", slow("email")) + monkeypatch.setattr(sh, "providers_health", slow("providers")) + + t0 = time.monotonic() + out = asyncio.run(sh.collect_service_health(None, None)) + elapsed = time.monotonic() - t0 + assert elapsed < 1.5, f"subsystems not concurrent: took {elapsed:.1f}s" + assert {s["name"] for s in out["services"]} == { + "chromadb", "searxng", "ntfy", "email", "providers"} + + +def test_collect_aggregate_deadline_yields_controlled_result(monkeypatch): + # If the gather overruns the aggregate ceiling, the response is still a + # controlled {overall, services, timestamp} with each network subsystem + # marked down/timeout — never a hang or a raised exception. + import asyncio + import time + monkeypatch.setattr(sh, "_AGGREGATE_DEADLINE", 0.5) + monkeypatch.setattr(sh, "_SUBSYSTEM_DEADLINE", 0.4) + monkeypatch.setattr(sh, "_gather_inputs", lambda: { + "settings": {}, "integrations": [], "accounts": [], "endpoints": [], + }) + + async def _slow_gather(*coros, **_k): + for c in coros: # close unawaited coros to avoid warnings + close = getattr(c, "close", None) + if close: + close() + await asyncio.sleep(5) + + # Force the outer wait_for to trip by making gather itself slow. + monkeypatch.setattr(sh.asyncio, "gather", _slow_gather) + t0 = time.monotonic() + out = asyncio.run(sh.collect_service_health(None, None)) + elapsed = time.monotonic() - t0 + assert elapsed < 2, f"aggregate deadline did not bound: {elapsed:.1f}s" + assert set(out) == {"overall", "services", "timestamp"} + net = [s for s in out["services"] if s["name"] != "chromadb"] + assert all(s["status"] == sh.DOWN and s["meta"].get("error") == "timeout" + for s in net)