Files
odysseus/tests/test_workspace_confine.py
Kenny Van de Maele 620fdd0859 feat(agent): confine agent file/shell tools to a selectable workspace (#3665)
* feat(agent): workspace confinement via context-local binding + get_workspace tool

Bind the per-turn workspace once in execute_tool_block; the shared path
resolvers (_resolve_tool_path / _resolve_search_root) and the subprocess cwd
helper (agent_cwd) read it, so file tools + bash/python are confined centrally
and a new tool that uses the shared helpers cannot accidentally bypass it.

Adds the admin-gated /api/workspace/browse picker, a workspace pill + directory
modal (reusing existing modal/button CSS), the /workspace slash command, and a
get_workspace tool (replaces a system-prompt block). Confinement is OS-agnostic
(realpath/normcase/commonpath) and docker-safe (container paths, no host
assumptions). Reopens #2023.

* ux(workspace): clarify workspace is not a sandbox

Picker modal note + pill tooltip + get_workspace tool/output wording now state
plainly: read_file/write_file/edit_file/grep/glob/ls are confined to the folder,
but bash/python only start there (cwd) and are not sandboxed. Modal note reuses
the existing .muted class.

* fix(agent): treat an active workspace as file-work intent

A vague low-signal message (e.g. "look at the local project") matches no
domain keywords, so tool retrieval is skipped and only always-available tools
are offered — leaving the agent with no file access even though a workspace is
set. When a workspace is active, include the file/code tools (incl.
get_workspace) on low-signal turns so the agent can act on the folder.

Also requires the tool index (ChromaDB) to be reachable for normal retrieval;
that is an environment dependency, not part of this change.

* ux(workspace): hide pill + overflow entry in chat mode

Workspace only scopes the agent's file/shell tools, so the pill and the
overflow 'Workspace' entry are agent-only now — hidden in chat mode like the
bash toggle. Mode read from the DOM in syncWorkspaceIndicator; applyMode() is
called from the agent/chat setMode handler.

* prompt(tools): steer bash/python to defer to the dedicated file tools

bash/python schema descriptions (what native-tool-calling models read) were
bare and gave no steer, so models would do file ops via the shell (e.g. writing
SVG/HTML, which then dumps raw markup into the tool preview). Tell bash/python
in the schema + tool-index + prompt section to prefer read_file/write_file/
edit_file/grep/glob/ls and only be used for what those do not cover.

* prompt(tools): keep bash/python deferral generic (no hardcoded tool names)

Reference 'a dedicated tool' rather than listing read_file/write_file/grep/etc.
by name, so the guidance does not go stale if those tools are renamed.

* style(workspace): drop em-dashes from added code comments/strings

* ux(workspace): terser non-sandbox note in picker (no tool-name list)

* ux(workspace): mirror terse non-sandbox wording in pill tooltip

* chore: untrack local venv symlink (run-only, not part of the feature)

* prompt(workspace): keep get_workspace text generic (no hardcoded tool names)

* fix(agent): low-signal + workspace surfaces only read-only file tools

Intersect the files tool group with PLAN_MODE_READONLY_TOOLS so a vague message
in a workspace exposes read_file/grep/glob/ls/get_workspace for exploration, but
not write_file/edit_file/bash/python -- those wait for a request that actually
calls for them (RAG retrieval still adds them on a real ask).

* feat(workspace): cap browse listing at 500 dirs with a truncated hint

Mirror the filesystem_tools._CODENAV_MAX_HITS pattern with a module-local
_MAX_BROWSE_DIRS so a directory with thousands of children does not dump every
row into the picker; the response carries a truncated flag and the modal tells
the user to type a path to jump in.

* chore: untrack local venv symlink (run-only artifact)

* fix(workspace): vet the workspace root against the sensitive-path deny list at bind time

The in-workspace resolver deny-lists sensitive paths inside the workspace,
but the empty-path search root is the workspace itself, so a workspace of
~/.ssh could be listed via ls with no path. vet_workspace() (public, in
tool_execution next to the resolvers) rejects non-directories and sensitive
roots before the path is ever bound; chat_routes uses it instead of its
inline isdir check.

* fix(workspace): reject filesystem roots and stop showing rejected workspaces as active

Review findings from #3665:

P2: vet_workspace accepted / (and would accept drive/UNC roots), which makes
every absolute path 'inside' the workspace and collapses confinement into
host-wide file access. A root is its own dirname, so reject when
dirname(resolved) == resolved; the browse response now carries a selectable
flag and the picker disables 'Use this folder' on unselectable dirs.

P3: /workspace set stored any string client-side and the chat route silently
dropped rejected values, so the pill could claim a confinement that was not
in effect. New admin-gated /api/workspace/vet validates manual paths before
they persist (canonical path returned), and when a posted workspace is
rejected at send time the stream emits workspace_rejected so the client
clears the stored value and toasts instead of continuing silently.

* fix(workspace): check caller privilege before vetting the posted workspace

Review finding: /api/chat_stream called vet_workspace() on the posted value
for every caller and emitted workspace_rejected on failure, so a non-admin
who can chat but cannot use file/shell tools could distinguish existing
directories from missing/file/sensitive/root paths by whether the event
appeared. The resolution now lives in _resolve_request_workspace, which
drops the submitted value uniformly for non-admin callers, with no vetting
and no event, before the path ever touches the filesystem. Admin and
single-user behavior is unchanged. Test pins that valid and invalid paths
are indistinguishable for a non-admin and that vet_workspace is never
invoked for them.
2026-06-11 18:17:54 +02:00

329 lines
14 KiB
Python

"""Workspace confinement.
The agent's per-turn workspace is a single context-local binding set in
execute_tool_block. The shared path resolvers (_resolve_tool_path /
_resolve_search_root) and the subprocess cwd helper (agent_cwd) read it, so
confinement is enforced in ONE place: a tool that uses the shared helpers is
confined automatically and a new tool cannot accidentally bypass it.
Covers: the resolver helper, the central binding (the safety net), end-to-end
confinement of read/write/edit/grep/ls + subprocess cwd via execute_tool_block,
the get_workspace tool, no-leak across calls, and the admin-gated browse route.
"""
import json
import os
import tempfile
from types import SimpleNamespace
import pytest
from src.tool_execution import (
_AGENT_WORKDIR,
_active_workspace,
_resolve_search_root,
_resolve_tool_path,
_resolve_tool_path_in_workspace,
agent_cwd,
execute_tool_block,
get_active_workspace,
)
def _block(tool, content=""):
return SimpleNamespace(tool_type=tool, content=content)
@pytest.fixture
def ws():
d = tempfile.mkdtemp()
with open(os.path.join(d, "a.txt"), "w") as f:
f.write("x")
return d
@pytest.fixture
def admin(monkeypatch):
"""Pass the public-tool gate so file tools dispatch in tests."""
monkeypatch.setattr(
"src.tool_execution.owner_is_admin_or_single_user", lambda owner: True
)
# ── the resolver helper ────────────────────────────────────────────────
def test_resolver_confines(ws):
real = os.path.realpath(os.path.join(ws, "a.txt"))
assert _resolve_tool_path_in_workspace(ws, "a.txt") == real # relative
assert _resolve_tool_path_in_workspace(ws, os.path.join(ws, "a.txt")) == real # abs inside
outside = tempfile.mkdtemp()
with pytest.raises(ValueError): # abs outside
_resolve_tool_path_in_workspace(ws, os.path.join(outside, "x.txt"))
with pytest.raises(ValueError): # parent escape
_resolve_tool_path_in_workspace(ws, os.path.join("..", "..", "escape.txt"))
def test_resolver_blocks_sensitive_inside_workspace(ws):
os.makedirs(os.path.join(ws, ".ssh"), exist_ok=True)
with pytest.raises(ValueError):
_resolve_tool_path_in_workspace(ws, ".ssh/authorized_keys")
# ── the central binding: the safety net ─────────────────────────────────
def test_active_binding_confines_shared_resolvers(ws):
"""ANY tool resolving paths through the shared helpers is confined while the
binding is active, without doing anything workspace-specific itself. This is
what stops a newly added tool from accidentally ignoring the workspace."""
token = _active_workspace.set(ws)
try:
assert get_active_workspace() == ws
assert agent_cwd() == ws
assert _resolve_tool_path("a.txt") == os.path.realpath(os.path.join(ws, "a.txt"))
with pytest.raises(ValueError): # normally-allowed root, now outside ws
_resolve_tool_path("/tmp/whatever.txt")
assert _resolve_search_root("") == os.path.realpath(ws)
finally:
_active_workspace.reset(token)
def test_no_binding_uses_default_roots():
assert get_active_workspace() is None
assert agent_cwd() == _AGENT_WORKDIR
with pytest.raises(ValueError):
_resolve_tool_path("/etc/hosts")
# ── end-to-end via execute_tool_block (sets + resets the binding) ───────
@pytest.mark.asyncio
async def test_read_write_edit_confined_e2e(ws, admin):
_, r = await execute_tool_block(_block("write_file", "note.txt\nhello"), owner="a", workspace=ws)
assert r["exit_code"] == 0 and os.path.isfile(os.path.join(ws, "note.txt"))
_, r = await execute_tool_block(_block("read_file", "note.txt"), owner="a", workspace=ws)
assert r["exit_code"] == 0 and r["output"] == "hello"
with open(os.path.join(ws, "f.txt"), "w") as f:
f.write("foo bar")
_, r = await execute_tool_block(
_block("edit_file", json.dumps({"path": "f.txt", "old_string": "foo", "new_string": "baz"})),
owner="a", workspace=ws,
)
assert r["exit_code"] == 0
with open(os.path.join(ws, "f.txt")) as f:
assert f.read() == "baz bar"
# outside the workspace is rejected, and nothing is created
outside = tempfile.mkdtemp()
of = os.path.join(outside, "secret.txt")
with open(of, "w") as f:
f.write("nope")
_, r = await execute_tool_block(_block("read_file", of), owner="a", workspace=ws)
assert r["exit_code"] == 1 and "outside the workspace" in r["error"]
escape = os.path.join(outside, "_esc.txt")
_, r = await execute_tool_block(_block("write_file", f"{escape}\nx"), owner="a", workspace=ws)
assert r["exit_code"] == 1 and "outside the workspace" in r["error"]
assert not os.path.exists(escape)
@pytest.mark.asyncio
async def test_grep_and_ls_confined_e2e(ws, admin):
with open(os.path.join(ws, "doc.txt"), "w") as f:
f.write("hello workspace\n")
_, r = await execute_tool_block(_block("grep", json.dumps({"pattern": "hello"})), owner="a", workspace=ws)
assert r["exit_code"] == 0 and "doc.txt" in r["output"]
outside = tempfile.mkdtemp()
_, r = await execute_tool_block(_block("grep", json.dumps({"pattern": "x", "path": outside})), owner="a", workspace=ws)
assert r["exit_code"] == 1 and "outside the workspace" in r["error"]
_, r = await execute_tool_block(_block("ls", ""), owner="a", workspace=ws)
assert r["exit_code"] == 0 and "doc.txt" in r["output"]
_, r = await execute_tool_block(_block("ls", outside), owner="a", workspace=ws)
assert r["exit_code"] == 1 and "outside the workspace" in r["error"]
@pytest.mark.asyncio
async def test_subprocess_cwd_is_workspace_e2e(ws, admin):
"""python tool runs with cwd = workspace (OS-agnostic probe)."""
_, r = await execute_tool_block(_block("python", "import os; print(os.getcwd())"), owner="a", workspace=ws)
assert r["exit_code"] == 0
assert os.path.realpath(r["output"].strip()) == os.path.realpath(ws)
# ── get_workspace tool ──────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_get_workspace_tool(ws, admin):
_, r = await execute_tool_block(_block("get_workspace", ""), owner="a", workspace=ws)
assert r["exit_code"] == 0 and r["output"].startswith(ws) and "not sandboxed" in r["output"]
_, r = await execute_tool_block(_block("get_workspace", ""), owner="a") # none active
assert r["exit_code"] == 0 and "No workspace" in r["output"]
# ── no leak across calls ────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_binding_does_not_leak(ws, admin):
await execute_tool_block(_block("ls", ""), owner="a", workspace=ws)
assert get_active_workspace() is None
# ── tool selection: an active workspace is the file-work signal ─────────
# A vague ("low-signal") message like "look at the local project" matches no
# domain keywords, so retrieval is normally skipped. When a workspace is set it
# must still surface the file tools, otherwise the agent says it has no file
# access (the bug this guards against).
def _sent_tool_names(monkeypatch, *, workspace):
import asyncio
import src.agent_loop as al
monkeypatch.setattr(al, "get_setting", lambda key, default=None: default, raising=False)
monkeypatch.setattr(al, "get_mcp_manager", lambda: None, raising=False)
monkeypatch.setattr(al, "estimate_tokens", lambda *a, **k: 10, raising=False)
# Isolate the selection logic from owner gating (tested separately).
monkeypatch.setattr(al, "blocked_tools_for_owner", lambda owner: set(), raising=False)
captured = []
async def _fake_stream(_candidates, messages, **kwargs):
captured.append(kwargs.get("tools"))
yield "data: " + json.dumps({"delta": "ok"}) + "\n\n"
yield "data: [DONE]\n\n"
monkeypatch.setattr(al, "stream_llm_with_fallback", _fake_stream, raising=False)
async def _run():
gen = al.stream_agent_loop(
"https://api.openai.com/v1", "gpt-test",
[{"role": "user", "content": "look at the local project"}],
max_rounds=1, relevant_tools=None, owner="admin", workspace=workspace,
)
return [c async for c in gen]
asyncio.run(_run())
schemas = captured[0] or []
return {t["function"]["name"] for t in schemas if isinstance(t, dict) and "function" in t}
def test_low_signal_with_workspace_surfaces_readonly_file_tools(monkeypatch):
names = _sent_tool_names(monkeypatch, workspace="/tmp")
# read-only nav tools surface so the agent can explore
assert "read_file" in names
assert "get_workspace" in names
assert "grep" in names
# write/shell tools do NOT surface on a vague message
assert "write_file" not in names
assert "edit_file" not in names
assert "bash" not in names
assert "python" not in names
def test_low_signal_without_workspace_excludes_file_tools(monkeypatch):
names = _sent_tool_names(monkeypatch, workspace=None)
assert "read_file" not in names
assert "get_workspace" not in names
# ── browse route is admin-gated ─────────────────────────────────────────
def test_browse_is_admin_gated(monkeypatch):
from fastapi import HTTPException
import routes.workspace_routes as wr
router = wr.setup_workspace_routes()
browse = next(r.endpoint for r in router.routes if r.path == "/api/workspace/browse")
monkeypatch.setattr(wr, "get_current_user", lambda req: "bob")
monkeypatch.setattr(wr, "owner_is_admin_or_single_user", lambda owner: False)
with pytest.raises(HTTPException) as ei:
browse(request=object(), path="/")
assert ei.value.status_code == 403
monkeypatch.setattr(wr, "owner_is_admin_or_single_user", lambda owner: True)
out = browse(request=object(), path=os.path.expanduser("~"))
assert "dirs" in out and "path" in out
assert all("name" in d and "path" in d for d in out["dirs"])
# ── bind-time vetting of the workspace root ─────────────────────────────
def test_vet_workspace_accepts_normal_dir(ws):
from src.tool_execution import vet_workspace
assert vet_workspace(ws) == os.path.realpath(ws)
def test_vet_workspace_rejects_sensitive_root(tmp_path):
# The resolver deny-lists sensitive paths inside the workspace, but the
# empty-path search root is the workspace itself - a sensitive root must
# be rejected before it is bound or `ls` with no path would list it.
from src.tool_execution import vet_workspace
ssh_dir = tmp_path / ".ssh"
ssh_dir.mkdir()
assert vet_workspace(str(ssh_dir)) is None
def test_vet_workspace_rejects_nondir_and_empty(ws):
from src.tool_execution import vet_workspace
assert vet_workspace(os.path.join(ws, "a.txt")) is None # file, not dir
assert vet_workspace("/nonexistent/path/xyz") is None
assert vet_workspace("") is None
assert vet_workspace(" ") is None
def test_vet_workspace_rejects_filesystem_root():
# Binding / would make every absolute path "inside" the workspace,
# collapsing confinement into host-wide file access.
from src.tool_execution import vet_workspace
assert vet_workspace("/") is None
def test_browse_marks_root_unselectable_and_vet_endpoint(monkeypatch):
import routes.workspace_routes as wr
router = wr.setup_workspace_routes()
browse = next(r.endpoint for r in router.routes if r.path == "/api/workspace/browse")
vet = next(r.endpoint for r in router.routes if r.path == "/api/workspace/vet")
monkeypatch.setattr(wr, "get_current_user", lambda req: "admin")
monkeypatch.setattr(wr, "owner_is_admin_or_single_user", lambda owner: True)
out = browse(request=object(), path="/")
assert out["selectable"] is False
out = browse(request=object(), path=os.path.expanduser("~"))
assert out["selectable"] is True
assert vet(request=object(), path="/") == {"ok": False, "path": None}
home = os.path.realpath(os.path.expanduser("~"))
assert vet(request=object(), path="~") == {"ok": True, "path": home}
from fastapi import HTTPException
monkeypatch.setattr(wr, "owner_is_admin_or_single_user", lambda owner: False)
with pytest.raises(HTTPException) as ei:
vet(request=object(), path="/tmp")
assert ei.value.status_code == 403
# ── send-time privilege gate (no path oracle for non-admins) ────────────
def test_request_workspace_gate(ws, monkeypatch):
"""Non-admin chat callers must get a uniform drop with no vetting: the
workspace_rejected signal would otherwise reveal which host paths exist."""
import routes.chat_routes as cr
monkeypatch.setattr(cr, "get_current_user", lambda req: "bob")
vet_calls = []
import src.tool_execution as te
real_vet = te.vet_workspace
monkeypatch.setattr(te, "vet_workspace", lambda p: vet_calls.append(p) or real_vet(p))
import src.tool_security as ts
monkeypatch.setattr(ts, "owner_is_admin_or_single_user", lambda owner: False)
# Valid and invalid paths are indistinguishable for a non-admin: both
# drop silently, and the path never reaches the filesystem.
assert cr._resolve_request_workspace(object(), ws) == ("", "")
assert cr._resolve_request_workspace(object(), "/nonexistent/xyz") == ("", "")
assert vet_calls == []
monkeypatch.setattr(ts, "owner_is_admin_or_single_user", lambda owner: True)
assert cr._resolve_request_workspace(object(), ws) == (os.path.realpath(ws), "")
assert cr._resolve_request_workspace(object(), "/nonexistent/xyz") == ("", "/nonexistent/xyz")