"""Per-turn tool policy composition for agent execution.""" from __future__ import annotations import re from dataclasses import dataclass, field from types import MappingProxyType from typing import Iterable, Mapping, Optional, Set, Tuple GUIDE_ONLY_DIRECTIVE = ( "## GUIDE-ONLY MODE - TOOL POLICY\n" "The latest user turn explicitly forbids tool use. Do not call tools, do not " "run shell commands, and do not inspect local files or the environment. " "Respond in normal text by guiding the user or asking them to paste the " "output they will produce locally." ) _COMMON_TOOL_NAMES = { "api_call", "app_api", "archive_email", "ask_teacher", "ask_user", "bash", "bulk_email", "builtin_browser", "cancel_download", "chat_with_model", "create_document", "create_session", "delete_email", "download_model", "edit_document", "edit_file", "edit_image", "generate_image", "glob", "grep", "list_cached_models", "list_cookbook_servers", "list_downloads", "list_emails", "list_models", "list_serve_presets", "list_served_models", "list_sessions", "ls", "manage_calendar", "manage_contact", "manage_documents", "manage_endpoints", "manage_mcp", "manage_memory", "manage_notes", "manage_research", "manage_session", "manage_settings", "manage_skills", "manage_tasks", "manage_tokens", "manage_webhooks", "mark_email_read", "pipeline", "python", "read_email", "read_file", "reply_to_email", "resolve_contact", "search_chats", "search_hf_models", "send_email", "send_to_session", "serve_model", "serve_preset", "stop_served_model", "suggest_document", "trigger_research", "ui_control", "update_document", "update_plan", "vault_get", "vault_search", "vault_unlock", "web_fetch", "web_search", "write_file", } _GUIDE_ONLY_PATTERNS: Tuple[Tuple[re.Pattern[str], str], ...] = tuple( (re.compile(pattern, re.IGNORECASE), reason) for pattern, reason in ( (r"\bguide[-\s]?only mode\b", "guide-only mode requested"), (r"\bno[-\s]?tools? mode\b", "no-tools mode requested"), (r"\bdo not use (?:any )?tools?\b", "user forbade tool use"), (r"\bdon'?t use (?:any )?tools?\b", "user forbade tool use"), (r"\bnot allowed to use (?:any )?tools?\b", "user forbade tool use"), (r"\bnot allowed to:?.{0,120}\buse (?:any )?tools?\b", "user forbade tool use"), (r"\bask (?:me )?(?:for confirmation )?before using tools?\b", "user requested confirmation before tools"), ) ) @dataclass(frozen=True) class ToolPolicy: """Effective tool behavior for one agent turn.""" disabled_tools: frozenset[str] = frozenset() hidden_tools: frozenset[str] = frozenset() reasons: Mapping[str, str] = field(default_factory=dict) mode: str = "normal" block_all_tool_calls: bool = False disable_mcp: bool = False def all_disabled_names(self) -> Set[str]: return set(self.disabled_tools) | set(self.hidden_tools) def blocks(self, tool_name: Optional[str]) -> bool: if not tool_name: return False return self.block_all_tool_calls or tool_name in self.disabled_tools or tool_name in self.hidden_tools def reason_for(self, tool_name: Optional[str]) -> str: if tool_name and tool_name in self.reasons: return self.reasons[tool_name] if self.block_all_tool_calls and self.mode == "guide_only": return "Tool use is disabled for this guide-only turn." return "Tool use is disabled for this turn." def detect_guide_only_turn(message: object) -> Optional[str]: """Return a reason when the latest user turn strongly requests no tools.""" if not isinstance(message, str) or not message.strip(): return None text = re.sub(r"\s+", " ", message.strip()) for pattern, reason in _GUIDE_ONLY_PATTERNS: if pattern.search(text): return reason return None def known_tool_names() -> Set[str]: """Best-effort set of native tool names for prompt hiding and denylisting.""" names = set(_COMMON_TOOL_NAMES) try: from src.tool_schemas import FUNCTION_TOOL_SCHEMAS for schema in FUNCTION_TOOL_SCHEMAS: name = (schema.get("function") or {}).get("name") or schema.get("name") if name: names.add(name) except Exception: pass try: from src.agent_loop import TOOL_SECTIONS names.update(TOOL_SECTIONS.keys()) except Exception: pass try: from src.tool_security import PLAN_MODE_READONLY_TOOLS, _PLAN_MODE_KNOWN_MUTATORS names.update(PLAN_MODE_READONLY_TOOLS) names.update(_PLAN_MODE_KNOWN_MUTATORS) except Exception: pass return names def build_effective_tool_policy( *, disabled_tools: Optional[Iterable[str]] = None, last_user_message: object = "", ) -> ToolPolicy: """Compose the effective policy for one agent turn. Existing callers still provide the already-composed disabled-tool denylist. This function adds higher-level turn policy on top so enforcement is not delegated to prompt compliance. """ disabled = {str(t) for t in (disabled_tools or []) if t} hidden: Set[str] = set() reasons = {tool: "Tool is disabled for this request." for tool in disabled} guide_reason = detect_guide_only_turn(last_user_message) if guide_reason: all_tools = known_tool_names() disabled.update(all_tools) hidden.update(all_tools) reasons.update({tool: f"{guide_reason}." for tool in all_tools}) return ToolPolicy( disabled_tools=frozenset(disabled), hidden_tools=frozenset(hidden), reasons=MappingProxyType(dict(reasons)), mode="guide_only", block_all_tool_calls=True, disable_mcp=True, ) return ToolPolicy( disabled_tools=frozenset(disabled), hidden_tools=frozenset(hidden), reasons=MappingProxyType(dict(reasons)), )