Files
odysseus/tests/test_service_health.py
Sheikh Rahat Mahmud 9180847c0e feat(diagnostics): add consolidated service health endpoint for degraded-state reporting (#964)
* Add consolidated service health endpoint for degraded-state reporting

ROADMAP (High Priority) asks for "Better degraded-state reporting for
ChromaDB, SearXNG, email, ntfy, and provider probes." Until now there was no
single readout of which subsystems are actually working: /api/health is only a
liveness ping and each subsystem's signal lives in a different module, so a
misconfigured self-host install gives no consolidated picture.

This adds an admin-only GET /api/diagnostics/services endpoint backed by a new
src/service_health.py aggregator. Each subsystem reports a uniform
{name, status, detail, meta} where status is ok | degraded | down | disabled,
and the response rolls up an overall verdict (worst non-disabled status).

Probes are deliberately non-intrusive and safe to poll:
- ChromaDB: reads the .healthy flags on the RAG and memory vector stores.
- SearXNG: GET /healthz (2xx), falling back to the instance root (<500). No
  search query is run.
- ntfy: GET the server's built-in /v1/health. No test notification is sent.
- email: short IMAP connect+logout per configured account (no credentials in
  meta).
- providers: probe each enabled ModelEndpoint's model list (no api_key in meta).

Probe functions take their inputs as parameters and isolate the network call to
injectable callables, so they unit-test without touching the network (same
pattern as the merged provider-endpoint tests). Network probes run concurrently
off the event loop via asyncio.to_thread with bounded per-probe timeouts.

memory_vector is now passed into setup_diagnostics_routes (new optional param,
backward-compatible) so ChromaDB's vector-memory store can be reported too.

Tests: tests/test_service_health.py — 29 tests covering every status mapping
per subsystem, the overall rollup, and that no secrets leak into meta.

Verification:
  python -m pytest tests/test_service_health.py -q          # 29 passed
  python -m py_compile src/service_health.py routes/diagnostics_routes.py app.py
  python -m pytest tests/test_endpoint_resolver.py tests/test_provider_endpoints.py -q

Backend + tests only; an Admin/Settings UI badge that renders this endpoint is
a natural follow-up.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* fix(diagnostics): bound service-health wall-clock and redact secrets

Addresses review on #964.

Blocker 1 — genuinely bounded wall-clock:
- providers_health and email_health now fan out per-item probes across a
  bounded thread pool (_bounded_map) with a hard total budget (_FANOUT_BUDGET),
  instead of probing endpoints/accounts sequentially. Stragglers are reported
  as a controlled `timeout` and never block; the pool is shut down with
  wait=False so the response returns on time regardless of endpoint/account
  count.
- The IMAP connect path now honors the service-health budget: _imap_connect
  gained a pass-through `timeout` param and the probe calls it with
  _PROBE_TIMEOUT instead of the default 15s.
- collect_service_health runs the four network subsystems concurrently, each
  under a per-subsystem deadline (_SUBSYSTEM_DEADLINE), with an overall
  wait_for ceiling (_AGGREGATE_DEADLINE) as a backstop.

Blocker 2 — no secret/raw-error leakage in the response:
- _safe_url strips userinfo, query, and fragment from every URL surfaced in
  meta (searxng instance, ntfy base, provider name fallback), keeping only
  scheme/host/port/path.
- _classify_error maps every probe failure to a controlled category token
  (timeout, connection_refused, dns_error, tls_error, network_error,
  http_error, auth_or_protocol_error, …) — raw str(exception), which can embed
  credentialed URLs or server text, is never returned.

Tests (tests/test_service_health.py, +tests/test_diagnostics_service_route.py):
- URL userinfo/query redaction for searxng/ntfy/providers.
- secret-bearing exception strings map to categories and don't leak.
- multiple slow providers/accounts stay bounded (single + 25-endpoint cases).
- subsystems run concurrently; aggregate deadline yields a controlled result.
- route-level unauthenticated (401) / non-admin (403) / admin (200) coverage.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* test(diagnostics): isolate route tests so they don't leak module globals

The new route tests replaced src.service_health.collect_service_health and
routes.diagnostics_routes.require_admin via direct assignment, which persisted
for the rest of the pytest session. In CI's full alphabetical run that fake
collector (returning services=[]) leaked into the later collect_service_health
tests and failed them. Switch to monkeypatch.setattr so both are restored after
each test. No production code change.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
Co-authored-by: Alexandre Teixeira <111787685+alteixeira20@users.noreply.github.com>
2026-06-09 16:00:24 +01:00

473 lines
16 KiB
Python

"""Tests for src.service_health — the consolidated degraded-state report.
Imports the real module (conftest.py stubs the heavy deps). Network is never
touched: HTTP probes take an injected `http_get`, and the email/provider probes
take an injected `connect` / `probe`. Asserts the ok/degraded/down/disabled
mapping per subsystem, the overall rollup, and that no secrets leak into meta.
"""
import types
import pytest
from src import service_health as sh
def _resp(status_code):
return types.SimpleNamespace(status_code=status_code)
def _raise(*_a, **_k):
raise RuntimeError("connection refused")
# ── chromadb_health ──
class _Store:
def __init__(self, healthy):
self.healthy = healthy
def test_chromadb_both_healthy_ok():
s = sh.chromadb_health(_Store(True), _Store(True))
assert s["status"] == sh.OK
assert s["meta"] == {"rag": True, "memory": True}
def test_chromadb_one_down_degraded():
s = sh.chromadb_health(_Store(True), _Store(False))
assert s["status"] == sh.DEGRADED
def test_chromadb_both_unhealthy_down():
s = sh.chromadb_health(_Store(False), _Store(False))
assert s["status"] == sh.DOWN
def test_chromadb_both_absent_disabled():
s = sh.chromadb_health(None, None)
assert s["status"] == sh.DISABLED
def test_chromadb_one_absent_one_healthy_ok():
# An absent store is not a failure; the present one being healthy is ok.
s = sh.chromadb_health(_Store(True), None)
assert s["status"] == sh.OK
assert s["meta"]["memory"] is None
# ── searxng_health ──
def test_searxng_disabled_when_other_provider():
s = sh.searxng_health({"search_provider": "brave"})
assert s["status"] == sh.DISABLED
def test_searxng_ok_on_healthz():
s = sh.searxng_health(
{"search_provider": "searxng", "search_url": "http://sx:8080"},
http_get=lambda url, timeout: _resp(200),
)
assert s["status"] == sh.OK
assert s["meta"]["probed"] == "/healthz"
def test_searxng_ok_on_root_fallback():
def getter(url, timeout):
return _resp(404) if url.endswith("/healthz") else _resp(200)
s = sh.searxng_health(
{"search_provider": "searxng", "search_url": "http://sx:8080"},
http_get=getter,
)
assert s["status"] == sh.OK
assert s["meta"]["probed"] == "/"
def test_searxng_down_on_exception():
s = sh.searxng_health(
{"search_provider": "searxng", "search_url": "http://sx:8080"},
http_get=_raise,
)
assert s["status"] == sh.DOWN
def test_searxng_down_on_5xx():
s = sh.searxng_health(
{"search_provider": "searxng", "search_url": "http://sx:8080"},
http_get=lambda url, timeout: _resp(502),
)
assert s["status"] == sh.DOWN
# ── ntfy_health ──
def _ntfy_intg():
return [{"preset": "ntfy", "enabled": True, "base_url": "http://ntfy:80"}]
def test_ntfy_disabled_without_integration():
s = sh.ntfy_health([], {"reminder_channel": "ntfy"})
assert s["status"] == sh.DISABLED
def test_ntfy_ok():
s = sh.ntfy_health(_ntfy_intg(), {"reminder_channel": "ntfy"},
http_get=lambda url, timeout: _resp(200))
assert s["status"] == sh.OK
assert s["meta"]["base"] == "http://ntfy:80"
def test_ntfy_probes_v1_health_not_a_topic():
seen = {}
def getter(url, timeout):
seen["url"] = url
return _resp(200)
sh.ntfy_health(_ntfy_intg(), {"reminder_channel": "ntfy"}, http_get=getter)
# Non-intrusive: hits /v1/health, never publishes to a topic.
assert seen["url"].endswith("/v1/health")
def test_ntfy_down_on_exception():
s = sh.ntfy_health(_ntfy_intg(), {"reminder_channel": "ntfy"},
http_get=_raise)
assert s["status"] == sh.DOWN
# ── email_health ──
def _acct(name, host="imap.example.com"):
return {"account_id": name, "account_name": name, "imap_host": host,
"imap_password": "hunter2"}
class _Conn:
def logout(self):
pass
def test_email_disabled_without_accounts():
assert sh.email_health([])["status"] == sh.DISABLED
def test_email_ok_all_connect():
s = sh.email_health([_acct("a"), _acct("b")], connect=lambda _id: _Conn())
assert s["status"] == sh.OK
def test_email_degraded_some_fail():
def connect(account_id):
if account_id == "bad":
raise RuntimeError("auth failed")
return _Conn()
s = sh.email_health([_acct("good"), _acct("bad")], connect=connect)
assert s["status"] == sh.DEGRADED
def test_email_down_all_fail():
s = sh.email_health([_acct("a")], connect=_raise)
assert s["status"] == sh.DOWN
def test_email_account_without_host_marked_failed():
s = sh.email_health([_acct("a", host="")], connect=lambda _id: _Conn())
assert s["status"] == sh.DOWN
def test_email_meta_never_leaks_password():
s = sh.email_health([_acct("a")], connect=lambda _id: _Conn())
assert "hunter2" not in repr(s)
# ── providers_health ──
def _ep(name):
return {"name": name, "base_url": f"http://{name}:8000/v1", "api_key": "sk-secret"}
def test_providers_disabled_without_endpoints():
assert sh.providers_health([])["status"] == sh.DISABLED
def test_providers_ok_all_reachable():
s = sh.providers_health([_ep("a")],
probe=lambda base, key, timeout: ["m1", "m2"])
assert s["status"] == sh.OK
assert s["meta"]["endpoints"][0]["model_count"] == 2
def test_providers_degraded_some_empty():
def probe(base, key, timeout):
return ["m1"] if "good" in base else []
s = sh.providers_health([_ep("good"), _ep("bad")], probe=probe)
assert s["status"] == sh.DEGRADED
def test_providers_down_all_fail():
s = sh.providers_health([_ep("a")], probe=_raise)
assert s["status"] == sh.DOWN
def test_providers_meta_never_leaks_api_key():
s = sh.providers_health([_ep("a")],
probe=lambda base, key, timeout: ["m1"])
assert "sk-secret" not in repr(s)
# ── rollup ──
def test_rollup_picks_worst_non_disabled():
services = [
{"status": sh.OK}, {"status": sh.DISABLED},
{"status": sh.DEGRADED}, {"status": sh.OK},
]
assert sh._rollup(services) == sh.DEGRADED
def test_rollup_down_beats_degraded():
assert sh._rollup([{"status": sh.DEGRADED}, {"status": sh.DOWN}]) == sh.DOWN
def test_rollup_all_disabled_is_ok():
assert sh._rollup([{"status": sh.DISABLED}, {"status": sh.DISABLED}]) == sh.OK
# ── collect_service_health (async aggregate) ──
def test_collect_service_health_shape(monkeypatch):
import asyncio
# Avoid touching real data sources / network.
monkeypatch.setattr(sh, "_gather_inputs", lambda: {
"settings": {"search_provider": "disabled"},
"integrations": [],
"accounts": [],
"endpoints": [],
})
out = asyncio.run(sh.collect_service_health(_Store(True), _Store(True)))
assert set(out) == {"overall", "services", "timestamp"}
names = {s["name"] for s in out["services"]}
assert names == {"chromadb", "searxng", "ntfy", "email", "providers"}
# Chroma healthy, everything else disabled → overall ok.
assert out["overall"] == sh.OK
# ── _safe_url: strip userinfo / query / fragment ──
@pytest.mark.parametrize("raw,expected", [
("http://user:pass@host:8080/path?api_key=secret#frag", "http://host:8080/path"),
("https://admin:hunter2@searx.example.com/", "https://searx.example.com"),
("http://ntfy.local:80?token=abc", "http://ntfy.local:80"),
("host:8080", "host:8080"),
("", ""),
(None, ""),
])
def test_safe_url_strips_secrets(raw, expected):
out = sh._safe_url(raw)
assert out == expected
for bad in ("pass", "secret", "hunter2", "abc", "token", "@"):
if raw and bad in raw and bad not in expected:
assert bad not in out
# ── _classify_error: controlled categories, never raw text ──
def test_classify_error_categories():
import socket
assert sh._classify_error(TimeoutError()) == "timeout"
assert sh._classify_error(socket.timeout()) == "timeout"
assert sh._classify_error(socket.gaierror()) == "dns_error"
assert sh._classify_error(ConnectionRefusedError()) == "connection_refused"
assert sh._classify_error(OSError("boom")) == "network_error"
assert sh._classify_error(ValueError("x")) == "error"
# ── Sanitization in subsystem output (blocker #2) ──
def test_searxng_meta_redacts_instance_url():
s = sh.searxng_health(
{"search_provider": "searxng",
"search_url": "http://user:s3cr3t@searx.local:8080/?token=zzz"},
http_get=lambda url, timeout: _resp(200),
)
blob = repr(s)
assert "s3cr3t" not in blob and "zzz" not in blob and "user:" not in blob
assert s["meta"]["instance"] == "http://searx.local:8080"
def test_searxng_down_uses_error_category_not_raw_exception():
def boom(url, timeout):
raise RuntimeError("failed connecting to http://user:pw@searx.local secret-token")
s = sh.searxng_health(
{"search_provider": "searxng", "search_url": "http://searx.local"},
http_get=boom,
)
assert s["status"] == sh.DOWN
assert s["meta"]["error"] == "error" # controlled category token
assert "secret-token" not in repr(s) and "pw@" not in repr(s)
def test_ntfy_meta_redacts_userinfo_in_base():
intg = [{"preset": "ntfy", "enabled": True,
"base_url": "https://user:topsecret@ntfy.example.com"}]
seen = {}
def getter(url, timeout):
seen["url"] = url # the probe itself may keep credentials
return _resp(200)
s = sh.ntfy_health(intg, {"reminder_channel": "ntfy"}, http_get=getter)
assert s["meta"]["base"] == "https://ntfy.example.com"
assert "topsecret" not in repr(s)
def test_providers_name_fallback_is_sanitized():
# No display name → falls back to the base_url, which must be sanitized.
ep = {"base_url": "http://user:k3y@prov.local:9000/v1?api_key=zzz", "api_key": "sk-x"}
s = sh.providers_health([ep], probe=lambda b, k, t: ["m1"])
entry = s["meta"]["endpoints"][0]
assert entry["name"] == "http://prov.local:9000/v1"
assert "k3y" not in repr(s) and "zzz" not in repr(s) and "sk-x" not in repr(s)
def test_providers_probe_exception_maps_to_category():
def boom(base, key, timeout):
raise RuntimeError(f"500 from {base} with key {key}") # would leak base+key
s = sh.providers_health([_ep("a")], probe=boom)
assert s["status"] == sh.DOWN
assert s["meta"]["endpoints"][0]["error"] == "error"
assert "sk-secret" not in repr(s) and "http://a" not in repr(s)
def test_email_connect_exception_maps_to_category():
def boom(account_id):
raise RuntimeError("login failed for user bob with password hunter2")
s = sh.email_health([_acct("a")], connect=boom)
assert s["status"] == sh.DOWN
assert s["meta"]["accounts"][0]["error"] == "error"
assert "hunter2" not in repr(s)
# ── Bounded wall-clock (blocker #1) ──
def test_providers_bounded_marks_slow_as_timeout(monkeypatch):
import time
monkeypatch.setattr(sh, "_FANOUT_BUDGET", 1)
def probe(base, key, timeout):
if "slow" in base:
time.sleep(10) # would blow the budget if unbounded
return ["m1"]
eps = [{"name": "fast", "base_url": "http://fast", "api_key": "k"},
{"name": "slow", "base_url": "http://slow", "api_key": "k"}]
t0 = time.monotonic()
out = sh.providers_health(eps, probe=probe)
elapsed = time.monotonic() - t0
assert elapsed < 4, f"providers_health not bounded: took {elapsed:.1f}s"
by = {e["name"]: e for e in out["meta"]["endpoints"]}
assert by["fast"]["ok"] is True
assert by["slow"]["ok"] is False and by["slow"]["error"] == "timeout"
assert out["status"] == sh.DEGRADED
def test_providers_bounded_with_many_slow_endpoints(monkeypatch):
import time
monkeypatch.setattr(sh, "_FANOUT_BUDGET", 1)
def probe(base, key, timeout):
time.sleep(10)
return ["m1"]
eps = [{"name": f"ep{i}", "base_url": f"http://ep{i}", "api_key": "k"}
for i in range(25)]
t0 = time.monotonic()
out = sh.providers_health(eps, probe=probe)
elapsed = time.monotonic() - t0
# 25 endpoints * sleep would be huge if sequential; bounded keeps it ~budget.
assert elapsed < 4, f"not bounded with many endpoints: {elapsed:.1f}s"
assert out["status"] == sh.DOWN
assert all(e["error"] == "timeout" for e in out["meta"]["endpoints"])
def test_email_bounded_marks_slow_as_timeout(monkeypatch):
import time
monkeypatch.setattr(sh, "_FANOUT_BUDGET", 1)
def connect(account_id):
if account_id == "slow":
time.sleep(10)
return _Conn()
accts = [_acct("fast"), _acct("slow")]
accts[1]["account_id"] = "slow"
t0 = time.monotonic()
out = sh.email_health(accts, connect=connect)
elapsed = time.monotonic() - t0
assert elapsed < 4, f"email_health not bounded: took {elapsed:.1f}s"
by = {a["name"]: a for a in out["meta"]["accounts"]}
assert by["slow"]["error"] == "timeout"
def test_collect_runs_subsystems_concurrently(monkeypatch):
# The aggregate is bounded by running the (internally-bounded) subsystems
# concurrently, so total wall-clock ≈ max(subsystem), not the sum. Each of
# the four network subsystems here sleeps ~0.6s; sequential would be ~2.4s.
import asyncio
import time
monkeypatch.setattr(sh, "_gather_inputs", lambda: {
"settings": {}, "integrations": [], "accounts": [], "endpoints": [],
})
def slow(name):
def _fn(*_a, **_k):
time.sleep(0.6)
return {"name": name, "status": sh.OK, "detail": "", "meta": {}}
return _fn
monkeypatch.setattr(sh, "searxng_health", slow("searxng"))
monkeypatch.setattr(sh, "ntfy_health", slow("ntfy"))
monkeypatch.setattr(sh, "email_health", slow("email"))
monkeypatch.setattr(sh, "providers_health", slow("providers"))
t0 = time.monotonic()
out = asyncio.run(sh.collect_service_health(None, None))
elapsed = time.monotonic() - t0
assert elapsed < 1.5, f"subsystems not concurrent: took {elapsed:.1f}s"
assert {s["name"] for s in out["services"]} == {
"chromadb", "searxng", "ntfy", "email", "providers"}
def test_collect_aggregate_deadline_yields_controlled_result(monkeypatch):
# If the gather overruns the aggregate ceiling, the response is still a
# controlled {overall, services, timestamp} with each network subsystem
# marked down/timeout — never a hang or a raised exception.
import asyncio
import time
monkeypatch.setattr(sh, "_AGGREGATE_DEADLINE", 0.5)
monkeypatch.setattr(sh, "_SUBSYSTEM_DEADLINE", 0.4)
monkeypatch.setattr(sh, "_gather_inputs", lambda: {
"settings": {}, "integrations": [], "accounts": [], "endpoints": [],
})
async def _slow_gather(*coros, **_k):
for c in coros: # close unawaited coros to avoid warnings
close = getattr(c, "close", None)
if close:
close()
await asyncio.sleep(5)
# Force the outer wait_for to trip by making gather itself slow.
monkeypatch.setattr(sh.asyncio, "gather", _slow_gather)
t0 = time.monotonic()
out = asyncio.run(sh.collect_service_health(None, None))
elapsed = time.monotonic() - t0
assert elapsed < 2, f"aggregate deadline did not bound: {elapsed:.1f}s"
assert set(out) == {"overall", "services", "timestamp"}
net = [s for s in out["services"] if s["name"] != "chromadb"]
assert all(s["status"] == sh.DOWN and s["meta"].get("error") == "timeout"
for s in net)