mirror of
https://github.com/pewdiepie-archdaemon/odysseus.git
synced 2026-06-16 09:45:24 -04:00
Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 4371425514 | |||
| fbed9027b0 | |||
| d9141c6e56 | |||
| 8ae2b5f58c |
@@ -101,11 +101,17 @@ def setup_backup_routes(memory_manager, preset_manager, skills_manager) -> APIRo
|
||||
# ── Skills ──
|
||||
if "skills" in body and isinstance(body["skills"], list):
|
||||
existing = skills_manager.load_all()
|
||||
existing_names = {s.get("name") for s in existing if s.get("name")}
|
||||
existing_ids = {s.get("id") for s in existing if s.get("id")}
|
||||
# Dedup against THIS user's own skills only. Using every tenant's
|
||||
# rows (load_all) meant a skill whose id/name/title matched any
|
||||
# other user's was silently skipped, so the importing user lost
|
||||
# their own data — same cross-tenant bug fixed for memories above.
|
||||
# The full store is still saved back below.
|
||||
own = [s for s in existing if s.get("owner") == user]
|
||||
existing_names = {s.get("name") for s in own if s.get("name")}
|
||||
existing_ids = {s.get("id") for s in own if s.get("id")}
|
||||
existing_titles = {
|
||||
(s.get("title") or s.get("description") or "").strip().lower()
|
||||
for s in existing
|
||||
for s in own
|
||||
}
|
||||
added = 0
|
||||
for skill in body["skills"]:
|
||||
|
||||
@@ -456,7 +456,6 @@ def setup_chat_routes(
|
||||
# manual form posts that still send plan_mode=true.
|
||||
plan_mode = False
|
||||
chat_mode = str(form_data.get("mode", "")).lower() # 'chat' or 'agent'
|
||||
workspace = ""
|
||||
# Plan mode is a modifier on agent mode — it only makes sense with tools.
|
||||
if plan_mode:
|
||||
chat_mode = "agent"
|
||||
@@ -1135,7 +1134,6 @@ def setup_chat_routes(
|
||||
tool_policy=tool_policy,
|
||||
owner=_user,
|
||||
fallbacks=_fallback_candidates,
|
||||
workspace=None,
|
||||
plan_mode=plan_mode,
|
||||
approved_plan=approved_plan or None,
|
||||
):
|
||||
|
||||
@@ -42,9 +42,16 @@ _SESSION_ID_RE = re.compile(r"^[A-Za-z0-9_-]{1,64}$")
|
||||
_SSH_PORT_RE = re.compile(r"^\d{1,5}$")
|
||||
_GPU_LIST_RE = re.compile(r"^\d+(?:,\d+)*$")
|
||||
# A download target directory. Absolute or ~-relative path; safe path glyphs
|
||||
# only (no quotes, shell metacharacters, or spaces) since it lands in a shell
|
||||
# command. A leading ~ is expanded to $HOME at command-build time.
|
||||
_LOCAL_DIR_RE = re.compile(r"^~?/[A-Za-z0-9._/-]*$|^~$")
|
||||
# only (no quotes or shell metacharacters). Spaces are allowed because command
|
||||
# builders pass the value through quoted shell/Python contexts. The character
|
||||
# class uses ``\w`` — Unicode word characters under Python 3's default str
|
||||
# matching — so non-ASCII folder names pass validation too: Cyrillic, accented
|
||||
# Latin, CJK, e.g. ``/Volumes/Модели`` or ``D:\AI Models\Модели``. This stays
|
||||
# shell-safe: none of ``; & | ` $ '' "" () {}`` newlines etc. are in ``[\w. -]``,
|
||||
# so injection vectors remain rejected. A leading ~ is expanded to $HOME at
|
||||
# command-build time. (Drive letters stay ASCII: ``[A-Za-z]:``.)
|
||||
_LOCAL_DIR_RE = re.compile(r"^~?(?:/[\w. -]*)+$|^~$")
|
||||
_WINDOWS_LOCAL_DIR_RE = re.compile(r"^[A-Za-z]:[\\/](?:[\w. -]+(?:[\\/][\w. -]+)*[\\/]?)?$")
|
||||
_WINDOWS_DRIVE_PATH_RE = re.compile(r"^[A-Za-z]:[\\/]")
|
||||
|
||||
|
||||
@@ -97,9 +104,19 @@ def _validate_token(v: str | None) -> str | None:
|
||||
def _validate_local_dir(v: str | None) -> str | None:
|
||||
if v is None or v == "":
|
||||
return None
|
||||
if len(v) >= 2 and v[0] == v[-1] and v[0] in {"'", '"'}:
|
||||
v = v[1:-1]
|
||||
v = v.rstrip("/") or "/"
|
||||
if not _LOCAL_DIR_RE.match(v):
|
||||
raise HTTPException(400, "Invalid local_dir — must be an absolute or ~ path with no spaces or shell metacharacters")
|
||||
if not (_LOCAL_DIR_RE.match(v) or _WINDOWS_LOCAL_DIR_RE.match(v)):
|
||||
raise HTTPException(400, "Invalid local_dir — must be an absolute or ~ path with no shell metacharacters")
|
||||
# Reject path segments that start with '-' (option injection). '-' is in the
|
||||
# allowlist, so a dir like ``/models/-rf`` or ``D:\models\-rf`` could be read
|
||||
# as a CLI flag by hf/etc. — and quoting does NOT stop a value from being
|
||||
# parsed as an option. This is the one residual that command-build-time
|
||||
# quoting can't cover, so the guard lives here, keeping the safety wholly
|
||||
# inside the validator rather than relying on consumers.
|
||||
if any(seg.startswith("-") for seg in re.split(r"[\\/]", v) if seg):
|
||||
raise HTTPException(400, "Invalid local_dir — path segments cannot start with '-'")
|
||||
return v
|
||||
|
||||
|
||||
@@ -125,7 +142,7 @@ def _validate_gpus(v: str | None) -> str | None:
|
||||
def _shell_path(p: str) -> str:
|
||||
"""Render a validated path for a double-quoted shell context, expanding a
|
||||
leading ~ to $HOME (single quotes wouldn't expand it). Safe because
|
||||
_validate_local_dir already restricts the charset."""
|
||||
_validate_local_dir already rejects quotes and shell metacharacters."""
|
||||
if p == "~":
|
||||
return '"$HOME"'
|
||||
if p.startswith("~/"):
|
||||
@@ -386,6 +403,7 @@ def _cached_model_scan_script(model_dirs: list[str] | None = None, add_hf_cache:
|
||||
" for root, dirs, fns in safe_walk(base):",
|
||||
" for fn in sorted(fns):",
|
||||
" if not fn.lower().endswith('.gguf'): continue",
|
||||
" if fn.startswith('._'): continue # macOS AppleDouble sidecar, not a real GGUF",
|
||||
" fp = os.path.join(root, fn)",
|
||||
" try: size = os.path.getsize(fp)",
|
||||
" except Exception: size = 0",
|
||||
|
||||
@@ -1707,7 +1707,6 @@ async def stream_agent_loop(
|
||||
owner: Optional[str] = None,
|
||||
relevant_tools: Optional[Set[str]] = None,
|
||||
fallbacks: Optional[List[tuple]] = None,
|
||||
workspace: Optional[str] = None,
|
||||
plan_mode: bool = False,
|
||||
approved_plan: Optional[str] = None,
|
||||
tool_policy: Optional[ToolPolicy] = None,
|
||||
@@ -1935,27 +1934,6 @@ async def stream_agent_loop(
|
||||
owner=owner,
|
||||
suppress_local_context=guide_only,
|
||||
)
|
||||
if workspace and not guide_only:
|
||||
# PREPEND (not append) so it dominates the large base prompt — appended
|
||||
# at the end, small models ignored it and asked the user for code. The
|
||||
# folder IS the project; the agent must explore it, not ask.
|
||||
_ws_note = (
|
||||
f"## ACTIVE WORKSPACE — READ FIRST\n"
|
||||
f"The user is working in this folder: {workspace}\n"
|
||||
f"It IS the project. bash/python run with cwd set here and "
|
||||
f"read_file/write_file are confined to it (paths outside are rejected).\n"
|
||||
f"When the user says \"the code\" / \"this project\" / \"the workspace\" "
|
||||
f"or asks to review/find/edit something WITHOUT a path, they mean THIS "
|
||||
f"folder. Do NOT ask the user for code or a path, and do NOT read a file "
|
||||
f"literally named \"workspace\". ALWAYS start by exploring it yourself: "
|
||||
f"run `bash` → `git ls-files` (or `ls -R`) to see the files, then "
|
||||
f"read_file the relevant ones by path RELATIVE to the workspace."
|
||||
)
|
||||
if messages and messages[0].get("role") == "system":
|
||||
messages[0]["content"] = _ws_note + "\n\n" + (messages[0].get("content") or "")
|
||||
else:
|
||||
messages.insert(0, {"role": "system", "content": _ws_note})
|
||||
logger.info("[workspace] active for this turn: %s", workspace)
|
||||
if plan_mode and not guide_only:
|
||||
# Steer the model to investigate-then-propose. Hard tool gating handles
|
||||
# every write path except shell; this directive is what keeps the
|
||||
@@ -2649,7 +2627,6 @@ async def stream_agent_loop(
|
||||
tool_policy=tool_policy,
|
||||
owner=owner,
|
||||
progress_cb=_push_progress,
|
||||
workspace=workspace,
|
||||
)
|
||||
finally:
|
||||
# Sentinel so the drainer knows to stop.
|
||||
|
||||
@@ -276,6 +276,24 @@ def _is_ollama_native_url(url: str) -> bool:
|
||||
return local_ollama_host and (path == "" or path == "/api" or path.startswith("/api/"))
|
||||
|
||||
|
||||
def _is_ollama_openai_compat_url(url: str) -> bool:
|
||||
"""Return True for local Ollama's OpenAI-compatible /v1 surface.
|
||||
|
||||
Mirrors the host detection used by ``_is_ollama_native_url`` so that the
|
||||
two helpers stay in lockstep: a localhost Ollama on a non-default port
|
||||
(custom ``OLLAMA_HOST``, reverse proxy, container port remap) is treated
|
||||
the same way here as it is on the native ``/api`` path.
|
||||
"""
|
||||
try:
|
||||
parsed = urlparse(url or "")
|
||||
except Exception:
|
||||
return False
|
||||
host = parsed.hostname or ""
|
||||
path = (parsed.path or "").rstrip("/")
|
||||
local_ollama_host = host in {"localhost", "127.0.0.1", "0.0.0.0", "::1"} or parsed.port == 11434
|
||||
return local_ollama_host and (path == "/v1" or path.startswith("/v1/"))
|
||||
|
||||
|
||||
def _ollama_api_root(url: str) -> str:
|
||||
"""Return a native Ollama API root such as https://ollama.com/api."""
|
||||
url = (url or "").strip().rstrip("/")
|
||||
@@ -1344,6 +1362,9 @@ async def llm_call_async(
|
||||
if max_tokens and max_tokens > 0:
|
||||
tok_key = "max_completion_tokens" if _uses_max_completion_tokens(model) else "max_tokens"
|
||||
payload[tok_key] = max_tokens
|
||||
# Suppress thinking for qwen3/gemma4 on Ollama /v1 — same as stream_llm.
|
||||
if _is_ollama_openai_compat_url(url) and _supports_thinking(model):
|
||||
payload["think"] = False
|
||||
|
||||
if _is_host_dead(target_url):
|
||||
raise HTTPException(503, f"Upstream {_host_key(target_url)} marked unreachable (cooldown active)")
|
||||
@@ -1461,6 +1482,11 @@ async def stream_llm(url: str, model: str, messages: List[Dict], temperature: fl
|
||||
payload[tok_key] = max_tokens
|
||||
if tools:
|
||||
payload["tools"] = tools
|
||||
# For Ollama's OpenAI-compat /v1 endpoint with thinking models (qwen3,
|
||||
# gemma4, etc.), suppress thinking so tool calls aren't swallowed inside
|
||||
# <think> blocks. Ollama /v1 accepts "think": false as a top-level param.
|
||||
if _is_ollama_openai_compat_url(url) and _supports_thinking(model):
|
||||
payload["think"] = False
|
||||
h = _provider_headers(provider, headers)
|
||||
if provider == "copilot":
|
||||
from src.copilot import apply_request_headers
|
||||
|
||||
+19
-67
@@ -67,13 +67,12 @@ def _unified_diff(old: str, new: str, path: str) -> Optional[Dict[str, Any]]:
|
||||
}
|
||||
|
||||
|
||||
async def _do_edit_file(content: str, workspace: Optional[str] = None) -> Dict[str, Any]:
|
||||
async def _do_edit_file(content: str) -> Dict[str, Any]:
|
||||
"""Exact string-replacement edit of an on-disk file.
|
||||
|
||||
content is JSON: {"path", "old_string", "new_string", "replace_all"?}.
|
||||
Fails if old_string is missing or non-unique (unless replace_all) so the
|
||||
model can't silently edit the wrong place. Returns a unified diff for the UI.
|
||||
Confined to the workspace when one is set (same policy as write_file).
|
||||
"""
|
||||
try:
|
||||
args = json.loads(content) if content.strip().startswith("{") else {}
|
||||
@@ -85,11 +84,9 @@ async def _do_edit_file(content: str, workspace: Optional[str] = None) -> Dict[s
|
||||
replace_all = bool(args.get("replace_all", False))
|
||||
if not raw_path:
|
||||
return {"error": "edit_file: path required", "exit_code": 1}
|
||||
# Confine to the workspace when set, else the same allowlist + sensitive-file
|
||||
# policy as read/write_file.
|
||||
# Allowlist + sensitive-file policy as read/write_file.
|
||||
try:
|
||||
path = (_resolve_tool_path_in_workspace(workspace, raw_path)
|
||||
if workspace else _resolve_tool_path(raw_path))
|
||||
path = _resolve_tool_path(raw_path)
|
||||
except ValueError as e:
|
||||
return {"error": f"edit_file: {e}", "exit_code": 1}
|
||||
if old == "":
|
||||
@@ -272,39 +269,6 @@ def _resolve_tool_path(raw_path: str) -> str:
|
||||
)
|
||||
|
||||
|
||||
def _resolve_tool_path_in_workspace(workspace: str, raw_path: str) -> str:
|
||||
"""Confine a model-supplied path to the active workspace.
|
||||
|
||||
Layered on top of upstream's path policy: the workspace is the allowed
|
||||
root (relative paths resolve under it; paths that escape it are rejected),
|
||||
and the sensitive-file deny list (.ssh, .gnupg, id_rsa, …) still applies
|
||||
inside it. When no workspace is set, callers use _resolve_tool_path (the
|
||||
default data/tmp allowlist) instead.
|
||||
"""
|
||||
if raw_path is None or not str(raw_path).strip():
|
||||
raise ValueError("path is required")
|
||||
base = os.path.realpath(workspace)
|
||||
expanded = os.path.expanduser(str(raw_path).strip())
|
||||
candidate = expanded if os.path.isabs(expanded) else os.path.join(base, expanded)
|
||||
resolved = os.path.realpath(candidate)
|
||||
if _is_sensitive_path(resolved):
|
||||
raise ValueError(
|
||||
f"path '{raw_path}' is inside a sensitive directory "
|
||||
f"(e.g. .ssh, .gnupg) or matches a sensitive filename"
|
||||
)
|
||||
if resolved != base:
|
||||
# normcase so containment holds on case-insensitive filesystems
|
||||
# (Windows, default macOS): it lowercases on Windows and is a no-op on
|
||||
# POSIX. commonpath raises ValueError across Windows drives (C: vs D:)
|
||||
# or mixed abs/rel — both mean "outside", so the except rejects them.
|
||||
nbase = os.path.normcase(base)
|
||||
try:
|
||||
if os.path.commonpath([os.path.normcase(resolved), nbase]) != nbase:
|
||||
raise ValueError
|
||||
except ValueError:
|
||||
raise ValueError(f"path '{raw_path}' is outside the workspace ({workspace})")
|
||||
return resolved
|
||||
|
||||
# Bash + python tools used to share a single 60s timeout. That's
|
||||
# enough for one-shot commands but starves real workloads (pip
|
||||
# install, ffmpeg conversions, etc.) — and worse, the agent saw the
|
||||
@@ -341,19 +305,13 @@ _CODENAV_MAX_HITS = 200
|
||||
_CODENAV_MAX_LINE = 400
|
||||
|
||||
|
||||
def _resolve_search_root(raw_path: str, workspace: Optional[str] = None) -> str:
|
||||
def _resolve_search_root(raw_path: str) -> str:
|
||||
"""Resolve + confine a code-nav path (grep/glob/ls).
|
||||
|
||||
With a workspace set, the workspace folder is the root and supplied paths are
|
||||
confined inside it (same policy as read_file). Without one, an empty path
|
||||
defaults to the agent's primary root (project data dir) and a supplied path
|
||||
is confined by the global allowlist + sensitive-file policy.
|
||||
An empty path defaults to the agent's primary root (project data dir) and a
|
||||
supplied path is confined by the global allowlist + sensitive-file policy.
|
||||
"""
|
||||
raw = (raw_path or "").strip()
|
||||
if workspace:
|
||||
if not raw:
|
||||
return os.path.realpath(workspace)
|
||||
return _resolve_tool_path_in_workspace(workspace, raw)
|
||||
if not raw:
|
||||
roots = _tool_path_roots()
|
||||
return roots[0] if roots else os.path.realpath(".")
|
||||
@@ -564,12 +522,11 @@ async def _call_mcp_tool(
|
||||
tool: str,
|
||||
content: str,
|
||||
progress_cb: Optional[Callable[[Dict], Awaitable[None]]] = None,
|
||||
workspace: Optional[str] = None,
|
||||
) -> Dict:
|
||||
"""Route a legacy tool call through the MCP manager, with direct fallbacks."""
|
||||
mcp = get_mcp_manager()
|
||||
if not mcp:
|
||||
return await _direct_fallback(tool, content, progress_cb=progress_cb, workspace=workspace) or {"error": f"MCP manager not available for tool '{tool}'", "exit_code": 1}
|
||||
return await _direct_fallback(tool, content, progress_cb=progress_cb) or {"error": f"MCP manager not available for tool '{tool}'", "exit_code": 1}
|
||||
|
||||
server_id, tool_name = _MCP_TOOL_MAP[tool]
|
||||
qualified = f"mcp__{server_id}__{tool_name}"
|
||||
@@ -578,7 +535,7 @@ async def _call_mcp_tool(
|
||||
|
||||
# If MCP server not connected, try direct fallback
|
||||
if isinstance(result, dict) and result.get("exit_code") == 1 and "not connected" in result.get("error", ""):
|
||||
fallback = await _direct_fallback(tool, content, progress_cb=progress_cb, workspace=workspace)
|
||||
fallback = await _direct_fallback(tool, content, progress_cb=progress_cb)
|
||||
if fallback:
|
||||
return fallback
|
||||
|
||||
@@ -636,7 +593,6 @@ async def _direct_fallback(
|
||||
tool: str,
|
||||
content: str,
|
||||
progress_cb: Optional[Callable[[Dict], Awaitable[None]]] = None,
|
||||
workspace: Optional[str] = None,
|
||||
) -> Optional[Dict]:
|
||||
"""In-process execution path for the eight tools that used to live as
|
||||
stdio MCP servers under mcp_servers/. Those servers were deleted in
|
||||
@@ -670,7 +626,7 @@ async def _direct_fallback(
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE,
|
||||
env=_subproc_env,
|
||||
cwd=workspace or _AGENT_WORKDIR,
|
||||
cwd=_AGENT_WORKDIR,
|
||||
)
|
||||
stdout, stderr, rc, timed_out = await _run_subprocess_streaming(
|
||||
proc,
|
||||
@@ -697,7 +653,7 @@ async def _direct_fallback(
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE,
|
||||
env=_subproc_env,
|
||||
cwd=workspace or _AGENT_WORKDIR,
|
||||
cwd=_AGENT_WORKDIR,
|
||||
)
|
||||
stdout, stderr, rc, timed_out = await _run_subprocess_streaming(
|
||||
proc,
|
||||
@@ -727,8 +683,7 @@ async def _direct_fallback(
|
||||
except (json.JSONDecodeError, TypeError, ValueError):
|
||||
pass
|
||||
try:
|
||||
path = (_resolve_tool_path_in_workspace(workspace, raw_path)
|
||||
if workspace else _resolve_tool_path(raw_path))
|
||||
path = _resolve_tool_path(raw_path)
|
||||
except ValueError as e:
|
||||
return {"error": f"read_file: {e}", "exit_code": 1}
|
||||
try:
|
||||
@@ -771,8 +726,7 @@ async def _direct_fallback(
|
||||
raw_path = lines[0].strip()
|
||||
body = lines[1] if len(lines) > 1 else ""
|
||||
try:
|
||||
path = (_resolve_tool_path_in_workspace(workspace, raw_path)
|
||||
if workspace else _resolve_tool_path(raw_path))
|
||||
path = _resolve_tool_path(raw_path)
|
||||
except ValueError as e:
|
||||
return {"error": f"write_file: {e}", "exit_code": 1}
|
||||
try:
|
||||
@@ -825,7 +779,7 @@ async def _direct_fallback(
|
||||
max_hits = _CODENAV_MAX_HITS
|
||||
max_hits = max(1, min(max_hits, _CODENAV_MAX_HITS))
|
||||
try:
|
||||
root = _resolve_search_root(str(args.get("path", "")), workspace)
|
||||
root = _resolve_search_root(str(args.get("path", "")))
|
||||
except ValueError as e:
|
||||
return {"error": f"grep: {e}", "exit_code": 1}
|
||||
|
||||
@@ -909,7 +863,7 @@ async def _direct_fallback(
|
||||
if not pattern:
|
||||
return {"error": "glob: pattern is required", "exit_code": 1}
|
||||
try:
|
||||
root = _resolve_search_root(str(args.get("path", "")), workspace)
|
||||
root = _resolve_search_root(str(args.get("path", "")))
|
||||
except ValueError as e:
|
||||
return {"error": f"glob: {e}", "exit_code": 1}
|
||||
|
||||
@@ -956,7 +910,7 @@ async def _direct_fallback(
|
||||
else:
|
||||
raw_path = _s.split("\n", 1)[0].strip()
|
||||
try:
|
||||
root = _resolve_search_root(raw_path, workspace)
|
||||
root = _resolve_search_root(raw_path)
|
||||
except ValueError as e:
|
||||
return {"error": f"ls: {e}", "exit_code": 1}
|
||||
|
||||
@@ -1121,7 +1075,6 @@ async def execute_tool_block(
|
||||
tool_policy: Optional[ToolPolicy] = None,
|
||||
owner: Optional[str] = None,
|
||||
progress_cb: Optional[Callable[[Dict], Awaitable[None]]] = None,
|
||||
workspace: Optional[str] = None,
|
||||
) -> Tuple[str, Dict]:
|
||||
"""Execute a single tool block. Returns (description, result_dict).
|
||||
|
||||
@@ -1296,7 +1249,7 @@ async def execute_tool_block(
|
||||
_is_bg, _bg_cmd = _split_bg_marker(content)
|
||||
if _is_bg and _bg_cmd:
|
||||
from src import bg_jobs
|
||||
rec = bg_jobs.launch(_bg_cmd, session_id=session_id, cwd=workspace or _AGENT_WORKDIR)
|
||||
rec = bg_jobs.launch(_bg_cmd, session_id=session_id, cwd=_AGENT_WORKDIR)
|
||||
short = _bg_cmd.strip().split(chr(10))[0][:80]
|
||||
desc = f"bash (background): {short}"
|
||||
result = {
|
||||
@@ -1318,13 +1271,12 @@ async def execute_tool_block(
|
||||
if tool in _MCP_TOOL_MAP:
|
||||
first_line = content.split(chr(10))[0][:80]
|
||||
desc = f"{tool}: {first_line}"
|
||||
result = await _call_mcp_tool(tool, content, progress_cb=progress_cb, workspace=workspace)
|
||||
result = await _call_mcp_tool(tool, content, progress_cb=progress_cb)
|
||||
elif tool in ("grep", "glob", "ls"):
|
||||
# Code-navigation tools — no MCP server; run the direct implementation.
|
||||
# Confined to the workspace when one is set (same policy as read_file).
|
||||
first_line = content.split(chr(10))[0][:80]
|
||||
desc = f"{tool}: {first_line}"
|
||||
result = await _direct_fallback(tool, content, progress_cb=progress_cb, workspace=workspace) \
|
||||
result = await _direct_fallback(tool, content, progress_cb=progress_cb) \
|
||||
or {"error": f"{tool}: execution failed", "exit_code": 1}
|
||||
elif tool == "create_document":
|
||||
title = content.split("\n")[0].strip()[:60]
|
||||
@@ -1429,7 +1381,7 @@ async def execute_tool_block(
|
||||
desc = "edit_image"
|
||||
result = await do_edit_image(content, owner=owner)
|
||||
elif tool == "edit_file":
|
||||
result = await _do_edit_file(content, workspace=workspace)
|
||||
result = await _do_edit_file(content)
|
||||
desc = result.get("output") or result.get("error") or "edit_file"
|
||||
elif tool == "trigger_research":
|
||||
desc = "trigger_research"
|
||||
|
||||
@@ -0,0 +1,112 @@
|
||||
"""Regression test for routes/backup_routes.py import_data skills dedup.
|
||||
|
||||
BUG: the skills import block deduplicates against EVERY tenant's skills
|
||||
(skills_manager.load_all()) instead of the importing user's own skills.
|
||||
So importing your own backup silently drops any skill whose title (or id)
|
||||
collides with ANOTHER user's skill — the same cross-tenant data-loss bug
|
||||
that was already fixed for memories in the block just above.
|
||||
"""
|
||||
import pytest
|
||||
|
||||
from fastapi import FastAPI, Request
|
||||
from fastapi.testclient import TestClient
|
||||
import routes.backup_routes as backup_routes
|
||||
from routes.backup_routes import setup_backup_routes
|
||||
|
||||
# require_admin / get_current_user are bound into routes.backup_routes at import
|
||||
# time (`from x import name`). We patch them on that module directly per-test
|
||||
# via monkeypatch — robust to import order and reverted at teardown. (Stubbing
|
||||
# them through sys.modules only works if backup_routes has not been imported
|
||||
# yet, which is not guaranteed in a full-suite run.)
|
||||
|
||||
|
||||
class FakeMemoryManager:
|
||||
def __init__(self):
|
||||
self.rows = []
|
||||
|
||||
def load(self, owner=None):
|
||||
return [r for r in self.rows if r.get("owner") == owner]
|
||||
|
||||
def load_all(self):
|
||||
return list(self.rows)
|
||||
|
||||
def save(self, rows):
|
||||
self.rows = list(rows)
|
||||
|
||||
|
||||
class FakePresetManager:
|
||||
def get_all(self):
|
||||
return {}
|
||||
|
||||
def save(self, d):
|
||||
pass
|
||||
|
||||
|
||||
class FakeSkillsManager:
|
||||
"""Mimics services.memory.skills: load_all() = all owners,
|
||||
load(owner) = that owner's skills only."""
|
||||
|
||||
def __init__(self, rows):
|
||||
self.rows = list(rows)
|
||||
|
||||
def load(self, owner=None):
|
||||
return [s for s in self.rows if s.get("owner") == owner]
|
||||
|
||||
def load_all(self):
|
||||
return list(self.rows)
|
||||
|
||||
def save(self, rows):
|
||||
self.rows = list(rows)
|
||||
|
||||
def add_skill(self, title=None, name=None, owner=None, **kwargs):
|
||||
# Mirrors services.memory.skills.add_skill: persists a SKILL.md row and
|
||||
# returns its identity. source="user" skips auto-dedup, so no _deduped.
|
||||
entry = {"id": f"new-{len(self.rows)}", "title": title, "name": name, "owner": owner}
|
||||
self.rows.append(entry)
|
||||
return {"name": name, "id": entry["id"]}
|
||||
|
||||
|
||||
def _make_client(skills_mgr, monkeypatch):
|
||||
# Bypass the admin gate and read the importer straight off request.state.
|
||||
monkeypatch.setattr(backup_routes, "require_admin", lambda *a, **k: None)
|
||||
monkeypatch.setattr(backup_routes, "get_current_user",
|
||||
lambda req: getattr(req.state, "user", None))
|
||||
app = FastAPI()
|
||||
|
||||
@app.middleware("http")
|
||||
async def _set_user(request: Request, call_next):
|
||||
request.state.user = "alice"
|
||||
return await call_next(request)
|
||||
|
||||
router = setup_backup_routes(FakeMemoryManager(), FakePresetManager(), skills_mgr)
|
||||
app.include_router(router)
|
||||
return TestClient(app)
|
||||
|
||||
|
||||
def test_import_skill_not_dropped_by_other_users_title_collision(monkeypatch):
|
||||
# Bob already owns a skill titled "Deploy". Alice (the importer) has none.
|
||||
skills_mgr = FakeSkillsManager([
|
||||
{"id": "bob-1", "title": "Deploy", "name": "Deploy", "owner": "bob"},
|
||||
])
|
||||
client = _make_client(skills_mgr, monkeypatch)
|
||||
|
||||
# Alice imports HER OWN backup containing a skill also titled "Deploy".
|
||||
payload = {
|
||||
"skills": [
|
||||
{"id": "alice-1", "title": "Deploy", "name": "Deploy"},
|
||||
],
|
||||
}
|
||||
resp = client.post("/api/import", json=payload)
|
||||
assert resp.status_code == 200, resp.text
|
||||
|
||||
# Alice's skill must have been imported and assigned to her.
|
||||
alice_skills = skills_mgr.load(owner="alice")
|
||||
titles = {s["title"] for s in alice_skills}
|
||||
assert "Deploy" in titles, (
|
||||
"Alice's own 'Deploy' skill was silently dropped because Bob owns a "
|
||||
"skill with the same title (cross-tenant dedup bug)."
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(pytest.main([__file__, "-v"]))
|
||||
@@ -22,10 +22,12 @@ from routes.cookbook_helpers import (
|
||||
_user_shell_path_bootstrap,
|
||||
_venv_safe_local_pip_install_cmd,
|
||||
_validate_gpus,
|
||||
_validate_local_dir,
|
||||
_validate_repo_id,
|
||||
_validate_serve_cmd,
|
||||
_validate_serve_model_id,
|
||||
_validate_ssh_port,
|
||||
_shell_path,
|
||||
run_ssh_command_async,
|
||||
)
|
||||
|
||||
@@ -110,6 +112,89 @@ def test_validate_ssh_port_rejects_shell_payload():
|
||||
assert _validate_ssh_port("2222") == "2222"
|
||||
|
||||
|
||||
def test_validate_local_dir_accepts_external_drive_paths_with_spaces():
|
||||
path = "/Volumes/T7 2TB/AI Models/llamacpp"
|
||||
|
||||
assert _validate_local_dir(path) == path
|
||||
assert _validate_local_dir(f'"{path}"') == path
|
||||
assert _shell_path(f"{path}/Qwen3-8B") == '"/Volumes/T7 2TB/AI Models/llamacpp/Qwen3-8B"'
|
||||
|
||||
|
||||
def test_validate_local_dir_accepts_windows_drive_paths_with_spaces():
|
||||
backslash_path = r"D:\AI Models\llamacpp"
|
||||
slash_path = "D:/AI Models/llamacpp"
|
||||
|
||||
assert _validate_local_dir(backslash_path) == backslash_path
|
||||
assert _validate_local_dir(f"'{backslash_path}'") == backslash_path
|
||||
assert _validate_local_dir(slash_path) == slash_path
|
||||
assert _shell_path(backslash_path + r"\Qwen3-8B") == '"D:\\AI Models\\llamacpp\\Qwen3-8B"'
|
||||
|
||||
|
||||
def test_validate_local_dir_still_rejects_shell_metacharacters():
|
||||
for path in [
|
||||
"/Volumes/T7 2TB/AI Models; touch /tmp/pwned",
|
||||
"/Volumes/T7 2TB/AI Models/$(touch pwned)",
|
||||
"/Volumes/T7 2TB/AI Models/`touch pwned`",
|
||||
"/Volumes/T7 2TB/AI Models/model\nnext",
|
||||
]:
|
||||
with pytest.raises(HTTPException):
|
||||
_validate_local_dir(path)
|
||||
|
||||
|
||||
def test_validate_local_dir_rejects_windows_shell_metacharacters():
|
||||
for path in [
|
||||
r"D:\AI Models\llamacpp; touch C:\pwned",
|
||||
r"D:\AI Models\llamacpp\$(touch pwned)",
|
||||
r"D:\AI Models\llamacpp\`touch pwned`",
|
||||
"D:\\AI Models\\llamacpp\nnext",
|
||||
]:
|
||||
with pytest.raises(HTTPException):
|
||||
_validate_local_dir(path)
|
||||
|
||||
|
||||
def test_validate_local_dir_accepts_non_ascii_unicode_paths():
|
||||
# Folder names are routinely non-ASCII on localized systems; the validator
|
||||
# must accept them the same way it accepts spaces (see issue: spaces AND
|
||||
# non-ASCII chars were both rejected by the old ASCII-only allowlist).
|
||||
for path in [
|
||||
"/Volumes/Модели/llamacpp", # Cyrillic (POSIX / external drive)
|
||||
"/home/josé/models", # accented Latin
|
||||
"/Volumes/モデル/llm", # CJK
|
||||
r"D:\AI Models\Модели", # Cyrillic (Windows drive path)
|
||||
]:
|
||||
assert _validate_local_dir(path) == path
|
||||
|
||||
|
||||
def test_validate_local_dir_rejects_metacharacters_in_unicode_paths():
|
||||
# Widening the allowlist to Unicode must not reopen the injection surface:
|
||||
# shell metacharacters stay rejected even alongside non-ASCII segments.
|
||||
for path in [
|
||||
"/Volumes/Модели; touch /tmp/pwned",
|
||||
"/Volumes/Модели/$(touch pwned)",
|
||||
"/Volumes/Модели/`touch pwned`",
|
||||
"/Volumes/Модели/a|b",
|
||||
"/Volumes/Модели\nnext",
|
||||
r"D:\Модели\llamacpp & calc.exe",
|
||||
]:
|
||||
with pytest.raises(HTTPException):
|
||||
_validate_local_dir(path)
|
||||
|
||||
|
||||
def test_validate_local_dir_rejects_leading_dash_segments():
|
||||
# A path segment starting with '-' could be parsed as a CLI option by hf/etc.
|
||||
# (option injection) even when quoted, since quoting doesn't stop a value from
|
||||
# being read as a flag. The validator must reject it on every platform.
|
||||
for path in [
|
||||
"/models/-rf",
|
||||
"/models/-rf/llamacpp",
|
||||
"/-oStrictHostKeyChecking=no",
|
||||
r"D:\models\-rf",
|
||||
"D:/models/-rf",
|
||||
]:
|
||||
with pytest.raises(HTTPException):
|
||||
_validate_local_dir(path)
|
||||
|
||||
|
||||
def test_validate_gpus_accepts_indexes_only():
|
||||
assert _validate_gpus("0,1,2") == "0,1,2"
|
||||
with pytest.raises(HTTPException):
|
||||
|
||||
@@ -0,0 +1,165 @@
|
||||
"""Tests for Ollama /v1 thinking-suppression helpers.
|
||||
|
||||
Covers:
|
||||
- _is_ollama_openai_compat_url: URL classification (local host + /v1 path)
|
||||
- think: false is injected into the payload for Ollama /v1 thinking models
|
||||
- think: false is NOT injected for non-thinking models or non-Ollama /v1 endpoints
|
||||
"""
|
||||
import asyncio
|
||||
import json
|
||||
|
||||
from src import llm_core
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Fake HTTP client — captures the outgoing payload without network I/O
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class _FakeResp:
|
||||
status_code = 200
|
||||
|
||||
async def aiter_lines(self):
|
||||
# Yield a minimal done event so stream_llm exits cleanly
|
||||
yield json.dumps({"choices": [{"delta": {"content": "ok"}, "finish_reason": "stop"}]})
|
||||
yield "data: [DONE]"
|
||||
|
||||
async def aread(self):
|
||||
return b""
|
||||
|
||||
|
||||
class _FakeStreamCtx:
|
||||
def __init__(self, captured):
|
||||
self._captured = captured
|
||||
|
||||
async def __aenter__(self):
|
||||
return _FakeResp()
|
||||
|
||||
async def __aexit__(self, *a):
|
||||
return False
|
||||
|
||||
|
||||
class _FakeClient:
|
||||
"""Minimal stand-in for httpx.AsyncClient that captures request payload."""
|
||||
|
||||
def __init__(self):
|
||||
self.captured_payload = {}
|
||||
|
||||
def stream(self, method, url, **kw):
|
||||
self.captured_payload = kw.get("json") or {}
|
||||
return _FakeStreamCtx(self.captured_payload)
|
||||
|
||||
|
||||
def _capture_payload(monkeypatch, url, model):
|
||||
"""Run stream_llm, intercept the HTTP payload, and return it."""
|
||||
client = _FakeClient()
|
||||
monkeypatch.setattr(llm_core, "_get_http_client", lambda: client)
|
||||
monkeypatch.setattr(llm_core, "_is_host_dead", lambda u: False)
|
||||
monkeypatch.setattr(llm_core, "note_model_activity", lambda *a, **k: None)
|
||||
monkeypatch.setattr(llm_core, "_clear_host_dead", lambda *a, **k: None)
|
||||
monkeypatch.setattr(llm_core, "get_context_length", lambda u, m: 32768)
|
||||
|
||||
async def run():
|
||||
return [c async for c in llm_core.stream_llm(
|
||||
url, model, [{"role": "user", "content": "hi"}],
|
||||
)]
|
||||
|
||||
asyncio.run(run())
|
||||
return client.captured_payload
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _is_ollama_openai_compat_url — pure function, no I/O
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestIsOllamaOpenAICompatUrl:
|
||||
"""Unit tests for the URL classifier that gates think-suppression."""
|
||||
|
||||
# Positive cases — should be True
|
||||
def test_default_port_v1_root(self):
|
||||
assert llm_core._is_ollama_openai_compat_url("http://127.0.0.1:11434/v1")
|
||||
|
||||
def test_default_port_chat_completions(self):
|
||||
assert llm_core._is_ollama_openai_compat_url("http://127.0.0.1:11434/v1/chat/completions")
|
||||
|
||||
def test_localhost_default_port(self):
|
||||
assert llm_core._is_ollama_openai_compat_url("http://localhost:11434/v1")
|
||||
|
||||
def test_localhost_default_port_with_path(self):
|
||||
assert llm_core._is_ollama_openai_compat_url("http://localhost:11434/v1/chat/completions")
|
||||
|
||||
def test_loopback_ipv6(self):
|
||||
# IPv6 addresses in URLs require square brackets per RFC 3986
|
||||
assert llm_core._is_ollama_openai_compat_url("http://[::1]:11434/v1")
|
||||
|
||||
def test_any_local_non_default_port(self):
|
||||
"""Localhost on a non-default port (custom OLLAMA_HOST) must also match."""
|
||||
assert llm_core._is_ollama_openai_compat_url("http://127.0.0.1:11435/v1")
|
||||
|
||||
def test_localhost_non_default_port(self):
|
||||
assert llm_core._is_ollama_openai_compat_url("http://localhost:8080/v1/chat/completions")
|
||||
|
||||
def test_zero_dot_zero_host(self):
|
||||
assert llm_core._is_ollama_openai_compat_url("http://0.0.0.0:11434/v1")
|
||||
|
||||
# Negative cases — should be False
|
||||
def test_openai_api_v1(self):
|
||||
"""Real OpenAI endpoint must never match, even though path is /v1."""
|
||||
assert not llm_core._is_ollama_openai_compat_url("https://api.openai.com/v1")
|
||||
|
||||
def test_openai_chat_completions(self):
|
||||
assert not llm_core._is_ollama_openai_compat_url("https://api.openai.com/v1/chat/completions")
|
||||
|
||||
def test_ollama_native_api_path(self):
|
||||
"""The native /api path is a different surface and must not match /v1."""
|
||||
assert not llm_core._is_ollama_openai_compat_url("http://localhost:11434/api")
|
||||
|
||||
def test_ollama_native_api_chat(self):
|
||||
assert not llm_core._is_ollama_openai_compat_url("http://localhost:11434/api/chat")
|
||||
|
||||
def test_remote_openrouter(self):
|
||||
assert not llm_core._is_ollama_openai_compat_url("https://openrouter.ai/api/v1")
|
||||
|
||||
def test_empty_string(self):
|
||||
assert not llm_core._is_ollama_openai_compat_url("")
|
||||
|
||||
def test_none_like_empty(self):
|
||||
assert not llm_core._is_ollama_openai_compat_url(None) # type: ignore[arg-type]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Payload injection — think: false only when both conditions hold
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestThinkSuppression:
|
||||
"""Assert think:false is present/absent in the outgoing HTTP payload."""
|
||||
|
||||
def test_think_false_for_ollama_v1_thinking_model(self, monkeypatch):
|
||||
"""think:false must be set for qwen3 on Ollama /v1."""
|
||||
payload = _capture_payload(
|
||||
monkeypatch, "http://127.0.0.1:11434/v1/chat/completions", "qwen3:14b"
|
||||
)
|
||||
assert payload.get("think") is False
|
||||
|
||||
def test_no_think_for_ollama_v1_non_thinking_model(self, monkeypatch):
|
||||
"""think must NOT be set for a plain (non-thinking) model on Ollama /v1."""
|
||||
payload = _capture_payload(
|
||||
monkeypatch, "http://127.0.0.1:11434/v1/chat/completions", "llama3.2:3b"
|
||||
)
|
||||
assert "think" not in payload
|
||||
|
||||
def test_no_think_for_openai_endpoint_with_thinking_model_name(self, monkeypatch):
|
||||
"""think must NOT leak to a real OpenAI endpoint even if the model name
|
||||
matches a thinking pattern — the URL guard is what matters."""
|
||||
payload = _capture_payload(
|
||||
monkeypatch, "https://api.openai.com/v1/chat/completions", "qwen3:14b"
|
||||
)
|
||||
assert "think" not in payload
|
||||
|
||||
def test_think_false_for_non_default_port_thinking_model(self, monkeypatch):
|
||||
"""Custom-port localhost Ollama (e.g. OLLAMA_HOST=0.0.0.0:11435) must
|
||||
also receive think:false — this is the regression guarded by the
|
||||
host-set check added in this fix."""
|
||||
payload = _capture_payload(
|
||||
monkeypatch, "http://127.0.0.1:11435/v1/chat/completions", "qwen3:14b"
|
||||
)
|
||||
assert payload.get("think") is False
|
||||
@@ -238,36 +238,6 @@ def test_guide_only_blocks_later_round_document_streaming(monkeypatch):
|
||||
assert not any(event.get("type") == "doc_stream_delta" for event in events)
|
||||
|
||||
|
||||
def test_guide_only_directive_dominates_workspace_prompt(monkeypatch):
|
||||
_patch_loop_basics(monkeypatch)
|
||||
system_prompts = []
|
||||
|
||||
async def _fake_stream(_candidates, messages, **kwargs):
|
||||
system_prompts.append(messages[0]["content"])
|
||||
yield _delta_chunk("ok")
|
||||
yield "data: [DONE]\n\n"
|
||||
|
||||
monkeypatch.setattr(al, "stream_llm_with_fallback", _fake_stream, raising=False)
|
||||
policy = build_effective_tool_policy(last_user_message="Do not use tools.")
|
||||
|
||||
_collect(
|
||||
al.stream_agent_loop(
|
||||
"http://local.test/v1",
|
||||
"local-model",
|
||||
[{"role": "user", "content": "Do not use tools."}],
|
||||
max_rounds=1,
|
||||
relevant_tools={"bash"},
|
||||
tool_policy=policy,
|
||||
workspace="/tmp/project",
|
||||
)
|
||||
)
|
||||
|
||||
assert system_prompts
|
||||
assert system_prompts[0].startswith("## GUIDE-ONLY MODE")
|
||||
assert "ACTIVE WORKSPACE" not in system_prompts[0]
|
||||
assert "ALWAYS start by exploring" not in system_prompts[0]
|
||||
|
||||
|
||||
def test_guide_only_skips_intent_without_action_nudge(monkeypatch):
|
||||
_patch_loop_basics(monkeypatch)
|
||||
|
||||
|
||||
@@ -1,107 +0,0 @@
|
||||
"""Workspace confinement: file tools are hard-bounded to the workspace folder
|
||||
(layered on upstream's sensitive-path policy); bash runs with cwd there."""
|
||||
import os
|
||||
import tempfile
|
||||
|
||||
import pytest
|
||||
|
||||
from src.tool_execution import _resolve_tool_path_in_workspace, _direct_fallback
|
||||
|
||||
|
||||
def test_workspace_resolver_confines():
|
||||
ws = tempfile.mkdtemp()
|
||||
open(os.path.join(ws, "a.txt"), "w").write("x")
|
||||
real = os.path.realpath(os.path.join(ws, "a.txt"))
|
||||
# relative path resolves under the workspace
|
||||
assert _resolve_tool_path_in_workspace(ws, "a.txt") == real
|
||||
# absolute path inside the workspace is allowed
|
||||
assert _resolve_tool_path_in_workspace(ws, os.path.join(ws, "a.txt")) == real
|
||||
# absolute path outside is rejected (sibling temp dir, portable across OSes)
|
||||
outside = tempfile.mkdtemp()
|
||||
with pytest.raises(ValueError):
|
||||
_resolve_tool_path_in_workspace(ws, os.path.join(outside, "x.txt"))
|
||||
# parent-escape is rejected
|
||||
with pytest.raises(ValueError):
|
||||
_resolve_tool_path_in_workspace(ws, os.path.join("..", "..", "escape.txt"))
|
||||
|
||||
|
||||
def test_workspace_resolver_blocks_sensitive():
|
||||
"""Upstream's sensitive-file deny list still applies inside the workspace."""
|
||||
ws = tempfile.mkdtemp()
|
||||
os.makedirs(os.path.join(ws, ".ssh"), exist_ok=True)
|
||||
with pytest.raises(ValueError):
|
||||
_resolve_tool_path_in_workspace(ws, ".ssh/authorized_keys")
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_read_write_confined_in_workspace():
|
||||
ws = tempfile.mkdtemp()
|
||||
# Write inside the workspace (relative path) succeeds.
|
||||
res = await _direct_fallback("write_file", "note.txt\nhello", workspace=ws)
|
||||
assert res["exit_code"] == 0
|
||||
assert os.path.isfile(os.path.join(ws, "note.txt"))
|
||||
# Read it back.
|
||||
res = await _direct_fallback("read_file", "note.txt", workspace=ws)
|
||||
assert res["exit_code"] == 0 and res["output"] == "hello"
|
||||
# Reading outside the workspace is rejected (sibling temp dir, portable).
|
||||
outside = tempfile.mkdtemp()
|
||||
outside_file = os.path.join(outside, "secret.txt")
|
||||
open(outside_file, "w").write("nope")
|
||||
res = await _direct_fallback("read_file", outside_file, workspace=ws)
|
||||
assert res["exit_code"] == 1 and "outside the workspace" in res["error"]
|
||||
# Writing outside is rejected (file must not be created).
|
||||
escape = os.path.join(outside, "_ws_escape.txt")
|
||||
res = await _direct_fallback("write_file", f"{escape}\nx", workspace=ws)
|
||||
assert res["exit_code"] == 1 and "outside the workspace" in res["error"]
|
||||
assert not os.path.exists(escape)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_subprocess_runs_with_workspace_cwd():
|
||||
"""bash/python subprocesses run with cwd set to the workspace. Use the
|
||||
python tool for an OS-agnostic cwd probe (Windows cmd has no `pwd`)."""
|
||||
ws = tempfile.mkdtemp()
|
||||
res = await _direct_fallback("python", "import os; print(os.getcwd())", workspace=ws)
|
||||
assert res["exit_code"] == 0
|
||||
assert os.path.realpath(res["output"].strip()) == os.path.realpath(ws)
|
||||
|
||||
|
||||
# --- Tools that landed after this PR, now wired into the workspace -----------
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_edit_file_confined_in_workspace():
|
||||
import json
|
||||
from src.tool_execution import _do_edit_file
|
||||
ws = tempfile.mkdtemp()
|
||||
open(os.path.join(ws, "f.txt"), "w").write("foo bar")
|
||||
# Edit inside the workspace succeeds.
|
||||
res = await _do_edit_file(json.dumps(
|
||||
{"path": "f.txt", "old_string": "foo", "new_string": "baz"}), workspace=ws)
|
||||
assert res["exit_code"] == 0
|
||||
assert open(os.path.join(ws, "f.txt")).read() == "baz bar"
|
||||
# Editing outside the workspace is rejected (sibling temp dir, portable).
|
||||
outside = tempfile.mkdtemp()
|
||||
outside_file = os.path.join(outside, "f.txt")
|
||||
open(outside_file, "w").write("a")
|
||||
res = await _do_edit_file(json.dumps(
|
||||
{"path": outside_file, "old_string": "a", "new_string": "b"}), workspace=ws)
|
||||
assert res["exit_code"] == 1 and "outside the workspace" in res["error"]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_grep_and_ls_confined_in_workspace():
|
||||
import json
|
||||
ws = tempfile.mkdtemp()
|
||||
open(os.path.join(ws, "doc.txt"), "w").write("hello workspace\n")
|
||||
# grep with no path searches the workspace root and finds the match.
|
||||
res = await _direct_fallback("grep", json.dumps({"pattern": "hello"}), workspace=ws)
|
||||
assert res["exit_code"] == 0 and "doc.txt" in res["output"]
|
||||
# grep pointed outside the workspace is rejected (sibling temp dir, portable).
|
||||
outside = tempfile.mkdtemp()
|
||||
res = await _direct_fallback("grep", json.dumps({"pattern": "x", "path": outside}), workspace=ws)
|
||||
assert res["exit_code"] == 1 and "outside the workspace" in res["error"]
|
||||
# ls of the workspace lists its files; ls outside is rejected.
|
||||
res = await _direct_fallback("ls", "", workspace=ws)
|
||||
assert res["exit_code"] == 0 and "doc.txt" in res["output"]
|
||||
res = await _direct_fallback("ls", outside, workspace=ws)
|
||||
assert res["exit_code"] == 1 and "outside the workspace" in res["error"]
|
||||
Reference in New Issue
Block a user