""" tool_implementations.py Extracted tool implementation functions (do_* and helpers) from agent_tools.py. These handle the actual execution logic for each tool type. """ import asyncio import json import logging import os import re from typing import Any, Dict, List, Optional from src.constants import MAX_READ_CHARS, DEEP_RESEARCH_DIR, VAULT_FILE from src.tool_utils import get_mcp_manager from core.constants import internal_api_base logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Argument parsing # --------------------------------------------------------------------------- def _parse_tool_args(content): """Parse a tool-call argument blob. Accepts either a JSON string or an already-decoded dict. Unwraps the common `{"body": {...}}` envelope that smaller models emit when they read tool descriptions like "Body is JSON: {...}" literally — they pass `body` as a field name rather than treating it as a noun. Returns a dict on success, raises ValueError on bad JSON. """ if isinstance(content, str): try: args = json.loads(content) if content.strip() else {} except (json.JSONDecodeError, TypeError) as e: raise ValueError(str(e)) elif isinstance(content, dict): args = content else: args = {} # Unwrap {"body": {...}} envelope — but only if `body` is the sole key # and points at a dict. We don't want to clobber a legitimate `body` # field on tools where it's a real arg (e.g. send_email body text). if ( isinstance(args, dict) and len(args) == 1 and "body" in args and isinstance(args["body"], dict) and "action" in args["body"] # extra safety: only unwrap if the inner dict looks like a tool call ): args = args["body"] return args # --------------------------------------------------------------------------- # Active document state # --------------------------------------------------------------------------- _active_document_id: Optional[str] = None _active_model: Optional[str] = None def set_active_document(doc_id: Optional[str]): """Set the active document ID for document tool execution.""" global _active_document_id _active_document_id = doc_id def set_active_model(model: Optional[str]): """Set the current model name for version summaries.""" global _active_model _active_model = model def get_active_document(): return _active_document_id def clear_active_document(doc_id: Optional[str] = None) -> bool: """Clear the in-memory active-document pointer. With ``doc_id`` given, only clears when it matches the current pointer, so a different active document is left untouched. Returns True if it was cleared. Called when a document is detached from its session or deleted (its tab is closed): without this, the stale pointer makes the last-resort doc-injection path re-surface a closed document in a later, unrelated chat — even one whose session no longer matches — because an unlinked doc has session_id NULL (#1160). """ global _active_document_id if doc_id is None or _active_document_id == doc_id: _active_document_id = None return True return False def _owned_document_query(query, Document, owner: Optional[str]): if owner is None: # A bare Python `False` is not a valid SQL expression — SQLAlchemy 1.4 # deprecates it and 2.0 raises ArgumentError. Use the SQL `false()` # literal to return zero rows for an unscoped (owner-less) query. from sqlalchemy import false return query.filter(false()) return query.filter(Document.owner == owner) def _get_owned_document(db, Document, doc_id: str, owner: Optional[str], active_only: bool = False): q = db.query(Document).filter(Document.id == doc_id) if active_only: q = q.filter(Document.is_active == True) q = _owned_document_query(q, Document, owner) return q.first() def _most_recent_owned_document(db, Document, owner: Optional[str], active_only: bool = False): q = db.query(Document) if active_only: q = q.filter(Document.is_active == True) q = _owned_document_query(q, Document, owner) return q.order_by(Document.updated_at.desc()).first() # --------------------------------------------------------------------------- # Document tools — create/update/edit/suggest living documents # --------------------------------------------------------------------------- def _sniff_doc_language(text: str) -> str: """Best-effort detect a document's language from its content when the model didn't specify one. Defaults to 'markdown' (prose). Recognizes the common markup/code types the editor supports so e.g. an SVG isn't saved as markdown.""" import json as _json, re as _re2 s = (text or "").strip() if not s: return "markdown" head = s[:600] hl = head.lower() if _looks_like_email_document(s): return "email" # Markup (unambiguous) if " bool: import re as _re title_l = (title or "").strip().lower() if title_l in {"new email", "new mail", "new message"}: return True s = (text or "").lstrip() if "\n---\n" in s and _re.search(r"(?im)^To:\s*", s) and _re.search(r"(?im)^Subject:\s*", s): return True return bool(_re.search(r"(?im)^To:\s*", s) and _re.search(r"(?im)^Subject:\s*", s)) def _coerce_email_document_content(existing: str, incoming: str) -> str: """Keep email docs in the To/Subject/---/body shape even if a model writes only the body or dumps header labels without the separator.""" import re as _re old = existing or "" new = (incoming or "").strip() if "\n---\n" in new: return new header = old.split("\n---\n", 1)[0] if "\n---\n" in old else "To: \nSubject: " if _looks_like_email_document(new): lines = new.splitlines() last_header_idx = -1 header_re = _re.compile(r"^(To|Cc|Bcc|Subject|In-Reply-To|References|X-Source-UID|X-Source-Folder|X-Attachments):", _re.I) for i, line in enumerate(lines): if header_re.match(line.strip()): last_header_idx = i body_lines = lines[last_header_idx + 1:] if last_header_idx >= 0 else lines while body_lines and not body_lines[0].strip(): body_lines.pop(0) body = "\n".join(body_lines).strip() else: body = new return header.rstrip() + "\n---\n" + body async def do_create_document(content_block: str, session_id: Optional[str] = None, owner: Optional[str] = None) -> Dict: """Create a new document. Supports two formats: 1) Line-based: line 1 = title, line 2 (optional) = language, rest = content 2) XML-like tags: ......... Some models mix them — strip any XML-style tags and fall back to line parsing.""" import uuid, re as _re from src.database import SessionLocal, Document, DocumentVersion, Session as DbSession raw = content_block or "" # Known languages the editor understands (match the