mirror of
https://github.com/pewdiepie-archdaemon/odysseus.git
synced 2026-06-15 17:25:26 -04:00
620fdd0859
* feat(agent): workspace confinement via context-local binding + get_workspace tool Bind the per-turn workspace once in execute_tool_block; the shared path resolvers (_resolve_tool_path / _resolve_search_root) and the subprocess cwd helper (agent_cwd) read it, so file tools + bash/python are confined centrally and a new tool that uses the shared helpers cannot accidentally bypass it. Adds the admin-gated /api/workspace/browse picker, a workspace pill + directory modal (reusing existing modal/button CSS), the /workspace slash command, and a get_workspace tool (replaces a system-prompt block). Confinement is OS-agnostic (realpath/normcase/commonpath) and docker-safe (container paths, no host assumptions). Reopens #2023. * ux(workspace): clarify workspace is not a sandbox Picker modal note + pill tooltip + get_workspace tool/output wording now state plainly: read_file/write_file/edit_file/grep/glob/ls are confined to the folder, but bash/python only start there (cwd) and are not sandboxed. Modal note reuses the existing .muted class. * fix(agent): treat an active workspace as file-work intent A vague low-signal message (e.g. "look at the local project") matches no domain keywords, so tool retrieval is skipped and only always-available tools are offered — leaving the agent with no file access even though a workspace is set. When a workspace is active, include the file/code tools (incl. get_workspace) on low-signal turns so the agent can act on the folder. Also requires the tool index (ChromaDB) to be reachable for normal retrieval; that is an environment dependency, not part of this change. * ux(workspace): hide pill + overflow entry in chat mode Workspace only scopes the agent's file/shell tools, so the pill and the overflow 'Workspace' entry are agent-only now — hidden in chat mode like the bash toggle. Mode read from the DOM in syncWorkspaceIndicator; applyMode() is called from the agent/chat setMode handler. * prompt(tools): steer bash/python to defer to the dedicated file tools bash/python schema descriptions (what native-tool-calling models read) were bare and gave no steer, so models would do file ops via the shell (e.g. writing SVG/HTML, which then dumps raw markup into the tool preview). Tell bash/python in the schema + tool-index + prompt section to prefer read_file/write_file/ edit_file/grep/glob/ls and only be used for what those do not cover. * prompt(tools): keep bash/python deferral generic (no hardcoded tool names) Reference 'a dedicated tool' rather than listing read_file/write_file/grep/etc. by name, so the guidance does not go stale if those tools are renamed. * style(workspace): drop em-dashes from added code comments/strings * ux(workspace): terser non-sandbox note in picker (no tool-name list) * ux(workspace): mirror terse non-sandbox wording in pill tooltip * chore: untrack local venv symlink (run-only, not part of the feature) * prompt(workspace): keep get_workspace text generic (no hardcoded tool names) * fix(agent): low-signal + workspace surfaces only read-only file tools Intersect the files tool group with PLAN_MODE_READONLY_TOOLS so a vague message in a workspace exposes read_file/grep/glob/ls/get_workspace for exploration, but not write_file/edit_file/bash/python -- those wait for a request that actually calls for them (RAG retrieval still adds them on a real ask). * feat(workspace): cap browse listing at 500 dirs with a truncated hint Mirror the filesystem_tools._CODENAV_MAX_HITS pattern with a module-local _MAX_BROWSE_DIRS so a directory with thousands of children does not dump every row into the picker; the response carries a truncated flag and the modal tells the user to type a path to jump in. * chore: untrack local venv symlink (run-only artifact) * fix(workspace): vet the workspace root against the sensitive-path deny list at bind time The in-workspace resolver deny-lists sensitive paths inside the workspace, but the empty-path search root is the workspace itself, so a workspace of ~/.ssh could be listed via ls with no path. vet_workspace() (public, in tool_execution next to the resolvers) rejects non-directories and sensitive roots before the path is ever bound; chat_routes uses it instead of its inline isdir check. * fix(workspace): reject filesystem roots and stop showing rejected workspaces as active Review findings from #3665: P2: vet_workspace accepted / (and would accept drive/UNC roots), which makes every absolute path 'inside' the workspace and collapses confinement into host-wide file access. A root is its own dirname, so reject when dirname(resolved) == resolved; the browse response now carries a selectable flag and the picker disables 'Use this folder' on unselectable dirs. P3: /workspace set stored any string client-side and the chat route silently dropped rejected values, so the pill could claim a confinement that was not in effect. New admin-gated /api/workspace/vet validates manual paths before they persist (canonical path returned), and when a posted workspace is rejected at send time the stream emits workspace_rejected so the client clears the stored value and toasts instead of continuing silently. * fix(workspace): check caller privilege before vetting the posted workspace Review finding: /api/chat_stream called vet_workspace() on the posted value for every caller and emitted workspace_rejected on failure, so a non-admin who can chat but cannot use file/shell tools could distinguish existing directories from missing/file/sensitive/root paths by whether the event appeared. The resolution now lives in _resolve_request_workspace, which drops the submitted value uniformly for non-admin callers, with no vetting and no event, before the path ever touches the filesystem. Admin and single-user behavior is unchanged. Test pins that valid and invalid paths are indistinguishable for a non-admin and that vet_workspace is never invoked for them.
329 lines
14 KiB
Python
329 lines
14 KiB
Python
"""Workspace confinement.
|
|
|
|
The agent's per-turn workspace is a single context-local binding set in
|
|
execute_tool_block. The shared path resolvers (_resolve_tool_path /
|
|
_resolve_search_root) and the subprocess cwd helper (agent_cwd) read it, so
|
|
confinement is enforced in ONE place: a tool that uses the shared helpers is
|
|
confined automatically and a new tool cannot accidentally bypass it.
|
|
|
|
Covers: the resolver helper, the central binding (the safety net), end-to-end
|
|
confinement of read/write/edit/grep/ls + subprocess cwd via execute_tool_block,
|
|
the get_workspace tool, no-leak across calls, and the admin-gated browse route.
|
|
"""
|
|
import json
|
|
import os
|
|
import tempfile
|
|
from types import SimpleNamespace
|
|
|
|
import pytest
|
|
|
|
from src.tool_execution import (
|
|
_AGENT_WORKDIR,
|
|
_active_workspace,
|
|
_resolve_search_root,
|
|
_resolve_tool_path,
|
|
_resolve_tool_path_in_workspace,
|
|
agent_cwd,
|
|
execute_tool_block,
|
|
get_active_workspace,
|
|
)
|
|
|
|
|
|
def _block(tool, content=""):
|
|
return SimpleNamespace(tool_type=tool, content=content)
|
|
|
|
|
|
@pytest.fixture
|
|
def ws():
|
|
d = tempfile.mkdtemp()
|
|
with open(os.path.join(d, "a.txt"), "w") as f:
|
|
f.write("x")
|
|
return d
|
|
|
|
|
|
@pytest.fixture
|
|
def admin(monkeypatch):
|
|
"""Pass the public-tool gate so file tools dispatch in tests."""
|
|
monkeypatch.setattr(
|
|
"src.tool_execution.owner_is_admin_or_single_user", lambda owner: True
|
|
)
|
|
|
|
|
|
# ── the resolver helper ────────────────────────────────────────────────
|
|
|
|
def test_resolver_confines(ws):
|
|
real = os.path.realpath(os.path.join(ws, "a.txt"))
|
|
assert _resolve_tool_path_in_workspace(ws, "a.txt") == real # relative
|
|
assert _resolve_tool_path_in_workspace(ws, os.path.join(ws, "a.txt")) == real # abs inside
|
|
outside = tempfile.mkdtemp()
|
|
with pytest.raises(ValueError): # abs outside
|
|
_resolve_tool_path_in_workspace(ws, os.path.join(outside, "x.txt"))
|
|
with pytest.raises(ValueError): # parent escape
|
|
_resolve_tool_path_in_workspace(ws, os.path.join("..", "..", "escape.txt"))
|
|
|
|
|
|
def test_resolver_blocks_sensitive_inside_workspace(ws):
|
|
os.makedirs(os.path.join(ws, ".ssh"), exist_ok=True)
|
|
with pytest.raises(ValueError):
|
|
_resolve_tool_path_in_workspace(ws, ".ssh/authorized_keys")
|
|
|
|
|
|
# ── the central binding: the safety net ─────────────────────────────────
|
|
|
|
def test_active_binding_confines_shared_resolvers(ws):
|
|
"""ANY tool resolving paths through the shared helpers is confined while the
|
|
binding is active, without doing anything workspace-specific itself. This is
|
|
what stops a newly added tool from accidentally ignoring the workspace."""
|
|
token = _active_workspace.set(ws)
|
|
try:
|
|
assert get_active_workspace() == ws
|
|
assert agent_cwd() == ws
|
|
assert _resolve_tool_path("a.txt") == os.path.realpath(os.path.join(ws, "a.txt"))
|
|
with pytest.raises(ValueError): # normally-allowed root, now outside ws
|
|
_resolve_tool_path("/tmp/whatever.txt")
|
|
assert _resolve_search_root("") == os.path.realpath(ws)
|
|
finally:
|
|
_active_workspace.reset(token)
|
|
|
|
|
|
def test_no_binding_uses_default_roots():
|
|
assert get_active_workspace() is None
|
|
assert agent_cwd() == _AGENT_WORKDIR
|
|
with pytest.raises(ValueError):
|
|
_resolve_tool_path("/etc/hosts")
|
|
|
|
|
|
# ── end-to-end via execute_tool_block (sets + resets the binding) ───────
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_read_write_edit_confined_e2e(ws, admin):
|
|
_, r = await execute_tool_block(_block("write_file", "note.txt\nhello"), owner="a", workspace=ws)
|
|
assert r["exit_code"] == 0 and os.path.isfile(os.path.join(ws, "note.txt"))
|
|
_, r = await execute_tool_block(_block("read_file", "note.txt"), owner="a", workspace=ws)
|
|
assert r["exit_code"] == 0 and r["output"] == "hello"
|
|
|
|
with open(os.path.join(ws, "f.txt"), "w") as f:
|
|
f.write("foo bar")
|
|
_, r = await execute_tool_block(
|
|
_block("edit_file", json.dumps({"path": "f.txt", "old_string": "foo", "new_string": "baz"})),
|
|
owner="a", workspace=ws,
|
|
)
|
|
assert r["exit_code"] == 0
|
|
with open(os.path.join(ws, "f.txt")) as f:
|
|
assert f.read() == "baz bar"
|
|
|
|
# outside the workspace is rejected, and nothing is created
|
|
outside = tempfile.mkdtemp()
|
|
of = os.path.join(outside, "secret.txt")
|
|
with open(of, "w") as f:
|
|
f.write("nope")
|
|
_, r = await execute_tool_block(_block("read_file", of), owner="a", workspace=ws)
|
|
assert r["exit_code"] == 1 and "outside the workspace" in r["error"]
|
|
escape = os.path.join(outside, "_esc.txt")
|
|
_, r = await execute_tool_block(_block("write_file", f"{escape}\nx"), owner="a", workspace=ws)
|
|
assert r["exit_code"] == 1 and "outside the workspace" in r["error"]
|
|
assert not os.path.exists(escape)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_grep_and_ls_confined_e2e(ws, admin):
|
|
with open(os.path.join(ws, "doc.txt"), "w") as f:
|
|
f.write("hello workspace\n")
|
|
_, r = await execute_tool_block(_block("grep", json.dumps({"pattern": "hello"})), owner="a", workspace=ws)
|
|
assert r["exit_code"] == 0 and "doc.txt" in r["output"]
|
|
outside = tempfile.mkdtemp()
|
|
_, r = await execute_tool_block(_block("grep", json.dumps({"pattern": "x", "path": outside})), owner="a", workspace=ws)
|
|
assert r["exit_code"] == 1 and "outside the workspace" in r["error"]
|
|
_, r = await execute_tool_block(_block("ls", ""), owner="a", workspace=ws)
|
|
assert r["exit_code"] == 0 and "doc.txt" in r["output"]
|
|
_, r = await execute_tool_block(_block("ls", outside), owner="a", workspace=ws)
|
|
assert r["exit_code"] == 1 and "outside the workspace" in r["error"]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_subprocess_cwd_is_workspace_e2e(ws, admin):
|
|
"""python tool runs with cwd = workspace (OS-agnostic probe)."""
|
|
_, r = await execute_tool_block(_block("python", "import os; print(os.getcwd())"), owner="a", workspace=ws)
|
|
assert r["exit_code"] == 0
|
|
assert os.path.realpath(r["output"].strip()) == os.path.realpath(ws)
|
|
|
|
|
|
# ── get_workspace tool ──────────────────────────────────────────────────
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_workspace_tool(ws, admin):
|
|
_, r = await execute_tool_block(_block("get_workspace", ""), owner="a", workspace=ws)
|
|
assert r["exit_code"] == 0 and r["output"].startswith(ws) and "not sandboxed" in r["output"]
|
|
_, r = await execute_tool_block(_block("get_workspace", ""), owner="a") # none active
|
|
assert r["exit_code"] == 0 and "No workspace" in r["output"]
|
|
|
|
|
|
# ── no leak across calls ────────────────────────────────────────────────
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_binding_does_not_leak(ws, admin):
|
|
await execute_tool_block(_block("ls", ""), owner="a", workspace=ws)
|
|
assert get_active_workspace() is None
|
|
|
|
|
|
# ── tool selection: an active workspace is the file-work signal ─────────
|
|
# A vague ("low-signal") message like "look at the local project" matches no
|
|
# domain keywords, so retrieval is normally skipped. When a workspace is set it
|
|
# must still surface the file tools, otherwise the agent says it has no file
|
|
# access (the bug this guards against).
|
|
|
|
def _sent_tool_names(monkeypatch, *, workspace):
|
|
import asyncio
|
|
import src.agent_loop as al
|
|
|
|
monkeypatch.setattr(al, "get_setting", lambda key, default=None: default, raising=False)
|
|
monkeypatch.setattr(al, "get_mcp_manager", lambda: None, raising=False)
|
|
monkeypatch.setattr(al, "estimate_tokens", lambda *a, **k: 10, raising=False)
|
|
# Isolate the selection logic from owner gating (tested separately).
|
|
monkeypatch.setattr(al, "blocked_tools_for_owner", lambda owner: set(), raising=False)
|
|
|
|
captured = []
|
|
|
|
async def _fake_stream(_candidates, messages, **kwargs):
|
|
captured.append(kwargs.get("tools"))
|
|
yield "data: " + json.dumps({"delta": "ok"}) + "\n\n"
|
|
yield "data: [DONE]\n\n"
|
|
|
|
monkeypatch.setattr(al, "stream_llm_with_fallback", _fake_stream, raising=False)
|
|
|
|
async def _run():
|
|
gen = al.stream_agent_loop(
|
|
"https://api.openai.com/v1", "gpt-test",
|
|
[{"role": "user", "content": "look at the local project"}],
|
|
max_rounds=1, relevant_tools=None, owner="admin", workspace=workspace,
|
|
)
|
|
return [c async for c in gen]
|
|
|
|
asyncio.run(_run())
|
|
schemas = captured[0] or []
|
|
return {t["function"]["name"] for t in schemas if isinstance(t, dict) and "function" in t}
|
|
|
|
|
|
def test_low_signal_with_workspace_surfaces_readonly_file_tools(monkeypatch):
|
|
names = _sent_tool_names(monkeypatch, workspace="/tmp")
|
|
# read-only nav tools surface so the agent can explore
|
|
assert "read_file" in names
|
|
assert "get_workspace" in names
|
|
assert "grep" in names
|
|
# write/shell tools do NOT surface on a vague message
|
|
assert "write_file" not in names
|
|
assert "edit_file" not in names
|
|
assert "bash" not in names
|
|
assert "python" not in names
|
|
|
|
|
|
def test_low_signal_without_workspace_excludes_file_tools(monkeypatch):
|
|
names = _sent_tool_names(monkeypatch, workspace=None)
|
|
assert "read_file" not in names
|
|
assert "get_workspace" not in names
|
|
|
|
|
|
# ── browse route is admin-gated ─────────────────────────────────────────
|
|
|
|
def test_browse_is_admin_gated(monkeypatch):
|
|
from fastapi import HTTPException
|
|
import routes.workspace_routes as wr
|
|
|
|
router = wr.setup_workspace_routes()
|
|
browse = next(r.endpoint for r in router.routes if r.path == "/api/workspace/browse")
|
|
|
|
monkeypatch.setattr(wr, "get_current_user", lambda req: "bob")
|
|
monkeypatch.setattr(wr, "owner_is_admin_or_single_user", lambda owner: False)
|
|
with pytest.raises(HTTPException) as ei:
|
|
browse(request=object(), path="/")
|
|
assert ei.value.status_code == 403
|
|
|
|
monkeypatch.setattr(wr, "owner_is_admin_or_single_user", lambda owner: True)
|
|
out = browse(request=object(), path=os.path.expanduser("~"))
|
|
assert "dirs" in out and "path" in out
|
|
assert all("name" in d and "path" in d for d in out["dirs"])
|
|
|
|
|
|
# ── bind-time vetting of the workspace root ─────────────────────────────
|
|
|
|
def test_vet_workspace_accepts_normal_dir(ws):
|
|
from src.tool_execution import vet_workspace
|
|
assert vet_workspace(ws) == os.path.realpath(ws)
|
|
|
|
|
|
def test_vet_workspace_rejects_sensitive_root(tmp_path):
|
|
# The resolver deny-lists sensitive paths inside the workspace, but the
|
|
# empty-path search root is the workspace itself - a sensitive root must
|
|
# be rejected before it is bound or `ls` with no path would list it.
|
|
from src.tool_execution import vet_workspace
|
|
ssh_dir = tmp_path / ".ssh"
|
|
ssh_dir.mkdir()
|
|
assert vet_workspace(str(ssh_dir)) is None
|
|
|
|
|
|
def test_vet_workspace_rejects_nondir_and_empty(ws):
|
|
from src.tool_execution import vet_workspace
|
|
assert vet_workspace(os.path.join(ws, "a.txt")) is None # file, not dir
|
|
assert vet_workspace("/nonexistent/path/xyz") is None
|
|
assert vet_workspace("") is None
|
|
assert vet_workspace(" ") is None
|
|
|
|
|
|
def test_vet_workspace_rejects_filesystem_root():
|
|
# Binding / would make every absolute path "inside" the workspace,
|
|
# collapsing confinement into host-wide file access.
|
|
from src.tool_execution import vet_workspace
|
|
assert vet_workspace("/") is None
|
|
|
|
|
|
def test_browse_marks_root_unselectable_and_vet_endpoint(monkeypatch):
|
|
import routes.workspace_routes as wr
|
|
|
|
router = wr.setup_workspace_routes()
|
|
browse = next(r.endpoint for r in router.routes if r.path == "/api/workspace/browse")
|
|
vet = next(r.endpoint for r in router.routes if r.path == "/api/workspace/vet")
|
|
|
|
monkeypatch.setattr(wr, "get_current_user", lambda req: "admin")
|
|
monkeypatch.setattr(wr, "owner_is_admin_or_single_user", lambda owner: True)
|
|
|
|
out = browse(request=object(), path="/")
|
|
assert out["selectable"] is False
|
|
out = browse(request=object(), path=os.path.expanduser("~"))
|
|
assert out["selectable"] is True
|
|
|
|
assert vet(request=object(), path="/") == {"ok": False, "path": None}
|
|
home = os.path.realpath(os.path.expanduser("~"))
|
|
assert vet(request=object(), path="~") == {"ok": True, "path": home}
|
|
|
|
from fastapi import HTTPException
|
|
monkeypatch.setattr(wr, "owner_is_admin_or_single_user", lambda owner: False)
|
|
with pytest.raises(HTTPException) as ei:
|
|
vet(request=object(), path="/tmp")
|
|
assert ei.value.status_code == 403
|
|
|
|
|
|
# ── send-time privilege gate (no path oracle for non-admins) ────────────
|
|
|
|
def test_request_workspace_gate(ws, monkeypatch):
|
|
"""Non-admin chat callers must get a uniform drop with no vetting: the
|
|
workspace_rejected signal would otherwise reveal which host paths exist."""
|
|
import routes.chat_routes as cr
|
|
|
|
monkeypatch.setattr(cr, "get_current_user", lambda req: "bob")
|
|
vet_calls = []
|
|
import src.tool_execution as te
|
|
real_vet = te.vet_workspace
|
|
monkeypatch.setattr(te, "vet_workspace", lambda p: vet_calls.append(p) or real_vet(p))
|
|
|
|
import src.tool_security as ts
|
|
monkeypatch.setattr(ts, "owner_is_admin_or_single_user", lambda owner: False)
|
|
# Valid and invalid paths are indistinguishable for a non-admin: both
|
|
# drop silently, and the path never reaches the filesystem.
|
|
assert cr._resolve_request_workspace(object(), ws) == ("", "")
|
|
assert cr._resolve_request_workspace(object(), "/nonexistent/xyz") == ("", "")
|
|
assert vet_calls == []
|
|
|
|
monkeypatch.setattr(ts, "owner_is_admin_or_single_user", lambda owner: True)
|
|
assert cr._resolve_request_workspace(object(), ws) == (os.path.realpath(ws), "")
|
|
assert cr._resolve_request_workspace(object(), "/nonexistent/xyz") == ("", "/nonexistent/xyz")
|