import asyncio import json import os import difflib import fnmatch import shutil from typing import Optional, Dict, Any, Tuple from src.constants import MAX_READ_CHARS, MAX_DIFF_LINES, MAX_OUTPUT_CHARS _CODENAV_SKIP_DIRS = frozenset({ ".git", ".hg", ".svn", "node_modules", "venv", ".venv", "__pycache__", ".mypy_cache", ".pytest_cache", ".ruff_cache", "dist", "build", ".next", ".cache", "site-packages", ".idea", ".tox", }) _CODENAV_MAX_HITS = 200 _CODENAV_MAX_LINE = 400 def _unified_diff(old: str, new: str, path: str) -> Optional[Dict[str, Any]]: if old == new: return None old_lines = old.splitlines() new_lines = new.splitlines() label = path or "file" diff_lines = list(difflib.unified_diff( old_lines, new_lines, fromfile=f"a/{label}", tofile=f"b/{label}", lineterm="", )) added = sum(1 for line in diff_lines if line.startswith("+") and not line.startswith("+++")) removed = sum(1 for line in diff_lines if line.startswith("-") and not line.startswith("---")) truncated = False if len(diff_lines) > MAX_DIFF_LINES: diff_lines = diff_lines[:MAX_DIFF_LINES] truncated = True text = "\n".join(diff_lines) if truncated: text += f"\n… diff truncated at {MAX_DIFF_LINES} lines" return { "text": text, "added": added, "removed": removed, "new_file": old == "", "file": os.path.basename(path) or (path or "file"), } class EditFileTool: async def execute(self, content: str, ctx: dict) -> dict: from src.tool_execution import _resolve_tool_path, _resolve_search_root, _truncate try: args = json.loads(content) if content.strip().startswith("{") else {} except (json.JSONDecodeError, TypeError): args = {} raw_path = (args.get("path") or "").strip() old = args.get("old_string", "") new = args.get("new_string", "") replace_all = bool(args.get("replace_all", False)) if not raw_path: return {"error": "edit_file: path required", "exit_code": 1} try: path = _resolve_tool_path(raw_path) except ValueError as e: return {"error": f"edit_file: {e}", "exit_code": 1} if old == "": return {"error": "edit_file: old_string required (use write_file to create a file)", "exit_code": 1} if old == new: return {"error": "edit_file: old_string and new_string are identical", "exit_code": 1} def _apply(): """Helper function that performs the actual string replacement and file writing logic.""" with open(path, "r", encoding="utf-8") as f: original = f.read() count = original.count(old) if count == 0: return original, None, "not_found" if count > 1 and not replace_all: return original, None, f"not_unique:{count}" updated = original.replace(old, new) if replace_all else original.replace(old, new, 1) with open(path, "w", encoding="utf-8") as f: f.write(updated) return original, updated, "ok" try: original, updated, status = await asyncio.to_thread(_apply) except FileNotFoundError: return {"error": f"edit_file: {path}: not found (use write_file to create it)", "exit_code": 1} except (IsADirectoryError, UnicodeDecodeError): return {"error": f"edit_file: {path}: not an editable text file", "exit_code": 1} except PermissionError: return {"error": f"edit_file: {path}: permission denied", "exit_code": 1} except OSError as e: return {"error": f"edit_file: {path}: {e}", "exit_code": 1} if status == "not_found": return {"error": f"edit_file: old_string not found in {path}. Read the file and match it exactly.", "exit_code": 1} if status.startswith("not_unique"): n = status.split(":", 1)[1] return {"error": f"edit_file: old_string is not unique in {path} ({n} matches). Add surrounding context or set replace_all=true.", "exit_code": 1} n = original.count(old) result = {"output": f"Edited {path} ({n} replacement{'s' if n != 1 else ''})", "exit_code": 0} diff = _unified_diff(original, updated, path) if diff: result["diff"] = diff return result class ReadFileTool: async def execute(self, content: str, ctx: dict) -> dict: from src.tool_execution import _resolve_tool_path, _resolve_search_root, _truncate raw_path, offset, limit = content.split("\n", 1)[0].strip(), 0, 0 _stripped = content.strip() if _stripped.startswith("{"): try: _a = json.loads(_stripped) raw_path = str(_a.get("path", "")).strip() offset = int(_a.get("offset") or 0) limit = int(_a.get("limit") or 0) except (json.JSONDecodeError, TypeError, ValueError): pass try: path = _resolve_tool_path(raw_path) except ValueError as e: return {"error": f"read_file: {e}", "exit_code": 1} try: def _read(): if offset > 0 or limit > 0: start = max(offset, 1) out, n, budget = [], 0, MAX_READ_CHARS with open(path, "r", encoding="utf-8", errors="replace") as f: for i, line in enumerate(f, 1): if i < start: continue if limit > 0 and n >= limit: break out.append(line) n += 1 budget -= len(line) if budget <= 0: out.append(f"\n... [truncated at {MAX_READ_CHARS} chars]") break return "".join(out) with open(path, "r", encoding="utf-8", errors="replace") as f: return f.read(MAX_READ_CHARS + 1) data = await asyncio.to_thread(_read) except FileNotFoundError: return {"error": f"read_file: {path}: not found", "exit_code": 1} except PermissionError: return {"error": f"read_file: {path}: permission denied", "exit_code": 1} except IsADirectoryError: return {"error": f"read_file: {path}: is a directory (use ls)", "exit_code": 1} except OSError as e: return {"error": f"read_file: {path}: {e}", "exit_code": 1} if not (offset > 0 or limit > 0) and len(data) > MAX_READ_CHARS: data = data[:MAX_READ_CHARS] + f"\n... [truncated at {MAX_READ_CHARS} chars]" return {"output": data, "exit_code": 0} class WriteFileTool: async def execute(self, content: str, ctx: dict) -> dict: from src.tool_execution import _resolve_tool_path, _resolve_search_root, _truncate lines = content.split("\n", 1) raw_path = lines[0].strip() body = lines[1] if len(lines) > 1 else "" try: path = _resolve_tool_path(raw_path) except ValueError as e: return {"error": f"write_file: {e}", "exit_code": 1} try: def _write(): old = "" try: with open(path, "r", encoding="utf-8") as f: old = f.read() except (FileNotFoundError, IsADirectoryError, UnicodeDecodeError, OSError): old = "" d = os.path.dirname(path) if d: os.makedirs(d, exist_ok=True) with open(path, "w", encoding="utf-8") as f: f.write(body) return old, len(body) old_content, size = await asyncio.to_thread(_write) except PermissionError: return {"error": f"write_file: {path}: permission denied", "exit_code": 1} except OSError as e: return {"error": f"write_file: {path}: {e}", "exit_code": 1} diff = _unified_diff(old_content, body, path) result = {"output": f"Wrote {size} bytes to {path}", "exit_code": 0} if diff: result["diff"] = diff return result class LsTool: async def execute(self, content: str, ctx: dict) -> dict: from src.tool_execution import _resolve_tool_path, _resolve_search_root, _truncate raw_path = "" _s = (content or "").strip() if _s.startswith("{"): try: raw_path = str(json.loads(_s).get("path", "")).strip() except json.JSONDecodeError: raw_path = "" else: raw_path = _s.split("\n", 1)[0].strip() try: root = _resolve_search_root(raw_path) except ValueError as e: return {"error": f"ls: {e}", "exit_code": 1} def _ls(): if not os.path.isdir(root): return None, f"ls: {root}: not a directory" rows = [] try: with os.scandir(root) as it: for entry in it: if entry.name.startswith("."): continue try: is_dir = entry.is_dir(follow_symlinks=False) size = entry.stat(follow_symlinks=False).st_size if not is_dir else 0 except OSError: continue rows.append((is_dir, entry.name, size)) except (PermissionError, OSError) as _e: return None, f"ls: {_e}" rows.sort(key=lambda r: (not r[0], r[1].lower())) lines = [f"{root}:"] for is_dir, name, size in rows[:_CODENAV_MAX_HITS]: lines.append(f" {name}/" if is_dir else f" {name} ({size} B)") if len(rows) > _CODENAV_MAX_HITS: lines.append(f" ... [{len(rows) - _CODENAV_MAX_HITS} more]") if not rows: lines.append(" (empty)") return "\n".join(lines), None out, err = await asyncio.to_thread(_ls) if err: return {"error": err, "exit_code": 1} return {"output": _truncate(out), "exit_code": 0} class GlobTool: async def execute(self, content: str, ctx: dict) -> dict: from src.tool_execution import _resolve_tool_path, _resolve_search_root, _truncate args = {} _s = (content or "").strip() if _s.startswith("{"): try: args = json.loads(_s) except json.JSONDecodeError: args = {} else: args = {"pattern": _s} pattern = str(args.get("pattern", "")).strip() if not pattern: return {"error": "glob: pattern is required", "exit_code": 1} try: root = _resolve_search_root(str(args.get("path", ""))) except ValueError as e: return {"error": f"glob: {e}", "exit_code": 1} def _glob(): from pathlib import Path base = Path(root) if not base.is_dir(): return None, f"glob: {root}: not a directory" matched = [] try: for p in base.rglob(pattern): if set(p.relative_to(base).parts) & _CODENAV_SKIP_DIRS: continue try: mtime = p.stat().st_mtime except OSError: mtime = 0 matched.append((mtime, str(p))) if len(matched) > _CODENAV_MAX_HITS * 5: break except (OSError, ValueError) as _e: return None, f"glob: {_e}" matched.sort(key=lambda t: t[0], reverse=True) return [pth for _, pth in matched[:_CODENAV_MAX_HITS]], None paths, err = await asyncio.to_thread(_glob) if err: return {"error": err, "exit_code": 1} if not paths: return {"output": f"No files matching {pattern!r} under {root}", "exit_code": 0} out = "\n".join(paths) if len(paths) >= _CODENAV_MAX_HITS: out += f"\n... [capped at {_CODENAV_MAX_HITS} files]" return {"output": _truncate(out), "exit_code": 0} class GrepTool: async def execute(self, content: str, ctx: dict) -> dict: from src.tool_execution import _resolve_tool_path, _resolve_search_root, _truncate args: Dict[str, Any] = {} _s = (content or "").strip() if _s.startswith("{"): try: args = json.loads(_s) except json.JSONDecodeError: args = {} else: args = {"pattern": _s} pattern = str(args.get("pattern", "")).strip() if not pattern: return {"error": "grep: pattern is required", "exit_code": 1} ignore_case = bool(args.get("ignore_case")) glob_pat = str(args.get("glob", "") or "").strip() try: max_hits = int(args.get("max_results") or _CODENAV_MAX_HITS) except (TypeError, ValueError): max_hits = _CODENAV_MAX_HITS max_hits = max(1, min(max_hits, _CODENAV_MAX_HITS)) try: root = _resolve_search_root(str(args.get("path", ""))) except ValueError as e: return {"error": f"grep: {e}", "exit_code": 1} def _grep(): import re as _re import shutil rg = shutil.which("rg") if rg: cmd = [rg, "--line-number", "--no-heading", "--color=never", "--max-count", str(max_hits)] if ignore_case: cmd.append("--ignore-case") if glob_pat: cmd += ["--glob", glob_pat] for _d in _CODENAV_SKIP_DIRS: cmd += ["--glob", f"!**/{_d}/**"] cmd += ["--regexp", pattern, root] try: import subprocess p = subprocess.run(cmd, capture_output=True, text=True, timeout=20) lines = [ln for ln in (p.stdout or "").splitlines() if ln][:max_hits] return lines, None except subprocess.TimeoutExpired: return None, "grep: timed out" except Exception as _e: return None, f"grep: {_e}" try: rx = _re.compile(pattern, _re.IGNORECASE if ignore_case else 0) except _re.error as _e: return None, f"grep: bad pattern: {_e}" hits = [] if os.path.isfile(root): file_iter = [root] else: file_iter = [] for dp, dns, fns in os.walk(root): dns[:] = [d for d in dns if d not in _CODENAV_SKIP_DIRS] for fn in fns: if glob_pat and not fnmatch.fnmatch(fn, glob_pat): continue file_iter.append(os.path.join(dp, fn)) for fp in file_iter: if len(hits) >= max_hits: break try: with open(fp, "r", encoding="utf-8", errors="strict") as f: for i, line in enumerate(f, 1): if rx.search(line): hits.append(f"{fp}:{i}:{line.rstrip()[:_CODENAV_MAX_LINE]}") if len(hits) >= max_hits: break except (UnicodeDecodeError, OSError): continue return hits, None lines, err = await asyncio.to_thread(_grep) if err: return {"error": err, "exit_code": 1} if not lines: return {"output": f"No matches for {pattern!r} under {root}", "exit_code": 0} out = "\n".join(ln[:_CODENAV_MAX_LINE] for ln in lines) if len(lines) >= max_hits: out += f"\n... [capped at {max_hits} matches]" return {"output": _truncate(out), "exit_code": 0} class GetWorkspaceTool: """Report the active workspace folder (no args). File tools are confined to it; the shell starts there (cwd) but is NOT sandboxed.""" async def execute(self, content: str, ctx: dict) -> dict: from src.tool_execution import get_active_workspace ws = get_active_workspace() if ws: return { "output": f"{ws}\n(File tools are confined to this folder; the shell starts " f"here but is not sandboxed and can reach outside it.)", "exit_code": 0, } return { "output": "No workspace is set. File tools use the default allowed roots; " "resolve paths from the user or use absolute paths.", "exit_code": 0, }