mirror of
https://github.com/pewdiepie-archdaemon/odysseus.git
synced 2026-06-18 18:55:28 -04:00
1f00fff837
Gives the agent first-class code navigation instead of shelling out via bash (token-heavy, unreliable on weaker models, unstructured). Mirrors the Grep/Glob/Read primitives that Claude Code / opencode expose. - grep: regex search over file contents across a tree. Uses ripgrep when available (with explicit excludes so junk dirs are skipped even without a .gitignore); falls back to a pure-Python walk+regex when rg is absent. Returns file:line:match, capped. - glob: find files by glob pattern (recursive), newest first. - ls: list a directory (folders first, then files with sizes). - read_file: optional offset/limit for line-range reads of large files (plain-path calls stay back-compatible). All confined by the same path policy as read_file (_resolve_tool_path: data/tmp allowlist + sensitive-file deny). Junk dirs (.git, node_modules, venv, __pycache__, dist/build, …) skipped. Output capped (200 hits, 400 chars/line). Admin-gated like the other filesystem tools. Wiring: schemas + native arg->content serializer (src/tool_schemas.py), tool tags (src/agent_tools.py), always-available + descriptions (src/tool_index.py), admin gate (src/tool_security.py), dispatch + impls (src/tool_execution.py). Tests: tests/test_code_nav_tools.py — match/skip-junk/ignore-case/glob-filter, allowlist rejection, glob/ls, read-range, and the no-ripgrep Python fallback.
88 lines
2.4 KiB
Python
88 lines
2.4 KiB
Python
"""Server-side tool safety policy."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
from typing import Optional, Set
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# Tools regular/public users must not execute directly. These either expose
|
|
# server/runtime access, sensitive user data, external messaging, persistent
|
|
# state changes, or generic loopback/integration surfaces.
|
|
NON_ADMIN_BLOCKED_TOOLS = {
|
|
"bash",
|
|
"python",
|
|
"read_file",
|
|
"write_file",
|
|
"edit_file",
|
|
"grep",
|
|
"glob",
|
|
"ls",
|
|
"search_chats",
|
|
"manage_memory",
|
|
"manage_skills",
|
|
"manage_tasks",
|
|
"manage_endpoints",
|
|
"manage_mcp",
|
|
"manage_webhooks",
|
|
"manage_tokens",
|
|
"manage_documents",
|
|
"manage_settings",
|
|
"api_call",
|
|
"app_api",
|
|
"send_email",
|
|
"reply_to_email",
|
|
"list_emails",
|
|
"read_email",
|
|
"resolve_contact",
|
|
"manage_contact",
|
|
"manage_calendar",
|
|
"vault_search",
|
|
"vault_get",
|
|
"vault_unlock",
|
|
"download_model",
|
|
"serve_model",
|
|
"serve_preset",
|
|
"stop_served_model",
|
|
"cancel_download",
|
|
"adopt_served_model",
|
|
}
|
|
|
|
|
|
def is_public_blocked_tool(tool_name: Optional[str]) -> bool:
|
|
"""Return True when a non-admin/public user must not execute this tool.
|
|
|
|
This is a security gate, so it fails CLOSED: a malformed non-string tool
|
|
name can't be matched against the blocklist or the ``mcp__`` namespace, so
|
|
it is treated as blocked rather than silently allowed through. ``None`` /
|
|
empty string means there is no tool to gate.
|
|
"""
|
|
if tool_name is None or tool_name == "":
|
|
return False
|
|
if not isinstance(tool_name, str):
|
|
return True
|
|
return tool_name in NON_ADMIN_BLOCKED_TOOLS or tool_name.startswith("mcp__")
|
|
|
|
|
|
def owner_is_admin_or_single_user(owner: Optional[str]) -> bool:
|
|
"""Return True for admins, or when auth is not configured yet."""
|
|
try:
|
|
from core.auth import AuthManager
|
|
|
|
auth = AuthManager()
|
|
if not auth.is_configured:
|
|
return True
|
|
return bool(owner and auth.is_admin(owner))
|
|
except Exception as exc:
|
|
logger.warning("Unable to evaluate owner admin status: %s", exc)
|
|
return False
|
|
|
|
|
|
def blocked_tools_for_owner(owner: Optional[str]) -> Set[str]:
|
|
"""Tools to hide/disable for this owner under public-user policy."""
|
|
if owner_is_admin_or_single_user(owner):
|
|
return set()
|
|
return set(NON_ADMIN_BLOCKED_TOOLS)
|