mirror of
https://github.com/pewdiepie-archdaemon/odysseus.git
synced 2026-06-15 17:25:26 -04:00
refactor(tools): remove dead workspace-confinement plumbing (#3590)
Commit e6b1009 removed the workspace feature's entry point (deleted
routes/workspace_routes.py + static/js/workspace.js and dropped the
workspace-param parsing in chat_routes), but left the downstream backend
plumbing dangling: chat_routes passed a hardcoded workspace=None into
stream_agent_loop, which forwarded it to execute_tool_block, so the
workspace value was permanently None and every workspace-gated branch
was unreachable.
Remove the now-dead code (no behavior change, since workspace was always
None):
- src/tool_execution.py: drop _resolve_tool_path_in_workspace and the
workspace params/branches on execute_tool_block, _direct_fallback,
_call_mcp_tool, _do_edit_file, and _resolve_search_root; restore the
bash/python/bg cwd to _AGENT_WORKDIR.
- src/agent_loop.py: drop the workspace param on stream_agent_loop, the
dead 'ACTIVE WORKSPACE' system-prompt block, and the workspace forward.
- routes/chat_routes.py: drop the hardcoded workspace=None arg and var.
- tests: delete test_workspace_confine.py (tested the removed feature) and
the workspace assertion in test_tool_policy.py.
Full suite: 2903 passed, 1 skipped.
This commit is contained in:
committed by
GitHub
parent
fbed9027b0
commit
0aba00f4cf
@@ -456,7 +456,6 @@ def setup_chat_routes(
|
||||
# manual form posts that still send plan_mode=true.
|
||||
plan_mode = False
|
||||
chat_mode = str(form_data.get("mode", "")).lower() # 'chat' or 'agent'
|
||||
workspace = ""
|
||||
# Plan mode is a modifier on agent mode — it only makes sense with tools.
|
||||
if plan_mode:
|
||||
chat_mode = "agent"
|
||||
@@ -1135,7 +1134,6 @@ def setup_chat_routes(
|
||||
tool_policy=tool_policy,
|
||||
owner=_user,
|
||||
fallbacks=_fallback_candidates,
|
||||
workspace=None,
|
||||
plan_mode=plan_mode,
|
||||
approved_plan=approved_plan or None,
|
||||
):
|
||||
|
||||
@@ -1707,7 +1707,6 @@ async def stream_agent_loop(
|
||||
owner: Optional[str] = None,
|
||||
relevant_tools: Optional[Set[str]] = None,
|
||||
fallbacks: Optional[List[tuple]] = None,
|
||||
workspace: Optional[str] = None,
|
||||
plan_mode: bool = False,
|
||||
approved_plan: Optional[str] = None,
|
||||
tool_policy: Optional[ToolPolicy] = None,
|
||||
@@ -1935,27 +1934,6 @@ async def stream_agent_loop(
|
||||
owner=owner,
|
||||
suppress_local_context=guide_only,
|
||||
)
|
||||
if workspace and not guide_only:
|
||||
# PREPEND (not append) so it dominates the large base prompt — appended
|
||||
# at the end, small models ignored it and asked the user for code. The
|
||||
# folder IS the project; the agent must explore it, not ask.
|
||||
_ws_note = (
|
||||
f"## ACTIVE WORKSPACE — READ FIRST\n"
|
||||
f"The user is working in this folder: {workspace}\n"
|
||||
f"It IS the project. bash/python run with cwd set here and "
|
||||
f"read_file/write_file are confined to it (paths outside are rejected).\n"
|
||||
f"When the user says \"the code\" / \"this project\" / \"the workspace\" "
|
||||
f"or asks to review/find/edit something WITHOUT a path, they mean THIS "
|
||||
f"folder. Do NOT ask the user for code or a path, and do NOT read a file "
|
||||
f"literally named \"workspace\". ALWAYS start by exploring it yourself: "
|
||||
f"run `bash` → `git ls-files` (or `ls -R`) to see the files, then "
|
||||
f"read_file the relevant ones by path RELATIVE to the workspace."
|
||||
)
|
||||
if messages and messages[0].get("role") == "system":
|
||||
messages[0]["content"] = _ws_note + "\n\n" + (messages[0].get("content") or "")
|
||||
else:
|
||||
messages.insert(0, {"role": "system", "content": _ws_note})
|
||||
logger.info("[workspace] active for this turn: %s", workspace)
|
||||
if plan_mode and not guide_only:
|
||||
# Steer the model to investigate-then-propose. Hard tool gating handles
|
||||
# every write path except shell; this directive is what keeps the
|
||||
@@ -2649,7 +2627,6 @@ async def stream_agent_loop(
|
||||
tool_policy=tool_policy,
|
||||
owner=owner,
|
||||
progress_cb=_push_progress,
|
||||
workspace=workspace,
|
||||
)
|
||||
finally:
|
||||
# Sentinel so the drainer knows to stop.
|
||||
|
||||
+19
-67
@@ -67,13 +67,12 @@ def _unified_diff(old: str, new: str, path: str) -> Optional[Dict[str, Any]]:
|
||||
}
|
||||
|
||||
|
||||
async def _do_edit_file(content: str, workspace: Optional[str] = None) -> Dict[str, Any]:
|
||||
async def _do_edit_file(content: str) -> Dict[str, Any]:
|
||||
"""Exact string-replacement edit of an on-disk file.
|
||||
|
||||
content is JSON: {"path", "old_string", "new_string", "replace_all"?}.
|
||||
Fails if old_string is missing or non-unique (unless replace_all) so the
|
||||
model can't silently edit the wrong place. Returns a unified diff for the UI.
|
||||
Confined to the workspace when one is set (same policy as write_file).
|
||||
"""
|
||||
try:
|
||||
args = json.loads(content) if content.strip().startswith("{") else {}
|
||||
@@ -85,11 +84,9 @@ async def _do_edit_file(content: str, workspace: Optional[str] = None) -> Dict[s
|
||||
replace_all = bool(args.get("replace_all", False))
|
||||
if not raw_path:
|
||||
return {"error": "edit_file: path required", "exit_code": 1}
|
||||
# Confine to the workspace when set, else the same allowlist + sensitive-file
|
||||
# policy as read/write_file.
|
||||
# Allowlist + sensitive-file policy as read/write_file.
|
||||
try:
|
||||
path = (_resolve_tool_path_in_workspace(workspace, raw_path)
|
||||
if workspace else _resolve_tool_path(raw_path))
|
||||
path = _resolve_tool_path(raw_path)
|
||||
except ValueError as e:
|
||||
return {"error": f"edit_file: {e}", "exit_code": 1}
|
||||
if old == "":
|
||||
@@ -272,39 +269,6 @@ def _resolve_tool_path(raw_path: str) -> str:
|
||||
)
|
||||
|
||||
|
||||
def _resolve_tool_path_in_workspace(workspace: str, raw_path: str) -> str:
|
||||
"""Confine a model-supplied path to the active workspace.
|
||||
|
||||
Layered on top of upstream's path policy: the workspace is the allowed
|
||||
root (relative paths resolve under it; paths that escape it are rejected),
|
||||
and the sensitive-file deny list (.ssh, .gnupg, id_rsa, …) still applies
|
||||
inside it. When no workspace is set, callers use _resolve_tool_path (the
|
||||
default data/tmp allowlist) instead.
|
||||
"""
|
||||
if raw_path is None or not str(raw_path).strip():
|
||||
raise ValueError("path is required")
|
||||
base = os.path.realpath(workspace)
|
||||
expanded = os.path.expanduser(str(raw_path).strip())
|
||||
candidate = expanded if os.path.isabs(expanded) else os.path.join(base, expanded)
|
||||
resolved = os.path.realpath(candidate)
|
||||
if _is_sensitive_path(resolved):
|
||||
raise ValueError(
|
||||
f"path '{raw_path}' is inside a sensitive directory "
|
||||
f"(e.g. .ssh, .gnupg) or matches a sensitive filename"
|
||||
)
|
||||
if resolved != base:
|
||||
# normcase so containment holds on case-insensitive filesystems
|
||||
# (Windows, default macOS): it lowercases on Windows and is a no-op on
|
||||
# POSIX. commonpath raises ValueError across Windows drives (C: vs D:)
|
||||
# or mixed abs/rel — both mean "outside", so the except rejects them.
|
||||
nbase = os.path.normcase(base)
|
||||
try:
|
||||
if os.path.commonpath([os.path.normcase(resolved), nbase]) != nbase:
|
||||
raise ValueError
|
||||
except ValueError:
|
||||
raise ValueError(f"path '{raw_path}' is outside the workspace ({workspace})")
|
||||
return resolved
|
||||
|
||||
# Bash + python tools used to share a single 60s timeout. That's
|
||||
# enough for one-shot commands but starves real workloads (pip
|
||||
# install, ffmpeg conversions, etc.) — and worse, the agent saw the
|
||||
@@ -341,19 +305,13 @@ _CODENAV_MAX_HITS = 200
|
||||
_CODENAV_MAX_LINE = 400
|
||||
|
||||
|
||||
def _resolve_search_root(raw_path: str, workspace: Optional[str] = None) -> str:
|
||||
def _resolve_search_root(raw_path: str) -> str:
|
||||
"""Resolve + confine a code-nav path (grep/glob/ls).
|
||||
|
||||
With a workspace set, the workspace folder is the root and supplied paths are
|
||||
confined inside it (same policy as read_file). Without one, an empty path
|
||||
defaults to the agent's primary root (project data dir) and a supplied path
|
||||
is confined by the global allowlist + sensitive-file policy.
|
||||
An empty path defaults to the agent's primary root (project data dir) and a
|
||||
supplied path is confined by the global allowlist + sensitive-file policy.
|
||||
"""
|
||||
raw = (raw_path or "").strip()
|
||||
if workspace:
|
||||
if not raw:
|
||||
return os.path.realpath(workspace)
|
||||
return _resolve_tool_path_in_workspace(workspace, raw)
|
||||
if not raw:
|
||||
roots = _tool_path_roots()
|
||||
return roots[0] if roots else os.path.realpath(".")
|
||||
@@ -564,12 +522,11 @@ async def _call_mcp_tool(
|
||||
tool: str,
|
||||
content: str,
|
||||
progress_cb: Optional[Callable[[Dict], Awaitable[None]]] = None,
|
||||
workspace: Optional[str] = None,
|
||||
) -> Dict:
|
||||
"""Route a legacy tool call through the MCP manager, with direct fallbacks."""
|
||||
mcp = get_mcp_manager()
|
||||
if not mcp:
|
||||
return await _direct_fallback(tool, content, progress_cb=progress_cb, workspace=workspace) or {"error": f"MCP manager not available for tool '{tool}'", "exit_code": 1}
|
||||
return await _direct_fallback(tool, content, progress_cb=progress_cb) or {"error": f"MCP manager not available for tool '{tool}'", "exit_code": 1}
|
||||
|
||||
server_id, tool_name = _MCP_TOOL_MAP[tool]
|
||||
qualified = f"mcp__{server_id}__{tool_name}"
|
||||
@@ -578,7 +535,7 @@ async def _call_mcp_tool(
|
||||
|
||||
# If MCP server not connected, try direct fallback
|
||||
if isinstance(result, dict) and result.get("exit_code") == 1 and "not connected" in result.get("error", ""):
|
||||
fallback = await _direct_fallback(tool, content, progress_cb=progress_cb, workspace=workspace)
|
||||
fallback = await _direct_fallback(tool, content, progress_cb=progress_cb)
|
||||
if fallback:
|
||||
return fallback
|
||||
|
||||
@@ -636,7 +593,6 @@ async def _direct_fallback(
|
||||
tool: str,
|
||||
content: str,
|
||||
progress_cb: Optional[Callable[[Dict], Awaitable[None]]] = None,
|
||||
workspace: Optional[str] = None,
|
||||
) -> Optional[Dict]:
|
||||
"""In-process execution path for the eight tools that used to live as
|
||||
stdio MCP servers under mcp_servers/. Those servers were deleted in
|
||||
@@ -670,7 +626,7 @@ async def _direct_fallback(
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE,
|
||||
env=_subproc_env,
|
||||
cwd=workspace or _AGENT_WORKDIR,
|
||||
cwd=_AGENT_WORKDIR,
|
||||
)
|
||||
stdout, stderr, rc, timed_out = await _run_subprocess_streaming(
|
||||
proc,
|
||||
@@ -697,7 +653,7 @@ async def _direct_fallback(
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE,
|
||||
env=_subproc_env,
|
||||
cwd=workspace or _AGENT_WORKDIR,
|
||||
cwd=_AGENT_WORKDIR,
|
||||
)
|
||||
stdout, stderr, rc, timed_out = await _run_subprocess_streaming(
|
||||
proc,
|
||||
@@ -727,8 +683,7 @@ async def _direct_fallback(
|
||||
except (json.JSONDecodeError, TypeError, ValueError):
|
||||
pass
|
||||
try:
|
||||
path = (_resolve_tool_path_in_workspace(workspace, raw_path)
|
||||
if workspace else _resolve_tool_path(raw_path))
|
||||
path = _resolve_tool_path(raw_path)
|
||||
except ValueError as e:
|
||||
return {"error": f"read_file: {e}", "exit_code": 1}
|
||||
try:
|
||||
@@ -771,8 +726,7 @@ async def _direct_fallback(
|
||||
raw_path = lines[0].strip()
|
||||
body = lines[1] if len(lines) > 1 else ""
|
||||
try:
|
||||
path = (_resolve_tool_path_in_workspace(workspace, raw_path)
|
||||
if workspace else _resolve_tool_path(raw_path))
|
||||
path = _resolve_tool_path(raw_path)
|
||||
except ValueError as e:
|
||||
return {"error": f"write_file: {e}", "exit_code": 1}
|
||||
try:
|
||||
@@ -825,7 +779,7 @@ async def _direct_fallback(
|
||||
max_hits = _CODENAV_MAX_HITS
|
||||
max_hits = max(1, min(max_hits, _CODENAV_MAX_HITS))
|
||||
try:
|
||||
root = _resolve_search_root(str(args.get("path", "")), workspace)
|
||||
root = _resolve_search_root(str(args.get("path", "")))
|
||||
except ValueError as e:
|
||||
return {"error": f"grep: {e}", "exit_code": 1}
|
||||
|
||||
@@ -909,7 +863,7 @@ async def _direct_fallback(
|
||||
if not pattern:
|
||||
return {"error": "glob: pattern is required", "exit_code": 1}
|
||||
try:
|
||||
root = _resolve_search_root(str(args.get("path", "")), workspace)
|
||||
root = _resolve_search_root(str(args.get("path", "")))
|
||||
except ValueError as e:
|
||||
return {"error": f"glob: {e}", "exit_code": 1}
|
||||
|
||||
@@ -956,7 +910,7 @@ async def _direct_fallback(
|
||||
else:
|
||||
raw_path = _s.split("\n", 1)[0].strip()
|
||||
try:
|
||||
root = _resolve_search_root(raw_path, workspace)
|
||||
root = _resolve_search_root(raw_path)
|
||||
except ValueError as e:
|
||||
return {"error": f"ls: {e}", "exit_code": 1}
|
||||
|
||||
@@ -1121,7 +1075,6 @@ async def execute_tool_block(
|
||||
tool_policy: Optional[ToolPolicy] = None,
|
||||
owner: Optional[str] = None,
|
||||
progress_cb: Optional[Callable[[Dict], Awaitable[None]]] = None,
|
||||
workspace: Optional[str] = None,
|
||||
) -> Tuple[str, Dict]:
|
||||
"""Execute a single tool block. Returns (description, result_dict).
|
||||
|
||||
@@ -1296,7 +1249,7 @@ async def execute_tool_block(
|
||||
_is_bg, _bg_cmd = _split_bg_marker(content)
|
||||
if _is_bg and _bg_cmd:
|
||||
from src import bg_jobs
|
||||
rec = bg_jobs.launch(_bg_cmd, session_id=session_id, cwd=workspace or _AGENT_WORKDIR)
|
||||
rec = bg_jobs.launch(_bg_cmd, session_id=session_id, cwd=_AGENT_WORKDIR)
|
||||
short = _bg_cmd.strip().split(chr(10))[0][:80]
|
||||
desc = f"bash (background): {short}"
|
||||
result = {
|
||||
@@ -1318,13 +1271,12 @@ async def execute_tool_block(
|
||||
if tool in _MCP_TOOL_MAP:
|
||||
first_line = content.split(chr(10))[0][:80]
|
||||
desc = f"{tool}: {first_line}"
|
||||
result = await _call_mcp_tool(tool, content, progress_cb=progress_cb, workspace=workspace)
|
||||
result = await _call_mcp_tool(tool, content, progress_cb=progress_cb)
|
||||
elif tool in ("grep", "glob", "ls"):
|
||||
# Code-navigation tools — no MCP server; run the direct implementation.
|
||||
# Confined to the workspace when one is set (same policy as read_file).
|
||||
first_line = content.split(chr(10))[0][:80]
|
||||
desc = f"{tool}: {first_line}"
|
||||
result = await _direct_fallback(tool, content, progress_cb=progress_cb, workspace=workspace) \
|
||||
result = await _direct_fallback(tool, content, progress_cb=progress_cb) \
|
||||
or {"error": f"{tool}: execution failed", "exit_code": 1}
|
||||
elif tool == "create_document":
|
||||
title = content.split("\n")[0].strip()[:60]
|
||||
@@ -1429,7 +1381,7 @@ async def execute_tool_block(
|
||||
desc = "edit_image"
|
||||
result = await do_edit_image(content, owner=owner)
|
||||
elif tool == "edit_file":
|
||||
result = await _do_edit_file(content, workspace=workspace)
|
||||
result = await _do_edit_file(content)
|
||||
desc = result.get("output") or result.get("error") or "edit_file"
|
||||
elif tool == "trigger_research":
|
||||
desc = "trigger_research"
|
||||
|
||||
@@ -238,36 +238,6 @@ def test_guide_only_blocks_later_round_document_streaming(monkeypatch):
|
||||
assert not any(event.get("type") == "doc_stream_delta" for event in events)
|
||||
|
||||
|
||||
def test_guide_only_directive_dominates_workspace_prompt(monkeypatch):
|
||||
_patch_loop_basics(monkeypatch)
|
||||
system_prompts = []
|
||||
|
||||
async def _fake_stream(_candidates, messages, **kwargs):
|
||||
system_prompts.append(messages[0]["content"])
|
||||
yield _delta_chunk("ok")
|
||||
yield "data: [DONE]\n\n"
|
||||
|
||||
monkeypatch.setattr(al, "stream_llm_with_fallback", _fake_stream, raising=False)
|
||||
policy = build_effective_tool_policy(last_user_message="Do not use tools.")
|
||||
|
||||
_collect(
|
||||
al.stream_agent_loop(
|
||||
"http://local.test/v1",
|
||||
"local-model",
|
||||
[{"role": "user", "content": "Do not use tools."}],
|
||||
max_rounds=1,
|
||||
relevant_tools={"bash"},
|
||||
tool_policy=policy,
|
||||
workspace="/tmp/project",
|
||||
)
|
||||
)
|
||||
|
||||
assert system_prompts
|
||||
assert system_prompts[0].startswith("## GUIDE-ONLY MODE")
|
||||
assert "ACTIVE WORKSPACE" not in system_prompts[0]
|
||||
assert "ALWAYS start by exploring" not in system_prompts[0]
|
||||
|
||||
|
||||
def test_guide_only_skips_intent_without_action_nudge(monkeypatch):
|
||||
_patch_loop_basics(monkeypatch)
|
||||
|
||||
|
||||
@@ -1,107 +0,0 @@
|
||||
"""Workspace confinement: file tools are hard-bounded to the workspace folder
|
||||
(layered on upstream's sensitive-path policy); bash runs with cwd there."""
|
||||
import os
|
||||
import tempfile
|
||||
|
||||
import pytest
|
||||
|
||||
from src.tool_execution import _resolve_tool_path_in_workspace, _direct_fallback
|
||||
|
||||
|
||||
def test_workspace_resolver_confines():
|
||||
ws = tempfile.mkdtemp()
|
||||
open(os.path.join(ws, "a.txt"), "w").write("x")
|
||||
real = os.path.realpath(os.path.join(ws, "a.txt"))
|
||||
# relative path resolves under the workspace
|
||||
assert _resolve_tool_path_in_workspace(ws, "a.txt") == real
|
||||
# absolute path inside the workspace is allowed
|
||||
assert _resolve_tool_path_in_workspace(ws, os.path.join(ws, "a.txt")) == real
|
||||
# absolute path outside is rejected (sibling temp dir, portable across OSes)
|
||||
outside = tempfile.mkdtemp()
|
||||
with pytest.raises(ValueError):
|
||||
_resolve_tool_path_in_workspace(ws, os.path.join(outside, "x.txt"))
|
||||
# parent-escape is rejected
|
||||
with pytest.raises(ValueError):
|
||||
_resolve_tool_path_in_workspace(ws, os.path.join("..", "..", "escape.txt"))
|
||||
|
||||
|
||||
def test_workspace_resolver_blocks_sensitive():
|
||||
"""Upstream's sensitive-file deny list still applies inside the workspace."""
|
||||
ws = tempfile.mkdtemp()
|
||||
os.makedirs(os.path.join(ws, ".ssh"), exist_ok=True)
|
||||
with pytest.raises(ValueError):
|
||||
_resolve_tool_path_in_workspace(ws, ".ssh/authorized_keys")
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_read_write_confined_in_workspace():
|
||||
ws = tempfile.mkdtemp()
|
||||
# Write inside the workspace (relative path) succeeds.
|
||||
res = await _direct_fallback("write_file", "note.txt\nhello", workspace=ws)
|
||||
assert res["exit_code"] == 0
|
||||
assert os.path.isfile(os.path.join(ws, "note.txt"))
|
||||
# Read it back.
|
||||
res = await _direct_fallback("read_file", "note.txt", workspace=ws)
|
||||
assert res["exit_code"] == 0 and res["output"] == "hello"
|
||||
# Reading outside the workspace is rejected (sibling temp dir, portable).
|
||||
outside = tempfile.mkdtemp()
|
||||
outside_file = os.path.join(outside, "secret.txt")
|
||||
open(outside_file, "w").write("nope")
|
||||
res = await _direct_fallback("read_file", outside_file, workspace=ws)
|
||||
assert res["exit_code"] == 1 and "outside the workspace" in res["error"]
|
||||
# Writing outside is rejected (file must not be created).
|
||||
escape = os.path.join(outside, "_ws_escape.txt")
|
||||
res = await _direct_fallback("write_file", f"{escape}\nx", workspace=ws)
|
||||
assert res["exit_code"] == 1 and "outside the workspace" in res["error"]
|
||||
assert not os.path.exists(escape)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_subprocess_runs_with_workspace_cwd():
|
||||
"""bash/python subprocesses run with cwd set to the workspace. Use the
|
||||
python tool for an OS-agnostic cwd probe (Windows cmd has no `pwd`)."""
|
||||
ws = tempfile.mkdtemp()
|
||||
res = await _direct_fallback("python", "import os; print(os.getcwd())", workspace=ws)
|
||||
assert res["exit_code"] == 0
|
||||
assert os.path.realpath(res["output"].strip()) == os.path.realpath(ws)
|
||||
|
||||
|
||||
# --- Tools that landed after this PR, now wired into the workspace -----------
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_edit_file_confined_in_workspace():
|
||||
import json
|
||||
from src.tool_execution import _do_edit_file
|
||||
ws = tempfile.mkdtemp()
|
||||
open(os.path.join(ws, "f.txt"), "w").write("foo bar")
|
||||
# Edit inside the workspace succeeds.
|
||||
res = await _do_edit_file(json.dumps(
|
||||
{"path": "f.txt", "old_string": "foo", "new_string": "baz"}), workspace=ws)
|
||||
assert res["exit_code"] == 0
|
||||
assert open(os.path.join(ws, "f.txt")).read() == "baz bar"
|
||||
# Editing outside the workspace is rejected (sibling temp dir, portable).
|
||||
outside = tempfile.mkdtemp()
|
||||
outside_file = os.path.join(outside, "f.txt")
|
||||
open(outside_file, "w").write("a")
|
||||
res = await _do_edit_file(json.dumps(
|
||||
{"path": outside_file, "old_string": "a", "new_string": "b"}), workspace=ws)
|
||||
assert res["exit_code"] == 1 and "outside the workspace" in res["error"]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_grep_and_ls_confined_in_workspace():
|
||||
import json
|
||||
ws = tempfile.mkdtemp()
|
||||
open(os.path.join(ws, "doc.txt"), "w").write("hello workspace\n")
|
||||
# grep with no path searches the workspace root and finds the match.
|
||||
res = await _direct_fallback("grep", json.dumps({"pattern": "hello"}), workspace=ws)
|
||||
assert res["exit_code"] == 0 and "doc.txt" in res["output"]
|
||||
# grep pointed outside the workspace is rejected (sibling temp dir, portable).
|
||||
outside = tempfile.mkdtemp()
|
||||
res = await _direct_fallback("grep", json.dumps({"pattern": "x", "path": outside}), workspace=ws)
|
||||
assert res["exit_code"] == 1 and "outside the workspace" in res["error"]
|
||||
# ls of the workspace lists its files; ls outside is rejected.
|
||||
res = await _direct_fallback("ls", "", workspace=ws)
|
||||
assert res["exit_code"] == 0 and "doc.txt" in res["output"]
|
||||
res = await _direct_fallback("ls", outside, workspace=ws)
|
||||
assert res["exit_code"] == 1 and "outside the workspace" in res["error"]
|
||||
Reference in New Issue
Block a user