mirror of
https://github.com/pewdiepie-archdaemon/odysseus.git
synced 2026-06-17 02:05:22 -04:00
Surface silent model fallback instead of masking it (#868)
When the selected model fails before producing output, stream_llm_with_fallback
quietly switches to the next candidate and the reply is shown under the
originally selected model's name, so a misconfigured provider looks like it
works. (Concretely: a Bedrock gateway that 400s every Anthropic/Claude request
appears fine because another model silently answers under the Claude label.)
Emit a `fallback` SSE event ({selected_model, answered_by, reason}) the first
time a non-primary candidate produces output, forward it through the agent loop
and both chat-route paths, stamp the response metrics with the model that
actually answered, and show a notice + relabel the reply in the UI.
Tested: python -m pytest tests/test_llm_core_fallback.py (3 pass);
python -m py_compile src/llm_core.py src/agent_loop.py routes/chat_routes.py;
node --check static/js/chat.js.
This commit is contained in:
+16
-2
@@ -769,6 +769,7 @@ def setup_chat_routes(
|
|||||||
return
|
return
|
||||||
elif chat_mode == "chat":
|
elif chat_mode == "chat":
|
||||||
_chat_start = time.time()
|
_chat_start = time.time()
|
||||||
|
_answered_by = None # set if the selected model failed and a fallback answered
|
||||||
# ── Chat mode: call stream_llm directly, NO tools, NO document access ──
|
# ── Chat mode: call stream_llm directly, NO tools, NO document access ──
|
||||||
try:
|
try:
|
||||||
_chat_candidates = [(sess.endpoint_url, sess.model, sess.headers)] + _fallback_candidates
|
_chat_candidates = [(sess.endpoint_url, sess.model, sess.headers)] + _fallback_candidates
|
||||||
@@ -797,9 +798,14 @@ def setup_chat_routes(
|
|||||||
full_response += data["delta"]
|
full_response += data["delta"]
|
||||||
_stream_set(session, partial=full_response)
|
_stream_set(session, partial=full_response)
|
||||||
yield chunk
|
yield chunk
|
||||||
|
elif data.get("type") == "fallback":
|
||||||
|
# Selected model failed; a fallback answered.
|
||||||
|
# Forward the notice and remember the real model.
|
||||||
|
_answered_by = data.get("answered_by") or _answered_by
|
||||||
|
yield chunk
|
||||||
elif data.get("type") == "usage":
|
elif data.get("type") == "usage":
|
||||||
last_metrics = data.get("data", {})
|
last_metrics = data.get("data", {})
|
||||||
last_metrics["model"] = sess.model
|
last_metrics["model"] = _answered_by or sess.model
|
||||||
if ctx.context_length and last_metrics.get("input_tokens"):
|
if ctx.context_length and last_metrics.get("input_tokens"):
|
||||||
pct = min(round((last_metrics["input_tokens"] / ctx.context_length) * 100, 1), 100.0)
|
pct = min(round((last_metrics["input_tokens"] / ctx.context_length) * 100, 1), 100.0)
|
||||||
last_metrics["context_percent"] = pct
|
last_metrics["context_percent"] = pct
|
||||||
@@ -867,6 +873,7 @@ def setup_chat_routes(
|
|||||||
# ── Agent mode: full agent loop with tools ──
|
# ── Agent mode: full agent loop with tools ──
|
||||||
_agent_rounds = 0
|
_agent_rounds = 0
|
||||||
_agent_tool_calls = 0
|
_agent_tool_calls = 0
|
||||||
|
_answered_by = None # set if the selected model failed and a fallback answered
|
||||||
try:
|
try:
|
||||||
from src.settings import get_setting
|
from src.settings import get_setting
|
||||||
_tool_budget = int(get_setting("agent_max_tool_calls", 0))
|
_tool_budget = int(get_setting("agent_max_tool_calls", 0))
|
||||||
@@ -911,9 +918,16 @@ def setup_chat_routes(
|
|||||||
elif data.get("type") == "tool_start":
|
elif data.get("type") == "tool_start":
|
||||||
_agent_tool_calls += 1
|
_agent_tool_calls += 1
|
||||||
yield chunk
|
yield chunk
|
||||||
|
elif data.get("type") == "fallback":
|
||||||
|
# Selected model failed; a fallback answered.
|
||||||
|
# Forward the notice and remember the real
|
||||||
|
# model so metrics reflect it, not the masked
|
||||||
|
# selected model.
|
||||||
|
_answered_by = data.get("answered_by") or _answered_by
|
||||||
|
yield chunk
|
||||||
elif data.get("type") == "metrics":
|
elif data.get("type") == "metrics":
|
||||||
last_metrics = data.get("data", {})
|
last_metrics = data.get("data", {})
|
||||||
last_metrics["model"] = sess.model
|
last_metrics["model"] = _answered_by or sess.model
|
||||||
yield f'data: {json.dumps({"type": "metrics", "data": last_metrics})}\n\n'
|
yield f'data: {json.dumps({"type": "metrics", "data": last_metrics})}\n\n'
|
||||||
except json.JSONDecodeError:
|
except json.JSONDecodeError:
|
||||||
yield chunk
|
yield chunk
|
||||||
|
|||||||
@@ -1638,6 +1638,12 @@ async def stream_agent_loop(
|
|||||||
real_output_tokens += u.get("output_tokens", 0)
|
real_output_tokens += u.get("output_tokens", 0)
|
||||||
last_round_input_tokens = round_input
|
last_round_input_tokens = round_input
|
||||||
has_real_usage = True
|
has_real_usage = True
|
||||||
|
elif data.get("type") == "fallback":
|
||||||
|
# The selected model failed and another answered; surface
|
||||||
|
# the notice so a misconfigured provider isn't masked.
|
||||||
|
logger.warning(f"[agent] round {round_num} fell back: "
|
||||||
|
f"{data.get('selected_model')} -> {data.get('answered_by')}")
|
||||||
|
yield chunk
|
||||||
elif "delta" in data:
|
elif "delta" in data:
|
||||||
if not first_token_received:
|
if not first_token_received:
|
||||||
time_to_first_token = time.time() - total_start
|
time_to_first_token = time.time() - total_start
|
||||||
|
|||||||
@@ -1148,6 +1148,24 @@ async def stream_llm(url: str, model: str, messages: List[Dict], temperature: fl
|
|||||||
yield f'event: error\ndata: {json.dumps({"error": str(e), "status": 502})}\n\n'
|
yield f'event: error\ndata: {json.dumps({"error": str(e), "status": 502})}\n\n'
|
||||||
|
|
||||||
|
|
||||||
|
def _summarize_stream_error(err_chunk: Optional[str]) -> str:
|
||||||
|
"""Pull a short human reason out of an `event: error` SSE chunk for the
|
||||||
|
fallback notice. Returns a generic message if it can't be parsed."""
|
||||||
|
if not err_chunk:
|
||||||
|
return "primary model failed"
|
||||||
|
try:
|
||||||
|
for line in err_chunk.split("\n"):
|
||||||
|
if line.startswith("data: "):
|
||||||
|
j = json.loads(line[6:])
|
||||||
|
txt = j.get("text") or j.get("error") or ""
|
||||||
|
status = j.get("status")
|
||||||
|
msg = (f"HTTP {status}: " if status else "") + str(txt)
|
||||||
|
return msg[:200].strip() or "primary model failed"
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
return "primary model failed"
|
||||||
|
|
||||||
|
|
||||||
async def stream_llm_with_fallback(candidates, messages, **kwargs):
|
async def stream_llm_with_fallback(candidates, messages, **kwargs):
|
||||||
"""Wrap stream_llm with an ordered fallback chain.
|
"""Wrap stream_llm with an ordered fallback chain.
|
||||||
|
|
||||||
@@ -1166,6 +1184,7 @@ async def stream_llm_with_fallback(candidates, messages, **kwargs):
|
|||||||
yield f'event: error\ndata: {json.dumps({"error": "No model endpoint configured", "status": 503})}\n\n'
|
yield f'event: error\ndata: {json.dumps({"error": "No model endpoint configured", "status": 503})}\n\n'
|
||||||
return
|
return
|
||||||
|
|
||||||
|
primary_model = cands[0][1]
|
||||||
last_error = None
|
last_error = None
|
||||||
for i, (url, model, headers) in enumerate(cands):
|
for i, (url, model, headers) in enumerate(cands):
|
||||||
is_last = (i == len(cands) - 1)
|
is_last = (i == len(cands) - 1)
|
||||||
@@ -1187,6 +1206,19 @@ async def stream_llm_with_fallback(candidates, messages, **kwargs):
|
|||||||
continue
|
continue
|
||||||
# Any data chunk other than the terminal [DONE] means real output.
|
# Any data chunk other than the terminal [DONE] means real output.
|
||||||
if chunk.startswith("data: ") and not chunk.startswith("data: [DONE]"):
|
if chunk.startswith("data: ") and not chunk.startswith("data: [DONE]"):
|
||||||
|
# First real output from a NON-primary candidate: tell the client
|
||||||
|
# the selected model failed and another answered. Without this the
|
||||||
|
# fallback is invisible — a misconfigured provider looks like it
|
||||||
|
# works because the reply is shown under the originally selected
|
||||||
|
# model's name (e.g. a Bedrock/Claude endpoint that 400s every
|
||||||
|
# request but appears fine because another model silently answered).
|
||||||
|
if not emitted and i > 0:
|
||||||
|
yield ('data: ' + json.dumps({
|
||||||
|
"type": "fallback",
|
||||||
|
"selected_model": primary_model,
|
||||||
|
"answered_by": model,
|
||||||
|
"reason": _summarize_stream_error(last_error),
|
||||||
|
}) + '\n\n')
|
||||||
emitted = True
|
emitted = True
|
||||||
yield chunk
|
yield chunk
|
||||||
if not retried:
|
if not retried:
|
||||||
|
|||||||
@@ -1771,6 +1771,26 @@ import createResearchSynapse from './researchSynapse.js';
|
|||||||
if (tsSpan) roleEl.appendChild(tsSpan);
|
if (tsSpan) roleEl.appendChild(tsSpan);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} else if (json.type === 'fallback') {
|
||||||
|
// The selected model failed and another provider answered. Make
|
||||||
|
// it visible so a misconfigured provider is never silently
|
||||||
|
// masked under the selected model's name.
|
||||||
|
if (!_isBg) {
|
||||||
|
var _selM = _shortModel(json.selected_model || '');
|
||||||
|
var _ansM = _shortModel(json.answered_by || '');
|
||||||
|
uiModule.showToast('⚠ ' + _selM + ' failed — answered by ' + _ansM, 6000);
|
||||||
|
if (holder) {
|
||||||
|
var _rEl = holder.querySelector('.role');
|
||||||
|
if (_rEl) {
|
||||||
|
var _tsS = _rEl.querySelector('.role-timestamp');
|
||||||
|
_rEl.textContent = _ansM + ' (fallback) ';
|
||||||
|
_rEl.title = (json.selected_model || '') + ' failed' +
|
||||||
|
(json.reason ? ': ' + json.reason : '') + ' — answered by ' + (json.answered_by || '');
|
||||||
|
_applyModelColor(_rEl, json.answered_by);
|
||||||
|
if (_tsS) _rEl.appendChild(_tsS);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
} else if (json.type === 'attachments') {
|
} else if (json.type === 'attachments') {
|
||||||
if (_isBg) continue;
|
if (_isBg) continue;
|
||||||
// Update user bubble — replace file chips with image previews
|
// Update user bubble — replace file chips with image previews
|
||||||
|
|||||||
@@ -0,0 +1,61 @@
|
|||||||
|
"""Tests for the fallback indicator in stream_llm_with_fallback.
|
||||||
|
|
||||||
|
When the selected model fails *before output* and another candidate answers,
|
||||||
|
a `fallback` event must be emitted so the switch is never masked under the
|
||||||
|
selected model's name (which is how a misconfigured provider can look like it
|
||||||
|
works while a different model silently answers).
|
||||||
|
"""
|
||||||
|
import json
|
||||||
|
import asyncio
|
||||||
|
|
||||||
|
from src import llm_core
|
||||||
|
|
||||||
|
|
||||||
|
def _run_fallback(monkeypatch, per_model):
|
||||||
|
"""Drive stream_llm_with_fallback with a stubbed stream_llm that returns a
|
||||||
|
canned SSE line list per candidate model. Returns the emitted chunks."""
|
||||||
|
async def fake_stream(url, model, messages, **kw):
|
||||||
|
for ln in per_model(model):
|
||||||
|
yield ln
|
||||||
|
monkeypatch.setattr(llm_core, "stream_llm", fake_stream)
|
||||||
|
|
||||||
|
async def run():
|
||||||
|
out = []
|
||||||
|
async for c in llm_core.stream_llm_with_fallback(
|
||||||
|
[("u1", "primary", {}), ("u2", "backup", {})], [{"role": "user", "content": "hi"}]
|
||||||
|
):
|
||||||
|
out.append(c)
|
||||||
|
return out
|
||||||
|
|
||||||
|
return asyncio.run(run())
|
||||||
|
|
||||||
|
|
||||||
|
def test_fallback_emits_indicator_when_primary_fails(monkeypatch):
|
||||||
|
def per_model(model):
|
||||||
|
if model == "primary":
|
||||||
|
return ['event: error\ndata: {"status": 400, "text": "Provider X returned HTTP 400"}\n\n']
|
||||||
|
return ['data: {"delta": "hello"}\n\n', "data: [DONE]\n\n"]
|
||||||
|
chunks = _run_fallback(monkeypatch, per_model)
|
||||||
|
fb = [json.loads(c[6:]) for c in chunks if c.startswith("data: ") and '"fallback"' in c]
|
||||||
|
assert fb, f"no fallback event in {chunks}"
|
||||||
|
assert fb[0]["type"] == "fallback"
|
||||||
|
assert fb[0]["selected_model"] == "primary"
|
||||||
|
assert fb[0]["answered_by"] == "backup"
|
||||||
|
assert "400" in fb[0]["reason"]
|
||||||
|
# the fallback notice must precede the answer content
|
||||||
|
order = [i for i, c in enumerate(chunks) if '"fallback"' in c or '"delta": "hello"' in c]
|
||||||
|
assert order == sorted(order)
|
||||||
|
assert any('"delta": "hello"' in c for c in chunks)
|
||||||
|
|
||||||
|
|
||||||
|
def test_no_fallback_event_when_primary_succeeds(monkeypatch):
|
||||||
|
def per_model(model):
|
||||||
|
return ['data: {"delta": "ok"}\n\n', "data: [DONE]\n\n"]
|
||||||
|
chunks = _run_fallback(monkeypatch, per_model)
|
||||||
|
assert not any('"fallback"' in c for c in chunks)
|
||||||
|
|
||||||
|
|
||||||
|
def test_summarize_stream_error():
|
||||||
|
assert "400" in llm_core._summarize_stream_error('event: error\ndata: {"status": 400, "text": "nope"}\n\n')
|
||||||
|
assert llm_core._summarize_stream_error(None) == "primary model failed"
|
||||||
|
assert llm_core._summarize_stream_error("garbage") == "primary model failed"
|
||||||
Reference in New Issue
Block a user