mirror of
https://github.com/pewdiepie-archdaemon/odysseus.git
synced 2026-06-22 20:55:29 -04:00
CodeQL hardening for cookbook sync
This commit is contained in:
+52
-16
@@ -61,14 +61,11 @@ _XML_DIRECT_TOOL_RE = re.compile(
|
||||
# <|tool▁call▁begin|>tool_name<|tool▁sep|>{...}<|tool▁call▁end|>
|
||||
# These can leak as text through llama.cpp/Ollama-style endpoints when the
|
||||
# engine does not return structured OpenAI tool_calls.
|
||||
_STEPFUN_TOOL_CALL_RE = re.compile(
|
||||
r"<|tool▁call▁begin|>\s*([A-Za-z_][\w.-]*)\s*<|tool▁sep|>\s*([\s\S]*?)\s*<|tool▁call▁end|>",
|
||||
re.IGNORECASE,
|
||||
)
|
||||
_STEPFUN_TOOL_CALLS_WRAPPER_RE = re.compile(
|
||||
r"</?|tool▁calls▁(?:begin|end)|>",
|
||||
re.IGNORECASE,
|
||||
)
|
||||
_STEPFUN_CALL_BEGIN = "<|tool▁call▁begin|>"
|
||||
_STEPFUN_CALL_SEP = "<|tool▁sep|>"
|
||||
_STEPFUN_CALL_END = "<|tool▁call▁end|>"
|
||||
_STEPFUN_CALLS_BEGIN = "<|tool▁calls▁begin|>"
|
||||
_STEPFUN_CALLS_END = "<|tool▁calls▁end|>"
|
||||
|
||||
# Pattern 4: <tool_code> blocks (MiniMax-M2.5 style)
|
||||
# {tool => 'tool_name', args => '<param>value</param>'}
|
||||
@@ -508,13 +505,53 @@ def _parse_xml_direct_tool(tool_match) -> Optional[ToolBlock]:
|
||||
return function_call_to_tool_block(mapped, json.dumps(params))
|
||||
|
||||
|
||||
def _parse_stepfun_tool_call(call_match) -> Optional[ToolBlock]:
|
||||
def _iter_stepfun_tool_calls(text: str):
|
||||
"""Yield StepFun native tool-call token bodies without regex backtracking."""
|
||||
pos = 0
|
||||
while True:
|
||||
start = text.find(_STEPFUN_CALL_BEGIN, pos)
|
||||
if start < 0:
|
||||
return
|
||||
name_start = start + len(_STEPFUN_CALL_BEGIN)
|
||||
sep = text.find(_STEPFUN_CALL_SEP, name_start)
|
||||
if sep < 0:
|
||||
return
|
||||
end = text.find(_STEPFUN_CALL_END, sep + len(_STEPFUN_CALL_SEP))
|
||||
if end < 0:
|
||||
return
|
||||
raw_name = text[name_start:sep].strip()
|
||||
body = text[sep + len(_STEPFUN_CALL_SEP):end].strip()
|
||||
if raw_name and len(raw_name) <= 128:
|
||||
yield raw_name, body
|
||||
pos = end + len(_STEPFUN_CALL_END)
|
||||
|
||||
|
||||
def _strip_stepfun_tool_markup(text: str) -> str:
|
||||
"""Remove StepFun tool-call token blocks and wrappers using literal scans."""
|
||||
out = []
|
||||
pos = 0
|
||||
while True:
|
||||
start = text.find(_STEPFUN_CALL_BEGIN, pos)
|
||||
if start < 0:
|
||||
out.append(text[pos:])
|
||||
break
|
||||
end = text.find(_STEPFUN_CALL_END, start + len(_STEPFUN_CALL_BEGIN))
|
||||
if end < 0:
|
||||
out.append(text[pos:])
|
||||
break
|
||||
out.append(text[pos:start])
|
||||
pos = end + len(_STEPFUN_CALL_END)
|
||||
cleaned = "".join(out)
|
||||
return cleaned.replace(_STEPFUN_CALLS_BEGIN, "").replace(_STEPFUN_CALLS_END, "")
|
||||
|
||||
|
||||
def _parse_stepfun_tool_call(tool_name: str, body: str) -> Optional[ToolBlock]:
|
||||
"""Parse StepFun native tool-call tokens into an Odysseus ToolBlock."""
|
||||
tool_name = call_match.group(1).lower().replace("-", "_").replace(".", "_")
|
||||
tool_name = tool_name.lower().replace("-", "_").replace(".", "_")
|
||||
mapped = _TOOL_NAME_MAP.get(tool_name) or (tool_name if tool_name in TOOL_TAGS else None)
|
||||
if not mapped:
|
||||
return None
|
||||
body = call_match.group(2).strip()
|
||||
body = (body or "").strip()
|
||||
if not body:
|
||||
return None
|
||||
try:
|
||||
@@ -660,8 +697,8 @@ def parse_tool_blocks(text: str, skip_fenced: bool = False) -> List[ToolBlock]:
|
||||
|
||||
# Pattern 3: XML-style <tool_call>/<invoke> blocks
|
||||
if not blocks:
|
||||
for step_call in _STEPFUN_TOOL_CALL_RE.finditer(text):
|
||||
block = _parse_stepfun_tool_call(step_call)
|
||||
for tool_name, body in _iter_stepfun_tool_calls(text):
|
||||
block = _parse_stepfun_tool_call(tool_name, body)
|
||||
if block:
|
||||
blocks.append(block)
|
||||
if blocks:
|
||||
@@ -733,8 +770,7 @@ def strip_tool_blocks(text: str, skip_fenced: bool = False) -> str:
|
||||
text = _normalize_dsml(text)
|
||||
cleaned = text if skip_fenced else _TOOL_BLOCK_RE.sub('', text)
|
||||
cleaned = _TOOL_CALL_RE.sub('', cleaned)
|
||||
cleaned = _STEPFUN_TOOL_CALL_RE.sub('', cleaned)
|
||||
cleaned = _STEPFUN_TOOL_CALLS_WRAPPER_RE.sub('', cleaned)
|
||||
cleaned = _strip_stepfun_tool_markup(cleaned)
|
||||
cleaned = _XML_TOOL_CALL_RE.sub('', cleaned)
|
||||
cleaned = _XML_OPEN_TOOL_CALL_RE.sub('', cleaned)
|
||||
cleaned = _TOOL_CODE_RE.sub('', cleaned)
|
||||
@@ -744,6 +780,6 @@ def strip_tool_blocks(text: str, skip_fenced: bool = False) -> str:
|
||||
_, (start, end) = raw_web_json
|
||||
cleaned = cleaned[:start] + cleaned[end:]
|
||||
# Strip bare <invoke> blocks not wrapped in <tool_call>
|
||||
cleaned = re.sub(r'<invoke\s+name=["\'].*?</invoke>', '', cleaned, flags=re.DOTALL | re.IGNORECASE)
|
||||
cleaned = _XML_INVOKE_RE.sub('', cleaned)
|
||||
cleaned = re.sub(r'\n{3,}', '\n\n', cleaned)
|
||||
return cleaned.strip()
|
||||
|
||||
Reference in New Issue
Block a user