2 Commits

Author SHA1 Message Date
Alexandre Teixeira bd0c67b6d3 fix(agent): preserve loop guard stream behavior 2026-06-15 17:17:16 +01:00
Alexandre Teixeira ff5bcd9864 fix(agent): surface early loop-guard stops 2026-06-15 17:07:15 +01:00
4 changed files with 592 additions and 14 deletions
+2
View File
@@ -1297,6 +1297,8 @@ def setup_chat_routes(
"doc_stream_open", "doc_stream_delta",
"doc_update", "doc_suggestions", "ui_control",
"rounds_exhausted",
"loop_breaker_triggered",
"intent_nudge_exhausted",
"ask_user",
"plan_update",
):
+223 -14
View File
@@ -38,6 +38,167 @@ from src.agent_tools import (
logger = logging.getLogger(__name__)
# Redaction patterns for common secret-bearing shapes. Explicit and tested
# (see tests/test_loop_guard_signals.py) rather than one clever broad regex —
# safety first, but we try not to mangle harmless prose. Applied in order.
_REDACTED = "[redacted]"
# Cookie: ... / Set-Cookie: ... — redact the rest of the line (cookies hold spaces).
_SENSITIVE_COOKIE_RE = re.compile(
r"(?i)\b((?:set-)?cookie\s*[:=]\s*)[^\r\n]+"
)
# URL credentials, e.g. postgres://user:pass@host/db. The password half allows
# inner colons (postgres://user:pa:ss@host/db) but still stops at / and @.
_SENSITIVE_URL_CRED_RE = re.compile(
r"(?i)\b([a-z][a-z0-9+.\-]*://)[^\s:/@]+:[^\s/@]+@"
)
# Prefix-only discovery regexes. Each matches the key and its separator (the part
# we KEEP); the value that follows is found by a linear scanner rather than by a
# regex, so there is no backtracking-prone quantifier over uncontrolled input.
#
# Authorization: Bearer <tok> / Authorization: Basic "two word secret"
_AUTH_PREFIX_RE = re.compile(
r"(?i)authorization\s*[:=]\s*(?:bearer|basic)\s+"
)
# Provider-prefixed env names, e.g. OPENAI_API_KEY=..., AWS_SECRET_ACCESS_KEY=...,
# GITHUB_TOKEN=... — require a sensitive suffix preceded by `_` so benign names
# that merely end in KEY (MONKEY, TURKEY) are left alone.
_ENV_PREFIX_RE = re.compile(
r"(?:export\s+)?\b[A-Z][A-Z0-9_]*"
r"_(?:KEY|TOKEN|SECRET|PASSWORD|PASSWD|PWD|CREDENTIALS?)\s*=\s*"
)
# Generic sensitive key, e.g. password=..., api_key: ..., client_secret=...
_KEY_PREFIX_RE = re.compile(
r"(?i)\b(?:password|passwd|pwd|token|api[_-]?key|client_secret|secret)\b\s*[:=]\s*"
)
# Obvious provider-shaped bare tokens (no surrounding key needed).
_SENSITIVE_BARE_TOKEN_RE = re.compile(
r"\b("
r"sk-[A-Za-z0-9_\-]{16,}" # OpenAI / Anthropic style
r"|gh[pousr]_[A-Za-z0-9]{20,}" # GitHub PAT
r"|xox[baprs]-[A-Za-z0-9\-]{10,}" # Slack
r"|AKIA[0-9A-Z]{16}" # AWS access key id
r"|hf_[A-Za-z0-9]{16,}" # Hugging Face token
r"|AIza[0-9A-Za-z_\-]{20,}" # Google API key
r")\b"
)
def _consume_secret_value_end(text: str, start: int) -> int:
"""Return the exclusive end index of the secret value beginning at ``start``.
If the value is quoted, scan to the matching unescaped quote (backslash
escapes are skipped two chars at a time). Otherwise scan to the first
whitespace, comma, or semicolon. The scan is linear in the length of the
input, so it cannot exhibit catastrophic backtracking.
"""
n = len(text)
if start >= n:
return start
quote = text[start]
if quote in ("'", '"'):
i = start + 1
while i < n:
ch = text[i]
if ch == "\\":
i += 2
continue
if ch == quote:
return i + 1
i += 1
return n # unterminated quote: redact to the end
i = start
while i < n and not text[i].isspace() and text[i] not in (",", ";"):
i += 1
return i
def _redact_after_prefix(text: str, prefix_re: "re.Pattern") -> str:
"""Redact the value following each ``prefix_re`` match using a linear scan."""
result = []
pos = 0
n = len(text)
while pos < n:
match = prefix_re.search(text, pos)
if match is None:
result.append(text[pos:])
break
result.append(text[pos:match.end()])
value_end = _consume_secret_value_end(text, match.end())
if value_end > match.end():
result.append(_REDACTED)
pos = value_end
else:
# Empty value: nothing to redact; step past the prefix and continue.
pos = match.end()
if pos < n:
result.append(text[pos])
pos += 1
return "".join(result)
def _redact_private_keys(text: str) -> str:
"""Replace PEM private-key blocks with a placeholder via linear scanning.
Finds ``-----BEGIN `` markers, verifies the header names a PRIVATE KEY,
locates the matching ``-----END `` marker, and collapses the whole block.
No regex is used, so the (multi-line, uncontrolled) body cannot trigger
polynomial matching.
"""
begin_marker = "-----BEGIN "
end_marker = "-----END "
dash = "-----"
max_header = 64 # generous bound on "[TYPE ]PRIVATE KEY"
result = []
pos = 0
while True:
begin = text.find(begin_marker, pos)
if begin == -1:
result.append(text[pos:])
return "".join(result)
header_start = begin + len(begin_marker)
header_close = text.find(dash, header_start)
if (
header_close == -1
or header_close - header_start > max_header
or not text[header_start:header_close].endswith("PRIVATE KEY")
):
result.append(text[pos:header_start])
pos = header_start
continue
end = text.find(end_marker, header_close)
if end == -1:
result.append(text[pos:])
return "".join(result)
end_header_start = end + len(end_marker)
end_close = text.find(dash, end_header_start)
if (
end_close == -1
or end_close - end_header_start > max_header
or not text[end_header_start:end_close].endswith("PRIVATE KEY")
):
result.append(text[pos:header_start])
pos = header_start
continue
result.append(text[pos:begin])
result.append("[redacted private key]")
pos = end_close + len(dash)
def _redact_sensitive_text(value: object) -> str:
"""Redact obvious credential values before surfacing tool output."""
if value is None:
return ""
text = str(value)
text = _redact_private_keys(text)
text = _redact_after_prefix(text, _AUTH_PREFIX_RE)
text = _SENSITIVE_COOKIE_RE.sub(r"\1" + _REDACTED, text)
text = _SENSITIVE_URL_CRED_RE.sub(r"\1" + _REDACTED + "@", text)
text = _redact_after_prefix(text, _ENV_PREFIX_RE)
text = _redact_after_prefix(text, _KEY_PREFIX_RE)
return _SENSITIVE_BARE_TOKEN_RE.sub(_REDACTED, text)
def _load_mcp_disabled_map() -> Dict[str, set]:
"""Load per-server disabled tool sets from the database."""
@@ -2215,6 +2376,7 @@ async def stream_agent_loop(
# signatures + consecutive no-text tool rounds to bail early.
_recent_call_sigs = collections.deque(maxlen=6)
_stuck_rounds = 0
_MAX_STUCK_ROUNDS = 4 # consecutive no-progress rounds before loop-breaker bails
# Frequency of each exact call signature (tool + args), for the runaway
# backstop. Counting identical repeats — not distinct same-tool calls —
# lets a legit batch (e.g. 18 calendar events at once) through.
@@ -2637,17 +2799,22 @@ async def stream_agent_loop(
# promise: short response (<400 chars), no fenced code/answer,
# and an action-intent phrase was matched. Long answers that
# happen to contain "let me know" are not stalls.
_looks_like_promise = (
_promise_shape = (
not guide_only
and _intent_match is not None
and len(_intent_text) < 400
and "```" not in _intent_text
and _intent_nudge_count < _MAX_INTENT_NUDGES
)
_looks_like_promise = _promise_shape and _intent_nudge_count < _MAX_INTENT_NUDGES
if _looks_like_promise:
_intent_nudge_count += 1
_matched_phrase = _intent_match.group(0).strip()
logger.info(f"[agent] intent-without-action nudge #{_intent_nudge_count} on round {round_num}: {_matched_phrase!r}")
# Don't log the matched phrase — it's raw model text that may
# carry credentials. Structural metadata only.
logger.info(
"[agent] intent-without-action nudge #%d on round %d",
_intent_nudge_count, round_num,
)
messages.append({
"role": "system",
"content": (
@@ -2663,6 +2830,24 @@ async def stream_agent_loop(
# Visible signal in the stream so the user knows we caught it.
yield f'data: {json.dumps({"type": "agent_step", "round": round_num + 1})}\n\n'
continue
# The model keeps announcing actions it never takes and we've spent
# every nudge — surface why the turn is ending instead of letting it
# look like a clean completion.
if _promise_shape and _intent_nudge_count >= _MAX_INTENT_NUDGES:
_matched_phrase = _intent_match.group(0).strip()
_matched_phrase_safe = _redact_sensitive_text(_matched_phrase)
_in_message = (
f"Intent-nudge cap reached on round {round_num}: the model "
f"announced an action ({_matched_phrase_safe!r}) without a tool call "
f"after {_intent_nudge_count} nudge(s); ending the turn."
)
# Do not log the matched phrase, even redacted. It is raw model
# text and may contain credentials; keep logs structural only.
logger.warning(
"[agent] intent-nudge cap exhausted on round %d (%d/%d)",
round_num, _intent_nudge_count, _MAX_INTENT_NUDGES,
)
yield f'data: {json.dumps({"type": "intent_nudge_exhausted", "round": round_num, "nudges": _intent_nudge_count, "max_nudges": _MAX_INTENT_NUDGES, "message": _in_message})}\n\n'
break # no tools — done
# ── Loop-breaker (Terminus-style stall detector) ──────────────
@@ -2695,10 +2880,23 @@ async def stream_agent_loop(
# Distinct calls to one tool (a real batch) are legitimate work, so we
# count identical call signatures, not raw per-tool-type totals.
_runaway = _detect_runaway_call(_call_freq)
if _stuck_rounds >= 4 or _runaway:
if _stuck_rounds >= _MAX_STUCK_ROUNDS or _runaway:
reason = (f"calling {_runaway} with identical arguments over and over" if _runaway
else "repeating the same tool calls without new progress")
logger.warning(f"[agent] loop-breaker tripped on round {round_num} ({reason}); sig={_sig[:80]!r}")
_lb_message = (
f"Loop-breaker stopped the agent on round {round_num}: {reason}. "
"Forced one tool-free round to converge on an answer or state what's blocked."
)
# Log structural metadata only — `_sig` is raw tool-call content
# that may carry credentials.
logger.warning(
"[agent] loop-breaker tripped on round %d (%s); "
"stuck_rounds=%d/%d runaway=%r",
round_num, reason, _stuck_rounds, _MAX_STUCK_ROUNDS, _runaway,
)
# Surface the stop cause to the stream so the user (and journalctl)
# can tell a guard fired, not a clean completion.
yield f'data: {json.dumps({"type": "loop_breaker_triggered", "round": round_num, "reason": reason, "stuck_rounds": _stuck_rounds, "max_stuck_rounds": _MAX_STUCK_ROUNDS, "runaway": _runaway, "message": _lb_message})}\n\n'
# The model has been executing tools, so its results are already
# in context. Force ONE tool-free round to converge: write the
# answer from what it has, or state plainly what's blocking it.
@@ -2777,6 +2975,10 @@ async def stream_agent_loop(
cmd_display = block.content.split("\n")[0].strip()[:80]
else:
cmd_display = block.content.strip()
# The display string is streamed (tool_start/tool_output) and persisted;
# redact any secrets in it. block.content itself is left untouched so
# tool execution still sees the real command.
cmd_display = _redact_sensitive_text(cmd_display)
if tool_policy and tool_policy.blocks(block.tool_type):
desc = f"{block.tool_type}: BLOCKED"
@@ -2822,8 +3024,15 @@ async def stream_agent_loop(
evt = await _progress_q.get()
if evt is None:
break
# Redact secrets in the live tail before streaming — the
# final tool_output is redacted, so the progress tail must
# be too, or a secret could flash by mid-run. Copy so we
# don't mutate the tool's own event payload.
_evt = dict(evt)
if isinstance(_evt.get("tail"), str):
_evt["tail"] = _redact_sensitive_text(_evt["tail"])
yield (
f'data: {json.dumps({"type": "tool_progress", "tool": block.tool_type, "round": round_num, **evt})}\n\n'
f'data: {json.dumps({"type": "tool_progress", "tool": block.tool_type, "round": round_num, **_evt})}\n\n'
)
desc, result = await _tool_task
@@ -2889,7 +3098,7 @@ async def stream_agent_loop(
result["results"] = _clean
elif "stdout" in result:
result["stdout"] = _clean
except (json.JSONDecodeError, Exception):
except Exception:
pass
# Emit doc-specific event for document tools — the frontend
@@ -2958,29 +3167,29 @@ async def stream_agent_loop(
# empty) stdout/stderr; fall back to the error so the "timed
# out" reason reaches the UI instead of a blank result.
raw = result["stdout"] or result["stderr"] or result.get("error", "")
output_text = _truncate(raw)
output_text = _truncate(_redact_sensitive_text(raw))
elif "output" in result:
# bash / python canonical result: {"output": ..., "exit_code": ...}
raw = result["output"] or ""
output_text = _truncate(raw)
output_text = _truncate(_redact_sensitive_text(raw))
elif "response" in result:
# AI interaction tools (chat_with_model, send_to_session)
label = result.get("model", result.get("session_name", "AI"))
output_text = _truncate(f"{label}: {result['response']}")
output_text = _truncate(_redact_sensitive_text(f"{label}: {result['response']}"))
elif "content" in result:
output_text = _truncate(result["content"])
output_text = _truncate(_redact_sensitive_text(result["content"]))
elif "results" in result:
output_text = _truncate(result["results"])
output_text = _truncate(_redact_sensitive_text(result["results"]))
elif "session_id" in result and "name" in result:
output_text = f"Session created: {result['name']} (id: {result['session_id']})"
elif "success" in result:
output_text = (
f"Written: {result.get('path', '')}"
if result["success"]
else f"Error: {result.get('error', '')}"
else f"Error: {_redact_sensitive_text(result.get('error', ''))}"
)
elif "error" in result:
output_text = _truncate(result["error"])
output_text = _truncate(_redact_sensitive_text(result["error"]))
# Emit tool_output (include ui_event data if present)
tool_output_data = {"type": "tool_output", "tool": block.tool_type, "command": cmd_display, "output": output_text, "exit_code": result.get("exit_code")}
+17
View File
@@ -1911,6 +1911,23 @@ import { wireArrowUpRecall, getLastUserMessageFromChatHistory } from './composer
_chatBox.appendChild(note);
try { note.scrollIntoView({ block: 'end', behavior: 'smooth' }); } catch (_) { uiModule.scrollHistory && uiModule.scrollHistory(); }
}
} else if (json.type === 'loop_breaker_triggered' || json.type === 'intent_nudge_exhausted') {
// A loop guard ended the turn — surface why so it isn't mistaken
// for a clean completion or a silent stall.
const _chatBox = document.getElementById('chat-history');
if (!_isBg && _chatBox) {
const note = document.createElement('div');
note.className = 'stopped-indicator loop-guard-stop';
const label = document.createElement('span');
label.className = 'rounds-exhausted-label';
label.textContent = json.message ||
(json.type === 'loop_breaker_triggered'
? 'Stopped by the loop-breaker (no new progress).'
: 'Stopped: announced an action but never called the tool.');
note.appendChild(label);
_chatBox.appendChild(note);
try { note.scrollIntoView({ block: 'end', behavior: 'smooth' }); } catch (_) { uiModule.scrollHistory && uiModule.scrollHistory(); }
}
} else if (json.type === 'model_actual') {
if (!_isBg && holder) {
holder._requestedModel = json.requested_model || holder._requestedModel || modelName;
+350
View File
@@ -0,0 +1,350 @@
"""Regression: stream_agent_loop surfaces *why* a guard ended the turn.
Two internal guards used to stop the agent in ways that looked like a clean
completion or a vague blocked message:
* the loop-breaker stall detector -> now emits `loop_breaker_triggered`
* the intent-without-action nudge cap -> now emits `intent_nudge_exhausted`
These tests run the real loop body against a fake LLM stream (no model calls,
no sleeps) and assert the structured stop event is emitted.
"""
import asyncio
import json
import logging
import pytest
import src.agent_loop as al
def _collect(gen):
async def _run():
return [c async for c in gen]
return asyncio.run(_run())
def _types(chunks):
out = []
for c in chunks:
if c.startswith("data: ") and not c.startswith("data: [DONE]"):
try:
out.append(json.loads(c[6:]))
except Exception:
pass
return out
def _patch_common(monkeypatch):
monkeypatch.setattr(al, "get_setting", lambda key, default=None: default, raising=False)
monkeypatch.setattr(al, "get_mcp_manager", lambda: None, raising=False)
monkeypatch.setattr(al, "estimate_tokens", lambda *a, **k: 10, raising=False)
async def _fake_exec(block, *a, **k):
return ("bash", {"output": "ok", "exit_code": 0})
monkeypatch.setattr(al, "execute_tool_block", _fake_exec, raising=False)
def _run_loop(monkeypatch, round_text, max_rounds, relevant_tools={"bash"}):
async def _fake_stream(_candidates, messages, **kwargs):
yield f'data: {json.dumps({"delta": round_text})}\n\n'
yield "data: [DONE]\n\n"
monkeypatch.setattr(al, "stream_llm_with_fallback", _fake_stream, raising=False)
gen = al.stream_agent_loop(
"http://x/v1", "m",
[{"role": "user", "content": "do a long multi-step task"}],
max_rounds=max_rounds,
relevant_tools=relevant_tools,
)
return _types(_collect(gen))
def test_emits_loop_breaker_triggered_on_repeated_no_progress(monkeypatch):
_patch_common(monkeypatch)
# Same exact tool call every round, no answer text -> stuck-round streak
# trips the loop-breaker once the cap is reached.
events = _run_loop(monkeypatch, "```bash\necho hi\n```", max_rounds=8)
lb = [e for e in events if e.get("type") == "loop_breaker_triggered"]
assert lb, events
e = lb[0]
assert e["reason"]
assert e["max_stuck_rounds"] == 4
assert e["stuck_rounds"] >= 4
assert "message" in e
def test_no_loop_breaker_on_normal_finish(monkeypatch):
_patch_common(monkeypatch)
events = _run_loop(monkeypatch, "All done, here is your answer.", max_rounds=8)
assert not any(e.get("type") == "loop_breaker_triggered" for e in events), events
def test_emits_intent_nudge_exhausted_when_cap_reached(monkeypatch):
_patch_common(monkeypatch)
# The model keeps announcing an action with no tool call. After the nudge
# cap is spent, the turn ends with an explicit intent_nudge_exhausted event.
events = _run_loop(monkeypatch, "Let me check the logs now", max_rounds=5)
inx = [e for e in events if e.get("type") == "intent_nudge_exhausted"]
assert inx, events
e = inx[0]
assert e["max_nudges"] == 2
assert e["nudges"] >= 2
assert "message" in e
def test_no_intent_nudge_exhausted_on_normal_finish(monkeypatch):
_patch_common(monkeypatch)
events = _run_loop(monkeypatch, "Here is the complete answer to your question.", max_rounds=5)
assert not any(e.get("type") == "intent_nudge_exhausted" for e in events), events
def _assert_guard_log_safe(caplog, *, structural, secret="secret123"):
"""The guard's own structural log line fired, and that record carries no raw
secret. Scoped to the guard's records on purpose: an unrelated, pre-existing
round-summary log echoes raw model text and is out of scope for this PR."""
records = [r for r in caplog.records if structural in r.getMessage()]
assert records, caplog.text
for r in records:
assert secret not in r.getMessage(), r.getMessage()
def test_intent_nudge_logging_does_not_leak_secret(monkeypatch, caplog):
# The model announces an action (no tool call) with a secret in the text.
# The nudge logger must record only structural metadata, never the matched
# phrase — so the credential never lands in journalctl.
_patch_common(monkeypatch)
with caplog.at_level(logging.INFO, logger="src.agent_loop"):
events = _run_loop(monkeypatch, "Let me check api_key=secret123 now", max_rounds=5)
assert any(e.get("type") == "intent_nudge_exhausted" for e in events), events
_assert_guard_log_safe(caplog, structural="intent-without-action nudge")
def test_loop_breaker_logging_does_not_leak_secret(monkeypatch, caplog):
# A repeated tool command carrying a secret trips the loop-breaker. The
# structural log must not contain `_sig` / raw tool-call content.
_patch_common(monkeypatch)
with caplog.at_level(logging.INFO, logger="src.agent_loop"):
events = _run_loop(monkeypatch, "```bash\necho api_key=secret123\n```", max_rounds=8)
assert any(e.get("type") == "loop_breaker_triggered" for e in events), events
_assert_guard_log_safe(caplog, structural="loop-breaker tripped")
def test_redacts_sensitive_tool_output_before_surfacing():
text = al._redact_sensitive_text(
"password: private-value\n"
"api_key=private-key\n"
"Authorization: Bearer private-token\n"
"normal output"
)
assert "private-value" not in text
assert "private-key" not in text
assert "private-token" not in text
assert "password: [redacted]" in text
assert "api_key=[redacted]" in text
assert "Authorization: Bearer [redacted]" in text
assert "normal output" in text
_GCP_API_KEY_SAMPLE = "AI" + "za" + ("A" * 35)
# (input, secret substring that must be gone, expected substring that must remain)
_REDACTION_CASES = [
("Authorization: Bearer abc123tok", "abc123tok", "Authorization: Bearer [redacted]"),
("Authorization: Basic dXNlcjpwYXNz", "dXNlcjpwYXNz", "Authorization: Basic [redacted]"),
# Quoted Authorization value (spaces) must be redacted whole.
('Authorization: Bearer "two word secret"', "two word secret", "Authorization: Bearer [redacted]"),
# Escaped quote inside a quoted secret must not leak the tail.
(r'password="abc\"def secret"', "def secret", "password=[redacted]"),
# URL password containing a colon must still be redacted whole.
("postgres://user:pa:ss@host/db", "pa:ss", "postgres://[redacted]@host/db"),
# Provider-shaped bare tokens.
("token is hf_abcdefghij1234567890XYZ", "hf_abcdefghij1234567890XYZ", "[redacted]"),
("key " + _GCP_API_KEY_SAMPLE, _GCP_API_KEY_SAMPLE, "[redacted]"),
("Cookie: session=abc123secret", "abc123secret", "Cookie: [redacted]"),
("Set-Cookie: sid=xyz789; HttpOnly", "xyz789", "Set-Cookie: [redacted]"),
("postgres://user:pa55word@host/db", "pa55word", "postgres://[redacted]@host/db"),
("client_secret=supersecretvalue", "supersecretvalue", "client_secret=[redacted]"),
("OPENAI_API_KEY=abcd1234deadbeef", "abcd1234deadbeef", "OPENAI_API_KEY=[redacted]"),
# Quoted multi-word env value must be fully redacted, not clipped at the space.
('OPENAI_API_KEY="two word secret"', "two word secret", "OPENAI_API_KEY=[redacted]"),
('password: "my secret value"', "my secret value", "password: [redacted]"),
("here is sk-abcdefghij1234567890", "sk-abcdefghij1234567890", "[redacted]"),
(
"-----BEGIN PRIVATE KEY-----\nMIIfakeKEYbody\n-----END PRIVATE KEY-----",
"MIIfakeKEYbody",
"[redacted private key]",
),
]
@pytest.mark.parametrize("raw, secret, expected", _REDACTION_CASES)
def test_redaction_covers_requested_secret_shapes(raw, secret, expected):
out = al._redact_sensitive_text(raw)
assert secret not in out, out
assert expected in out, out
@pytest.mark.parametrize("raw", [
"the build completed in 3.2s with 0 errors",
"password reset email sent to the user",
"Listing 5 files: a.py b.py c.py d.py e.py",
"https://example.com/path?page=2",
# Benign uppercase names that merely end in KEY must not be redacted.
"MONKEY=banana",
"TURKEY=dinner",
])
def test_redaction_keeps_normal_output_readable(raw):
assert al._redact_sensitive_text(raw) == raw
def test_redacts_before_truncating():
# A secret near the start must be gone even if truncation would otherwise
# only clip the tail — redaction runs first.
raw = "api_key=topsecretvalue " + ("x" * 50_000)
out = al._truncate(al._redact_sensitive_text(raw))
assert "topsecretvalue" not in out
assert "api_key=[redacted]" in out
def _run_tool_result(monkeypatch, tool, exec_result, max_rounds=2):
"""Drive one tool round whose execution returns `exec_result`, and collect
the streamed events. Used to assert restored per-tool-result emissions."""
_patch_common(monkeypatch)
async def _fake_exec(block, *a, **k):
return (tool, exec_result)
monkeypatch.setattr(al, "execute_tool_block", _fake_exec, raising=False)
round_text = f"```{tool}\n{{}}\n```"
async def _fake_stream(_candidates, messages, **kwargs):
yield f'data: {json.dumps({"delta": round_text})}\n\n'
yield "data: [DONE]\n\n"
monkeypatch.setattr(al, "stream_llm_with_fallback", _fake_stream, raising=False)
gen = al.stream_agent_loop(
"http://x/v1", "m",
[{"role": "user", "content": "do something"}],
max_rounds=max_rounds,
relevant_tools={tool},
)
return _types(_collect(gen))
def test_restores_doc_suggestions_event(monkeypatch):
events = _run_tool_result(
monkeypatch, "suggest_document",
{"action": "suggest", "doc_id": "d1", "suggestions": [{"text": "x"}], "exit_code": 0},
)
assert any(e.get("type") == "doc_suggestions" for e in events), events
def test_restores_doc_update_event(monkeypatch):
events = _run_tool_result(
monkeypatch, "edit_document",
{"action": "edit", "doc_id": "d1", "content": "body", "version": 2,
"title": "T", "language": "md", "exit_code": 0},
)
# A native document block also emits doc_update AFTER tool_output, so a plain
# "any doc_update" check would pass even if the restored generic block were
# gone. Prove the restored block fires BEFORE the first tool_output.
types = [e.get("type") for e in events]
assert "doc_update" in types, events
assert "tool_output" in types, events
assert types.index("doc_update") < types.index("tool_output"), types
def test_restores_ui_control_event(monkeypatch):
events = _run_tool_result(
monkeypatch, "ui_control",
{"ui_event": "toggle", "toggle_name": "bash", "state": "off", "exit_code": 0},
)
assert any(e.get("type") == "ui_control" for e in events), events
def test_restores_plan_update_event(monkeypatch):
events = _run_tool_result(
monkeypatch, "update_plan",
{"plan_update": {"steps": [{"text": "step", "done": True}]}, "exit_code": 0},
)
assert any(e.get("type") == "plan_update" for e in events), events
def test_restores_ask_user_event_and_persists_question(monkeypatch):
events = _run_tool_result(
monkeypatch, "ask_user",
{"ask_user": {"question": "Which option?", "options": [{"label": "A"}, {"label": "B"}]},
"exit_code": 0},
)
# Exactly one ask_user event — not re-emitted on a follow-up round.
_ask_events = [e for e in events if e.get("type") == "ask_user"]
assert len(_ask_events) == 1, events
# The question is streamed as assistant text so it persists for replay.
# Upstream prepends "\n\n" when full_response already holds streamed text,
# so match on containment — and it must be streamed exactly once.
_q_deltas = [e for e in events if "Which option?" in (e.get("delta") or "")]
assert len(_q_deltas) == 1, events
# Setting `_awaiting_user` breaks the loop, so the turn does NOT advance into
# another agent round (which would emit an agent_step event) after the ask.
assert not any(e.get("type") == "agent_step" for e in events), events
def test_redacts_command_display_in_streamed_events(monkeypatch):
# A tool command line can carry a secret. The streamed command display
# (tool_start / tool_output) must be redacted, even though the real command
# passed to execution is left untouched.
_patch_common(monkeypatch)
round_text = "```bash\necho api_key=secret123\n```"
async def _fake_stream(_candidates, messages, **kwargs):
yield f'data: {json.dumps({"delta": round_text})}\n\n'
yield "data: [DONE]\n\n"
monkeypatch.setattr(al, "stream_llm_with_fallback", _fake_stream, raising=False)
gen = al.stream_agent_loop(
"http://x/v1", "m",
[{"role": "user", "content": "run it"}],
max_rounds=2,
relevant_tools={"bash"},
)
events = _types(_collect(gen))
cmds = [e for e in events if e.get("type") in ("tool_start", "tool_output")]
assert cmds, events
assert all("secret123" not in (e.get("command") or "") for e in cmds), cmds
assert any("api_key=[redacted]" in (e.get("command") or "") for e in cmds), cmds
def test_redacts_live_tool_progress_tail(monkeypatch):
# A secret in the live progress tail must be redacted before streaming —
# otherwise it flashes by before the (already redacted) final tool_output.
_patch_common(monkeypatch)
async def _fake_exec(block, *a, **k):
await k["progress_cb"]({"tail": "api_key=secret123", "elapsed_s": 1})
return ("bash", {"output": "done", "exit_code": 0})
monkeypatch.setattr(al, "execute_tool_block", _fake_exec, raising=False)
round_text = "```bash\necho hi\n```"
async def _fake_stream(_candidates, messages, **kwargs):
yield f'data: {json.dumps({"delta": round_text})}\n\n'
yield "data: [DONE]\n\n"
monkeypatch.setattr(al, "stream_llm_with_fallback", _fake_stream, raising=False)
gen = al.stream_agent_loop(
"http://x/v1", "m",
[{"role": "user", "content": "run it"}],
max_rounds=2,
relevant_tools={"bash"},
)
events = _types(_collect(gen))
prog = [e for e in events if e.get("type") == "tool_progress"]
assert prog, events
assert all("secret123" not in (e.get("tail") or "") for e in prog), prog
assert any("api_key=[redacted]" in (e.get("tail") or "") for e in prog), prog
# Other fields are preserved.
assert any(e.get("elapsed_s") == 1 for e in prog), prog