Files
odysseus/src/agent_tools/filesystem_tools.py
T
Michael 39a802bea2 fix(tools): prune skipped dirs before descending in glob tool (#4538)
* fix(tools): prune skipped dirs before descending in glob tool

GlobTool used pathlib.Path.rglob which descends into every directory
(including node_modules, .git, dist, etc.) and filters AFTER the walk.
On repos with large junk directories this causes the glob tool to hang
for minutes.

Replace rglob with os.walk that prunes _CODENAV_SKIP_DIRS before
descending — matching the approach GrepTool already uses. Also add a
fast path for literal patterns (no wildcards → direct path lookup).

Fixes #4493

* fix(tools): use regex glob matching to fix * semantics and literal fallback

Replace fnmatch with _glob_to_regex so that * stays within a single
path segment (matching pathlib/rglob semantics) and **/ spans zero or
more directories.  Literal patterns now fall through to os.walk when
the direct path lookup misses, so e.g. 'foo.py' still finds files at
any depth.

Add tests for:
- bare literal matching in subdirectories
- multi-segment single-star patterns (sub/*.txt)
- * not crossing / boundaries
- ** matching at arbitrary depth

Closes #4493

---------

Co-authored-by: michaelxer <michaelxer@users.noreply.github.com>
2026-06-18 22:02:29 +02:00

440 lines
19 KiB
Python

import asyncio
import json
import os
import re
import difflib
import fnmatch
import shutil
from typing import Optional, Dict, Any, Tuple
from src.constants import MAX_READ_CHARS, MAX_DIFF_LINES, MAX_OUTPUT_CHARS
_CODENAV_SKIP_DIRS = frozenset({
".git", ".hg", ".svn", "node_modules", "venv", ".venv", "__pycache__",
".mypy_cache", ".pytest_cache", ".ruff_cache", "dist", "build",
".next", ".cache", "site-packages", ".idea", ".tox",
})
_CODENAV_MAX_HITS = 200
_CODENAV_MAX_LINE = 400
def _glob_to_regex(pat: str) -> "re.Pattern":
"""Translate a forward-slash glob (**, *, ?) into a compiled regex.
`**/` matches zero or more complete directories.
`*` matches within a single path segment (does not cross /).
"""
i, n, out = 0, len(pat), []
while i < n:
if pat[i : i + 3] == "**/":
out.append("(?:[^/]+/)*")
i += 3
elif pat[i : i + 2] == "**":
out.append(".*")
i += 2
elif pat[i] == "*":
out.append("[^/]*")
i += 1
elif pat[i] == "?":
out.append("[^/]")
i += 1
else:
out.append(re.escape(pat[i]))
i += 1
return re.compile("".join(out))
def _unified_diff(old: str, new: str, path: str) -> Optional[Dict[str, Any]]:
if old == new:
return None
old_lines = old.splitlines()
new_lines = new.splitlines()
label = path or "file"
diff_lines = list(difflib.unified_diff(
old_lines, new_lines,
fromfile=f"a/{label}", tofile=f"b/{label}",
lineterm="",
))
added = sum(1 for line in diff_lines if line.startswith("+") and not line.startswith("+++"))
removed = sum(1 for line in diff_lines if line.startswith("-") and not line.startswith("---"))
truncated = False
if len(diff_lines) > MAX_DIFF_LINES:
diff_lines = diff_lines[:MAX_DIFF_LINES]
truncated = True
text = "\n".join(diff_lines)
if truncated:
text += f"\n… diff truncated at {MAX_DIFF_LINES} lines"
return {
"text": text,
"added": added,
"removed": removed,
"new_file": old == "",
"file": os.path.basename(path) or (path or "file"),
}
class EditFileTool:
async def execute(self, content: str, ctx: dict) -> dict:
from src.tool_execution import _resolve_tool_path, _resolve_search_root, _truncate
try:
args = json.loads(content) if content.strip().startswith("{") else {}
except (json.JSONDecodeError, TypeError):
args = {}
raw_path = (args.get("path") or "").strip()
old = args.get("old_string", "")
new = args.get("new_string", "")
replace_all = bool(args.get("replace_all", False))
if not raw_path:
return {"error": "edit_file: path required", "exit_code": 1}
try:
path = _resolve_tool_path(raw_path)
except ValueError as e:
return {"error": f"edit_file: {e}", "exit_code": 1}
if old == "":
return {"error": "edit_file: old_string required (use write_file to create a file)", "exit_code": 1}
if old == new:
return {"error": "edit_file: old_string and new_string are identical", "exit_code": 1}
def _apply():
"""Helper function that performs the actual string replacement and file writing logic."""
with open(path, "r", encoding="utf-8") as f:
original = f.read()
count = original.count(old)
if count == 0:
return original, None, "not_found"
if count > 1 and not replace_all:
return original, None, f"not_unique:{count}"
updated = original.replace(old, new) if replace_all else original.replace(old, new, 1)
with open(path, "w", encoding="utf-8") as f:
f.write(updated)
return original, updated, "ok"
try:
original, updated, status = await asyncio.to_thread(_apply)
except FileNotFoundError:
return {"error": f"edit_file: {path}: not found (use write_file to create it)", "exit_code": 1}
except (IsADirectoryError, UnicodeDecodeError):
return {"error": f"edit_file: {path}: not an editable text file", "exit_code": 1}
except PermissionError:
return {"error": f"edit_file: {path}: permission denied", "exit_code": 1}
except OSError as e:
return {"error": f"edit_file: {path}: {e}", "exit_code": 1}
if status == "not_found":
return {"error": f"edit_file: old_string not found in {path}. Read the file and match it exactly.", "exit_code": 1}
if status.startswith("not_unique"):
n = status.split(":", 1)[1]
return {"error": f"edit_file: old_string is not unique in {path} ({n} matches). Add surrounding context or set replace_all=true.", "exit_code": 1}
n = original.count(old)
result = {"output": f"Edited {path} ({n} replacement{'s' if n != 1 else ''})", "exit_code": 0}
diff = _unified_diff(original, updated, path)
if diff:
result["diff"] = diff
return result
class ReadFileTool:
async def execute(self, content: str, ctx: dict) -> dict:
from src.tool_execution import _resolve_tool_path, _resolve_search_root, _truncate
raw_path, offset, limit = content.split("\n", 1)[0].strip(), 0, 0
_stripped = content.strip()
if _stripped.startswith("{"):
try:
_a = json.loads(_stripped)
raw_path = str(_a.get("path", "")).strip()
offset = int(_a.get("offset") or 0)
limit = int(_a.get("limit") or 0)
except (json.JSONDecodeError, TypeError, ValueError):
pass
try:
path = _resolve_tool_path(raw_path)
except ValueError as e:
return {"error": f"read_file: {e}", "exit_code": 1}
try:
def _read():
if offset > 0 or limit > 0:
start = max(offset, 1)
out, n, budget = [], 0, MAX_READ_CHARS
with open(path, "r", encoding="utf-8", errors="replace") as f:
for i, line in enumerate(f, 1):
if i < start:
continue
if limit > 0 and n >= limit:
break
out.append(line)
n += 1
budget -= len(line)
if budget <= 0:
out.append(f"\n... [truncated at {MAX_READ_CHARS} chars]")
break
return "".join(out)
with open(path, "r", encoding="utf-8", errors="replace") as f:
return f.read(MAX_READ_CHARS + 1)
data = await asyncio.to_thread(_read)
except FileNotFoundError:
return {"error": f"read_file: {path}: not found", "exit_code": 1}
except PermissionError:
return {"error": f"read_file: {path}: permission denied", "exit_code": 1}
except IsADirectoryError:
return {"error": f"read_file: {path}: is a directory (use ls)", "exit_code": 1}
except OSError as e:
return {"error": f"read_file: {path}: {e}", "exit_code": 1}
if not (offset > 0 or limit > 0) and len(data) > MAX_READ_CHARS:
data = data[:MAX_READ_CHARS] + f"\n... [truncated at {MAX_READ_CHARS} chars]"
return {"output": data, "exit_code": 0}
class WriteFileTool:
async def execute(self, content: str, ctx: dict) -> dict:
from src.tool_execution import _resolve_tool_path, _resolve_search_root, _truncate
lines = content.split("\n", 1)
raw_path = lines[0].strip()
body = lines[1] if len(lines) > 1 else ""
try:
path = _resolve_tool_path(raw_path)
except ValueError as e:
return {"error": f"write_file: {e}", "exit_code": 1}
try:
def _write():
old = ""
try:
with open(path, "r", encoding="utf-8") as f:
old = f.read()
except (FileNotFoundError, IsADirectoryError, UnicodeDecodeError, OSError):
old = ""
d = os.path.dirname(path)
if d:
os.makedirs(d, exist_ok=True)
with open(path, "w", encoding="utf-8") as f:
f.write(body)
return old, len(body)
old_content, size = await asyncio.to_thread(_write)
except PermissionError:
return {"error": f"write_file: {path}: permission denied", "exit_code": 1}
except OSError as e:
return {"error": f"write_file: {path}: {e}", "exit_code": 1}
diff = _unified_diff(old_content, body, path)
result = {"output": f"Wrote {size} bytes to {path}", "exit_code": 0}
if diff:
result["diff"] = diff
return result
class LsTool:
async def execute(self, content: str, ctx: dict) -> dict:
from src.tool_execution import _resolve_tool_path, _resolve_search_root, _truncate
raw_path = ""
_s = (content or "").strip()
if _s.startswith("{"):
try:
raw_path = str(json.loads(_s).get("path", "")).strip()
except json.JSONDecodeError:
raw_path = ""
else:
raw_path = _s.split("\n", 1)[0].strip()
try:
root = _resolve_search_root(raw_path)
except ValueError as e:
return {"error": f"ls: {e}", "exit_code": 1}
def _ls():
if not os.path.isdir(root):
return None, f"ls: {root}: not a directory"
rows = []
try:
with os.scandir(root) as it:
for entry in it:
if entry.name.startswith("."):
continue
try:
is_dir = entry.is_dir(follow_symlinks=False)
size = entry.stat(follow_symlinks=False).st_size if not is_dir else 0
except OSError:
continue
rows.append((is_dir, entry.name, size))
except (PermissionError, OSError) as _e:
return None, f"ls: {_e}"
rows.sort(key=lambda r: (not r[0], r[1].lower()))
lines = [f"{root}:"]
for is_dir, name, size in rows[:_CODENAV_MAX_HITS]:
lines.append(f" {name}/" if is_dir else f" {name} ({size} B)")
if len(rows) > _CODENAV_MAX_HITS:
lines.append(f" ... [{len(rows) - _CODENAV_MAX_HITS} more]")
if not rows:
lines.append(" (empty)")
return "\n".join(lines), None
out, err = await asyncio.to_thread(_ls)
if err:
return {"error": err, "exit_code": 1}
return {"output": _truncate(out), "exit_code": 0}
class GlobTool:
async def execute(self, content: str, ctx: dict) -> dict:
from src.tool_execution import _resolve_tool_path, _resolve_search_root, _truncate
args = {}
_s = (content or "").strip()
if _s.startswith("{"):
try:
args = json.loads(_s)
except json.JSONDecodeError:
args = {}
else:
args = {"pattern": _s}
pattern = str(args.get("pattern", "")).strip()
if not pattern:
return {"error": "glob: pattern is required", "exit_code": 1}
try:
root = _resolve_search_root(str(args.get("path", "")))
except ValueError as e:
return {"error": f"glob: {e}", "exit_code": 1}
def _glob():
base = os.path.abspath(root)
if not os.path.isdir(base):
return None, f"glob: {root}: not a directory"
norm_pat = pattern.replace("\\", "/")
# Fast path: literal pattern (no wildcards) → direct path lookup.
if not any(c in norm_pat for c in "*?["):
cand = os.path.normpath(os.path.join(base, norm_pat))
if os.path.exists(cand):
return [cand], None
# Literal not at exact path — fall through to walk so
# e.g. "foo.py" still matches at any depth (like rglob).
# Compile glob to regex: * stays within one segment, **/ spans dirs.
regex = _glob_to_regex(norm_pat)
matched = []
cap = _CODENAV_MAX_HITS * 5
try:
for dp, dns, fns in os.walk(base):
# Prune skipped dirs before descending (unlike rglob which
# descends first then filters — fatal on large node_modules).
dns[:] = [d for d in dns if d not in _CODENAV_SKIP_DIRS]
for name in fns + dns:
full = os.path.join(dp, name)
rel = os.path.relpath(full, base).replace(os.sep, "/")
if regex.fullmatch(rel) or regex.fullmatch(name):
try:
mtime = os.stat(full).st_mtime
except OSError:
mtime = 0
matched.append((mtime, full))
if len(matched) > cap:
break
except OSError as _e:
return None, f"glob: {_e}"
matched.sort(key=lambda t: t[0], reverse=True)
return [pth for _, pth in matched[:_CODENAV_MAX_HITS]], None
paths, err = await asyncio.to_thread(_glob)
if err:
return {"error": err, "exit_code": 1}
if not paths:
return {"output": f"No files matching {pattern!r} under {root}", "exit_code": 0}
out = "\n".join(paths)
if len(paths) >= _CODENAV_MAX_HITS:
out += f"\n... [capped at {_CODENAV_MAX_HITS} files]"
return {"output": _truncate(out), "exit_code": 0}
class GrepTool:
async def execute(self, content: str, ctx: dict) -> dict:
from src.tool_execution import _resolve_tool_path, _resolve_search_root, _truncate
args: Dict[str, Any] = {}
_s = (content or "").strip()
if _s.startswith("{"):
try:
args = json.loads(_s)
except json.JSONDecodeError:
args = {}
else:
args = {"pattern": _s}
pattern = str(args.get("pattern", "")).strip()
if not pattern:
return {"error": "grep: pattern is required", "exit_code": 1}
ignore_case = bool(args.get("ignore_case"))
glob_pat = str(args.get("glob", "") or "").strip()
try:
max_hits = int(args.get("max_results") or _CODENAV_MAX_HITS)
except (TypeError, ValueError):
max_hits = _CODENAV_MAX_HITS
max_hits = max(1, min(max_hits, _CODENAV_MAX_HITS))
try:
root = _resolve_search_root(str(args.get("path", "")))
except ValueError as e:
return {"error": f"grep: {e}", "exit_code": 1}
def _grep():
import re as _re
import shutil
rg = shutil.which("rg")
if rg:
cmd = [rg, "--line-number", "--no-heading", "--color=never",
"--max-count", str(max_hits)]
if ignore_case:
cmd.append("--ignore-case")
if glob_pat:
cmd += ["--glob", glob_pat]
for _d in _CODENAV_SKIP_DIRS:
cmd += ["--glob", f"!**/{_d}/**"]
cmd += ["--regexp", pattern, root]
try:
import subprocess
p = subprocess.run(cmd, capture_output=True, text=True, timeout=20)
lines = [ln for ln in (p.stdout or "").splitlines() if ln][:max_hits]
return lines, None
except subprocess.TimeoutExpired:
return None, "grep: timed out"
except Exception as _e:
return None, f"grep: {_e}"
try:
rx = _re.compile(pattern, _re.IGNORECASE if ignore_case else 0)
except _re.error as _e:
return None, f"grep: bad pattern: {_e}"
hits = []
if os.path.isfile(root):
file_iter = [root]
else:
file_iter = []
for dp, dns, fns in os.walk(root):
dns[:] = [d for d in dns if d not in _CODENAV_SKIP_DIRS]
for fn in fns:
if glob_pat and not fnmatch.fnmatch(fn, glob_pat):
continue
file_iter.append(os.path.join(dp, fn))
for fp in file_iter:
if len(hits) >= max_hits:
break
try:
with open(fp, "r", encoding="utf-8", errors="strict") as f:
for i, line in enumerate(f, 1):
if rx.search(line):
hits.append(f"{fp}:{i}:{line.rstrip()[:_CODENAV_MAX_LINE]}")
if len(hits) >= max_hits:
break
except (UnicodeDecodeError, OSError):
continue
return hits, None
lines, err = await asyncio.to_thread(_grep)
if err:
return {"error": err, "exit_code": 1}
if not lines:
return {"output": f"No matches for {pattern!r} under {root}", "exit_code": 0}
out = "\n".join(ln[:_CODENAV_MAX_LINE] for ln in lines)
if len(lines) >= max_hits:
out += f"\n... [capped at {max_hits} matches]"
return {"output": _truncate(out), "exit_code": 0}
class GetWorkspaceTool:
"""Report the active workspace folder (no args). File tools are confined to
it; the shell starts there (cwd) but is NOT sandboxed."""
async def execute(self, content: str, ctx: dict) -> dict:
from src.tool_execution import get_active_workspace
ws = get_active_workspace()
if ws:
return {
"output": f"{ws}\n(File tools are confined to this folder; the shell starts "
f"here but is not sandboxed and can reach outside it.)",
"exit_code": 0,
}
return {
"output": "No workspace is set. File tools use the default allowed roots; "
"resolve paths from the user or use absolute paths.",
"exit_code": 0,
}