feat(diagnostics): add consolidated service health endpoint for degraded-state reporting (#964)

* Add consolidated service health endpoint for degraded-state reporting

ROADMAP (High Priority) asks for "Better degraded-state reporting for
ChromaDB, SearXNG, email, ntfy, and provider probes." Until now there was no
single readout of which subsystems are actually working: /api/health is only a
liveness ping and each subsystem's signal lives in a different module, so a
misconfigured self-host install gives no consolidated picture.

This adds an admin-only GET /api/diagnostics/services endpoint backed by a new
src/service_health.py aggregator. Each subsystem reports a uniform
{name, status, detail, meta} where status is ok | degraded | down | disabled,
and the response rolls up an overall verdict (worst non-disabled status).

Probes are deliberately non-intrusive and safe to poll:
- ChromaDB: reads the .healthy flags on the RAG and memory vector stores.
- SearXNG: GET /healthz (2xx), falling back to the instance root (<500). No
  search query is run.
- ntfy: GET the server's built-in /v1/health. No test notification is sent.
- email: short IMAP connect+logout per configured account (no credentials in
  meta).
- providers: probe each enabled ModelEndpoint's model list (no api_key in meta).

Probe functions take their inputs as parameters and isolate the network call to
injectable callables, so they unit-test without touching the network (same
pattern as the merged provider-endpoint tests). Network probes run concurrently
off the event loop via asyncio.to_thread with bounded per-probe timeouts.

memory_vector is now passed into setup_diagnostics_routes (new optional param,
backward-compatible) so ChromaDB's vector-memory store can be reported too.

Tests: tests/test_service_health.py — 29 tests covering every status mapping
per subsystem, the overall rollup, and that no secrets leak into meta.

Verification:
  python -m pytest tests/test_service_health.py -q          # 29 passed
  python -m py_compile src/service_health.py routes/diagnostics_routes.py app.py
  python -m pytest tests/test_endpoint_resolver.py tests/test_provider_endpoints.py -q

Backend + tests only; an Admin/Settings UI badge that renders this endpoint is
a natural follow-up.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* fix(diagnostics): bound service-health wall-clock and redact secrets

Addresses review on #964.

Blocker 1 — genuinely bounded wall-clock:
- providers_health and email_health now fan out per-item probes across a
  bounded thread pool (_bounded_map) with a hard total budget (_FANOUT_BUDGET),
  instead of probing endpoints/accounts sequentially. Stragglers are reported
  as a controlled `timeout` and never block; the pool is shut down with
  wait=False so the response returns on time regardless of endpoint/account
  count.
- The IMAP connect path now honors the service-health budget: _imap_connect
  gained a pass-through `timeout` param and the probe calls it with
  _PROBE_TIMEOUT instead of the default 15s.
- collect_service_health runs the four network subsystems concurrently, each
  under a per-subsystem deadline (_SUBSYSTEM_DEADLINE), with an overall
  wait_for ceiling (_AGGREGATE_DEADLINE) as a backstop.

Blocker 2 — no secret/raw-error leakage in the response:
- _safe_url strips userinfo, query, and fragment from every URL surfaced in
  meta (searxng instance, ntfy base, provider name fallback), keeping only
  scheme/host/port/path.
- _classify_error maps every probe failure to a controlled category token
  (timeout, connection_refused, dns_error, tls_error, network_error,
  http_error, auth_or_protocol_error, …) — raw str(exception), which can embed
  credentialed URLs or server text, is never returned.

Tests (tests/test_service_health.py, +tests/test_diagnostics_service_route.py):
- URL userinfo/query redaction for searxng/ntfy/providers.
- secret-bearing exception strings map to categories and don't leak.
- multiple slow providers/accounts stay bounded (single + 25-endpoint cases).
- subsystems run concurrently; aggregate deadline yields a controlled result.
- route-level unauthenticated (401) / non-admin (403) / admin (200) coverage.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* test(diagnostics): isolate route tests so they don't leak module globals

The new route tests replaced src.service_health.collect_service_health and
routes.diagnostics_routes.require_admin via direct assignment, which persisted
for the rest of the pytest session. In CI's full alphabetical run that fake
collector (returning services=[]) leaked into the later collect_service_health
tests and failed them. Switch to monkeypatch.setattr so both are restored after
each test. No production code change.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
Co-authored-by: Alexandre Teixeira <111787685+alteixeira20@users.noreply.github.com>
This commit is contained in:
Sheikh Rahat Mahmud
2026-06-09 21:00:24 +06:00
committed by GitHub
parent c1674fc2aa
commit 9180847c0e
6 changed files with 1062 additions and 3 deletions
+1 -1
View File
@@ -577,7 +577,7 @@ app.include_router(setup_preset_routes(preset_manager))
# Diagnostics # Diagnostics
from routes.diagnostics_routes import setup_diagnostics_routes from routes.diagnostics_routes import setup_diagnostics_routes
app.include_router(setup_diagnostics_routes(rag_manager, rag_available, research_handler)) app.include_router(setup_diagnostics_routes(rag_manager, rag_available, research_handler, memory_vector))
# Cleanup # Cleanup
from routes.cleanup_routes import setup_cleanup_routes from routes.cleanup_routes import setup_cleanup_routes
+9
View File
@@ -16,9 +16,18 @@ def setup_diagnostics_routes(
rag_manager, rag_manager,
rag_available: bool, rag_available: bool,
research_handler, research_handler,
memory_vector=None,
) -> APIRouter: ) -> APIRouter:
router = APIRouter(tags=["diagnostics"]) router = APIRouter(tags=["diagnostics"])
@router.get("/api/diagnostics/services")
async def get_service_health(request: Request) -> Dict[str, Any]:
"""Consolidated degraded-state report for ChromaDB, SearXNG, email,
ntfy, and provider endpoints. Non-intrusive probes — safe to poll."""
require_admin(request)
from src.service_health import collect_service_health
return await collect_service_health(rag_manager, memory_vector)
@router.get("/api/db/stats") @router.get("/api/db/stats")
async def get_database_stats(request: Request) -> Dict[str, Any]: async def get_database_stats(request: Request) -> Dict[str, Any]:
require_admin(request) require_admin(request)
+6 -2
View File
@@ -762,10 +762,14 @@ def _open_imap_connection(host: str, port: int, *, starttls: bool, timeout: int
imaplib._MAXLINE = 50_000_000 imaplib._MAXLINE = 50_000_000
return conn return conn
def _imap_connect(account_id: str | None = None, owner: str = ""): def _imap_connect(account_id: str | None = None, owner: str = "",
timeout: int = _IMAP_TIMEOUT_SECONDS):
# SECURITY: passing `owner` scopes the fallback config lookup so a brand # SECURITY: passing `owner` scopes the fallback config lookup so a brand
# new user doesn't get connected against another user's default mailbox # new user doesn't get connected against another user's default mailbox
# when they have no account configured. # when they have no account configured.
#
# `timeout` is overridable so short-lived callers (e.g. the service-health
# probe) can impose a tighter budget than the default IMAP timeout.
cfg = _get_email_config(account_id, owner=owner) cfg = _get_email_config(account_id, owner=owner)
# Connection mode: # Connection mode:
# STARTTLS on → plain + upgrade # STARTTLS on → plain + upgrade
@@ -778,7 +782,7 @@ def _imap_connect(account_id: str | None = None, owner: str = ""):
cfg["imap_host"], cfg["imap_host"],
cfg["imap_port"], cfg["imap_port"],
starttls=bool(cfg.get("imap_starttls")), starttls=bool(cfg.get("imap_starttls")),
timeout=_IMAP_TIMEOUT_SECONDS, timeout=timeout,
) )
try: try:
conn.login(cfg["imap_user"], cfg["imap_password"]) conn.login(cfg["imap_user"], cfg["imap_password"])
+506
View File
@@ -0,0 +1,506 @@
"""Consolidated service health / degraded-state reporting.
ROADMAP: "Better degraded-state reporting for ChromaDB, SearXNG, email, ntfy,
and provider probes." There was no single readout of which subsystems are
actually working `/api/health` is only a liveness ping and each subsystem's
signal lives in a different module. This collects them into one uniform,
*non-intrusive* report (no test push is sent, no real search is run), so the
admin endpoint built on top of it is safe to poll.
Each probe returns:
{"name": str, "status": "ok"|"degraded"|"down"|"disabled",
"detail": str, "meta": dict}
- ok reachable / working
- degraded partially working (one of several components down)
- down configured & enabled but unreachable / erroring
- disabled not configured or turned off (not counted as a failure)
Design notes (driven by review feedback):
- **Bounded wall-clock.** Per-item probes (providers, email accounts) fan out
across a bounded thread pool with a hard total budget (`_FANOUT_BUDGET`);
stragglers are reported as a controlled `timeout` rather than blocking. The
aggregate adds a per-subsystem deadline (`_SUBSYSTEM_DEADLINE`) and an overall
ceiling (`_AGGREGATE_DEADLINE`), so the endpoint cannot hang regardless of how
many endpoints/accounts are configured or how slowly they respond.
- **No secret leakage.** Even though the endpoint is admin-only, the response
never returns credential-bearing URLs or raw exception text: URLs are passed
through `_safe_url` (userinfo / query / fragment stripped) and failures are
mapped to controlled categories via `_classify_error`.
The probe functions take their inputs as parameters (settings dict, account
list, endpoint list, manager objects) and isolate the network call to
``_http_get`` / injected callables, so they unit-test without touching the
network.
"""
import asyncio
import concurrent.futures
import logging
import socket
import ssl
import time
from typing import Any, Callable, Dict, List, Optional
from urllib.parse import urlparse
logger = logging.getLogger(__name__)
# Status ordering for rolling up an overall verdict. "disabled" is excluded —
# a turned-off feature must never drag the overall status down.
_SEVERITY = {"ok": 0, "degraded": 1, "down": 2}
OK = "ok"
DEGRADED = "degraded"
DOWN = "down"
DISABLED = "disabled"
# Timing budgets (seconds). _PROBE_TIMEOUT bounds a single network op;
# _FANOUT_BUDGET bounds a whole fan-out (providers/email) regardless of count;
# the aggregate layer adds a per-subsystem deadline and an overall ceiling.
_PROBE_TIMEOUT = 4
_PROBE_CONCURRENCY = 8
_FANOUT_BUDGET = 8
_SUBSYSTEM_DEADLINE = 10
_AGGREGATE_DEADLINE = 14
# Controlled, secret-free phrasing for each failure category.
_ERROR_DETAIL = {
"timeout": "probe timed out",
"connection_refused": "connection refused",
"dns_error": "host could not be resolved",
"tls_error": "TLS handshake failed",
"network_error": "network error",
"http_error": "server returned an error response",
"auth_or_protocol_error": "authentication or protocol error",
"no_models": "endpoint returned no models",
"no_host": "no host configured",
"error": "probe failed",
}
def _svc(name: str, status: str, detail: str, **meta: Any) -> Dict[str, Any]:
return {"name": name, "status": status, "detail": detail, "meta": dict(meta)}
def _safe_url(url: Optional[str]) -> str:
"""Strip credentials (userinfo), query, and fragment from a URL.
Keeps scheme / host / port / path so the report is still useful, but never
echoes `user:pass@`, `?api_key=`, or `#…` back to the caller. Returns
"<redacted>" if the URL can't be parsed into at least a host.
"""
if not url:
return ""
raw = url.strip()
try:
p = urlparse(raw if "://" in raw else "//" + raw)
host = p.hostname or ""
if not host:
return "<redacted>"
netloc = f"{host}:{p.port}" if p.port else host
path = (p.path or "").rstrip("/")
scheme = f"{p.scheme}://" if p.scheme else ""
return f"{scheme}{netloc}{path}"
except Exception:
return "<redacted>"
def _classify_error(exc: BaseException) -> str:
"""Map an exception to a controlled, secret-free category token.
Never returns `str(exc)` httpx/imaplib exception text can embed the target
URL (which may carry credentials) or server-supplied detail.
"""
if isinstance(exc, (asyncio.TimeoutError, concurrent.futures.TimeoutError,
TimeoutError, socket.timeout)):
return "timeout"
name = type(exc).__name__
mod = (type(exc).__module__ or "")
if isinstance(exc, ssl.SSLError) or "SSL" in name or "Certificate" in name:
return "tls_error"
if isinstance(exc, socket.gaierror) or name in ("gaierror", "herror"):
return "dns_error"
if isinstance(exc, ConnectionRefusedError) or "ConnectionRefused" in name \
or name in ("ConnectError",):
return "connection_refused"
if "Timeout" in name:
return "timeout"
if mod.startswith("imaplib") or name in ("error", "abort", "readonly"):
return "auth_or_protocol_error"
if name == "HTTPStatusError":
return "http_error"
if name in ("ConnectTimeout", "ReadTimeout", "ReadError", "WriteError",
"PoolTimeout", "RemoteProtocolError", "NetworkError",
"ProxyError", "ProtocolError"):
return "network_error"
if isinstance(exc, OSError):
return "network_error"
return "error"
def _detail_for(category: str) -> str:
return _ERROR_DETAIL.get(category, _ERROR_DETAIL["error"])
def _http_get(url: str, timeout: float = _PROBE_TIMEOUT):
"""Single network entry point for the HTTP probes (monkeypatched in tests)."""
import httpx
return httpx.get(url, timeout=timeout)
def _bounded_map(items: List[Any], worker: Callable[[int, Any], Dict[str, Any]],
*, budget: float = _FANOUT_BUDGET,
concurrency: int = _PROBE_CONCURRENCY) -> List[Optional[Dict[str, Any]]]:
"""Run ``worker(index, item)`` across a bounded thread pool, in order.
`worker` must catch its own exceptions and return a per-item dict. Any item
not finished within `budget` seconds *in total* is left as ``None`` (the
caller substitutes a controlled `timeout` entry). The pool is shut down with
``wait=False`` so stragglers never block the response their own per-op
timeout reaps them shortly after.
"""
n = len(items)
out: List[Optional[Dict[str, Any]]] = [None] * n
if n == 0:
return out
ex = concurrent.futures.ThreadPoolExecutor(max_workers=max(1, min(concurrency, n)))
futures = {ex.submit(worker, i, items[i]): i for i in range(n)}
try:
for fut in concurrent.futures.as_completed(futures, timeout=budget):
i = futures[fut]
try:
out[i] = fut.result()
except Exception as e: # worker is expected to handle its own errors
out[i] = {"ok": False, "error": _classify_error(e)}
except concurrent.futures.TimeoutError:
pass # unfinished items stay None → marked timeout by the caller
finally:
ex.shutdown(wait=False, cancel_futures=True)
return out
# ── ChromaDB (vector RAG + vector memory) ──
def chromadb_health(rag_manager: Any, memory_vector: Any) -> Dict[str, Any]:
"""Report on the two ChromaDB-backed stores via their `.healthy` flags.
Both absent disabled (Chroma/embeddings not installed or off).
Both healthy ok. One down degraded. Both present but unhealthy down.
"""
rag_present = rag_manager is not None
mem_present = memory_vector is not None
if not rag_present and not mem_present:
return _svc("chromadb", DISABLED,
"Vector RAG and vector memory are not initialized.",
rag=None, memory=None)
rag_ok = bool(rag_present and getattr(rag_manager, "healthy", False))
mem_ok = bool(mem_present and getattr(memory_vector, "healthy", False))
meta = {"rag": rag_ok if rag_present else None,
"memory": mem_ok if mem_present else None}
healthy = [ok for ok in (rag_ok if rag_present else None,
mem_ok if mem_present else None) if ok is not None]
if healthy and all(healthy):
return _svc("chromadb", OK, "Vector stores healthy.", **meta)
if any(healthy):
return _svc("chromadb", DEGRADED,
"One vector store is unavailable.", **meta)
return _svc("chromadb", DOWN, "Vector stores are unavailable.", **meta)
# ── SearXNG ──
def _searxng_instance(settings: Dict[str, Any]) -> str:
"""Mirror src/search/providers.py:_get_search_instance precedence."""
url = (settings.get("search_url") or "").strip()
if url:
return url.rstrip("/")
from src.constants import SEARXNG_INSTANCE
return SEARXNG_INSTANCE.rstrip("/")
def searxng_health(settings: Dict[str, Any],
*, http_get: Callable = _http_get) -> Dict[str, Any]:
"""Non-intrusive reachability probe for the configured SearXNG instance.
Tries `/healthz` (2xx), falling back to the instance root (any non-5xx means
the host answered). No search query is run. The configured instance is
probed in full, but only its sanitized form is returned in `meta`.
"""
provider = (settings.get("search_provider") or "searxng")
if provider != "searxng":
return _svc("searxng", DISABLED,
f"Search provider is '{provider}', not SearXNG.",
provider=provider)
instance = _searxng_instance(settings)
if not instance:
return _svc("searxng", DISABLED, "No SearXNG instance configured.")
safe_instance = _safe_url(instance)
last_category = "error"
for path, accept in (("/healthz", lambda c: 200 <= c < 300),
("/", lambda c: 0 < c < 500)):
try:
r = http_get(instance + path, timeout=_PROBE_TIMEOUT)
code = getattr(r, "status_code", 0)
if accept(code):
return _svc("searxng", OK, f"Reachable (HTTP {code}).",
instance=safe_instance, probed=path, http_status=code)
last_category = "http_error"
except Exception as e: # connection refused, DNS, timeout, …
last_category = _classify_error(e)
return _svc("searxng", DOWN, f"Unreachable ({_detail_for(last_category)}).",
instance=safe_instance, error=last_category)
# ── ntfy ──
def _ntfy_integration(integrations: List[Dict[str, Any]]) -> Optional[Dict[str, Any]]:
"""First enabled ntfy integration with a base_url (matches note_routes)."""
for i in integrations or []:
if (i.get("preset") == "ntfy" and i.get("enabled", True)
and i.get("base_url")):
return i
return None
def ntfy_health(integrations: List[Dict[str, Any]], settings: Dict[str, Any],
*, http_get: Callable = _http_get) -> Dict[str, Any]:
"""Non-intrusive ntfy probe via the server's built-in `/v1/health` route.
No test notification is POSTed `/v1/health` returns `{"healthy":true}`
without publishing to a topic. The request keeps whatever credentials the
configured base_url carries, but `meta.base` is sanitized.
"""
channel = settings.get("reminder_channel") or "browser"
intg = _ntfy_integration(integrations)
if not intg:
return _svc("ntfy", DISABLED, "No ntfy integration configured.",
reminder_channel=channel)
raw = (intg.get("base_url") or "").strip()
parsed = urlparse(raw)
probe_base = (f"{parsed.scheme}://{parsed.netloc}"
if parsed.scheme and parsed.netloc else raw.rstrip("/"))
safe_base = _safe_url(raw)
try:
r = http_get(probe_base + "/v1/health", timeout=_PROBE_TIMEOUT)
code = getattr(r, "status_code", 0)
if code and code < 500:
return _svc("ntfy", OK, f"Reachable (HTTP {code}).",
base=safe_base, reminder_channel=channel, http_status=code)
return _svc("ntfy", DOWN, "Server returned an error response.",
base=safe_base, reminder_channel=channel, error="http_error")
except Exception as e:
category = _classify_error(e)
return _svc("ntfy", DOWN, f"Unreachable ({_detail_for(category)}).",
base=safe_base, reminder_channel=channel, error=category)
# ── Email (IMAP) ──
def email_health(accounts: List[Dict[str, Any]],
*, connect: Optional[Callable] = None) -> Dict[str, Any]:
"""Try a short IMAP connect+logout per configured account, concurrently.
All connect ok. Some fail degraded. All fail down. No account
configured disabled. Bounded by `_FANOUT_BUDGET` regardless of count.
`meta` carries only the account label and a controlled error category
never credentials or raw exception text.
"""
if not accounts:
return _svc("email", DISABLED, "No email accounts configured.")
if connect is None:
from routes.email_helpers import _imap_connect
# Impose the service-health budget on the IMAP connect itself.
connect = lambda aid: _imap_connect(aid, timeout=_PROBE_TIMEOUT) # noqa: E731
def _label(acc: Dict[str, Any]) -> str:
return acc.get("account_name") or acc.get("account_id") or "account"
def _check(_i: int, acc: Dict[str, Any]) -> Dict[str, Any]:
name = _label(acc)
if not (acc.get("imap_host") or ""):
return {"name": name, "ok": False, "error": "no_host"}
try:
conn = connect(acc.get("account_id"))
try:
conn.logout()
except Exception:
pass
return {"name": name, "ok": True, "error": None}
except Exception as e:
return {"name": name, "ok": False, "error": _classify_error(e)}
raw = _bounded_map(accounts, _check, budget=_FANOUT_BUDGET,
concurrency=_PROBE_CONCURRENCY)
per_account = [r if r is not None
else {"name": _label(accounts[i]), "ok": False, "error": "timeout"}
for i, r in enumerate(raw)]
return _rollup_items("email", "mailbox(es)", per_account)
# ── Provider endpoints ──
def providers_health(endpoints: List[Dict[str, Any]],
*, probe: Optional[Callable] = None) -> Dict[str, Any]:
"""Probe each enabled model endpoint's model list, concurrently.
`endpoints` is a list of plain dicts ({name, base_url, api_key}) so this
stays decoupled from the ORM and trivially testable. Non-empty model list
reachable. Bounded by `_FANOUT_BUDGET` regardless of count. `meta` never
contains api_key or raw URLs only a display name (or a sanitized URL when
no name is set) and a controlled error category.
"""
if not endpoints:
return _svc("providers", DISABLED, "No model endpoints configured.")
if probe is None:
from routes.model_routes import _probe_endpoint as probe
def _label(ep: Dict[str, Any]) -> str:
return ep.get("name") or _safe_url(ep.get("base_url")) or "endpoint"
def _check(_i: int, ep: Dict[str, Any]) -> Dict[str, Any]:
name = _label(ep)
try:
models = probe(ep.get("base_url"), ep.get("api_key"),
timeout=_PROBE_TIMEOUT) or []
except Exception as e:
return {"name": name, "ok": False, "model_count": 0,
"error": _classify_error(e)}
count = len(models)
return {"name": name, "ok": bool(count), "model_count": count,
"error": None if count else "no_models"}
raw = _bounded_map(endpoints, _check, budget=_FANOUT_BUDGET,
concurrency=_PROBE_CONCURRENCY)
per_endpoint = [r if r is not None
else {"name": _label(endpoints[i]), "ok": False,
"model_count": 0, "error": "timeout"}
for i, r in enumerate(raw)]
return _rollup_items("providers", "endpoint(s)", per_endpoint, key="endpoints")
def _rollup_items(name: str, noun: str, items: List[Dict[str, Any]],
key: str = "accounts") -> Dict[str, Any]:
"""Shared ok/degraded/down rollup for a list of per-item probe results."""
total = len(items)
ok_count = sum(1 for it in items if it.get("ok"))
if ok_count == total:
status, detail = OK, f"{ok_count}/{total} {noun} reachable."
elif ok_count == 0:
status, detail = DOWN, f"No {noun} reachable."
else:
status, detail = DEGRADED, f"{ok_count}/{total} {noun} reachable."
return _svc(name, status, detail, **{key: items})
# ── Aggregate ──
def _rollup(services: List[Dict[str, Any]]) -> str:
worst = OK
for s in services:
sev = _SEVERITY.get(s.get("status"))
if sev is not None and sev > _SEVERITY[worst]:
worst = s["status"]
return worst
def _gather_inputs() -> Dict[str, Any]:
"""Pull live config/account/endpoint lists from the app's data sources.
Each lookup fails soft: a broken source yields an empty/neutral value so a
single failure can't take down the whole health report.
"""
settings: Dict[str, Any] = {}
integrations: List[Dict[str, Any]] = []
accounts: List[Dict[str, Any]] = []
endpoints: List[Dict[str, Any]] = []
try:
from src.settings import load_settings
settings = load_settings() or {}
except Exception as e:
logger.debug(f"service_health: settings load failed: {e}")
try:
from src.integrations import load_integrations
integrations = load_integrations() or []
except Exception as e:
logger.debug(f"service_health: integrations load failed: {e}")
try:
from routes.email_helpers import _list_email_accounts
accounts = _list_email_accounts() or []
except Exception as e:
logger.debug(f"service_health: email accounts load failed: {e}")
try:
from core.database import SessionLocal, ModelEndpoint
db = SessionLocal()
try:
rows = db.query(ModelEndpoint).filter(
ModelEndpoint.is_enabled == True).all() # noqa: E712
endpoints = [{"name": r.name, "base_url": r.base_url,
"api_key": r.api_key} for r in rows]
finally:
db.close()
except Exception as e:
logger.debug(f"service_health: endpoint load failed: {e}")
return {"settings": settings, "integrations": integrations,
"accounts": accounts, "endpoints": endpoints}
async def _run_subsystem(name: str, fn: Callable, *args: Any) -> Dict[str, Any]:
"""Run one (sync) subsystem probe in a thread under a hard deadline.
A subsystem that overruns `_SUBSYSTEM_DEADLINE` (or raises) becomes a
controlled `down`/`timeout` entry instead of hanging or leaking the error.
"""
try:
return await asyncio.wait_for(asyncio.to_thread(fn, *args),
timeout=_SUBSYSTEM_DEADLINE)
except asyncio.TimeoutError:
return _svc(name, DOWN, _detail_for("timeout"), error="timeout")
except Exception as e:
category = _classify_error(e)
return _svc(name, DOWN, _detail_for(category), error=category)
async def collect_service_health(rag_manager: Any = None,
memory_vector: Any = None) -> Dict[str, Any]:
"""Run every probe and return {overall, services, timestamp}.
Bounded end-to-end: in-process ChromaDB flags are read synchronously; the
four network subsystems run concurrently, each under `_SUBSYSTEM_DEADLINE`,
with an overall `_AGGREGATE_DEADLINE` backstop. Per-item probes inside
providers/email are themselves bounded by `_FANOUT_BUDGET`.
"""
from datetime import datetime, timezone
inputs = _gather_inputs()
settings = inputs["settings"]
# ChromaDB is in-process and synchronous (just reads flags).
chroma = chromadb_health(rag_manager, memory_vector)
names = ["searxng", "ntfy", "email", "providers"]
coros = [
_run_subsystem("searxng", searxng_health, settings),
_run_subsystem("ntfy", ntfy_health, inputs["integrations"], settings),
_run_subsystem("email", email_health, inputs["accounts"]),
_run_subsystem("providers", providers_health, inputs["endpoints"]),
]
try:
results = await asyncio.wait_for(asyncio.gather(*coros),
timeout=_AGGREGATE_DEADLINE)
except asyncio.TimeoutError:
# Hard backstop — should not normally fire given per-subsystem deadlines.
results = [_svc(n, DOWN, _detail_for("timeout"), error="timeout")
for n in names]
services = [chroma, *results]
return {
"overall": _rollup(services),
"services": services,
# Timezone-aware UTC (…+00:00). Avoids the deprecated naive
# datetime.utcnow() flagged in review (overlaps with #1116).
"timestamp": datetime.now(timezone.utc).isoformat(),
}
+68
View File
@@ -0,0 +1,68 @@
"""Route-level regression tests for GET /api/diagnostics/services.
The reviewer asked for explicit coverage of unauthenticated / non-admin / admin
access to this admin diagnostics route, beyond the unit tests for the collector.
These need a real FastAPI + TestClient (the conftest only stubs FastAPI when it
is *not* installed). When the full app deps aren't present we skip rather than
fail, so the suite stays green in minimal environments; CI installs
requirements, so the tests run there.
"""
import pytest
fastapi = pytest.importorskip("fastapi")
pytest.importorskip("starlette.testclient")
from fastapi import FastAPI, HTTPException, Request
from starlette.testclient import TestClient
# Importing the route module pulls a few app deps; skip cleanly if unavailable.
diag = pytest.importorskip("routes.diagnostics_routes")
def _client_with_admin_gate(monkeypatch, gate):
"""Mount the diagnostics router with `require_admin` and the collector
patched (via monkeypatch so the module globals are restored afterwards),
and return a TestClient. `gate` plays the role of require_admin."""
import src.service_health as sh
async def _fake_collect(_rag, _mem):
return {"overall": "ok", "services": [], "timestamp": "t"}
# monkeypatch.setattr restores these after the test — a plain assignment
# would leak the fakes into every later test in the session.
monkeypatch.setattr(diag, "require_admin", gate)
monkeypatch.setattr(sh, "collect_service_health", _fake_collect)
app = FastAPI()
app.include_router(diag.setup_diagnostics_routes(
rag_manager=None, rag_available=False, research_handler=None,
memory_vector=None))
return TestClient(app, raise_server_exceptions=False)
def test_unauthenticated_is_rejected(monkeypatch):
def gate(_request: Request):
raise HTTPException(401, "Not authenticated")
client = _client_with_admin_gate(monkeypatch, gate)
r = client.get("/api/diagnostics/services")
assert r.status_code == 401
def test_non_admin_is_forbidden(monkeypatch):
def gate(_request: Request):
raise HTTPException(403, "Admin only")
client = _client_with_admin_gate(monkeypatch, gate)
r = client.get("/api/diagnostics/services")
assert r.status_code == 403
def test_admin_gets_report(monkeypatch):
def gate(_request: Request):
return None # admin allowed
client = _client_with_admin_gate(monkeypatch, gate)
r = client.get("/api/diagnostics/services")
assert r.status_code == 200
body = r.json()
assert set(body) == {"overall", "services", "timestamp"}
assert body["overall"] == "ok"
+472
View File
@@ -0,0 +1,472 @@
"""Tests for src.service_health — the consolidated degraded-state report.
Imports the real module (conftest.py stubs the heavy deps). Network is never
touched: HTTP probes take an injected `http_get`, and the email/provider probes
take an injected `connect` / `probe`. Asserts the ok/degraded/down/disabled
mapping per subsystem, the overall rollup, and that no secrets leak into meta.
"""
import types
import pytest
from src import service_health as sh
def _resp(status_code):
return types.SimpleNamespace(status_code=status_code)
def _raise(*_a, **_k):
raise RuntimeError("connection refused")
# ── chromadb_health ──
class _Store:
def __init__(self, healthy):
self.healthy = healthy
def test_chromadb_both_healthy_ok():
s = sh.chromadb_health(_Store(True), _Store(True))
assert s["status"] == sh.OK
assert s["meta"] == {"rag": True, "memory": True}
def test_chromadb_one_down_degraded():
s = sh.chromadb_health(_Store(True), _Store(False))
assert s["status"] == sh.DEGRADED
def test_chromadb_both_unhealthy_down():
s = sh.chromadb_health(_Store(False), _Store(False))
assert s["status"] == sh.DOWN
def test_chromadb_both_absent_disabled():
s = sh.chromadb_health(None, None)
assert s["status"] == sh.DISABLED
def test_chromadb_one_absent_one_healthy_ok():
# An absent store is not a failure; the present one being healthy is ok.
s = sh.chromadb_health(_Store(True), None)
assert s["status"] == sh.OK
assert s["meta"]["memory"] is None
# ── searxng_health ──
def test_searxng_disabled_when_other_provider():
s = sh.searxng_health({"search_provider": "brave"})
assert s["status"] == sh.DISABLED
def test_searxng_ok_on_healthz():
s = sh.searxng_health(
{"search_provider": "searxng", "search_url": "http://sx:8080"},
http_get=lambda url, timeout: _resp(200),
)
assert s["status"] == sh.OK
assert s["meta"]["probed"] == "/healthz"
def test_searxng_ok_on_root_fallback():
def getter(url, timeout):
return _resp(404) if url.endswith("/healthz") else _resp(200)
s = sh.searxng_health(
{"search_provider": "searxng", "search_url": "http://sx:8080"},
http_get=getter,
)
assert s["status"] == sh.OK
assert s["meta"]["probed"] == "/"
def test_searxng_down_on_exception():
s = sh.searxng_health(
{"search_provider": "searxng", "search_url": "http://sx:8080"},
http_get=_raise,
)
assert s["status"] == sh.DOWN
def test_searxng_down_on_5xx():
s = sh.searxng_health(
{"search_provider": "searxng", "search_url": "http://sx:8080"},
http_get=lambda url, timeout: _resp(502),
)
assert s["status"] == sh.DOWN
# ── ntfy_health ──
def _ntfy_intg():
return [{"preset": "ntfy", "enabled": True, "base_url": "http://ntfy:80"}]
def test_ntfy_disabled_without_integration():
s = sh.ntfy_health([], {"reminder_channel": "ntfy"})
assert s["status"] == sh.DISABLED
def test_ntfy_ok():
s = sh.ntfy_health(_ntfy_intg(), {"reminder_channel": "ntfy"},
http_get=lambda url, timeout: _resp(200))
assert s["status"] == sh.OK
assert s["meta"]["base"] == "http://ntfy:80"
def test_ntfy_probes_v1_health_not_a_topic():
seen = {}
def getter(url, timeout):
seen["url"] = url
return _resp(200)
sh.ntfy_health(_ntfy_intg(), {"reminder_channel": "ntfy"}, http_get=getter)
# Non-intrusive: hits /v1/health, never publishes to a topic.
assert seen["url"].endswith("/v1/health")
def test_ntfy_down_on_exception():
s = sh.ntfy_health(_ntfy_intg(), {"reminder_channel": "ntfy"},
http_get=_raise)
assert s["status"] == sh.DOWN
# ── email_health ──
def _acct(name, host="imap.example.com"):
return {"account_id": name, "account_name": name, "imap_host": host,
"imap_password": "hunter2"}
class _Conn:
def logout(self):
pass
def test_email_disabled_without_accounts():
assert sh.email_health([])["status"] == sh.DISABLED
def test_email_ok_all_connect():
s = sh.email_health([_acct("a"), _acct("b")], connect=lambda _id: _Conn())
assert s["status"] == sh.OK
def test_email_degraded_some_fail():
def connect(account_id):
if account_id == "bad":
raise RuntimeError("auth failed")
return _Conn()
s = sh.email_health([_acct("good"), _acct("bad")], connect=connect)
assert s["status"] == sh.DEGRADED
def test_email_down_all_fail():
s = sh.email_health([_acct("a")], connect=_raise)
assert s["status"] == sh.DOWN
def test_email_account_without_host_marked_failed():
s = sh.email_health([_acct("a", host="")], connect=lambda _id: _Conn())
assert s["status"] == sh.DOWN
def test_email_meta_never_leaks_password():
s = sh.email_health([_acct("a")], connect=lambda _id: _Conn())
assert "hunter2" not in repr(s)
# ── providers_health ──
def _ep(name):
return {"name": name, "base_url": f"http://{name}:8000/v1", "api_key": "sk-secret"}
def test_providers_disabled_without_endpoints():
assert sh.providers_health([])["status"] == sh.DISABLED
def test_providers_ok_all_reachable():
s = sh.providers_health([_ep("a")],
probe=lambda base, key, timeout: ["m1", "m2"])
assert s["status"] == sh.OK
assert s["meta"]["endpoints"][0]["model_count"] == 2
def test_providers_degraded_some_empty():
def probe(base, key, timeout):
return ["m1"] if "good" in base else []
s = sh.providers_health([_ep("good"), _ep("bad")], probe=probe)
assert s["status"] == sh.DEGRADED
def test_providers_down_all_fail():
s = sh.providers_health([_ep("a")], probe=_raise)
assert s["status"] == sh.DOWN
def test_providers_meta_never_leaks_api_key():
s = sh.providers_health([_ep("a")],
probe=lambda base, key, timeout: ["m1"])
assert "sk-secret" not in repr(s)
# ── rollup ──
def test_rollup_picks_worst_non_disabled():
services = [
{"status": sh.OK}, {"status": sh.DISABLED},
{"status": sh.DEGRADED}, {"status": sh.OK},
]
assert sh._rollup(services) == sh.DEGRADED
def test_rollup_down_beats_degraded():
assert sh._rollup([{"status": sh.DEGRADED}, {"status": sh.DOWN}]) == sh.DOWN
def test_rollup_all_disabled_is_ok():
assert sh._rollup([{"status": sh.DISABLED}, {"status": sh.DISABLED}]) == sh.OK
# ── collect_service_health (async aggregate) ──
def test_collect_service_health_shape(monkeypatch):
import asyncio
# Avoid touching real data sources / network.
monkeypatch.setattr(sh, "_gather_inputs", lambda: {
"settings": {"search_provider": "disabled"},
"integrations": [],
"accounts": [],
"endpoints": [],
})
out = asyncio.run(sh.collect_service_health(_Store(True), _Store(True)))
assert set(out) == {"overall", "services", "timestamp"}
names = {s["name"] for s in out["services"]}
assert names == {"chromadb", "searxng", "ntfy", "email", "providers"}
# Chroma healthy, everything else disabled → overall ok.
assert out["overall"] == sh.OK
# ── _safe_url: strip userinfo / query / fragment ──
@pytest.mark.parametrize("raw,expected", [
("http://user:pass@host:8080/path?api_key=secret#frag", "http://host:8080/path"),
("https://admin:hunter2@searx.example.com/", "https://searx.example.com"),
("http://ntfy.local:80?token=abc", "http://ntfy.local:80"),
("host:8080", "host:8080"),
("", ""),
(None, ""),
])
def test_safe_url_strips_secrets(raw, expected):
out = sh._safe_url(raw)
assert out == expected
for bad in ("pass", "secret", "hunter2", "abc", "token", "@"):
if raw and bad in raw and bad not in expected:
assert bad not in out
# ── _classify_error: controlled categories, never raw text ──
def test_classify_error_categories():
import socket
assert sh._classify_error(TimeoutError()) == "timeout"
assert sh._classify_error(socket.timeout()) == "timeout"
assert sh._classify_error(socket.gaierror()) == "dns_error"
assert sh._classify_error(ConnectionRefusedError()) == "connection_refused"
assert sh._classify_error(OSError("boom")) == "network_error"
assert sh._classify_error(ValueError("x")) == "error"
# ── Sanitization in subsystem output (blocker #2) ──
def test_searxng_meta_redacts_instance_url():
s = sh.searxng_health(
{"search_provider": "searxng",
"search_url": "http://user:s3cr3t@searx.local:8080/?token=zzz"},
http_get=lambda url, timeout: _resp(200),
)
blob = repr(s)
assert "s3cr3t" not in blob and "zzz" not in blob and "user:" not in blob
assert s["meta"]["instance"] == "http://searx.local:8080"
def test_searxng_down_uses_error_category_not_raw_exception():
def boom(url, timeout):
raise RuntimeError("failed connecting to http://user:pw@searx.local secret-token")
s = sh.searxng_health(
{"search_provider": "searxng", "search_url": "http://searx.local"},
http_get=boom,
)
assert s["status"] == sh.DOWN
assert s["meta"]["error"] == "error" # controlled category token
assert "secret-token" not in repr(s) and "pw@" not in repr(s)
def test_ntfy_meta_redacts_userinfo_in_base():
intg = [{"preset": "ntfy", "enabled": True,
"base_url": "https://user:topsecret@ntfy.example.com"}]
seen = {}
def getter(url, timeout):
seen["url"] = url # the probe itself may keep credentials
return _resp(200)
s = sh.ntfy_health(intg, {"reminder_channel": "ntfy"}, http_get=getter)
assert s["meta"]["base"] == "https://ntfy.example.com"
assert "topsecret" not in repr(s)
def test_providers_name_fallback_is_sanitized():
# No display name → falls back to the base_url, which must be sanitized.
ep = {"base_url": "http://user:k3y@prov.local:9000/v1?api_key=zzz", "api_key": "sk-x"}
s = sh.providers_health([ep], probe=lambda b, k, t: ["m1"])
entry = s["meta"]["endpoints"][0]
assert entry["name"] == "http://prov.local:9000/v1"
assert "k3y" not in repr(s) and "zzz" not in repr(s) and "sk-x" not in repr(s)
def test_providers_probe_exception_maps_to_category():
def boom(base, key, timeout):
raise RuntimeError(f"500 from {base} with key {key}") # would leak base+key
s = sh.providers_health([_ep("a")], probe=boom)
assert s["status"] == sh.DOWN
assert s["meta"]["endpoints"][0]["error"] == "error"
assert "sk-secret" not in repr(s) and "http://a" not in repr(s)
def test_email_connect_exception_maps_to_category():
def boom(account_id):
raise RuntimeError("login failed for user bob with password hunter2")
s = sh.email_health([_acct("a")], connect=boom)
assert s["status"] == sh.DOWN
assert s["meta"]["accounts"][0]["error"] == "error"
assert "hunter2" not in repr(s)
# ── Bounded wall-clock (blocker #1) ──
def test_providers_bounded_marks_slow_as_timeout(monkeypatch):
import time
monkeypatch.setattr(sh, "_FANOUT_BUDGET", 1)
def probe(base, key, timeout):
if "slow" in base:
time.sleep(10) # would blow the budget if unbounded
return ["m1"]
eps = [{"name": "fast", "base_url": "http://fast", "api_key": "k"},
{"name": "slow", "base_url": "http://slow", "api_key": "k"}]
t0 = time.monotonic()
out = sh.providers_health(eps, probe=probe)
elapsed = time.monotonic() - t0
assert elapsed < 4, f"providers_health not bounded: took {elapsed:.1f}s"
by = {e["name"]: e for e in out["meta"]["endpoints"]}
assert by["fast"]["ok"] is True
assert by["slow"]["ok"] is False and by["slow"]["error"] == "timeout"
assert out["status"] == sh.DEGRADED
def test_providers_bounded_with_many_slow_endpoints(monkeypatch):
import time
monkeypatch.setattr(sh, "_FANOUT_BUDGET", 1)
def probe(base, key, timeout):
time.sleep(10)
return ["m1"]
eps = [{"name": f"ep{i}", "base_url": f"http://ep{i}", "api_key": "k"}
for i in range(25)]
t0 = time.monotonic()
out = sh.providers_health(eps, probe=probe)
elapsed = time.monotonic() - t0
# 25 endpoints * sleep would be huge if sequential; bounded keeps it ~budget.
assert elapsed < 4, f"not bounded with many endpoints: {elapsed:.1f}s"
assert out["status"] == sh.DOWN
assert all(e["error"] == "timeout" for e in out["meta"]["endpoints"])
def test_email_bounded_marks_slow_as_timeout(monkeypatch):
import time
monkeypatch.setattr(sh, "_FANOUT_BUDGET", 1)
def connect(account_id):
if account_id == "slow":
time.sleep(10)
return _Conn()
accts = [_acct("fast"), _acct("slow")]
accts[1]["account_id"] = "slow"
t0 = time.monotonic()
out = sh.email_health(accts, connect=connect)
elapsed = time.monotonic() - t0
assert elapsed < 4, f"email_health not bounded: took {elapsed:.1f}s"
by = {a["name"]: a for a in out["meta"]["accounts"]}
assert by["slow"]["error"] == "timeout"
def test_collect_runs_subsystems_concurrently(monkeypatch):
# The aggregate is bounded by running the (internally-bounded) subsystems
# concurrently, so total wall-clock ≈ max(subsystem), not the sum. Each of
# the four network subsystems here sleeps ~0.6s; sequential would be ~2.4s.
import asyncio
import time
monkeypatch.setattr(sh, "_gather_inputs", lambda: {
"settings": {}, "integrations": [], "accounts": [], "endpoints": [],
})
def slow(name):
def _fn(*_a, **_k):
time.sleep(0.6)
return {"name": name, "status": sh.OK, "detail": "", "meta": {}}
return _fn
monkeypatch.setattr(sh, "searxng_health", slow("searxng"))
monkeypatch.setattr(sh, "ntfy_health", slow("ntfy"))
monkeypatch.setattr(sh, "email_health", slow("email"))
monkeypatch.setattr(sh, "providers_health", slow("providers"))
t0 = time.monotonic()
out = asyncio.run(sh.collect_service_health(None, None))
elapsed = time.monotonic() - t0
assert elapsed < 1.5, f"subsystems not concurrent: took {elapsed:.1f}s"
assert {s["name"] for s in out["services"]} == {
"chromadb", "searxng", "ntfy", "email", "providers"}
def test_collect_aggregate_deadline_yields_controlled_result(monkeypatch):
# If the gather overruns the aggregate ceiling, the response is still a
# controlled {overall, services, timestamp} with each network subsystem
# marked down/timeout — never a hang or a raised exception.
import asyncio
import time
monkeypatch.setattr(sh, "_AGGREGATE_DEADLINE", 0.5)
monkeypatch.setattr(sh, "_SUBSYSTEM_DEADLINE", 0.4)
monkeypatch.setattr(sh, "_gather_inputs", lambda: {
"settings": {}, "integrations": [], "accounts": [], "endpoints": [],
})
async def _slow_gather(*coros, **_k):
for c in coros: # close unawaited coros to avoid warnings
close = getattr(c, "close", None)
if close:
close()
await asyncio.sleep(5)
# Force the outer wait_for to trip by making gather itself slow.
monkeypatch.setattr(sh.asyncio, "gather", _slow_gather)
t0 = time.monotonic()
out = asyncio.run(sh.collect_service_health(None, None))
elapsed = time.monotonic() - t0
assert elapsed < 2, f"aggregate deadline did not bound: {elapsed:.1f}s"
assert set(out) == {"overall", "services", "timestamp"}
net = [s for s in out["services"] if s["name"] != "chromadb"]
assert all(s["status"] == sh.DOWN and s["meta"].get("error") == "timeout"
for s in net)