mirror of
https://github.com/pewdiepie-archdaemon/odysseus.git
synced 2026-06-28 23:52:09 -04:00
fix(security): prevent ReDoS in LLM-output tool/think parsers (#4704)
* fix(security): prevent ReDoS in LLM-output tool/think parsers The regexes that parse untrusted model output in text_helpers.py and tool_parsing.py are delimiter-bounded with a lazy [\s\S]*? (or an ambiguous (\s+[^>]*)?). Applied with re.sub/re.finditer over a whole response, they degrade to O(n^2) when the closing delimiter is absent: the engine rescans to end-of-string from every opener. Model output is untrusted, so a prompt-injected or malicious model can stall the agent loop with many unclosed openers (measured ~25s on a 60KB <thought flood). - text_helpers.py: replace ambiguous <thought(\s+[^>]*)?> with <thought([^>]*)> (identical capture, no \s+/[^>]* overlap); skip the Gemma <|channel>...<channel|> subs when no <channel|> closer is present. - tool_parsing.py: gate _TOOL_CALL_RE, _XML_TOOL_CALL_RE and _TOOL_CODE_RE (in parse_tool_blocks and strip_tool_blocks) on a cheap presence check for their closing delimiter. With no closer the regex cannot match, so skipping is equivalent; only the wasted O(n^2) rescan is removed. Resolves CodeQL py/polynomial-redos #230, #231, #232, #233, #235, #236, #524. The _XML_OPEN_TOOL_CALL_RE alerts (#234, #477) are false positives (its greedy [\s\S]*\Z is linear) and left untouched. * fix(security): close ReDoS gaps in tool/think parsers from review Addresses two review findings on the closer-guard approach: - Whole-string "closer exists?" checks were bypassable: a stale closer before an opener flood, or a closer with no reachable inner `}`, kept the guard true while every opener still rescanned to end-of-string (O(n^2)). Replace the substring guards with `_iter_delimited`, a forward-only scan that pairs each opener with a *later* closer and stops once none is reachable (O(n)). `parse_tool_blocks` and `strip_tool_blocks` (via `_strip_delimited`) both use it for the [TOOL_CALL], <tool_call>/<function_call>, and <tool_code> formats. Verified equivalent to the original regexes on well-formed inputs. - `<thought([^>]*)>` dropped the tag-name boundary and corrupted unrelated tags (`<thoughtful>` -> `<thinkful>`). Use `<thought(\s[^>]*)?>`: the single fixed `\s` keeps the pattern linear (no `\s+`/`[^>]*` overlap) while restoring the boundary; capture is byte-for-byte identical for real `<thought ...>` openers. Adds regressions for stale-closer-before-opener, closer-present-without- inner-brace, and the <thoughtful>/<thoughts> passthrough. * fix(security): close Gemma channel ReDoS guard flagged in review vdmkenny noted the same bypassable whole-string guard remained in text_helpers.py: `if "<channel|>" in out.lower()` gating the Gemma thought/response channel subs. A stale `<channel|>` before a `<|channel>thought` opener flood keeps the guard true while every opener still rescans to end-of-string (measured ~7.3s at 4k openers). Replace it with `_sub_delimited`, the same forward-only scan used for the tool-call parsers: pair each opener with a later closer, stop when none is reachable (O(n)). Verified output-equivalent to the original capture regexes on well-formed multi-channel inputs; the stale-closer case now runs in <2ms. Adds a regression for stale-closer-before-opener on the Gemma path. * fix(security): harden strip_think() think-tag ReDoS flagged in review The earlier fixes hardened normalize_thinking_markup and the delimiter scanners, but the production entrypoint strip_think() still ran _THINK_CLOSED_RE / _THINK_ATTR_RE / _THINK_OPEN_RE (and the stray-tag _THINK_TAG_RE) over untrusted model output. Those kept the same ReDoS shapes: the lazy `<open>[\s\S]*?</close>` rescanned to end-of-string from every opener, and `(?:\s+[^>]*)?` / `[^>]*` attribute scans ran to end-of-string from every opener on a "many openers, no closer" flood. On the prior head, malformed `<think` / `<thinking` / `<thought` floods took 6-14s through strip_think(). The shipped `<thought>` normalization had the same residual: the single-opener case was linear but an opener flood was still O(n^2) (~4.4s). - Replace the lazy multi-pass _THINK_CLOSED_RE loop with the existing forward-only _sub_delimited scan (pair each opener with the first reachable closer, stop when none is reachable). One pass collapses sequential and nested blocks as before. - Bound every opener/stray-tag attribute scan at `<` (`[^<>]` not `[^>]`) so a no-`>` opener flood can't drive a single match attempt to end-of-string. Identical capture for well-formed think/thought tags. - email_helpers._strip_think: compute had_think from the single linear _THINK_TAG_RE instead of the lazy closed/open `.search()` calls, which had the same O(n^2) on the email reply/summary/extraction paths. All flood variants now finish in <10ms (were 6-14s). Output verified byte-for-byte identical to the prior implementation over a 34-case corpus (nested, mismatched, attr, uppercase, Gemma, prose, prompt-echo). Adds strip_think() timing regressions for malformed openers, opener floods (all three tag names), the closed-opener flood, and the malformed-closer flood. * docs: trim verbose comments in think-tag ReDoS fix
This commit is contained in:
@@ -225,8 +225,9 @@ def _strip_think(text: str) -> str:
|
||||
"""
|
||||
if not text:
|
||||
return ""
|
||||
from src.text_helpers import strip_think as _central, _THINK_CLOSED_RE, _THINK_OPEN_RE, _THINK_TAG_RE
|
||||
had_think = bool(_THINK_CLOSED_RE.search(text) or _THINK_OPEN_RE.search(text) or _THINK_TAG_RE.search(text))
|
||||
from src.text_helpers import strip_think as _central, _THINK_TAG_RE
|
||||
# Single linear tag check; the old closed/open `.search()` calls could ReDoS.
|
||||
had_think = bool(_THINK_TAG_RE.search(text))
|
||||
return _central(text, prose=had_think, prompt_echo=True)
|
||||
|
||||
|
||||
|
||||
+54
-31
@@ -17,31 +17,27 @@ import re
|
||||
|
||||
_THINK_TAG_NAME = r"(?:think(?:ing)?|thought)"
|
||||
|
||||
# Closed reasoning blocks. Multi-pass loop in `strip_think` handles nested
|
||||
# `<think><think>...</think></think>` patterns some models emit.
|
||||
_THINK_CLOSED_RE = re.compile(rf"<{_THINK_TAG_NAME}(?:\s+[^>]*)?>[\s\S]*?</{_THINK_TAG_NAME}>\s*", re.IGNORECASE)
|
||||
# Orphan opening or closing tags that survive after the closed-pass.
|
||||
_THINK_TAG_RE = re.compile(rf"</?{_THINK_TAG_NAME}[^>]*>\s*", re.IGNORECASE)
|
||||
# Dangling opener anywhere in the response with no closer — strip everything
|
||||
# from `<think>` to the end of string.
|
||||
_THINK_OPEN_RE = re.compile(rf"<{_THINK_TAG_NAME}(?:\s+[^>]*)?>[\s\S]*$", re.IGNORECASE)
|
||||
# Streaming models occasionally emit `<thinking time="0.42">`-style attributes.
|
||||
# Normalize to a plain `<think>` so the regexes above catch them.
|
||||
_THINK_ATTR_RE = re.compile(rf"<{_THINK_TAG_NAME}\s+[^>]*>", re.IGNORECASE)
|
||||
_THINK_ATTR_CLOSE_RE = re.compile(rf"</{_THINK_TAG_NAME}\s+[^>]*>", re.IGNORECASE)
|
||||
# Think-tag matchers. `[^<>]` (not `[^>]`) bounds attribute scans at the next
|
||||
# `<` so an opener flood with no closing `>` can't backtrack to end-of-string
|
||||
# (ReDoS, CodeQL py/polynomial-redos); capture is identical for well-formed tags.
|
||||
# Opener/closer are split for the forward-only block strip (_sub_delimited).
|
||||
_THINK_OPEN_TAG_RE = re.compile(rf"<{_THINK_TAG_NAME}(?:\s[^<>]*)?>", re.IGNORECASE)
|
||||
_THINK_CLOSE_TAG_RE = re.compile(rf"</{_THINK_TAG_NAME}>\s*", re.IGNORECASE)
|
||||
# Orphan opening/closing tags left after the block strip.
|
||||
_THINK_TAG_RE = re.compile(rf"</?{_THINK_TAG_NAME}[^<>]*>\s*", re.IGNORECASE)
|
||||
# Dangling opener with no closer: strip from `<think>` to end of string.
|
||||
_THINK_OPEN_RE = re.compile(rf"<{_THINK_TAG_NAME}(?:\s[^<>]*)?>[\s\S]*$", re.IGNORECASE)
|
||||
# Normalize `<thinking time="0.42">`-style attributes to a plain `<think>`.
|
||||
_THINK_ATTR_RE = re.compile(rf"<{_THINK_TAG_NAME}\s[^<>]*>", re.IGNORECASE)
|
||||
_THINK_ATTR_CLOSE_RE = re.compile(rf"</{_THINK_TAG_NAME}\s[^<>]*>", re.IGNORECASE)
|
||||
_GEMMA_THOUGHT_OPEN_RE = re.compile(r"<\|channel>thought\s*\n?[\s\S]*$", re.IGNORECASE)
|
||||
_GEMMA_RESPONSE_CHANNEL_RE = re.compile(
|
||||
r"<\|channel>response\s*\n?([\s\S]*?)<channel\|>",
|
||||
re.IGNORECASE,
|
||||
)
|
||||
_GEMMA_RESPONSE_OPEN_RE = re.compile(r"<\|channel>response\s*\n?", re.IGNORECASE)
|
||||
_GEMMA_CHANNEL_CLOSE_RE = re.compile(r"<channel\|>", re.IGNORECASE)
|
||||
_THOUGHT_TAG_OPEN_RE = re.compile(r"<thought(\s+[^>]*)?>", re.IGNORECASE)
|
||||
_THOUGHT_TAG_OPEN_RE = re.compile(r"<thought(\s[^<>]*)?>", re.IGNORECASE)
|
||||
_THOUGHT_TAG_CLOSE_RE = re.compile(r"</thought>", re.IGNORECASE)
|
||||
_GEMMA_THOUGHT_CHANNEL_CAPTURE_RE = re.compile(
|
||||
r"<\|channel>thought\s*\n?([\s\S]*?)<channel\|>\s*",
|
||||
re.IGNORECASE,
|
||||
)
|
||||
# Gemma thought-channel delimiters, split for the forward-only sub (_sub_delimited).
|
||||
_GEMMA_THOUGHT_CHANNEL_OPEN_RE = re.compile(r"<\|channel>thought\s*\n?", re.IGNORECASE)
|
||||
_GEMMA_CHANNEL_CLOSE_TRIM_RE = re.compile(r"<channel\|>\s*", re.IGNORECASE)
|
||||
# Qwen and a few other models prefix the response with a "Thinking Process:"
|
||||
# block before the real answer.
|
||||
_QWEN_THINKING_RE = re.compile(
|
||||
@@ -93,6 +89,31 @@ def _strip_reasoning_prose(text: str) -> str:
|
||||
return "\n\n".join(keep).strip() if keep else text
|
||||
|
||||
|
||||
def _sub_delimited(text, open_re, close_re, repl):
|
||||
"""Forward-only ``re.sub`` of ``open_re...close_re`` that can't ReDoS.
|
||||
|
||||
Pairs each opener with the first closer after it and stops once no closer is
|
||||
reachable, so it stays O(n) instead of re.sub's rescan-to-end from every
|
||||
opener (O(n^2) on "many openers, no closer" input). ``repl`` gets the inner
|
||||
text. A whole-string "closer present?" guard is not enough: a stale closer
|
||||
before an opener flood keeps it true while every opener still rescans.
|
||||
"""
|
||||
out = []
|
||||
pos = 0
|
||||
while True:
|
||||
om = open_re.search(text, pos)
|
||||
if om is None:
|
||||
break
|
||||
cm = close_re.search(text, om.end())
|
||||
if cm is None:
|
||||
break
|
||||
out.append(text[pos:om.start()])
|
||||
out.append(repl(text[om.end():cm.start()]))
|
||||
pos = cm.end()
|
||||
out.append(text[pos:])
|
||||
return "".join(out)
|
||||
|
||||
|
||||
def normalize_thinking_markup(text: str) -> str:
|
||||
"""Canonicalize supported thinking wrappers to `<think>` markup.
|
||||
|
||||
@@ -106,12 +127,17 @@ def normalize_thinking_markup(text: str) -> str:
|
||||
out = _THOUGHT_TAG_OPEN_RE.sub(lambda m: "<think" + (m.group(1) or "") + ">", text)
|
||||
out = _THOUGHT_TAG_CLOSE_RE.sub("</think>", out)
|
||||
|
||||
def _replace_gemma_thought(match: re.Match) -> str:
|
||||
thought = match.group(1).strip()
|
||||
def _replace_gemma_thought(inner: str) -> str:
|
||||
thought = inner.strip()
|
||||
return f"<think>{thought}</think>\n" if thought else ""
|
||||
|
||||
out = _GEMMA_THOUGHT_CHANNEL_CAPTURE_RE.sub(_replace_gemma_thought, out)
|
||||
out = _GEMMA_RESPONSE_CHANNEL_RE.sub(lambda m: m.group(1), out)
|
||||
# Forward-only so a stale/unreachable `<channel|>` can't drive a ReDoS rescan.
|
||||
out = _sub_delimited(
|
||||
out, _GEMMA_THOUGHT_CHANNEL_OPEN_RE, _GEMMA_CHANNEL_CLOSE_TRIM_RE, _replace_gemma_thought
|
||||
)
|
||||
out = _sub_delimited(
|
||||
out, _GEMMA_RESPONSE_OPEN_RE, _GEMMA_CHANNEL_CLOSE_RE, lambda inner: inner
|
||||
)
|
||||
out = _GEMMA_RESPONSE_OPEN_RE.sub("", out)
|
||||
out = _GEMMA_CHANNEL_CLOSE_RE.sub("", out)
|
||||
return out
|
||||
@@ -149,12 +175,9 @@ def strip_think(text: str, *, prose: bool = False, prompt_echo: bool = True) ->
|
||||
# Normalize attributes so the closed/open regexes can catch them.
|
||||
text = _THINK_ATTR_RE.sub("<think>", text)
|
||||
text = _THINK_ATTR_CLOSE_RE.sub("</think>", text)
|
||||
# Multi-pass for nested blocks.
|
||||
prev = None
|
||||
out = text
|
||||
while prev != out:
|
||||
prev = out
|
||||
out = _THINK_CLOSED_RE.sub("", out)
|
||||
# Forward-only block strip (see _sub_delimited): one pass collapses nested
|
||||
# and sequential blocks without the old lazy re.sub loop's ReDoS rescan.
|
||||
out = _sub_delimited(text, _THINK_OPEN_TAG_RE, _THINK_CLOSE_TAG_RE, lambda _inner: "")
|
||||
out = _THINK_OPEN_RE.sub("", out)
|
||||
out = _THINK_TAG_RE.sub("", out)
|
||||
if prompt_echo:
|
||||
|
||||
+87
-10
@@ -31,6 +31,12 @@ _TOOL_CALL_RE = re.compile(
|
||||
r"\[TOOL_CALL\]\s*\{([\s\S]*?)\}\s*\[/TOOL_CALL\]",
|
||||
re.IGNORECASE,
|
||||
)
|
||||
# Same delimiters as _TOOL_CALL_RE, split so they can be driven by
|
||||
# _iter_delimited (a forward-only scan). The closer is `}\s*[/TOOL_CALL]`, so a
|
||||
# present-but-unmatched `[/TOOL_CALL]` with no inner `}` ahead simply ends the
|
||||
# scan instead of triggering re.finditer's O(n^2) rescan. See _iter_delimited.
|
||||
_TOOL_CALL_OPEN_RE = re.compile(r"\[TOOL_CALL\]\s*\{", re.IGNORECASE)
|
||||
_TOOL_CALL_CLOSE_RE = re.compile(r"\}\s*\[/TOOL_CALL\]", re.IGNORECASE)
|
||||
|
||||
# Pattern 3: XML-style tool calls (minimax, some other models)
|
||||
# <minimax:tool_call><invoke name="bash"><parameter name="command">...</parameter></invoke></minimax:tool_call>
|
||||
@@ -43,6 +49,15 @@ _XML_OPEN_TOOL_CALL_RE = re.compile(
|
||||
r"<(?:[\w]+:)?(?:tool_call|function_call)>\s*([\s\S]*)\Z",
|
||||
re.IGNORECASE,
|
||||
)
|
||||
# _XML_TOOL_CALL_RE's delimiters, split for _iter_delimited's forward-only scan.
|
||||
_XML_TOOL_CALL_OPEN_RE = re.compile(
|
||||
r"<(?:[\w]+:)?(?:tool_call|function_call)>\s*",
|
||||
re.IGNORECASE,
|
||||
)
|
||||
_XML_TOOL_CALL_CLOSE_RE = re.compile(
|
||||
r"</(?:[\w]+:)?(?:tool_call|function_call)>",
|
||||
re.IGNORECASE,
|
||||
)
|
||||
_XML_INVOKE_RE = re.compile(
|
||||
r'<invoke\s+name=["\'](\w+)["\']>\s*([\s\S]*?)</invoke>',
|
||||
re.IGNORECASE,
|
||||
@@ -73,6 +88,9 @@ _TOOL_CODE_RE = re.compile(
|
||||
r"<tool_code>\s*\{([\s\S]*?)\}\s*</tool_code>",
|
||||
re.IGNORECASE,
|
||||
)
|
||||
# _TOOL_CODE_RE's delimiters, split for _iter_delimited's forward-only scan.
|
||||
_TOOL_CODE_OPEN_RE = re.compile(r"<tool_code>\s*\{", re.IGNORECASE)
|
||||
_TOOL_CODE_CLOSE_RE = re.compile(r"\}\s*</tool_code>", re.IGNORECASE)
|
||||
|
||||
# Pattern 5: DeepSeek DSML markup leaking into content. When deepseek
|
||||
# models can't emit structured tool_calls (e.g. we sent no tool schemas
|
||||
@@ -736,6 +754,52 @@ def _parse_tool_code_block(raw: str) -> Optional[ToolBlock]:
|
||||
return None
|
||||
|
||||
|
||||
def _iter_delimited(text, open_re, close_re):
|
||||
"""Yield ``(match_start, inner_start, inner_end, match_end)`` for each
|
||||
non-overlapping ``open_re ... close_re`` pair, scanning strictly forward.
|
||||
|
||||
For the lazy, non-nesting delimiters here this is equivalent to
|
||||
``re.finditer`` of ``open_re([\\s\\S]*?)close_re`` (each opener pairs with
|
||||
the first closer after it; the next scan resumes past that closer), but it
|
||||
runs in O(n): the moment an opener has no reachable closer, no later opener
|
||||
can have one either, so we stop. ``re.finditer`` instead retries from every
|
||||
opener and rescans to end-of-string each time -> O(n^2) on attacker-
|
||||
controlled "many openers, no closer" model output (CodeQL py/polynomial-redos).
|
||||
|
||||
A whole-string "is the closer present?" guard is not enough: a stale closer
|
||||
placed before an opener flood, or a closer with no matching inner delimiter
|
||||
(e.g. `[/TOOL_CALL]` but no `}`), keeps the guard true while every opener
|
||||
still rescans. Pairing each opener only with a closer *after* it closes both
|
||||
holes.
|
||||
"""
|
||||
pos = 0
|
||||
while True:
|
||||
om = open_re.search(text, pos)
|
||||
if om is None:
|
||||
return
|
||||
cm = close_re.search(text, om.end())
|
||||
if cm is None:
|
||||
return
|
||||
yield om.start(), om.end(), cm.start(), cm.end()
|
||||
pos = cm.end()
|
||||
|
||||
|
||||
def _strip_delimited(text: str, open_re, close_re) -> str:
|
||||
"""Remove every ``open_re ... close_re`` span (forward-only; see
|
||||
_iter_delimited). Equivalent to ``open_re([\\s\\S]*?)close_re`` ``re.sub('')``
|
||||
for these delimiters, without the O(n^2) rescan on unclosed openers."""
|
||||
spans = list(_iter_delimited(text, open_re, close_re))
|
||||
if not spans:
|
||||
return text
|
||||
out = []
|
||||
last = 0
|
||||
for match_start, _inner_start, _inner_end, match_end in spans:
|
||||
out.append(text[last:match_start])
|
||||
last = match_end
|
||||
out.append(text[last:])
|
||||
return "".join(out)
|
||||
|
||||
|
||||
def parse_tool_blocks(text: str, skip_fenced: bool = False) -> List[ToolBlock]:
|
||||
"""Extract executable tool blocks from LLM response text.
|
||||
|
||||
@@ -794,9 +858,14 @@ def parse_tool_blocks(text: str, skip_fenced: bool = False) -> List[ToolBlock]:
|
||||
blocks.append(ToolBlock(tag, content))
|
||||
|
||||
# Pattern 2: [TOOL_CALL] blocks (only if no fenced blocks found)
|
||||
# _iter_delimited scans the delimiter-bounded formats forward-only so
|
||||
# untrusted "many openers, no closer" output can't drive the O(n^2)
|
||||
# finditer rescan (ReDoS); see its docstring.
|
||||
if not blocks:
|
||||
for m in _TOOL_CALL_RE.finditer(text):
|
||||
block = _parse_tool_call_block(m.group(1))
|
||||
for _ms, inner_start, inner_end, _me in _iter_delimited(
|
||||
text, _TOOL_CALL_OPEN_RE, _TOOL_CALL_CLOSE_RE
|
||||
):
|
||||
block = _parse_tool_call_block(text[inner_start:inner_end])
|
||||
if block:
|
||||
blocks.append(block)
|
||||
|
||||
@@ -809,13 +878,16 @@ def parse_tool_blocks(text: str, skip_fenced: bool = False) -> List[ToolBlock]:
|
||||
if blocks:
|
||||
return blocks
|
||||
# Try wrapped: <tool_call><invoke ...>...</invoke></tool_call>
|
||||
for m in _XML_TOOL_CALL_RE.finditer(text):
|
||||
for inv in _XML_INVOKE_RE.finditer(m.group(1)):
|
||||
for _ms, inner_start, inner_end, _me in _iter_delimited(
|
||||
text, _XML_TOOL_CALL_OPEN_RE, _XML_TOOL_CALL_CLOSE_RE
|
||||
):
|
||||
body = text[inner_start:inner_end]
|
||||
for inv in _XML_INVOKE_RE.finditer(body):
|
||||
block = _parse_xml_invoke(inv)
|
||||
if block:
|
||||
blocks.append(block)
|
||||
if not blocks:
|
||||
for direct in _XML_DIRECT_TOOL_RE.finditer(m.group(1)):
|
||||
for direct in _XML_DIRECT_TOOL_RE.finditer(body):
|
||||
block = _parse_xml_direct_tool(direct)
|
||||
if block:
|
||||
blocks.append(block)
|
||||
@@ -843,8 +915,10 @@ def parse_tool_blocks(text: str, skip_fenced: bool = False) -> List[ToolBlock]:
|
||||
|
||||
# Pattern 4: <tool_code> blocks (MiniMax-M2.5 style)
|
||||
if not blocks:
|
||||
for m in _TOOL_CODE_RE.finditer(text):
|
||||
block = _parse_tool_code_block(m.group(1))
|
||||
for _ms, inner_start, inner_end, _me in _iter_delimited(
|
||||
text, _TOOL_CODE_OPEN_RE, _TOOL_CODE_CLOSE_RE
|
||||
):
|
||||
block = _parse_tool_code_block(text[inner_start:inner_end])
|
||||
if block:
|
||||
blocks.append(block)
|
||||
|
||||
@@ -874,11 +948,14 @@ def strip_tool_blocks(text: str, skip_fenced: bool = False) -> str:
|
||||
# / <tool_call> removers below instead of leaking to the user.
|
||||
text = _normalize_dsml(text)
|
||||
cleaned = text if skip_fenced else _TOOL_BLOCK_RE.sub('', text)
|
||||
cleaned = _TOOL_CALL_RE.sub('', cleaned)
|
||||
# Forward-only removal mirrors parse_tool_blocks: _strip_delimited pairs each
|
||||
# opener with a later closer and stops when none is reachable, so untrusted
|
||||
# output can't drive the O(n^2) lazy-rescan (ReDoS); see _iter_delimited.
|
||||
cleaned = _strip_delimited(cleaned, _TOOL_CALL_OPEN_RE, _TOOL_CALL_CLOSE_RE)
|
||||
cleaned = _strip_stepfun_tool_markup(cleaned)
|
||||
cleaned = _XML_TOOL_CALL_RE.sub('', cleaned)
|
||||
cleaned = _strip_delimited(cleaned, _XML_TOOL_CALL_OPEN_RE, _XML_TOOL_CALL_CLOSE_RE)
|
||||
cleaned = _XML_OPEN_TOOL_CALL_RE.sub('', cleaned)
|
||||
cleaned = _TOOL_CODE_RE.sub('', cleaned)
|
||||
cleaned = _strip_delimited(cleaned, _TOOL_CODE_OPEN_RE, _TOOL_CODE_CLOSE_RE)
|
||||
if not skip_fenced:
|
||||
raw_web_json = _parse_raw_web_json_lookup(cleaned)
|
||||
if raw_web_json:
|
||||
|
||||
@@ -0,0 +1,201 @@
|
||||
"""Regression tests for ReDoS in the regexes that parse untrusted LLM output.
|
||||
|
||||
CodeQL flagged several `py/polynomial-redos` sinks in `text_helpers.py` and
|
||||
`tool_parsing.py`. Each is a delimiter-bounded pattern (`<open>...<close>`)
|
||||
applied with `re.sub`/`re.finditer` over a whole model response. When the
|
||||
closing delimiter is missing, the engine rescans to end-of-string from every
|
||||
opening occurrence -> O(n^2) on attacker-influenced input (prompt injection
|
||||
via tool output / retrieved content).
|
||||
|
||||
These tests pin BOTH halves of the fix:
|
||||
* correctness is unchanged for legitimate inputs, and
|
||||
* pathological "many openers, no closer" inputs complete promptly.
|
||||
|
||||
The timing bound is deliberately loose (seconds, not ms) so it never flakes on
|
||||
a slow CI box; the unguarded code took tens of seconds on the same inputs, so
|
||||
the margin is ~100x.
|
||||
"""
|
||||
|
||||
import time
|
||||
|
||||
import pytest
|
||||
|
||||
import src.agent_tools # noqa: F401 (break agent_tools<->tool_parsing import cycle)
|
||||
from src.text_helpers import normalize_thinking_markup, strip_think
|
||||
from src.tool_parsing import parse_tool_blocks, strip_tool_blocks
|
||||
|
||||
# Loose ceiling: guarded paths finish in well under 100ms; the vulnerable
|
||||
# versions took 8-30s on these same inputs.
|
||||
_BUDGET_S = 4.0
|
||||
|
||||
|
||||
def _timed(fn, *args):
|
||||
start = time.perf_counter()
|
||||
result = fn(*args)
|
||||
return result, time.perf_counter() - start
|
||||
|
||||
|
||||
# ── correctness is preserved ────────────────────────────────────────────────
|
||||
|
||||
def test_thought_attr_normalization_unchanged():
|
||||
# `<thought time="0.4">` -> `<think time="0.4">` then stripped.
|
||||
assert strip_think('<thought time="0.4">reasoning</thought>Answer.') == "Answer."
|
||||
assert normalize_thinking_markup("<thought>x</thought>") == "<think>x</think>"
|
||||
|
||||
|
||||
def test_gemma_channel_unwrap_unchanged():
|
||||
text = "<|channel>thought\ninternal<channel|><|channel>response\nFinal.<channel|>"
|
||||
assert strip_think(text) == "Final."
|
||||
|
||||
|
||||
def test_thought_prefix_tags_not_overmatched():
|
||||
# The `<thought...>` opener must keep a tag-name boundary: tags whose names
|
||||
# merely start with "thought" are unrelated markup and must pass through
|
||||
# untouched (no `<thinkful>`/`<thinks>` corruption).
|
||||
for text in ("<thoughtful>keep</thoughtful>", "<thoughts>keep</thoughts>"):
|
||||
assert normalize_thinking_markup(text) == text
|
||||
|
||||
|
||||
def test_tool_call_blocks_still_parsed():
|
||||
blocks = parse_tool_blocks('[TOOL_CALL]{tool: "shell", command: "ls"}[/TOOL_CALL]')
|
||||
assert blocks, "well-formed [TOOL_CALL] block should still parse"
|
||||
assert "[TOOL_CALL]" not in strip_tool_blocks('before [TOOL_CALL]{tool: "shell", command: "ls"}[/TOOL_CALL] after')
|
||||
|
||||
|
||||
def test_xml_tool_call_blocks_still_parsed():
|
||||
xml = '<tool_call><invoke name="bash"><parameter name="command">ls</parameter></invoke></tool_call>'
|
||||
blocks = parse_tool_blocks(xml)
|
||||
assert blocks, "well-formed <tool_call> block should still parse"
|
||||
assert "tool_call" not in strip_tool_blocks(xml)
|
||||
|
||||
|
||||
def test_tool_code_blocks_still_parsed():
|
||||
assert "<tool_code>" not in strip_tool_blocks('<tool_code>{"tool": "shell"}</tool_code>')
|
||||
|
||||
|
||||
# ── pathological inputs no longer blow up ───────────────────────────────────
|
||||
|
||||
def test_thought_open_no_close_is_fast():
|
||||
evil = "<thought" + " " * 60_000 # no closing '>', ambiguous (\s+[^>]*)? loops
|
||||
out, dt = _timed(normalize_thinking_markup, evil)
|
||||
assert dt < _BUDGET_S, f"normalize_thinking_markup took {dt:.2f}s"
|
||||
assert out == evil # nothing to normalize, returned unchanged
|
||||
|
||||
|
||||
def test_gemma_channel_opener_flood_is_fast():
|
||||
evil = "<|channel>thought\n" * 4000 # no <channel|> closer
|
||||
_, dt = _timed(normalize_thinking_markup, evil)
|
||||
assert dt < _BUDGET_S, f"normalize_thinking_markup took {dt:.2f}s"
|
||||
|
||||
|
||||
def test_gemma_stale_closer_before_opener_flood_is_fast():
|
||||
# A lone leading <channel|> makes a whole-string "closer present?" check
|
||||
# true, but no <|channel>thought opener after it has a reachable closer.
|
||||
evil = "<channel|>" + "<|channel>thought\n" * 4000
|
||||
_, dt = _timed(normalize_thinking_markup, evil)
|
||||
assert dt < _BUDGET_S, f"normalize_thinking_markup took {dt:.2f}s"
|
||||
|
||||
|
||||
def test_tool_call_opener_flood_is_fast():
|
||||
evil = "[TOOL_CALL]{tool: x}" * 6000 # '}' present but no [/TOOL_CALL] closer
|
||||
blocks, dt = _timed(parse_tool_blocks, evil)
|
||||
assert dt < _BUDGET_S, f"parse_tool_blocks took {dt:.2f}s"
|
||||
assert blocks == []
|
||||
_, dt2 = _timed(strip_tool_blocks, evil)
|
||||
assert dt2 < _BUDGET_S, f"strip_tool_blocks took {dt2:.2f}s"
|
||||
|
||||
|
||||
def test_xml_tool_call_opener_flood_is_fast():
|
||||
# strip_tool_blocks exercises the CodeQL-flagged _XML_TOOL_CALL_RE in
|
||||
# isolation (the parse path also reaches _XML_DIRECT_TOOL_RE, a separate
|
||||
# unflagged backreference pattern tracked as a follow-up).
|
||||
evil = ("<tool_call>" + "a" * 20) * 4000 # no </tool_call> closer
|
||||
_, dt = _timed(strip_tool_blocks, evil)
|
||||
assert dt < _BUDGET_S, f"strip_tool_blocks took {dt:.2f}s"
|
||||
|
||||
|
||||
def test_tool_code_opener_flood_is_fast():
|
||||
evil = "<tool_code>{tool: x}" * 6000 # '}' present but no </tool_code> closer
|
||||
_, dt = _timed(parse_tool_blocks, evil)
|
||||
assert dt < _BUDGET_S, f"parse_tool_blocks took {dt:.2f}s"
|
||||
_, dt2 = _timed(strip_tool_blocks, evil)
|
||||
assert dt2 < _BUDGET_S, f"strip_tool_blocks took {dt2:.2f}s"
|
||||
|
||||
|
||||
# ── a present closer must not re-enable the O(n^2) rescan ────────────────────
|
||||
# A whole-string "closer exists?" guard is defeated by a stale closer placed
|
||||
# before an opener flood, or by a closer whose required inner delimiter is
|
||||
# missing. The parser must pair each opener only with a *later* closer.
|
||||
|
||||
def test_xml_stale_closer_before_opener_flood_is_fast():
|
||||
# A lone leading </tool_call> makes a whole-string closer check true, but no
|
||||
# opener after it has a reachable closer. (strip exercises the CodeQL-flagged
|
||||
# _XML_TOOL_CALL_RE path; parse additionally reaches _XML_DIRECT_TOOL_RE, the
|
||||
# separate backreference pattern tracked as a follow-up — see
|
||||
# test_xml_tool_call_opener_flood_is_fast.)
|
||||
evil = "</tool_call>" + ("<tool_call>" + "a" * 10) * 6000
|
||||
_, dt = _timed(strip_tool_blocks, evil)
|
||||
assert dt < _BUDGET_S, f"strip_tool_blocks took {dt:.2f}s"
|
||||
|
||||
|
||||
def test_tool_call_closer_present_without_inner_brace_is_fast():
|
||||
# Leading [/TOOL_CALL] satisfies a substring guard, but the openers carry no
|
||||
# inner '}', so '}\\s*[/TOOL_CALL]' is never reachable from any opener.
|
||||
evil = "[/TOOL_CALL]" + "[TOOL_CALL]{tool: x" * 6000
|
||||
blocks, dt = _timed(parse_tool_blocks, evil)
|
||||
assert dt < _BUDGET_S, f"parse_tool_blocks took {dt:.2f}s"
|
||||
assert blocks == []
|
||||
_, dt2 = _timed(strip_tool_blocks, evil)
|
||||
assert dt2 < _BUDGET_S, f"strip_tool_blocks took {dt2:.2f}s"
|
||||
|
||||
|
||||
def test_tool_code_closer_present_without_inner_brace_is_fast():
|
||||
evil = "</tool_code>" + "<tool_code>{tool: x" * 6000
|
||||
blocks, dt = _timed(parse_tool_blocks, evil)
|
||||
assert dt < _BUDGET_S, f"parse_tool_blocks took {dt:.2f}s"
|
||||
assert blocks == []
|
||||
_, dt2 = _timed(strip_tool_blocks, evil)
|
||||
assert dt2 < _BUDGET_S, f"strip_tool_blocks took {dt2:.2f}s"
|
||||
|
||||
|
||||
# ── strip_think() is the production entrypoint that callers actually run ─────
|
||||
# The timing tests above cover normalize_thinking_markup and the scanners;
|
||||
# these cover strip_think() itself, which applies the think-tag regexes too.
|
||||
|
||||
def test_strip_think_nested_and_attr_blocks_unchanged():
|
||||
# Values pin pre-existing behavior (incl. the nested-block quirk that leaves
|
||||
# the inter-tag `c`) so the forward-only rewrite stays byte-equal.
|
||||
assert strip_think("<think>a<think>b</think>c</think>Answer.") == "cAnswer."
|
||||
assert strip_think('<think time="0.4">reasoning</think>Answer.') == "Answer."
|
||||
assert strip_think("<thinking>x</thinking>Answer.") == "Answer."
|
||||
assert strip_think("<think>r</think>Answer.") == "Answer."
|
||||
assert strip_think("Answer.") == "Answer."
|
||||
|
||||
|
||||
def test_strip_think_malformed_open_no_gt_is_fast():
|
||||
for opener in ("<think", "<thinking", "<thought"):
|
||||
evil = opener + " " * 40_000 # no closing '>'
|
||||
out, dt = _timed(strip_think, evil)
|
||||
assert dt < _BUDGET_S, f"strip_think({opener!r}) took {dt:.2f}s"
|
||||
assert out == evil.strip() # nothing is a real tag
|
||||
|
||||
|
||||
def test_strip_think_attr_opener_flood_is_fast():
|
||||
for opener in ("<think x", "<thinking x", "<thought x"): # no `>`, no closer
|
||||
evil = opener * 8000
|
||||
_, dt = _timed(strip_think, evil)
|
||||
assert dt < _BUDGET_S, f"strip_think({opener!r}) took {dt:.2f}s"
|
||||
|
||||
|
||||
def test_strip_think_closed_opener_flood_is_fast():
|
||||
evil = "<think>" * 16000 # well-formed openers, no closer
|
||||
out, dt = _timed(strip_think, evil)
|
||||
assert dt < _BUDGET_S, f"strip_think took {dt:.2f}s"
|
||||
assert out == ""
|
||||
|
||||
|
||||
def test_strip_think_malformed_closer_flood_is_fast():
|
||||
evil = "</think x" * 8000 # closer flood, no `>`
|
||||
out, dt = _timed(strip_think, evil)
|
||||
assert dt < _BUDGET_S, f"strip_think took {dt:.2f}s"
|
||||
assert out == evil.strip()
|
||||
Reference in New Issue
Block a user