fix(agent): surface early loop-guard stops

This commit is contained in:
Alexandre Teixeira
2026-06-13 17:14:45 +01:00
parent 270b8570fc
commit ff5bcd9864
4 changed files with 193 additions and 77 deletions
+2
View File
@@ -1297,6 +1297,8 @@ def setup_chat_routes(
"doc_stream_open", "doc_stream_delta", "doc_stream_open", "doc_stream_delta",
"doc_update", "doc_suggestions", "ui_control", "doc_update", "doc_suggestions", "ui_control",
"rounds_exhausted", "rounds_exhausted",
"loop_breaker_triggered",
"intent_nudge_exhausted",
"ask_user", "ask_user",
"plan_update", "plan_update",
): ):
+60 -77
View File
@@ -11,6 +11,7 @@ import collections
import json import json
import re import re
import time import time
import re
import logging import logging
from typing import AsyncGenerator, List, Dict, Optional, Set from typing import AsyncGenerator, List, Dict, Optional, Set
from urllib.parse import urlparse from urllib.parse import urlparse
@@ -38,6 +39,27 @@ from src.agent_tools import (
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
_SENSITIVE_BEARER_RE = re.compile(
r"(?i)\b(authorization\s*[:=]\s*bearer\s+)[A-Za-z0-9._~+/=-]+"
)
_SENSITIVE_VALUE_RE = re.compile(
r"(?i)\b(password|passwd|pwd|token|api[_-]?key|secret)\b"
r"(\s*[:=]\s*)"
r"([^\s,;]+)"
)
def _redact_sensitive_text(value: object) -> str:
"""Redact obvious credential values before surfacing tool output."""
if value is None:
return ""
text = str(value)
text = _SENSITIVE_BEARER_RE.sub(r"\1[redacted]", text)
return _SENSITIVE_VALUE_RE.sub(r"\1\2[redacted]", text)
def _load_mcp_disabled_map() -> Dict[str, set]: def _load_mcp_disabled_map() -> Dict[str, set]:
"""Load per-server disabled tool sets from the database.""" """Load per-server disabled tool sets from the database."""
@@ -2215,6 +2237,7 @@ async def stream_agent_loop(
# signatures + consecutive no-text tool rounds to bail early. # signatures + consecutive no-text tool rounds to bail early.
_recent_call_sigs = collections.deque(maxlen=6) _recent_call_sigs = collections.deque(maxlen=6)
_stuck_rounds = 0 _stuck_rounds = 0
_MAX_STUCK_ROUNDS = 4 # consecutive no-progress rounds before loop-breaker bails
# Frequency of each exact call signature (tool + args), for the runaway # Frequency of each exact call signature (tool + args), for the runaway
# backstop. Counting identical repeats — not distinct same-tool calls — # backstop. Counting identical repeats — not distinct same-tool calls —
# lets a legit batch (e.g. 18 calendar events at once) through. # lets a legit batch (e.g. 18 calendar events at once) through.
@@ -2637,13 +2660,13 @@ async def stream_agent_loop(
# promise: short response (<400 chars), no fenced code/answer, # promise: short response (<400 chars), no fenced code/answer,
# and an action-intent phrase was matched. Long answers that # and an action-intent phrase was matched. Long answers that
# happen to contain "let me know" are not stalls. # happen to contain "let me know" are not stalls.
_looks_like_promise = ( _promise_shape = (
not guide_only not guide_only
and _intent_match is not None and _intent_match is not None
and len(_intent_text) < 400 and len(_intent_text) < 400
and "```" not in _intent_text and "```" not in _intent_text
and _intent_nudge_count < _MAX_INTENT_NUDGES
) )
_looks_like_promise = _promise_shape and _intent_nudge_count < _MAX_INTENT_NUDGES
if _looks_like_promise: if _looks_like_promise:
_intent_nudge_count += 1 _intent_nudge_count += 1
_matched_phrase = _intent_match.group(0).strip() _matched_phrase = _intent_match.group(0).strip()
@@ -2663,6 +2686,21 @@ async def stream_agent_loop(
# Visible signal in the stream so the user knows we caught it. # Visible signal in the stream so the user knows we caught it.
yield f'data: {json.dumps({"type": "agent_step", "round": round_num + 1})}\n\n' yield f'data: {json.dumps({"type": "agent_step", "round": round_num + 1})}\n\n'
continue continue
# The model keeps announcing actions it never takes and we've spent
# every nudge — surface why the turn is ending instead of letting it
# look like a clean completion.
if _promise_shape and _intent_nudge_count >= _MAX_INTENT_NUDGES:
_matched_phrase = _intent_match.group(0).strip()
_in_message = (
f"Intent-nudge cap reached on round {round_num}: the model "
f"announced an action ({_matched_phrase!r}) without a tool call "
f"after {_intent_nudge_count} nudge(s); ending the turn."
)
logger.warning(
"[agent] intent-nudge cap exhausted on round %d (%d/%d): %r",
round_num, _intent_nudge_count, _MAX_INTENT_NUDGES, _matched_phrase,
)
yield f'data: {json.dumps({"type": "intent_nudge_exhausted", "round": round_num, "nudges": _intent_nudge_count, "max_nudges": _MAX_INTENT_NUDGES, "message": _in_message})}\n\n'
break # no tools — done break # no tools — done
# ── Loop-breaker (Terminus-style stall detector) ────────────── # ── Loop-breaker (Terminus-style stall detector) ──────────────
@@ -2695,10 +2733,21 @@ async def stream_agent_loop(
# Distinct calls to one tool (a real batch) are legitimate work, so we # Distinct calls to one tool (a real batch) are legitimate work, so we
# count identical call signatures, not raw per-tool-type totals. # count identical call signatures, not raw per-tool-type totals.
_runaway = _detect_runaway_call(_call_freq) _runaway = _detect_runaway_call(_call_freq)
if _stuck_rounds >= 4 or _runaway: if _stuck_rounds >= _MAX_STUCK_ROUNDS or _runaway:
reason = (f"calling {_runaway} with identical arguments over and over" if _runaway reason = (f"calling {_runaway} with identical arguments over and over" if _runaway
else "repeating the same tool calls without new progress") else "repeating the same tool calls without new progress")
logger.warning(f"[agent] loop-breaker tripped on round {round_num} ({reason}); sig={_sig[:80]!r}") _lb_message = (
f"Loop-breaker stopped the agent on round {round_num}: {reason}. "
"Forced one tool-free round to converge on an answer or state what's blocked."
)
logger.warning(
"[agent] loop-breaker tripped on round %d (%s); "
"stuck_rounds=%d/%d runaway=%r sig=%r",
round_num, reason, _stuck_rounds, _MAX_STUCK_ROUNDS, _runaway, _sig[:80],
)
# Surface the stop cause to the stream so the user (and journalctl)
# can tell a guard fired, not a clean completion.
yield f'data: {json.dumps({"type": "loop_breaker_triggered", "round": round_num, "reason": reason, "stuck_rounds": _stuck_rounds, "max_stuck_rounds": _MAX_STUCK_ROUNDS, "runaway": _runaway, "message": _lb_message})}\n\n'
# The model has been executing tools, so its results are already # The model has been executing tools, so its results are already
# in context. Force ONE tool-free round to converge: write the # in context. Force ONE tool-free round to converge: write the
# answer from what it has, or state plainly what's blocking it. # answer from what it has, or state plainly what's blocking it.
@@ -2888,99 +2937,33 @@ async def stream_agent_loop(
elif "results" in result: elif "results" in result:
result["results"] = _clean result["results"] = _clean
elif "stdout" in result: elif "stdout" in result:
result["stdout"] = _clean
except (json.JSONDecodeError, Exception):
pass
# Emit doc-specific event for document tools — the frontend
# document panel handles this; no need to show content in chat.
if is_doc_tool and "action" in result:
if result["action"] == "suggest":
yield (
f'data: {json.dumps({"type": "doc_suggestions", "doc_id": result["doc_id"], "suggestions": result["suggestions"]})}\n\n'
)
else:
yield (
f'data: {json.dumps({"type": "doc_update", "doc_id": result["doc_id"], "content": result["content"], "version": result["version"], "title": result.get("title", ""), "language": result.get("language")})}\n\n'
)
# Emit ui_control event for frontend to apply UI changes
if "ui_event" in result:
yield (
f'data: {json.dumps({"type": "ui_control", "data": result})}\n\n'
)
# ask_user: the agent posed a multiple-choice question. Emit it so the
# frontend renders clickable options, then end the turn (below) and
# wait — the user's pick becomes the next message.
if "ask_user" in result:
# The question lives in the tool args. ChatMessage.to_dict()
# replays only role+content to the model next turn — tool_event
# metadata is dropped — so if the question is never in the saved
# assistant text, the model can't see it already asked and will
# loop and re-ask after the user answers. Stream it as assistant
# text (once) so it persists and is replayed. The card shows the
# options only, so this is the single visible copy of the question.
_auq = result["ask_user"]
_auq_q = (_auq.get("question") or "").strip()
if _auq_q and _auq_q not in full_response:
_auq_delta = ("\n\n" if full_response.strip() else "") + _auq_q
full_response += _auq_delta
yield 'data: ' + json.dumps({"delta": _auq_delta}) + '\n\n'
yield (
f'data: {json.dumps({"type": "ask_user", "data": result["ask_user"]})}\n\n'
)
_awaiting_user = True
# update_plan: agent wrote back to the plan (ticked a step / revised).
# Push it to the frontend so the stored plan + docked window update
# live. Does NOT end the turn — the agent keeps working.
if "plan_update" in result:
yield (
f'data: {json.dumps({"type": "plan_update", "data": result["plan_update"]})}\n\n'
)
# Build output for frontend tool bubble.
# Document tools get a short summary — content goes to the editor panel.
output_text = ""
if is_doc_tool and "action" in result:
action = result["action"]
title = result.get("title", "")
ver = result.get("version", "?")
if action == "create":
output_text = f'Document created: "{title}" (v{ver})'
elif action == "edit":
output_text = f'Document edited: "{title}" (v{ver}, {result.get("applied", 0)} edit(s))'
elif action == "update":
output_text = f'Document updated: "{title}" (v{ver})'
elif "stdout" in result:
# On a bash/python timeout the result carries error + (often # On a bash/python timeout the result carries error + (often
# empty) stdout/stderr; fall back to the error so the "timed # empty) stdout/stderr; fall back to the error so the "timed
# out" reason reaches the UI instead of a blank result. # out" reason reaches the UI instead of a blank result.
raw = result["stdout"] or result["stderr"] or result.get("error", "") raw = result["stdout"] or result["stderr"] or result.get("error", "")
output_text = _truncate(raw) output_text = _truncate(_redact_sensitive_text(raw))
elif "output" in result: elif "output" in result:
# bash / python canonical result: {"output": ..., "exit_code": ...} # bash / python canonical result: {"output": ..., "exit_code": ...}
raw = result["output"] or "" raw = result["output"] or ""
output_text = _truncate(raw) output_text = _truncate(_redact_sensitive_text(raw))
elif "response" in result: elif "response" in result:
# AI interaction tools (chat_with_model, send_to_session) # AI interaction tools (chat_with_model, send_to_session)
label = result.get("model", result.get("session_name", "AI")) label = result.get("model", result.get("session_name", "AI"))
output_text = _truncate(f"{label}: {result['response']}") output_text = _truncate(_redact_sensitive_text(f"{label}: {result['response']}"))
elif "content" in result: elif "content" in result:
output_text = _truncate(result["content"]) output_text = _truncate(_redact_sensitive_text(result["content"]))
elif "results" in result: elif "results" in result:
output_text = _truncate(result["results"]) output_text = _truncate(_redact_sensitive_text(result["results"]))
elif "session_id" in result and "name" in result: elif "session_id" in result and "name" in result:
output_text = f"Session created: {result['name']} (id: {result['session_id']})" output_text = f"Session created: {result['name']} (id: {result['session_id']})"
elif "success" in result: elif "success" in result:
output_text = ( output_text = (
f"Written: {result.get('path', '')}" f"Written: {result.get('path', '')}"
if result["success"] if result["success"]
else f"Error: {result.get('error', '')}" else f"Error: {_redact_sensitive_text(result.get('error', ''))}"
) )
elif "error" in result: elif "error" in result:
output_text = _truncate(result["error"]) output_text = _truncate(_redact_sensitive_text(result["error"]))
# Emit tool_output (include ui_event data if present) # Emit tool_output (include ui_event data if present)
tool_output_data = {"type": "tool_output", "tool": block.tool_type, "command": cmd_display, "output": output_text, "exit_code": result.get("exit_code")} tool_output_data = {"type": "tool_output", "tool": block.tool_type, "command": cmd_display, "output": output_text, "exit_code": result.get("exit_code")}
+17
View File
@@ -1911,6 +1911,23 @@ import { wireArrowUpRecall, getLastUserMessageFromChatHistory } from './composer
_chatBox.appendChild(note); _chatBox.appendChild(note);
try { note.scrollIntoView({ block: 'end', behavior: 'smooth' }); } catch (_) { uiModule.scrollHistory && uiModule.scrollHistory(); } try { note.scrollIntoView({ block: 'end', behavior: 'smooth' }); } catch (_) { uiModule.scrollHistory && uiModule.scrollHistory(); }
} }
} else if (json.type === 'loop_breaker_triggered' || json.type === 'intent_nudge_exhausted') {
// A loop guard ended the turn — surface why so it isn't mistaken
// for a clean completion or a silent stall.
const _chatBox = document.getElementById('chat-history');
if (!_isBg && _chatBox) {
const note = document.createElement('div');
note.className = 'stopped-indicator loop-guard-stop';
const label = document.createElement('span');
label.className = 'rounds-exhausted-label';
label.textContent = json.message ||
(json.type === 'loop_breaker_triggered'
? 'Stopped by the loop-breaker (no new progress).'
: 'Stopped: announced an action but never called the tool.');
note.appendChild(label);
_chatBox.appendChild(note);
try { note.scrollIntoView({ block: 'end', behavior: 'smooth' }); } catch (_) { uiModule.scrollHistory && uiModule.scrollHistory(); }
}
} else if (json.type === 'model_actual') { } else if (json.type === 'model_actual') {
if (!_isBg && holder) { if (!_isBg && holder) {
holder._requestedModel = json.requested_model || holder._requestedModel || modelName; holder._requestedModel = json.requested_model || holder._requestedModel || modelName;
+114
View File
@@ -0,0 +1,114 @@
"""Regression: stream_agent_loop surfaces *why* a guard ended the turn.
Two internal guards used to stop the agent in ways that looked like a clean
completion or a vague blocked message:
* the loop-breaker stall detector -> now emits `loop_breaker_triggered`
* the intent-without-action nudge cap -> now emits `intent_nudge_exhausted`
These tests run the real loop body against a fake LLM stream (no model calls,
no sleeps) and assert the structured stop event is emitted.
"""
import asyncio
import json
import src.agent_loop as al
def _collect(gen):
async def _run():
return [c async for c in gen]
return asyncio.run(_run())
def _types(chunks):
out = []
for c in chunks:
if c.startswith("data: ") and not c.startswith("data: [DONE]"):
try:
out.append(json.loads(c[6:]))
except Exception:
pass
return out
def _patch_common(monkeypatch):
monkeypatch.setattr(al, "get_setting", lambda key, default=None: default, raising=False)
monkeypatch.setattr(al, "get_mcp_manager", lambda: None, raising=False)
monkeypatch.setattr(al, "estimate_tokens", lambda *a, **k: 10, raising=False)
async def _fake_exec(block, *a, **k):
return ("bash", {"output": "ok", "exit_code": 0})
monkeypatch.setattr(al, "execute_tool_block", _fake_exec, raising=False)
def _run_loop(monkeypatch, round_text, max_rounds, relevant_tools={"bash"}):
async def _fake_stream(_candidates, messages, **kwargs):
yield f'data: {json.dumps({"delta": round_text})}\n\n'
yield "data: [DONE]\n\n"
monkeypatch.setattr(al, "stream_llm_with_fallback", _fake_stream, raising=False)
gen = al.stream_agent_loop(
"http://x/v1", "m",
[{"role": "user", "content": "do a long multi-step task"}],
max_rounds=max_rounds,
relevant_tools=relevant_tools,
)
return _types(_collect(gen))
def test_emits_loop_breaker_triggered_on_repeated_no_progress(monkeypatch):
_patch_common(monkeypatch)
# Same exact tool call every round, no answer text -> stuck-round streak
# trips the loop-breaker once the cap is reached.
events = _run_loop(monkeypatch, "```bash\necho hi\n```", max_rounds=8)
lb = [e for e in events if e.get("type") == "loop_breaker_triggered"]
assert lb, events
e = lb[0]
assert e["reason"]
assert e["max_stuck_rounds"] == 4
assert e["stuck_rounds"] >= 4
assert "message" in e
def test_no_loop_breaker_on_normal_finish(monkeypatch):
_patch_common(monkeypatch)
events = _run_loop(monkeypatch, "All done, here is your answer.", max_rounds=8)
assert not any(e.get("type") == "loop_breaker_triggered" for e in events), events
def test_emits_intent_nudge_exhausted_when_cap_reached(monkeypatch):
_patch_common(monkeypatch)
# The model keeps announcing an action with no tool call. After the nudge
# cap is spent, the turn ends with an explicit intent_nudge_exhausted event.
events = _run_loop(monkeypatch, "Let me check the logs now", max_rounds=5)
inx = [e for e in events if e.get("type") == "intent_nudge_exhausted"]
assert inx, events
e = inx[0]
assert e["max_nudges"] == 2
assert e["nudges"] >= 2
assert "message" in e
def test_no_intent_nudge_exhausted_on_normal_finish(monkeypatch):
_patch_common(monkeypatch)
events = _run_loop(monkeypatch, "Here is the complete answer to your question.", max_rounds=5)
assert not any(e.get("type") == "intent_nudge_exhausted" for e in events), events
def test_redacts_sensitive_tool_output_before_surfacing():
text = al._redact_sensitive_text(
"password: private-value\n"
"api_key=private-key\n"
"Authorization: Bearer private-token\n"
"normal output"
)
assert "private-value" not in text
assert "private-key" not in text
assert "private-token" not in text
assert "password: [redacted]" in text
assert "api_key=[redacted]" in text
assert "Authorization: Bearer [redacted]" in text
assert "normal output" in text