mirror of
https://github.com/pewdiepie-archdaemon/odysseus.git
synced 2026-06-24 21:55:25 -04:00
fix(agent): parse misfenced read_file calls (#4799)
This commit is contained in:
+84
-1
@@ -308,6 +308,88 @@ def _parse_misfenced_web_lookup(content: str) -> Optional[ToolBlock]:
|
||||
return ToolBlock("web_fetch", url)
|
||||
|
||||
|
||||
|
||||
def _parse_misfenced_read_file_lookup(content: str, *, allow_shell_style: bool = False) -> Optional[ToolBlock]:
|
||||
"""Recover simple read_file calls wrapped in python/bash fences."""
|
||||
stripped = content.strip()
|
||||
if not stripped:
|
||||
return None
|
||||
|
||||
try:
|
||||
module = ast.parse(stripped, mode="exec")
|
||||
except SyntaxError:
|
||||
module = None
|
||||
if module and len(module.body) == 1 and isinstance(module.body[0], ast.Expr):
|
||||
call = module.body[0].value
|
||||
if isinstance(call, ast.Call) and isinstance(call.func, ast.Name):
|
||||
if call.func.id.lower() != "read_file" or len(call.args) > 1:
|
||||
return None
|
||||
args = {}
|
||||
if call.args:
|
||||
path = _literal_string(call.args[0])
|
||||
if not path:
|
||||
return None
|
||||
args["path"] = path
|
||||
allowed = {"path", "file", "file_path", "offset", "limit"}
|
||||
for keyword in call.keywords:
|
||||
if keyword.arg not in allowed:
|
||||
return None
|
||||
key = "path" if keyword.arg in ("file", "file_path") else keyword.arg
|
||||
if key == "path":
|
||||
path = _literal_string(keyword.value)
|
||||
if not path:
|
||||
return None
|
||||
args["path"] = path
|
||||
continue
|
||||
try:
|
||||
value = ast.literal_eval(keyword.value)
|
||||
except (ValueError, SyntaxError, TypeError):
|
||||
return None
|
||||
if not isinstance(value, int) or value < 0:
|
||||
return None
|
||||
args[key] = value
|
||||
if not args.get("path"):
|
||||
return None
|
||||
from src.tool_schemas import function_call_to_tool_block
|
||||
return function_call_to_tool_block("read_file", json.dumps(args))
|
||||
|
||||
if not allow_shell_style:
|
||||
return None
|
||||
lines = [line.strip() for line in stripped.splitlines() if line.strip()]
|
||||
if len(lines) != 1:
|
||||
return None
|
||||
match = re.fullmatch(r"read_file\s+(.+)", lines[0], re.IGNORECASE)
|
||||
if not match:
|
||||
return None
|
||||
path = match.group(1).strip()
|
||||
if not path:
|
||||
return None
|
||||
if path.startswith("{"):
|
||||
try:
|
||||
args = json.loads(path)
|
||||
except json.JSONDecodeError:
|
||||
return None
|
||||
if not isinstance(args, dict):
|
||||
return None
|
||||
normalized = {}
|
||||
raw_path = args.get("path") or args.get("file") or args.get("file_path")
|
||||
if isinstance(raw_path, str) and raw_path.strip():
|
||||
normalized["path"] = raw_path.strip()
|
||||
for key in ("offset", "limit"):
|
||||
value = args.get(key)
|
||||
if isinstance(value, int) and value >= 0:
|
||||
normalized[key] = value
|
||||
if not normalized.get("path"):
|
||||
return None
|
||||
from src.tool_schemas import function_call_to_tool_block
|
||||
return function_call_to_tool_block("read_file", json.dumps(normalized))
|
||||
if len(path) >= 2 and path[0] == path[-1] and path[0] in "'\"":
|
||||
path = path[1:-1].strip()
|
||||
if not path:
|
||||
return None
|
||||
return ToolBlock("read_file", path)
|
||||
|
||||
|
||||
def _coerce_raw_web_query(value) -> Optional[str]:
|
||||
if isinstance(value, str) and value.strip():
|
||||
return value.strip()
|
||||
@@ -704,7 +786,8 @@ def parse_tool_blocks(text: str, skip_fenced: bool = False) -> List[ToolBlock]:
|
||||
# _XML_INVOKE_RE's \w+ can't match would otherwise be executed as code.
|
||||
continue
|
||||
if tag in ("python", "bash"):
|
||||
block = _parse_misfenced_web_lookup(content)
|
||||
block = (_parse_misfenced_web_lookup(content)
|
||||
or _parse_misfenced_read_file_lookup(content, allow_shell_style=(tag == "bash")))
|
||||
if block:
|
||||
blocks.append(block)
|
||||
continue
|
||||
|
||||
Reference in New Issue
Block a user