Files
odysseus/src/tool_execution.py
T
Kenny Van de Maele 620fdd0859 feat(agent): confine agent file/shell tools to a selectable workspace (#3665)
* feat(agent): workspace confinement via context-local binding + get_workspace tool

Bind the per-turn workspace once in execute_tool_block; the shared path
resolvers (_resolve_tool_path / _resolve_search_root) and the subprocess cwd
helper (agent_cwd) read it, so file tools + bash/python are confined centrally
and a new tool that uses the shared helpers cannot accidentally bypass it.

Adds the admin-gated /api/workspace/browse picker, a workspace pill + directory
modal (reusing existing modal/button CSS), the /workspace slash command, and a
get_workspace tool (replaces a system-prompt block). Confinement is OS-agnostic
(realpath/normcase/commonpath) and docker-safe (container paths, no host
assumptions). Reopens #2023.

* ux(workspace): clarify workspace is not a sandbox

Picker modal note + pill tooltip + get_workspace tool/output wording now state
plainly: read_file/write_file/edit_file/grep/glob/ls are confined to the folder,
but bash/python only start there (cwd) and are not sandboxed. Modal note reuses
the existing .muted class.

* fix(agent): treat an active workspace as file-work intent

A vague low-signal message (e.g. "look at the local project") matches no
domain keywords, so tool retrieval is skipped and only always-available tools
are offered — leaving the agent with no file access even though a workspace is
set. When a workspace is active, include the file/code tools (incl.
get_workspace) on low-signal turns so the agent can act on the folder.

Also requires the tool index (ChromaDB) to be reachable for normal retrieval;
that is an environment dependency, not part of this change.

* ux(workspace): hide pill + overflow entry in chat mode

Workspace only scopes the agent's file/shell tools, so the pill and the
overflow 'Workspace' entry are agent-only now — hidden in chat mode like the
bash toggle. Mode read from the DOM in syncWorkspaceIndicator; applyMode() is
called from the agent/chat setMode handler.

* prompt(tools): steer bash/python to defer to the dedicated file tools

bash/python schema descriptions (what native-tool-calling models read) were
bare and gave no steer, so models would do file ops via the shell (e.g. writing
SVG/HTML, which then dumps raw markup into the tool preview). Tell bash/python
in the schema + tool-index + prompt section to prefer read_file/write_file/
edit_file/grep/glob/ls and only be used for what those do not cover.

* prompt(tools): keep bash/python deferral generic (no hardcoded tool names)

Reference 'a dedicated tool' rather than listing read_file/write_file/grep/etc.
by name, so the guidance does not go stale if those tools are renamed.

* style(workspace): drop em-dashes from added code comments/strings

* ux(workspace): terser non-sandbox note in picker (no tool-name list)

* ux(workspace): mirror terse non-sandbox wording in pill tooltip

* chore: untrack local venv symlink (run-only, not part of the feature)

* prompt(workspace): keep get_workspace text generic (no hardcoded tool names)

* fix(agent): low-signal + workspace surfaces only read-only file tools

Intersect the files tool group with PLAN_MODE_READONLY_TOOLS so a vague message
in a workspace exposes read_file/grep/glob/ls/get_workspace for exploration, but
not write_file/edit_file/bash/python -- those wait for a request that actually
calls for them (RAG retrieval still adds them on a real ask).

* feat(workspace): cap browse listing at 500 dirs with a truncated hint

Mirror the filesystem_tools._CODENAV_MAX_HITS pattern with a module-local
_MAX_BROWSE_DIRS so a directory with thousands of children does not dump every
row into the picker; the response carries a truncated flag and the modal tells
the user to type a path to jump in.

* chore: untrack local venv symlink (run-only artifact)

* fix(workspace): vet the workspace root against the sensitive-path deny list at bind time

The in-workspace resolver deny-lists sensitive paths inside the workspace,
but the empty-path search root is the workspace itself, so a workspace of
~/.ssh could be listed via ls with no path. vet_workspace() (public, in
tool_execution next to the resolvers) rejects non-directories and sensitive
roots before the path is ever bound; chat_routes uses it instead of its
inline isdir check.

* fix(workspace): reject filesystem roots and stop showing rejected workspaces as active

Review findings from #3665:

P2: vet_workspace accepted / (and would accept drive/UNC roots), which makes
every absolute path 'inside' the workspace and collapses confinement into
host-wide file access. A root is its own dirname, so reject when
dirname(resolved) == resolved; the browse response now carries a selectable
flag and the picker disables 'Use this folder' on unselectable dirs.

P3: /workspace set stored any string client-side and the chat route silently
dropped rejected values, so the pill could claim a confinement that was not
in effect. New admin-gated /api/workspace/vet validates manual paths before
they persist (canonical path returned), and when a posted workspace is
rejected at send time the stream emits workspace_rejected so the client
clears the stored value and toasts instead of continuing silently.

* fix(workspace): check caller privilege before vetting the posted workspace

Review finding: /api/chat_stream called vet_workspace() on the posted value
for every caller and emitted workspace_rejected on failure, so a non-admin
who can chat but cannot use file/shell tools could distinguish existing
directories from missing/file/sensitive/root paths by whether the event
appeared. The resolution now lives in _resolve_request_workspace, which
drops the submitted value uniformly for non-admin callers, with no vetting
and no event, before the path ever touches the filesystem. Admin and
single-user behavior is unchanged. Test pins that valid and invalid paths
are indistinguishable for a non-admin and that vet_workspace is never
invoked for them.
2026-06-11 18:17:54 +02:00

950 lines
38 KiB
Python

"""
tool_execution.py
Tool dispatcher and result formatter for the agent loop.
Routes tool blocks to MCP servers or native implementations.
Extracted from agent_tools.py.
"""
import asyncio
import collections
import contextvars
import json
import logging
import os
import pathlib
import re
import sys
import time
from typing import Any, Awaitable, Callable, Dict, Optional, Tuple
from src.tool_security import is_public_blocked_tool, owner_is_admin_or_single_user
from src.tool_policy import ToolPolicy
from src.constants import MAX_OUTPUT_CHARS, MAX_READ_CHARS, MAX_DIFF_LINES, DATA_DIR
from src.tool_utils import _truncate, get_mcp_manager
# Persistent working directory for agent subprocesses.
# Resolves to <repo_root>/data, which is the bind-mounted volume in Docker
# (/app/data) and the local data directory for manual installs.
# Using this as cwd and HOME prevents the agent from silently creating files
# in ephemeral container layers that are lost on the next rebuild.
_AGENT_WORKDIR = DATA_DIR
# ---------------------------------------------------------------------------
# Path confinement for read_file / write_file
# ---------------------------------------------------------------------------
# read_file + write_file are admin-only tools, but the path the agent
# supplies is model-controlled. Prompt-injection in an admin's chat can
# weaponise "read /etc/shadow" or "write ~/.ssh/authorized_keys" without
# the admin noticing.
#
# Policy:
# 1. Sensitive-subpath deny list — checked FIRST. Blocks .ssh,
# .gnupg, shell rc files, token/env files even if the root above
# them is on the allowlist.
# 2. Allowlist — only the directories the agent legitimately needs
# (project data/, system tmp). $HOME is NOT on the default list.
# 3. Opt-in extra roots — admin can add broader roots via the
# "tool_path_extra_roots" setting (list of path strings).
# ---------------------------------------------------------------------------
_SENSITIVE_BASENAMES: set[str] = {
".ssh", ".gnupg", ".gitconfig",
".bashrc", ".bash_profile", ".bash_logout",
".zshrc", ".zprofile", ".zshenv",
".profile", ".tcshrc", ".cshrc",
".env", ".netrc",
}
_SENSITIVE_FILE_PATTERNS: tuple[str, ...] = (
"authorized_keys", "id_rsa", "id_ed25519", "id_ecdsa",
"known_hosts",
)
def _is_sensitive_path(resolved: str) -> bool:
"""Return True if *resolved* falls under a sensitive directory or
matches a sensitive filename — regardless of what root it sits under.
"""
parts = resolved.split(os.sep)
filenames: set[str] = {parts[-1]} if parts else set()
# Check if any path component is a sensitive directory.
for part in parts:
if part in _SENSITIVE_BASENAMES:
return True
# Check filename against known sensitive files.
for pat in _SENSITIVE_FILE_PATTERNS:
if pat in filenames:
return True
return False
def _tool_path_roots() -> list[str]:
"""Return the list of directory roots that read_file / write_file
may touch. Default: project data/ + system temp dirs. Extra roots
are loaded from the ``tool_path_extra_roots`` setting.
"""
roots: list[str] = []
# Project data directory — the agent's primary workspace.
from src.constants import DATA_DIR
roots.append(DATA_DIR)
# /tmp (and its macOS realpath /private/tmp).
roots.append("/tmp")
try:
private_tmp = os.path.realpath("/tmp")
if private_tmp != "/tmp":
roots.append(private_tmp)
except OSError:
pass
# $TMPDIR — per-user temp root on macOS (e.g. /var/folders/.../T/).
tmpdir = os.environ.get("TMPDIR")
if tmpdir:
roots.append(tmpdir)
# Opt-in extra roots from settings.
try:
from src.settings import get_setting
extra = get_setting("tool_path_extra_roots")
if isinstance(extra, list):
roots.extend(str(r) for r in extra if r)
except Exception:
pass
# Deduplicate; resolve symlinks so containment is unambiguous.
seen: set[str] = set()
out: list[str] = []
for r in roots:
try:
real = os.path.realpath(r)
except OSError:
continue
if real in seen:
continue
seen.add(real)
out.append(real)
return out
def _resolve_tool_path(raw_path: str) -> str:
"""Resolve and confine a model-supplied path.
Order of checks:
1. Non-empty path.
2. Sensitive-subpath deny list (blocks .ssh, .gnupg, etc.
even when the root is on the allowlist).
3. Allowlist containment (must land under one of the roots).
Returns the realpath on success. Raises ValueError on rejection.
Symlinks are resolved before comparison.
When a workspace is active for this turn, paths are confined to it instead
of the default allowlist (see _resolve_tool_path_in_workspace).
"""
ws = get_active_workspace()
if ws:
return _resolve_tool_path_in_workspace(ws, raw_path)
if raw_path is None or not str(raw_path).strip():
raise ValueError("path is required")
expanded = os.path.expanduser(str(raw_path).strip())
resolved = os.path.realpath(expanded)
if _is_sensitive_path(resolved):
raise ValueError(
f"path '{raw_path}' is inside a sensitive directory "
f"(e.g. .ssh, .gnupg) or matches a sensitive filename"
)
for root in _tool_path_roots():
if resolved == root:
return resolved
try:
common = os.path.commonpath([resolved, root])
except ValueError:
continue
if common == root:
return resolved
raise ValueError(
f"path '{raw_path}' is outside the allowed roots"
)
def _resolve_tool_path_in_workspace(workspace: str, raw_path: str) -> str:
"""Confine a model-supplied path to the active workspace.
Layered on top of upstream's path policy: the workspace is the allowed
root (relative paths resolve under it; paths that escape it are rejected),
and the sensitive-file deny list (.ssh, .gnupg, id_rsa, …) still applies
inside it. When no workspace is set, callers use _resolve_tool_path (the
default data/tmp allowlist) instead.
"""
if raw_path is None or not str(raw_path).strip():
raise ValueError("path is required")
base = os.path.realpath(workspace)
expanded = os.path.expanduser(str(raw_path).strip())
candidate = expanded if os.path.isabs(expanded) else os.path.join(base, expanded)
resolved = os.path.realpath(candidate)
if _is_sensitive_path(resolved):
raise ValueError(
f"path '{raw_path}' is inside a sensitive directory "
f"(e.g. .ssh, .gnupg) or matches a sensitive filename"
)
if resolved != base:
# normcase so containment holds on case-insensitive filesystems
# (Windows, default macOS): it lowercases on Windows and is a no-op on
# POSIX. commonpath raises ValueError across Windows drives (C: vs D:)
# or mixed abs/rel — both mean "outside", so the except rejects them.
nbase = os.path.normcase(base)
try:
if os.path.commonpath([os.path.normcase(resolved), nbase]) != nbase:
raise ValueError
except ValueError:
raise ValueError(f"path '{raw_path}' is outside the workspace ({workspace})")
return resolved
# ---------------------------------------------------------------------------
# Active workspace (per-turn, context-local)
# ---------------------------------------------------------------------------
# Set ONCE in execute_tool_block from the request's `workspace`. The path
# resolvers (_resolve_tool_path / _resolve_search_root) and the subprocess cwd
# helper (agent_cwd) read it from here, so confinement is enforced in a single
# place: any tool that resolves paths through these helpers is confined
# automatically and cannot accidentally bypass the workspace. contextvars are
# task-local, so concurrent turns don't leak into each other.
_active_workspace: contextvars.ContextVar = contextvars.ContextVar(
"agent_active_workspace", default=None
)
def get_active_workspace() -> Optional[str]:
"""The folder the agent is confined to this turn, or None."""
return _active_workspace.get()
def vet_workspace(raw: str) -> Optional[str]:
"""Validate a requested workspace path at bind time.
Returns the canonical path, or None when it is unusable: not a real
directory, or itself a sensitive path (.ssh, .gnupg, ...). The in-workspace
resolver deny-lists sensitive paths *inside* the workspace, but the
empty-path search root is the workspace itself, so the root has to be
vetted before it is ever bound.
"""
raw = (raw or "").strip()
if not raw:
return None
resolved = os.path.realpath(os.path.expanduser(raw))
if not os.path.isdir(resolved) or _is_sensitive_path(resolved):
return None
# Reject filesystem roots: binding / (or a Windows drive/UNC root) as the
# workspace would make every absolute path "inside" it, collapsing the
# confinement into host-wide file access. A root is its own dirname, which
# also covers C:\ and \\server\share without platform-specific lists.
if os.path.dirname(resolved) == resolved:
return None
return resolved
def agent_cwd() -> str:
"""Working directory for agent subprocesses (bash/python/background jobs):
the active workspace when set, else the persistent data dir."""
return get_active_workspace() or _AGENT_WORKDIR
def get_mcp_manager():
from src import agent_tools
return agent_tools.get_mcp_manager()
def _resolve_search_root(raw_path: str) -> str:
"""Resolve + confine a code-nav path (grep/glob/ls).
With a workspace active, the workspace folder is the root and a supplied
path is confined inside it. Otherwise an empty path defaults to the agent's
primary root (project data dir) and a supplied path is confined by the
global allowlist + sensitive-file policy.
"""
raw = (raw_path or "").strip()
ws = get_active_workspace()
if ws:
return os.path.realpath(ws) if not raw else _resolve_tool_path_in_workspace(ws, raw)
if not raw:
roots = _tool_path_roots()
return roots[0] if roots else os.path.realpath(".")
return _resolve_tool_path(raw)
logger = logging.getLogger(__name__)
_ADMIN_TOOLS = {
"app_api",
"manage_endpoints",
"manage_mcp",
"manage_webhooks",
"manage_tokens",
"manage_settings",
"download_model",
"serve_model",
"serve_preset",
"stop_served_model",
"cancel_download",
}
def _owner_is_admin(owner: Optional[str]) -> bool:
"""Mirror route-level admin behavior for agent tool execution."""
return owner_is_admin_or_single_user(owner)
# ---------------------------------------------------------------------------
# MCP-backed tool helpers
# ---------------------------------------------------------------------------
# Map legacy tool names -> (MCP server_id, MCP tool_name)
_MCP_TOOL_MAP = {
"bash": ("bash", "bash"),
"python": ("python", "python"),
"read_file": ("filesystem", "read_file"),
"write_file": ("filesystem", "write_file"),
"web_search": ("web_search", "web_search"),
"web_fetch": ("web_fetch", "web_fetch"),
"generate_image": ("image_gen", "generate_image"),
}
def _parse_generate_image(content: str) -> Dict:
lines = content.strip().split("\n")
args = {"prompt": lines[0].strip() if lines else ""}
for i, key in enumerate(["model", "size", "quality"], 1):
if len(lines) > i and lines[i].strip():
args[key] = lines[i].strip()
return args
def _parse_manage_memory(content: str) -> Dict:
lines = content.strip().split("\n")
action = lines[0].strip().lower() if lines else ""
args = {"action": action}
if action == "add":
args["text"] = lines[1].strip() if len(lines) > 1 else ""
if len(lines) > 2 and lines[2].strip():
args["category"] = lines[2].strip().lower()
elif action == "edit":
args["memory_id"] = lines[1].strip() if len(lines) > 1 else ""
args["text"] = lines[2].strip() if len(lines) > 2 else ""
elif action == "delete":
args["memory_id"] = lines[1].strip() if len(lines) > 1 else ""
elif action == "search":
args["text"] = lines[1].strip() if len(lines) > 1 else ""
elif action == "list":
if len(lines) > 1 and lines[1].strip():
args["category"] = lines[1].strip().lower()
return args
def _parse_write_file(content: str) -> Dict:
lines = content.split("\n", 1)
return {"path": lines[0].strip(), "content": lines[1] if len(lines) > 1 else ""}
_MCP_ARG_PARSERS: Dict[str, Callable[[str], Dict[str, str]]] = {
"bash": lambda c: {"command": c},
"python": lambda c: {"code": c},
"web_search": lambda c: {"query": c.split("\n")[0].strip()},
"web_fetch": lambda c: {"url": c.split("\n")[0].strip()},
"read_file": lambda c: {"path": c.split("\n")[0].strip()},
"write_file": _parse_write_file,
"generate_image": _parse_generate_image,
"manage_memory": _parse_manage_memory,
}
def _build_mcp_args(tool: str, content: str) -> Dict:
"""Convert fenced-block text content to structured MCP arguments."""
parser = _MCP_ARG_PARSERS.get(tool)
return parser(content) if parser else {}
async def _call_mcp_tool(
tool: str,
content: str,
progress_cb: Optional[Callable[[Dict], Awaitable[None]]] = None,
) -> Dict:
"""Route a legacy tool call through the MCP manager, with direct fallbacks."""
mcp = get_mcp_manager()
if not mcp:
return await _direct_fallback(tool, content, progress_cb=progress_cb) or {"error": f"MCP manager not available for tool '{tool}'", "exit_code": 1}
server_id, tool_name = _MCP_TOOL_MAP[tool]
qualified = f"mcp__{server_id}__{tool_name}"
args = _build_mcp_args(tool, content)
result = await mcp.call_tool(qualified, args)
# If MCP server not connected, try direct fallback
if isinstance(result, dict) and result.get("exit_code") == 1 and "not connected" in result.get("error", ""):
fallback = await _direct_fallback(tool, content, progress_cb=progress_cb)
if fallback:
return fallback
# generate_image runs as a text-only MCP tool, so the saved image URL never
# reaches the agent loop's structured forwarding (which renders the image via
# buildImageBubble on result["image_url"]). Lift it out of the tool's stdout so
# the image renders deterministically — no dependence on the model echoing the
# URL into its prose (which it mangles/hallucinates).
if tool == "generate_image":
_promote_image_fields(result)
return result
def _promote_image_fields(result: Dict) -> None:
"""Lift the image URL (+ prompt/model/size) from a successful generate_image MCP
text result into structured fields the agent loop already forwards to
buildImageBubble. Only acts on a dict result with exit_code 0; matches the
generated-image URL by pattern (absolute or relative) so it's robust to the
result's wording."""
if not isinstance(result, dict) or result.get("exit_code") != 0:
return
out = result.get("stdout") or ""
m = re.search(r'(?:https?://[^\s)\]]+)?/api/generated-image/[A-Za-z0-9._-]+', out)
if not m:
return
result["image_url"] = m.group(0).strip()
for field, pat in (
("image_prompt", r'^Generated image for:\s*(.+)$'),
("image_model", r'^model:\s*(.+)$'),
("image_size", r'^size:\s*(.+)$'),
):
fm = re.search(pat, out, re.M)
if fm:
result[field] = fm.group(1).strip()
_BG_MARKERS = {"#!bg", "#bg", "# bg", "#background", "# background", "@background", "# @background"}
def _split_bg_marker(content: str):
"""If the bash content's first non-empty line is a background marker
(e.g. `#!bg`), return (True, command_without_marker); else (False, content)."""
lines = content.split("\n")
i = 0
while i < len(lines) and not lines[i].strip():
i += 1
if i < len(lines) and lines[i].strip().lower() in _BG_MARKERS:
del lines[i]
return True, "\n".join(lines).strip()
return False, content
async def _direct_fallback(
tool: str,
content: str,
progress_cb: Optional[Callable[[Dict], Awaitable[None]]] = None,
) -> Optional[Dict]:
_subproc_env = {
**os.environ,
"TERM": "xterm-256color",
"COLUMNS": "120",
"LINES": "40",
"HOME": _AGENT_WORKDIR,
}
try:
ctx = {
"progress_cb": progress_cb,
"subproc_env": _subproc_env,
}
from src.agent_tools import TOOL_HANDLERS
if tool in TOOL_HANDLERS:
return await TOOL_HANDLERS[tool](content, ctx)
except Exception as e:
return {"error": f"{tool}: {e}", "exit_code": 1}
return None
async def _document_tool_dispatch(
tool: str,
content: str,
session_id: Optional[str] = None,
owner: Optional[str] = None,
) -> Optional[Dict]:
"""Route a document tool through TOOL_HANDLERS with the right ctx shape."""
from src.agent_tools import TOOL_HANDLERS
ctx = {"session_id": session_id, "owner": owner}
if tool in TOOL_HANDLERS:
return await TOOL_HANDLERS[tool](content, ctx)
return None
# ---------------------------------------------------------------------------
# Dispatcher
# ---------------------------------------------------------------------------
async def execute_tool_block(
block: Any,
session_id: Optional[str] = None,
disabled_tools: Optional[set] = None,
owner: Optional[str] = None,
progress_cb: Optional[Callable[[Dict], Awaitable[None]]] = None,
workspace: Optional[str] = None,
tool_policy: Optional[Any] = None,
) -> Tuple[str, Dict]:
"""Execute a single tool block. Returns (description, result_dict).
Thin wrapper: bind the per-turn workspace (so the path resolvers + subprocess
cwd confine to it) for the duration of this call, then delegate. Reset on the
way out so the binding never leaks to the next tool call.
"""
token = _active_workspace.set(workspace or None)
try:
return await _execute_tool_block_impl(
block,
session_id=session_id,
disabled_tools=disabled_tools,
owner=owner,
progress_cb=progress_cb,
tool_policy=tool_policy,
)
finally:
_active_workspace.reset(token)
async def _execute_tool_block_impl(
block: Any,
session_id: Optional[str] = None,
disabled_tools: Optional[set] = None,
owner: Optional[str] = None,
progress_cb: Optional[Callable[[Dict], Awaitable[None]]] = None,
tool_policy: Optional[Any] = None,
) -> Tuple[str, Dict]:
"""Execute a single tool block. Returns (description, result_dict).
`progress_cb` is forwarded to long-running subprocess tools
(bash, python) so the agent loop can emit `tool_progress` SSE
events while the command is in flight. Ignored by other tools.
"""
from src.tool_implementations import (
do_search_chats, do_manage_tasks,
do_manage_skills, do_api_call, do_manage_endpoints,
do_manage_mcp, do_manage_webhooks, do_manage_tokens,
do_manage_settings, do_manage_notes,
do_manage_calendar,
do_download_model, do_serve_model, do_list_served_models, do_stop_served_model,
do_tail_serve_output,
do_list_downloads, do_cancel_download, do_search_hf_models, do_list_cached_models,
do_list_serve_presets, do_serve_preset, do_adopt_served_model,
do_list_cookbook_servers,
do_edit_image, do_trigger_research, do_manage_research, do_resolve_contact,
do_manage_contact,
do_vault_search, do_vault_get, do_vault_unlock,
do_app_api,
)
tool = block.tool_type
content = block.content
# Misformatted tool call detection: model put JSON inside ```python``` (or
# similar) without naming the tool. Common with MiniMax-style outputs.
# Return a helpful error so the model retries with the correct format.
if tool in ("python", "json", "xml") and content.strip().startswith("{") and content.strip().endswith("}"):
try:
parsed = json.loads(content.strip())
if isinstance(parsed, dict):
desc = f"{tool}: misformatted tool call"
result = {
"error": (
f"You wrote a JSON object inside a ```{tool}``` block, but that's not a tool call.\n"
"To call a tool, use the tool name as the fence tag, e.g.\n"
"```resolve_contact\n"
"{\"name\": \"...\"}\n"
"```\n"
"or\n"
"```send_email\n"
"{\"to\": \"...\", \"subject\": \"...\", \"body\": \"...\"}\n"
"```"
),
"exit_code": 1,
}
return desc, result
except (ValueError, TypeError):
pass
# Reject tools that the user has disabled for this request
if disabled_tools and tool in disabled_tools:
desc = f"{tool}: BLOCKED"
result = {"error": f"Tool '{tool}' is disabled by user.", "exit_code": 1}
logger.info(f"Tool blocked by user: {tool}")
return desc, result
if tool_policy and tool_policy.blocks(tool):
desc = f"{tool}: BLOCKED"
result = {
"error": f"Execution of tool '{tool}' is forbade by the active guide-only policy.",
"exit_code": 1,
}
logger.warning("Tool policy blocked tool=%s", tool)
return desc, result
if tool in _ADMIN_TOOLS and not _owner_is_admin(owner):
desc = f"{tool}: BLOCKED"
result = {"error": f"Tool '{tool}' requires an admin user.", "exit_code": 1}
logger.warning("Admin tool blocked for non-admin owner=%r tool=%s", owner, tool)
return desc, result
if is_public_blocked_tool(tool) and not _owner_is_admin(owner):
desc = f"{tool}: BLOCKED"
result = {
"error": (
f"Tool '{tool}' is restricted to admin users on this deployment. "
"Ask an admin to perform this action or grant the needed permission."
),
"exit_code": 1,
}
logger.warning("Public tool policy blocked owner=%r tool=%s", owner, tool)
return desc, result
# ask_user: the agent poses a multiple-choice question to the user to get a
# decision/clarification. This is a pure UI-control marker — no subprocess,
# no filesystem. It returns an `ask_user` payload that the agent loop turns
# into an `ask_user` SSE event and then ENDS the turn, so the chat waits for
# the user's selection (their choice arrives as the next message).
if tool == "ask_user":
question, options, multi = "", [], False
raw = (content or "").strip()
try:
parsed = json.loads(raw) if raw else {}
except (ValueError, TypeError):
parsed = {}
if isinstance(parsed, dict):
question = str(parsed.get("question", "")).strip()
multi = bool(parsed.get("multi") or parsed.get("multiSelect"))
for opt in (parsed.get("options") or []):
if isinstance(opt, dict):
label = str(opt.get("label", "")).strip()
descr = str(opt.get("description", "")).strip()
elif isinstance(opt, str):
label, descr = opt.strip(), ""
else:
continue
if label:
options.append({"label": label, "description": descr})
else:
question = raw
if not question or len(options) < 2:
return "ask_user: invalid", {
"error": (
"ask_user needs a non-empty `question` and at least 2 `options` "
"(each an object with a `label`, optional `description`)."
),
"exit_code": 1,
}
options = options[:6] # keep the choice list sane
desc = f"ask_user: {question[:80]}"
labels = ", ".join(o["label"] for o in options)
result = {
"ask_user": {"question": question, "options": options, "multi": multi},
"output": f"Asked the user: {question}\nOptions: {labels}\nAwaiting their selection.",
"exit_code": 0,
}
logger.info("Tool executed: %s (%d options, multi=%s)", desc, len(options), multi)
return desc, result
# update_plan: the agent writes back to the active plan — tick an item done
# or revise steps (e.g. when the user asks to change something). Pure UI
# marker: returns a `plan_update` payload the agent loop turns into a
# `plan_update` SSE event; the frontend replaces the stored plan and refreshes
# the docked plan window. Does NOT end the turn.
if tool == "update_plan":
import json as _json
raw = (content or "").strip()
plan = ""
try:
parsed = _json.loads(raw) if raw else {}
except (ValueError, TypeError):
parsed = {}
if isinstance(parsed, dict) and parsed.get("plan"):
plan = str(parsed.get("plan", "")).strip()
else:
# Plain-string call (raw checklist) or JSON without a usable `plan`.
plan = raw
if not plan:
return "update_plan: invalid", {
"error": "update_plan needs a non-empty `plan` (the full updated checklist as markdown).",
"exit_code": 1,
}
plan = plan[:8192]
done = plan.count("- [x]") + plan.count("- [X]")
total = done + plan.count("- [ ]")
desc = f"update_plan: {done}/{total} done" if total else "update_plan"
result = {
"plan_update": {"plan": plan},
"output": f"Plan updated ({done}/{total} steps complete)." if total else "Plan updated.",
"exit_code": 0,
}
logger.info("Tool executed: %s", desc)
return desc, result
# Background execution: a `bash` block whose first line is the `#!bg`
# marker runs DETACHED — returns a job id immediately so the chat stream
# isn't held open for a multi-minute install/ffmpeg/download. The always-on
# monitor re-invokes the agent with the full output when the job finishes.
if tool == "bash" and session_id:
_is_bg, _bg_cmd = _split_bg_marker(content)
if _is_bg and _bg_cmd:
from src import bg_jobs
rec = bg_jobs.launch(_bg_cmd, session_id=session_id, cwd=agent_cwd())
short = _bg_cmd.strip().split(chr(10))[0][:80]
desc = f"bash (background): {short}"
result = {
"output": (
f"Started background job `{rec['id']}`. It is running detached — "
f"do NOT wait for it or poll it. You will be automatically re-invoked "
f"with its full output when it finishes. Continue with other work, or "
f"end your turn now and resume when the result arrives."
),
"exit_code": 0,
"bg_job_id": rec["id"],
}
logger.info(f"Tool executed: {desc} -> bg job {rec['id']}")
return desc, result
# Route MCP-extracted tools through the MCP manager. Forward
# the progress callback so long-running subprocess tools
# (bash, python) can stream `tool_progress` events to the UI.
if tool in _MCP_TOOL_MAP:
first_line = content.split(chr(10))[0][:80]
desc = f"{tool}: {first_line}"
result = await _call_mcp_tool(tool, content, progress_cb=progress_cb)
elif tool in ("grep", "glob", "ls", "get_workspace"):
# Code-navigation tools — no MCP server; run the direct implementation.
first_line = content.split(chr(10))[0][:80]
desc = f"{tool}: {first_line}"
result = await _direct_fallback(tool, content, progress_cb=progress_cb) \
or {"error": f"{tool}: execution failed", "exit_code": 1}
elif tool in ("create_document", "update_document", "edit_document",
"suggest_document", "manage_documents"):
desc = f"{tool}: {content.split(chr(10))[0][:80]}"
result = await _document_tool_dispatch(tool, content, session_id, owner) \
or {"error": f"{tool}: execution failed", "exit_code": 1}
if tool in ("edit_document", "suggest_document") and "title" in (result or {}):
desc = f"{tool}: {result.get('title', '')}"
elif tool == "search_chats":
query = content.split("\n")[0].strip()
desc = f"search_chats: {query[:80]}"
result = await do_search_chats(query, owner=owner)
elif tool in ("chat_with_model", "create_session", "list_sessions",
"send_to_session", "pipeline",
"manage_session", "manage_memory", "list_models",
"ui_control", "ask_teacher"):
from src.ai_interaction import dispatch_ai_tool
desc, result = await dispatch_ai_tool(tool, content, session_id, owner=owner)
elif tool == "manage_tasks":
desc = "manage_tasks"
result = await do_manage_tasks(content, owner=owner)
elif tool == "manage_skills":
desc = "manage_skills"
result = await do_manage_skills(content, owner=owner)
elif tool == "api_call":
first_line = content.split("\n")[0].strip()[:60]
desc = f"api_call: {first_line}"
result = await do_api_call(content)
elif tool == "manage_endpoints":
desc = "manage_endpoints"
result = await do_manage_endpoints(content, owner=owner)
elif tool == "manage_mcp":
desc = "manage_mcp"
result = await do_manage_mcp(content, owner=owner)
elif tool == "manage_webhooks":
desc = "manage_webhooks"
result = await do_manage_webhooks(content, owner=owner)
elif tool == "manage_tokens":
desc = "manage_tokens"
result = await do_manage_tokens(content, owner=owner)
elif tool == "manage_settings":
desc = "manage_settings"
result = await do_manage_settings(content, owner=owner)
elif tool == "manage_notes":
desc = "manage_notes"
result = await do_manage_notes(content, owner=owner)
elif tool == "manage_calendar":
desc = "manage_calendar"
result = await do_manage_calendar(content, owner=owner)
elif tool == "download_model":
desc = "download_model"
result = await do_download_model(content, owner=owner)
elif tool == "serve_model":
desc = "serve_model"
result = await do_serve_model(content, owner=owner)
elif tool == "list_served_models":
desc = "list_served_models"
result = await do_list_served_models(content, owner=owner)
elif tool == "stop_served_model":
desc = "stop_served_model"
result = await do_stop_served_model(content, owner=owner)
elif tool == "tail_serve_output":
desc = "tail_serve_output"
result = await do_tail_serve_output(content, owner=owner)
elif tool == "list_downloads":
desc = "list_downloads"
result = await do_list_downloads(content, owner=owner)
elif tool == "cancel_download":
desc = "cancel_download"
result = await do_cancel_download(content, owner=owner)
elif tool == "search_hf_models":
desc = "search_hf_models"
result = await do_search_hf_models(content, owner=owner)
elif tool == "list_cached_models":
desc = "list_cached_models"
result = await do_list_cached_models(content, owner=owner)
elif tool == "app_api":
desc = "app_api"
result = await do_app_api(content, owner=owner)
elif tool == "list_serve_presets":
desc = "list_serve_presets"
result = await do_list_serve_presets(content, owner=owner)
elif tool == "serve_preset":
desc = "serve_preset"
result = await do_serve_preset(content, owner=owner)
elif tool == "adopt_served_model":
desc = "adopt_served_model"
result = await do_adopt_served_model(content, owner=owner)
elif tool == "list_cookbook_servers":
desc = "list_cookbook_servers"
result = await do_list_cookbook_servers(content, owner=owner)
elif tool == "edit_image":
desc = "edit_image"
result = await do_edit_image(content, owner=owner)
elif tool == "edit_file":
result = await _direct_fallback(tool, content) or {"error": "edit failed", "exit_code": 1}
desc = result.get("output") or result.get("error") or "edit_file"
elif tool == "trigger_research":
desc = "trigger_research"
result = await do_trigger_research(content, owner=owner)
elif tool == "manage_research":
desc = "manage_research"
result = await do_manage_research(content, owner=owner)
elif tool == "resolve_contact":
desc = "resolve_contact"
result = await do_resolve_contact(content, owner=owner)
elif tool == "manage_contact":
desc = "manage_contact"
result = await do_manage_contact(content, owner=owner)
elif tool == "vault_search":
desc = "vault_search"
result = await do_vault_search(content, owner=owner)
elif tool == "vault_get":
desc = "vault_get"
result = await do_vault_get(content, owner=owner)
elif tool == "vault_unlock":
desc = "vault_unlock"
result = await do_vault_unlock(content, owner=owner)
elif tool.startswith("mcp__"):
# MCP tool dispatch
mcp = get_mcp_manager()
if mcp:
try:
args = json.loads(content) if content.strip().startswith("{") else {}
except (json.JSONDecodeError, TypeError):
args = {}
desc = f"mcp: {tool}"
result = await mcp.call_tool(tool, args)
else:
desc = f"mcp: {tool}"
result = {"error": "MCP manager not available", "exit_code": 1}
else:
desc = f"unknown: {tool}"
result = {"error": f"Unknown tool type: {tool}", "exit_code": 1}
logger.info(f"Tool executed: {desc} -> exit_code={result.get('exit_code', 'n/a')}")
return desc, result
# ---------------------------------------------------------------------------
# Result formatting
# ---------------------------------------------------------------------------
# Keys handled by the dedicated branches below — never echo them as raw JSON.
_FORMATTER_HANDLED_KEYS = {
"stdout", "stderr", "exit_code", "content", "size",
"response", "results", "session_id", "name", "model", "session_name",
"success", "path", "action", "title", "doc_id", "version", "applied",
"error", "output",
}
def format_tool_result(description: str, result: Dict) -> str:
"""Format a tool result into text for feeding back to the LLM."""
parts = [f"### {description}"]
if "stdout" in result:
if result["stdout"]:
parts.append(f"**stdout:**\n```\n{result['stdout']}\n```")
if result["stderr"]:
parts.append(f"**stderr:**\n```\n{result['stderr']}\n```")
parts.append(f"**exit_code:** {result.get('exit_code', 'unknown')}")
elif "output" in result:
# bash / python canonical result shape: {"output": ..., "exit_code": ...}
parts.append(f"```\n{result['output']}\n```")
if result.get("exit_code") not in (0, None):
parts.append(f"**exit_code:** {result['exit_code']}")
elif "content" in result:
parts.append(f"**content ({result.get('size', '?')} chars):**\n```\n{result['content']}\n```")
elif "response" in result:
model = result.get("model", result.get("session_name", ""))
if model:
parts.append(f"**{model} responded:**\n{result['response']}")
else:
parts.append(result["response"])
elif "results" in result:
parts.append(result["results"])
elif "session_id" in result and "name" in result:
parts.append(f"Session created: **{result['name']}** (id: `{result['session_id']}`, model: {result.get('model', 'unknown')})")
elif "success" in result:
if result["success"]:
parts.append(f"File written: {result['path']} ({result['size']} bytes)")
else:
parts.append(f"Error: {result.get('error', 'unknown')}")
elif "action" in result:
action = result["action"]
if action == "create":
parts.append(f"Document created: \"{result.get('title', '')}\" (id: {result['doc_id']}, v{result['version']})")
elif action == "update":
parts.append(f"Document updated: \"{result.get('title', '')}\" (v{result['version']})")
elif action == "edit":
parts.append(f'Document edited: "{result.get("title", "")}" (v{result.get("version", "?")}, {result.get("applied", 0)} edit(s) applied)')
elif "error" in result:
parts.append(f"**Error:** {result['error']}")
# Surface any additional structured payload (events, tasks, notes, calendars,
# documents, attachments, etc.) that the dedicated branches above don't show.
# Without this, tools that return {"response": "...", "events": [...]} would
# silently drop the events list and the model would only see the summary line.
extra = {k: v for k, v in result.items() if k not in _FORMATTER_HANDLED_KEYS}
if extra:
try:
extra_json = json.dumps(extra, indent=2, default=str, ensure_ascii=False)
# Cap to avoid blowing the context window on huge payloads.
if len(extra_json) > 8000:
extra_json = extra_json[:8000] + f"\n... (truncated, {len(extra_json)} chars total)"
parts.append(f"**data:**\n```json\n{extra_json}\n```")
except (TypeError, ValueError):
pass
return "\n".join(parts)