From c0983557788df9b71d8e6a0359c0bbb16b8555c4 Mon Sep 17 00:00:00 2001 From: nopoz Date: Sat, 27 Jun 2026 10:12:28 -0700 Subject: [PATCH] fix(security): prevent ReDoS in LLM-output tool/think parsers (#4704) * fix(security): prevent ReDoS in LLM-output tool/think parsers The regexes that parse untrusted model output in text_helpers.py and tool_parsing.py are delimiter-bounded with a lazy [\s\S]*? (or an ambiguous (\s+[^>]*)?). Applied with re.sub/re.finditer over a whole response, they degrade to O(n^2) when the closing delimiter is absent: the engine rescans to end-of-string from every opener. Model output is untrusted, so a prompt-injected or malicious model can stall the agent loop with many unclosed openers (measured ~25s on a 60KB ]*)?> with ]*)> (identical capture, no \s+/[^>]* overlap); skip the Gemma <|channel>... subs when no closer is present. - tool_parsing.py: gate _TOOL_CALL_RE, _XML_TOOL_CALL_RE and _TOOL_CODE_RE (in parse_tool_blocks and strip_tool_blocks) on a cheap presence check for their closing delimiter. With no closer the regex cannot match, so skipping is equivalent; only the wasted O(n^2) rescan is removed. Resolves CodeQL py/polynomial-redos #230, #231, #232, #233, #235, #236, #524. The _XML_OPEN_TOOL_CALL_RE alerts (#234, #477) are false positives (its greedy [\s\S]*\Z is linear) and left untouched. * fix(security): close ReDoS gaps in tool/think parsers from review Addresses two review findings on the closer-guard approach: - Whole-string "closer exists?" checks were bypassable: a stale closer before an opener flood, or a closer with no reachable inner `}`, kept the guard true while every opener still rescanned to end-of-string (O(n^2)). Replace the substring guards with `_iter_delimited`, a forward-only scan that pairs each opener with a *later* closer and stops once none is reachable (O(n)). `parse_tool_blocks` and `strip_tool_blocks` (via `_strip_delimited`) both use it for the [TOOL_CALL], /, and formats. Verified equivalent to the original regexes on well-formed inputs. - `]*)>` dropped the tag-name boundary and corrupted unrelated tags (`` -> ``). Use `]*)?>`: the single fixed `\s` keeps the pattern linear (no `\s+`/`[^>]*` overlap) while restoring the boundary; capture is byte-for-byte identical for real `` openers. Adds regressions for stale-closer-before-opener, closer-present-without- inner-brace, and the / passthrough. * fix(security): close Gemma channel ReDoS guard flagged in review vdmkenny noted the same bypassable whole-string guard remained in text_helpers.py: `if "" in out.lower()` gating the Gemma thought/response channel subs. A stale `` before a `<|channel>thought` opener flood keeps the guard true while every opener still rescans to end-of-string (measured ~7.3s at 4k openers). Replace it with `_sub_delimited`, the same forward-only scan used for the tool-call parsers: pair each opener with a later closer, stop when none is reachable (O(n)). Verified output-equivalent to the original capture regexes on well-formed multi-channel inputs; the stale-closer case now runs in <2ms. Adds a regression for stale-closer-before-opener on the Gemma path. * fix(security): harden strip_think() think-tag ReDoS flagged in review The earlier fixes hardened normalize_thinking_markup and the delimiter scanners, but the production entrypoint strip_think() still ran _THINK_CLOSED_RE / _THINK_ATTR_RE / _THINK_OPEN_RE (and the stray-tag _THINK_TAG_RE) over untrusted model output. Those kept the same ReDoS shapes: the lazy `[\s\S]*?` rescanned to end-of-string from every opener, and `(?:\s+[^>]*)?` / `[^>]*` attribute scans ran to end-of-string from every opener on a "many openers, no closer" flood. On the prior head, malformed `` normalization had the same residual: the single-opener case was linear but an opener flood was still O(n^2) (~4.4s). - Replace the lazy multi-pass _THINK_CLOSED_RE loop with the existing forward-only _sub_delimited scan (pair each opener with the first reachable closer, stop when none is reachable). One pass collapses sequential and nested blocks as before. - Bound every opener/stray-tag attribute scan at `<` (`[^<>]` not `[^>]`) so a no-`>` opener flood can't drive a single match attempt to end-of-string. Identical capture for well-formed think/thought tags. - email_helpers._strip_think: compute had_think from the single linear _THINK_TAG_RE instead of the lazy closed/open `.search()` calls, which had the same O(n^2) on the email reply/summary/extraction paths. All flood variants now finish in <10ms (were 6-14s). Output verified byte-for-byte identical to the prior implementation over a 34-case corpus (nested, mismatched, attr, uppercase, Gemma, prose, prompt-echo). Adds strip_think() timing regressions for malformed openers, opener floods (all three tag names), the closed-opener flood, and the malformed-closer flood. * docs: trim verbose comments in think-tag ReDoS fix --- routes/email_helpers.py | 5 +- src/text_helpers.py | 85 +++++++++----- src/tool_parsing.py | 97 +++++++++++++-- tests/test_redos_llm_parsers.py | 201 ++++++++++++++++++++++++++++++++ 4 files changed, 345 insertions(+), 43 deletions(-) create mode 100644 tests/test_redos_llm_parsers.py diff --git a/routes/email_helpers.py b/routes/email_helpers.py index 513ec1f0a..91baab767 100644 --- a/routes/email_helpers.py +++ b/routes/email_helpers.py @@ -225,8 +225,9 @@ def _strip_think(text: str) -> str: """ if not text: return "" - from src.text_helpers import strip_think as _central, _THINK_CLOSED_RE, _THINK_OPEN_RE, _THINK_TAG_RE - had_think = bool(_THINK_CLOSED_RE.search(text) or _THINK_OPEN_RE.search(text) or _THINK_TAG_RE.search(text)) + from src.text_helpers import strip_think as _central, _THINK_TAG_RE + # Single linear tag check; the old closed/open `.search()` calls could ReDoS. + had_think = bool(_THINK_TAG_RE.search(text)) return _central(text, prose=had_think, prompt_echo=True) diff --git a/src/text_helpers.py b/src/text_helpers.py index 733ced05d..80207ee89 100644 --- a/src/text_helpers.py +++ b/src/text_helpers.py @@ -17,31 +17,27 @@ import re _THINK_TAG_NAME = r"(?:think(?:ing)?|thought)" -# Closed reasoning blocks. Multi-pass loop in `strip_think` handles nested -# `...` patterns some models emit. -_THINK_CLOSED_RE = re.compile(rf"<{_THINK_TAG_NAME}(?:\s+[^>]*)?>[\s\S]*?\s*", re.IGNORECASE) -# Orphan opening or closing tags that survive after the closed-pass. -_THINK_TAG_RE = re.compile(rf"]*>\s*", re.IGNORECASE) -# Dangling opener anywhere in the response with no closer — strip everything -# from `` to the end of string. -_THINK_OPEN_RE = re.compile(rf"<{_THINK_TAG_NAME}(?:\s+[^>]*)?>[\s\S]*$", re.IGNORECASE) -# Streaming models occasionally emit ``-style attributes. -# Normalize to a plain `` so the regexes above catch them. -_THINK_ATTR_RE = re.compile(rf"<{_THINK_TAG_NAME}\s+[^>]*>", re.IGNORECASE) -_THINK_ATTR_CLOSE_RE = re.compile(rf"]*>", re.IGNORECASE) +# Think-tag matchers. `[^<>]` (not `[^>]`) bounds attribute scans at the next +# `<` so an opener flood with no closing `>` can't backtrack to end-of-string +# (ReDoS, CodeQL py/polynomial-redos); capture is identical for well-formed tags. +# Opener/closer are split for the forward-only block strip (_sub_delimited). +_THINK_OPEN_TAG_RE = re.compile(rf"<{_THINK_TAG_NAME}(?:\s[^<>]*)?>", re.IGNORECASE) +_THINK_CLOSE_TAG_RE = re.compile(rf"\s*", re.IGNORECASE) +# Orphan opening/closing tags left after the block strip. +_THINK_TAG_RE = re.compile(rf"]*>\s*", re.IGNORECASE) +# Dangling opener with no closer: strip from `` to end of string. +_THINK_OPEN_RE = re.compile(rf"<{_THINK_TAG_NAME}(?:\s[^<>]*)?>[\s\S]*$", re.IGNORECASE) +# Normalize ``-style attributes to a plain ``. +_THINK_ATTR_RE = re.compile(rf"<{_THINK_TAG_NAME}\s[^<>]*>", re.IGNORECASE) +_THINK_ATTR_CLOSE_RE = re.compile(rf"]*>", re.IGNORECASE) _GEMMA_THOUGHT_OPEN_RE = re.compile(r"<\|channel>thought\s*\n?[\s\S]*$", re.IGNORECASE) -_GEMMA_RESPONSE_CHANNEL_RE = re.compile( - r"<\|channel>response\s*\n?([\s\S]*?)", - re.IGNORECASE, -) _GEMMA_RESPONSE_OPEN_RE = re.compile(r"<\|channel>response\s*\n?", re.IGNORECASE) _GEMMA_CHANNEL_CLOSE_RE = re.compile(r"", re.IGNORECASE) -_THOUGHT_TAG_OPEN_RE = re.compile(r"]*)?>", re.IGNORECASE) +_THOUGHT_TAG_OPEN_RE = re.compile(r"]*)?>", re.IGNORECASE) _THOUGHT_TAG_CLOSE_RE = re.compile(r"", re.IGNORECASE) -_GEMMA_THOUGHT_CHANNEL_CAPTURE_RE = re.compile( - r"<\|channel>thought\s*\n?([\s\S]*?)\s*", - re.IGNORECASE, -) +# Gemma thought-channel delimiters, split for the forward-only sub (_sub_delimited). +_GEMMA_THOUGHT_CHANNEL_OPEN_RE = re.compile(r"<\|channel>thought\s*\n?", re.IGNORECASE) +_GEMMA_CHANNEL_CLOSE_TRIM_RE = re.compile(r"\s*", re.IGNORECASE) # Qwen and a few other models prefix the response with a "Thinking Process:" # block before the real answer. _QWEN_THINKING_RE = re.compile( @@ -93,6 +89,31 @@ def _strip_reasoning_prose(text: str) -> str: return "\n\n".join(keep).strip() if keep else text +def _sub_delimited(text, open_re, close_re, repl): + """Forward-only ``re.sub`` of ``open_re...close_re`` that can't ReDoS. + + Pairs each opener with the first closer after it and stops once no closer is + reachable, so it stays O(n) instead of re.sub's rescan-to-end from every + opener (O(n^2) on "many openers, no closer" input). ``repl`` gets the inner + text. A whole-string "closer present?" guard is not enough: a stale closer + before an opener flood keeps it true while every opener still rescans. + """ + out = [] + pos = 0 + while True: + om = open_re.search(text, pos) + if om is None: + break + cm = close_re.search(text, om.end()) + if cm is None: + break + out.append(text[pos:om.start()]) + out.append(repl(text[om.end():cm.start()])) + pos = cm.end() + out.append(text[pos:]) + return "".join(out) + + def normalize_thinking_markup(text: str) -> str: """Canonicalize supported thinking wrappers to `` markup. @@ -106,12 +127,17 @@ def normalize_thinking_markup(text: str) -> str: out = _THOUGHT_TAG_OPEN_RE.sub(lambda m: "", text) out = _THOUGHT_TAG_CLOSE_RE.sub("", out) - def _replace_gemma_thought(match: re.Match) -> str: - thought = match.group(1).strip() + def _replace_gemma_thought(inner: str) -> str: + thought = inner.strip() return f"{thought}\n" if thought else "" - out = _GEMMA_THOUGHT_CHANNEL_CAPTURE_RE.sub(_replace_gemma_thought, out) - out = _GEMMA_RESPONSE_CHANNEL_RE.sub(lambda m: m.group(1), out) + # Forward-only so a stale/unreachable `` can't drive a ReDoS rescan. + out = _sub_delimited( + out, _GEMMA_THOUGHT_CHANNEL_OPEN_RE, _GEMMA_CHANNEL_CLOSE_TRIM_RE, _replace_gemma_thought + ) + out = _sub_delimited( + out, _GEMMA_RESPONSE_OPEN_RE, _GEMMA_CHANNEL_CLOSE_RE, lambda inner: inner + ) out = _GEMMA_RESPONSE_OPEN_RE.sub("", out) out = _GEMMA_CHANNEL_CLOSE_RE.sub("", out) return out @@ -149,12 +175,9 @@ def strip_think(text: str, *, prose: bool = False, prompt_echo: bool = True) -> # Normalize attributes so the closed/open regexes can catch them. text = _THINK_ATTR_RE.sub("", text) text = _THINK_ATTR_CLOSE_RE.sub("", text) - # Multi-pass for nested blocks. - prev = None - out = text - while prev != out: - prev = out - out = _THINK_CLOSED_RE.sub("", out) + # Forward-only block strip (see _sub_delimited): one pass collapses nested + # and sequential blocks without the old lazy re.sub loop's ReDoS rescan. + out = _sub_delimited(text, _THINK_OPEN_TAG_RE, _THINK_CLOSE_TAG_RE, lambda _inner: "") out = _THINK_OPEN_RE.sub("", out) out = _THINK_TAG_RE.sub("", out) if prompt_echo: diff --git a/src/tool_parsing.py b/src/tool_parsing.py index a68a5a6b6..fefd4efea 100644 --- a/src/tool_parsing.py +++ b/src/tool_parsing.py @@ -31,6 +31,12 @@ _TOOL_CALL_RE = re.compile( r"\[TOOL_CALL\]\s*\{([\s\S]*?)\}\s*\[/TOOL_CALL\]", re.IGNORECASE, ) +# Same delimiters as _TOOL_CALL_RE, split so they can be driven by +# _iter_delimited (a forward-only scan). The closer is `}\s*[/TOOL_CALL]`, so a +# present-but-unmatched `[/TOOL_CALL]` with no inner `}` ahead simply ends the +# scan instead of triggering re.finditer's O(n^2) rescan. See _iter_delimited. +_TOOL_CALL_OPEN_RE = re.compile(r"\[TOOL_CALL\]\s*\{", re.IGNORECASE) +_TOOL_CALL_CLOSE_RE = re.compile(r"\}\s*\[/TOOL_CALL\]", re.IGNORECASE) # Pattern 3: XML-style tool calls (minimax, some other models) # ... @@ -43,6 +49,15 @@ _XML_OPEN_TOOL_CALL_RE = re.compile( r"<(?:[\w]+:)?(?:tool_call|function_call)>\s*([\s\S]*)\Z", re.IGNORECASE, ) +# _XML_TOOL_CALL_RE's delimiters, split for _iter_delimited's forward-only scan. +_XML_TOOL_CALL_OPEN_RE = re.compile( + r"<(?:[\w]+:)?(?:tool_call|function_call)>\s*", + re.IGNORECASE, +) +_XML_TOOL_CALL_CLOSE_RE = re.compile( + r"", + re.IGNORECASE, +) _XML_INVOKE_RE = re.compile( r'\s*([\s\S]*?)', re.IGNORECASE, @@ -73,6 +88,9 @@ _TOOL_CODE_RE = re.compile( r"\s*\{([\s\S]*?)\}\s*", re.IGNORECASE, ) +# _TOOL_CODE_RE's delimiters, split for _iter_delimited's forward-only scan. +_TOOL_CODE_OPEN_RE = re.compile(r"\s*\{", re.IGNORECASE) +_TOOL_CODE_CLOSE_RE = re.compile(r"\}\s*", re.IGNORECASE) # Pattern 5: DeepSeek DSML markup leaking into content. When deepseek # models can't emit structured tool_calls (e.g. we sent no tool schemas @@ -736,6 +754,52 @@ def _parse_tool_code_block(raw: str) -> Optional[ToolBlock]: return None +def _iter_delimited(text, open_re, close_re): + """Yield ``(match_start, inner_start, inner_end, match_end)`` for each + non-overlapping ``open_re ... close_re`` pair, scanning strictly forward. + + For the lazy, non-nesting delimiters here this is equivalent to + ``re.finditer`` of ``open_re([\\s\\S]*?)close_re`` (each opener pairs with + the first closer after it; the next scan resumes past that closer), but it + runs in O(n): the moment an opener has no reachable closer, no later opener + can have one either, so we stop. ``re.finditer`` instead retries from every + opener and rescans to end-of-string each time -> O(n^2) on attacker- + controlled "many openers, no closer" model output (CodeQL py/polynomial-redos). + + A whole-string "is the closer present?" guard is not enough: a stale closer + placed before an opener flood, or a closer with no matching inner delimiter + (e.g. `[/TOOL_CALL]` but no `}`), keeps the guard true while every opener + still rescans. Pairing each opener only with a closer *after* it closes both + holes. + """ + pos = 0 + while True: + om = open_re.search(text, pos) + if om is None: + return + cm = close_re.search(text, om.end()) + if cm is None: + return + yield om.start(), om.end(), cm.start(), cm.end() + pos = cm.end() + + +def _strip_delimited(text: str, open_re, close_re) -> str: + """Remove every ``open_re ... close_re`` span (forward-only; see + _iter_delimited). Equivalent to ``open_re([\\s\\S]*?)close_re`` ``re.sub('')`` + for these delimiters, without the O(n^2) rescan on unclosed openers.""" + spans = list(_iter_delimited(text, open_re, close_re)) + if not spans: + return text + out = [] + last = 0 + for match_start, _inner_start, _inner_end, match_end in spans: + out.append(text[last:match_start]) + last = match_end + out.append(text[last:]) + return "".join(out) + + def parse_tool_blocks(text: str, skip_fenced: bool = False) -> List[ToolBlock]: """Extract executable tool blocks from LLM response text. @@ -794,9 +858,14 @@ def parse_tool_blocks(text: str, skip_fenced: bool = False) -> List[ToolBlock]: blocks.append(ToolBlock(tag, content)) # Pattern 2: [TOOL_CALL] blocks (only if no fenced blocks found) + # _iter_delimited scans the delimiter-bounded formats forward-only so + # untrusted "many openers, no closer" output can't drive the O(n^2) + # finditer rescan (ReDoS); see its docstring. if not blocks: - for m in _TOOL_CALL_RE.finditer(text): - block = _parse_tool_call_block(m.group(1)) + for _ms, inner_start, inner_end, _me in _iter_delimited( + text, _TOOL_CALL_OPEN_RE, _TOOL_CALL_CLOSE_RE + ): + block = _parse_tool_call_block(text[inner_start:inner_end]) if block: blocks.append(block) @@ -809,13 +878,16 @@ def parse_tool_blocks(text: str, skip_fenced: bool = False) -> List[ToolBlock]: if blocks: return blocks # Try wrapped: ... - for m in _XML_TOOL_CALL_RE.finditer(text): - for inv in _XML_INVOKE_RE.finditer(m.group(1)): + for _ms, inner_start, inner_end, _me in _iter_delimited( + text, _XML_TOOL_CALL_OPEN_RE, _XML_TOOL_CALL_CLOSE_RE + ): + body = text[inner_start:inner_end] + for inv in _XML_INVOKE_RE.finditer(body): block = _parse_xml_invoke(inv) if block: blocks.append(block) if not blocks: - for direct in _XML_DIRECT_TOOL_RE.finditer(m.group(1)): + for direct in _XML_DIRECT_TOOL_RE.finditer(body): block = _parse_xml_direct_tool(direct) if block: blocks.append(block) @@ -843,8 +915,10 @@ def parse_tool_blocks(text: str, skip_fenced: bool = False) -> List[ToolBlock]: # Pattern 4: blocks (MiniMax-M2.5 style) if not blocks: - for m in _TOOL_CODE_RE.finditer(text): - block = _parse_tool_code_block(m.group(1)) + for _ms, inner_start, inner_end, _me in _iter_delimited( + text, _TOOL_CODE_OPEN_RE, _TOOL_CODE_CLOSE_RE + ): + block = _parse_tool_code_block(text[inner_start:inner_end]) if block: blocks.append(block) @@ -874,11 +948,14 @@ def strip_tool_blocks(text: str, skip_fenced: bool = False) -> str: # / removers below instead of leaking to the user. text = _normalize_dsml(text) cleaned = text if skip_fenced else _TOOL_BLOCK_RE.sub('', text) - cleaned = _TOOL_CALL_RE.sub('', cleaned) + # Forward-only removal mirrors parse_tool_blocks: _strip_delimited pairs each + # opener with a later closer and stops when none is reachable, so untrusted + # output can't drive the O(n^2) lazy-rescan (ReDoS); see _iter_delimited. + cleaned = _strip_delimited(cleaned, _TOOL_CALL_OPEN_RE, _TOOL_CALL_CLOSE_RE) cleaned = _strip_stepfun_tool_markup(cleaned) - cleaned = _XML_TOOL_CALL_RE.sub('', cleaned) + cleaned = _strip_delimited(cleaned, _XML_TOOL_CALL_OPEN_RE, _XML_TOOL_CALL_CLOSE_RE) cleaned = _XML_OPEN_TOOL_CALL_RE.sub('', cleaned) - cleaned = _TOOL_CODE_RE.sub('', cleaned) + cleaned = _strip_delimited(cleaned, _TOOL_CODE_OPEN_RE, _TOOL_CODE_CLOSE_RE) if not skip_fenced: raw_web_json = _parse_raw_web_json_lookup(cleaned) if raw_web_json: diff --git a/tests/test_redos_llm_parsers.py b/tests/test_redos_llm_parsers.py new file mode 100644 index 000000000..be3417f47 --- /dev/null +++ b/tests/test_redos_llm_parsers.py @@ -0,0 +1,201 @@ +"""Regression tests for ReDoS in the regexes that parse untrusted LLM output. + +CodeQL flagged several `py/polynomial-redos` sinks in `text_helpers.py` and +`tool_parsing.py`. Each is a delimiter-bounded pattern (`...`) +applied with `re.sub`/`re.finditer` over a whole model response. When the +closing delimiter is missing, the engine rescans to end-of-string from every +opening occurrence -> O(n^2) on attacker-influenced input (prompt injection +via tool output / retrieved content). + +These tests pin BOTH halves of the fix: + * correctness is unchanged for legitimate inputs, and + * pathological "many openers, no closer" inputs complete promptly. + +The timing bound is deliberately loose (seconds, not ms) so it never flakes on +a slow CI box; the unguarded code took tens of seconds on the same inputs, so +the margin is ~100x. +""" + +import time + +import pytest + +import src.agent_tools # noqa: F401 (break agent_tools<->tool_parsing import cycle) +from src.text_helpers import normalize_thinking_markup, strip_think +from src.tool_parsing import parse_tool_blocks, strip_tool_blocks + +# Loose ceiling: guarded paths finish in well under 100ms; the vulnerable +# versions took 8-30s on these same inputs. +_BUDGET_S = 4.0 + + +def _timed(fn, *args): + start = time.perf_counter() + result = fn(*args) + return result, time.perf_counter() - start + + +# ── correctness is preserved ──────────────────────────────────────────────── + +def test_thought_attr_normalization_unchanged(): + # `` -> `` then stripped. + assert strip_think('reasoningAnswer.') == "Answer." + assert normalize_thinking_markup("x") == "x" + + +def test_gemma_channel_unwrap_unchanged(): + text = "<|channel>thought\ninternal<|channel>response\nFinal." + assert strip_think(text) == "Final." + + +def test_thought_prefix_tags_not_overmatched(): + # The `` opener must keep a tag-name boundary: tags whose names + # merely start with "thought" are unrelated markup and must pass through + # untouched (no ``/`` corruption). + for text in ("keep", "keep"): + assert normalize_thinking_markup(text) == text + + +def test_tool_call_blocks_still_parsed(): + blocks = parse_tool_blocks('[TOOL_CALL]{tool: "shell", command: "ls"}[/TOOL_CALL]') + assert blocks, "well-formed [TOOL_CALL] block should still parse" + assert "[TOOL_CALL]" not in strip_tool_blocks('before [TOOL_CALL]{tool: "shell", command: "ls"}[/TOOL_CALL] after') + + +def test_xml_tool_call_blocks_still_parsed(): + xml = 'ls' + blocks = parse_tool_blocks(xml) + assert blocks, "well-formed block should still parse" + assert "tool_call" not in strip_tool_blocks(xml) + + +def test_tool_code_blocks_still_parsed(): + assert "" not in strip_tool_blocks('{"tool": "shell"}') + + +# ── pathological inputs no longer blow up ─────────────────────────────────── + +def test_thought_open_no_close_is_fast(): + evil = "', ambiguous (\s+[^>]*)? loops + out, dt = _timed(normalize_thinking_markup, evil) + assert dt < _BUDGET_S, f"normalize_thinking_markup took {dt:.2f}s" + assert out == evil # nothing to normalize, returned unchanged + + +def test_gemma_channel_opener_flood_is_fast(): + evil = "<|channel>thought\n" * 4000 # no closer + _, dt = _timed(normalize_thinking_markup, evil) + assert dt < _BUDGET_S, f"normalize_thinking_markup took {dt:.2f}s" + + +def test_gemma_stale_closer_before_opener_flood_is_fast(): + # A lone leading makes a whole-string "closer present?" check + # true, but no <|channel>thought opener after it has a reachable closer. + evil = "" + "<|channel>thought\n" * 4000 + _, dt = _timed(normalize_thinking_markup, evil) + assert dt < _BUDGET_S, f"normalize_thinking_markup took {dt:.2f}s" + + +def test_tool_call_opener_flood_is_fast(): + evil = "[TOOL_CALL]{tool: x}" * 6000 # '}' present but no [/TOOL_CALL] closer + blocks, dt = _timed(parse_tool_blocks, evil) + assert dt < _BUDGET_S, f"parse_tool_blocks took {dt:.2f}s" + assert blocks == [] + _, dt2 = _timed(strip_tool_blocks, evil) + assert dt2 < _BUDGET_S, f"strip_tool_blocks took {dt2:.2f}s" + + +def test_xml_tool_call_opener_flood_is_fast(): + # strip_tool_blocks exercises the CodeQL-flagged _XML_TOOL_CALL_RE in + # isolation (the parse path also reaches _XML_DIRECT_TOOL_RE, a separate + # unflagged backreference pattern tracked as a follow-up). + evil = ("" + "a" * 20) * 4000 # no closer + _, dt = _timed(strip_tool_blocks, evil) + assert dt < _BUDGET_S, f"strip_tool_blocks took {dt:.2f}s" + + +def test_tool_code_opener_flood_is_fast(): + evil = "{tool: x}" * 6000 # '}' present but no closer + _, dt = _timed(parse_tool_blocks, evil) + assert dt < _BUDGET_S, f"parse_tool_blocks took {dt:.2f}s" + _, dt2 = _timed(strip_tool_blocks, evil) + assert dt2 < _BUDGET_S, f"strip_tool_blocks took {dt2:.2f}s" + + +# ── a present closer must not re-enable the O(n^2) rescan ──────────────────── +# A whole-string "closer exists?" guard is defeated by a stale closer placed +# before an opener flood, or by a closer whose required inner delimiter is +# missing. The parser must pair each opener only with a *later* closer. + +def test_xml_stale_closer_before_opener_flood_is_fast(): + # A lone leading makes a whole-string closer check true, but no + # opener after it has a reachable closer. (strip exercises the CodeQL-flagged + # _XML_TOOL_CALL_RE path; parse additionally reaches _XML_DIRECT_TOOL_RE, the + # separate backreference pattern tracked as a follow-up — see + # test_xml_tool_call_opener_flood_is_fast.) + evil = "" + ("" + "a" * 10) * 6000 + _, dt = _timed(strip_tool_blocks, evil) + assert dt < _BUDGET_S, f"strip_tool_blocks took {dt:.2f}s" + + +def test_tool_call_closer_present_without_inner_brace_is_fast(): + # Leading [/TOOL_CALL] satisfies a substring guard, but the openers carry no + # inner '}', so '}\\s*[/TOOL_CALL]' is never reachable from any opener. + evil = "[/TOOL_CALL]" + "[TOOL_CALL]{tool: x" * 6000 + blocks, dt = _timed(parse_tool_blocks, evil) + assert dt < _BUDGET_S, f"parse_tool_blocks took {dt:.2f}s" + assert blocks == [] + _, dt2 = _timed(strip_tool_blocks, evil) + assert dt2 < _BUDGET_S, f"strip_tool_blocks took {dt2:.2f}s" + + +def test_tool_code_closer_present_without_inner_brace_is_fast(): + evil = "" + "{tool: x" * 6000 + blocks, dt = _timed(parse_tool_blocks, evil) + assert dt < _BUDGET_S, f"parse_tool_blocks took {dt:.2f}s" + assert blocks == [] + _, dt2 = _timed(strip_tool_blocks, evil) + assert dt2 < _BUDGET_S, f"strip_tool_blocks took {dt2:.2f}s" + + +# ── strip_think() is the production entrypoint that callers actually run ───── +# The timing tests above cover normalize_thinking_markup and the scanners; +# these cover strip_think() itself, which applies the think-tag regexes too. + +def test_strip_think_nested_and_attr_blocks_unchanged(): + # Values pin pre-existing behavior (incl. the nested-block quirk that leaves + # the inter-tag `c`) so the forward-only rewrite stays byte-equal. + assert strip_think("abcAnswer.") == "cAnswer." + assert strip_think('reasoningAnswer.') == "Answer." + assert strip_think("xAnswer.") == "Answer." + assert strip_think("rAnswer.") == "Answer." + assert strip_think("Answer.") == "Answer." + + +def test_strip_think_malformed_open_no_gt_is_fast(): + for opener in ("' + out, dt = _timed(strip_think, evil) + assert dt < _BUDGET_S, f"strip_think({opener!r}) took {dt:.2f}s" + assert out == evil.strip() # nothing is a real tag + + +def test_strip_think_attr_opener_flood_is_fast(): + for opener in ("`, no closer + evil = opener * 8000 + _, dt = _timed(strip_think, evil) + assert dt < _BUDGET_S, f"strip_think({opener!r}) took {dt:.2f}s" + + +def test_strip_think_closed_opener_flood_is_fast(): + evil = "" * 16000 # well-formed openers, no closer + out, dt = _timed(strip_think, evil) + assert dt < _BUDGET_S, f"strip_think took {dt:.2f}s" + assert out == "" + + +def test_strip_think_malformed_closer_flood_is_fast(): + evil = "` + out, dt = _timed(strip_think, evil) + assert dt < _BUDGET_S, f"strip_think took {dt:.2f}s" + assert out == evil.strip()