mirror of
https://github.com/pewdiepie-archdaemon/odysseus.git
synced 2026-06-16 09:45:24 -04:00
fix(agent): enforce guide-only tool policy (#3088)
This commit is contained in:
@@ -0,0 +1,209 @@
|
||||
"""Per-turn tool policy composition for agent execution."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
from dataclasses import dataclass, field
|
||||
from types import MappingProxyType
|
||||
from typing import Iterable, Mapping, Optional, Set, Tuple
|
||||
|
||||
|
||||
GUIDE_ONLY_DIRECTIVE = (
|
||||
"## GUIDE-ONLY MODE - TOOL POLICY\n"
|
||||
"The latest user turn explicitly forbids tool use. Do not call tools, do not "
|
||||
"run shell commands, and do not inspect local files or the environment. "
|
||||
"Respond in normal text by guiding the user or asking them to paste the "
|
||||
"output they will produce locally."
|
||||
)
|
||||
|
||||
|
||||
_COMMON_TOOL_NAMES = {
|
||||
"api_call",
|
||||
"app_api",
|
||||
"archive_email",
|
||||
"ask_teacher",
|
||||
"ask_user",
|
||||
"bash",
|
||||
"bulk_email",
|
||||
"builtin_browser",
|
||||
"cancel_download",
|
||||
"chat_with_model",
|
||||
"create_document",
|
||||
"create_session",
|
||||
"delete_email",
|
||||
"download_model",
|
||||
"edit_document",
|
||||
"edit_file",
|
||||
"edit_image",
|
||||
"generate_image",
|
||||
"glob",
|
||||
"grep",
|
||||
"list_cached_models",
|
||||
"list_cookbook_servers",
|
||||
"list_downloads",
|
||||
"list_emails",
|
||||
"list_models",
|
||||
"list_serve_presets",
|
||||
"list_served_models",
|
||||
"list_sessions",
|
||||
"ls",
|
||||
"manage_calendar",
|
||||
"manage_contact",
|
||||
"manage_documents",
|
||||
"manage_endpoints",
|
||||
"manage_mcp",
|
||||
"manage_memory",
|
||||
"manage_notes",
|
||||
"manage_research",
|
||||
"manage_session",
|
||||
"manage_settings",
|
||||
"manage_skills",
|
||||
"manage_tasks",
|
||||
"manage_tokens",
|
||||
"manage_webhooks",
|
||||
"mark_email_read",
|
||||
"pipeline",
|
||||
"python",
|
||||
"read_email",
|
||||
"read_file",
|
||||
"reply_to_email",
|
||||
"resolve_contact",
|
||||
"search_chats",
|
||||
"search_hf_models",
|
||||
"send_email",
|
||||
"send_to_session",
|
||||
"serve_model",
|
||||
"serve_preset",
|
||||
"stop_served_model",
|
||||
"suggest_document",
|
||||
"trigger_research",
|
||||
"ui_control",
|
||||
"update_document",
|
||||
"update_plan",
|
||||
"vault_get",
|
||||
"vault_search",
|
||||
"vault_unlock",
|
||||
"web_fetch",
|
||||
"web_search",
|
||||
"write_file",
|
||||
}
|
||||
|
||||
|
||||
_GUIDE_ONLY_PATTERNS: Tuple[Tuple[re.Pattern[str], str], ...] = tuple(
|
||||
(re.compile(pattern, re.IGNORECASE), reason)
|
||||
for pattern, reason in (
|
||||
(r"\bguide[-\s]?only mode\b", "guide-only mode requested"),
|
||||
(r"\bno[-\s]?tools? mode\b", "no-tools mode requested"),
|
||||
(r"\bdo not use (?:any )?tools?\b", "user forbade tool use"),
|
||||
(r"\bdon'?t use (?:any )?tools?\b", "user forbade tool use"),
|
||||
(r"\bnot allowed to use (?:any )?tools?\b", "user forbade tool use"),
|
||||
(r"\bnot allowed to:?.{0,120}\buse (?:any )?tools?\b", "user forbade tool use"),
|
||||
(r"\bask (?:me )?(?:for confirmation )?before using tools?\b", "user requested confirmation before tools"),
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class ToolPolicy:
|
||||
"""Effective tool behavior for one agent turn."""
|
||||
|
||||
disabled_tools: frozenset[str] = frozenset()
|
||||
hidden_tools: frozenset[str] = frozenset()
|
||||
reasons: Mapping[str, str] = field(default_factory=dict)
|
||||
mode: str = "normal"
|
||||
block_all_tool_calls: bool = False
|
||||
disable_mcp: bool = False
|
||||
|
||||
def all_disabled_names(self) -> Set[str]:
|
||||
return set(self.disabled_tools) | set(self.hidden_tools)
|
||||
|
||||
def blocks(self, tool_name: Optional[str]) -> bool:
|
||||
if not tool_name:
|
||||
return False
|
||||
return self.block_all_tool_calls or tool_name in self.disabled_tools or tool_name in self.hidden_tools
|
||||
|
||||
def reason_for(self, tool_name: Optional[str]) -> str:
|
||||
if tool_name and tool_name in self.reasons:
|
||||
return self.reasons[tool_name]
|
||||
if self.block_all_tool_calls and self.mode == "guide_only":
|
||||
return "Tool use is disabled for this guide-only turn."
|
||||
return "Tool use is disabled for this turn."
|
||||
|
||||
|
||||
def detect_guide_only_turn(message: object) -> Optional[str]:
|
||||
"""Return a reason when the latest user turn strongly requests no tools."""
|
||||
|
||||
if not isinstance(message, str) or not message.strip():
|
||||
return None
|
||||
text = re.sub(r"\s+", " ", message.strip())
|
||||
for pattern, reason in _GUIDE_ONLY_PATTERNS:
|
||||
if pattern.search(text):
|
||||
return reason
|
||||
return None
|
||||
|
||||
|
||||
def known_tool_names() -> Set[str]:
|
||||
"""Best-effort set of native tool names for prompt hiding and denylisting."""
|
||||
|
||||
names = set(_COMMON_TOOL_NAMES)
|
||||
try:
|
||||
from src.tool_schemas import FUNCTION_TOOL_SCHEMAS
|
||||
|
||||
for schema in FUNCTION_TOOL_SCHEMAS:
|
||||
name = (schema.get("function") or {}).get("name") or schema.get("name")
|
||||
if name:
|
||||
names.add(name)
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
from src.agent_loop import TOOL_SECTIONS
|
||||
|
||||
names.update(TOOL_SECTIONS.keys())
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
from src.tool_security import PLAN_MODE_READONLY_TOOLS, _PLAN_MODE_KNOWN_MUTATORS
|
||||
|
||||
names.update(PLAN_MODE_READONLY_TOOLS)
|
||||
names.update(_PLAN_MODE_KNOWN_MUTATORS)
|
||||
except Exception:
|
||||
pass
|
||||
return names
|
||||
|
||||
|
||||
def build_effective_tool_policy(
|
||||
*,
|
||||
disabled_tools: Optional[Iterable[str]] = None,
|
||||
last_user_message: object = "",
|
||||
) -> ToolPolicy:
|
||||
"""Compose the effective policy for one agent turn.
|
||||
|
||||
Existing callers still provide the already-composed disabled-tool denylist.
|
||||
This function adds higher-level turn policy on top so enforcement is not
|
||||
delegated to prompt compliance.
|
||||
"""
|
||||
|
||||
disabled = {str(t) for t in (disabled_tools or []) if t}
|
||||
hidden: Set[str] = set()
|
||||
reasons = {tool: "Tool is disabled for this request." for tool in disabled}
|
||||
|
||||
guide_reason = detect_guide_only_turn(last_user_message)
|
||||
if guide_reason:
|
||||
all_tools = known_tool_names()
|
||||
disabled.update(all_tools)
|
||||
hidden.update(all_tools)
|
||||
reasons.update({tool: f"{guide_reason}." for tool in all_tools})
|
||||
return ToolPolicy(
|
||||
disabled_tools=frozenset(disabled),
|
||||
hidden_tools=frozenset(hidden),
|
||||
reasons=MappingProxyType(dict(reasons)),
|
||||
mode="guide_only",
|
||||
block_all_tool_calls=True,
|
||||
disable_mcp=True,
|
||||
)
|
||||
|
||||
return ToolPolicy(
|
||||
disabled_tools=frozenset(disabled),
|
||||
hidden_tools=frozenset(hidden),
|
||||
reasons=MappingProxyType(dict(reasons)),
|
||||
)
|
||||
Reference in New Issue
Block a user