fix(email): enforce MCP owner boundaries (#4335)

* fix(email): enforce MCP owner boundaries

* fix(email): fail closed for unowned MCP fallback
This commit is contained in:
RaresKeY
2026-06-16 06:31:24 +03:00
committed by GitHub
parent 2f9ae43a58
commit 260ce8ba59
10 changed files with 475 additions and 33 deletions
+89 -12
View File
@@ -23,6 +23,7 @@ import os.path
from pathlib import Path
from datetime import datetime, timedelta
import uuid
from contextvars import ContextVar
from mcp.server import Server
from mcp.server.stdio import stdio_server
@@ -55,6 +56,8 @@ def _uid_fetch_rows(data) -> list:
# flat keys when no DB row matches (legacy single-account behaviour).
_ACCOUNT_CACHE: dict = {} # key = normalized account selector -> config dict
_MCP_OWNER_ARG = "_odysseus_owner"
_CURRENT_OWNER: ContextVar[str | None] = ContextVar("email_mcp_owner", default=None)
def _clean_header_value(value) -> str:
@@ -68,6 +71,45 @@ def _db_path() -> Path:
return Path(APP_DB)
def _current_owner() -> str:
owner = _CURRENT_OWNER.get()
return str(owner or "").strip()
def _account_visible_to_owner(row: dict, owner: str) -> bool:
row_owner = str(row.get("owner") or "").strip()
if row_owner == owner:
return True
if row_owner:
return False
# Legacy ownerless accounts are only visible to a scoped caller when the
# mailbox itself matches the owner, mirroring the HTTP email route fallback.
owner_l = owner.lower()
return owner_l in {
str(row.get("imap_user") or "").strip().lower(),
str(row.get("from_address") or "").strip().lower(),
}
def _filter_accounts_for_owner(rows: list[dict]) -> list[dict]:
owner = _current_owner()
if owner:
return [r for r in rows if _account_visible_to_owner(r, owner)]
owners = {str(r.get("owner") or "").strip() for r in rows if str(r.get("owner") or "").strip()}
if len(owners) > 1:
return []
return rows
def _mcp_owner_required(rows: list[dict] | None = None) -> bool:
if _current_owner():
return False
rows = rows if rows is not None else _read_accounts_from_db()
owners = {str(r.get("owner") or "").strip() for r in rows if str(r.get("owner") or "").strip()}
return len(owners) > 1
def _load_email_writing_style() -> str:
"""Return the existing Settings > Email > Writing Style value."""
try:
@@ -121,9 +163,8 @@ def _default_document_owner() -> str | None:
return None
def _list_accounts_raw() -> list:
"""Return list of dicts from the email_accounts table. Empty list if table
missing or empty. Never raises."""
def _read_accounts_from_db() -> list:
"""Return all enabled email account rows. Empty list if missing. Never raises."""
path = _db_path()
if not path.exists():
return []
@@ -131,9 +172,10 @@ def _list_accounts_raw() -> list:
conn = sqlite3.connect(str(path))
conn.row_factory = sqlite3.Row
columns = {r[1] for r in conn.execute("PRAGMA table_info(email_accounts)").fetchall()}
owner_select = "owner" if "owner" in columns else "NULL AS owner"
smtp_security_select = "smtp_security" if "smtp_security" in columns else "'' AS smtp_security"
rows = conn.execute(f"""
SELECT id, name, is_default, enabled,
SELECT id, {owner_select}, name, is_default, enabled,
imap_host, imap_port, imap_user, imap_password, imap_starttls,
smtp_host, smtp_port, {smtp_security_select}, smtp_user, smtp_password, from_address
FROM email_accounts WHERE enabled = 1
@@ -147,11 +189,15 @@ def _list_accounts_raw() -> list:
return []
def _resolve_account(selector: str | None) -> dict | None:
def _list_accounts_raw() -> list:
"""Return owner-visible email account rows for the active MCP call."""
return _filter_accounts_for_owner(_read_accounts_from_db())
def _resolve_account_from_rows(rows: list[dict], selector: str | None) -> dict | None:
"""Given a selector (None = default, or a name/user/id string), return the
matching row or None. Matching is case-insensitive substring on name +
imap_user + from_address, plus exact id match."""
rows = _list_accounts_raw()
if not rows:
return None
if not selector:
@@ -186,6 +232,10 @@ def _resolve_account(selector: str | None) -> dict | None:
return None
def _resolve_account(selector: str | None) -> dict | None:
return _resolve_account_from_rows(_list_accounts_raw(), selector)
def _load_config(account: str | None = None) -> dict:
"""Return the full config dict for the requested account (or default).
@@ -194,7 +244,7 @@ def _load_config(account: str | None = None) -> dict:
2. env vars + settings.json flat keys (legacy)
3. hardcoded fallbacks (localhost:31143 etc.)
"""
cache_key = (account or "").strip().lower() or "__default__"
cache_key = (_current_owner(), (account or "").strip().lower() or "__default__")
if cache_key in _ACCOUNT_CACHE:
return _ACCOUNT_CACHE[cache_key]
@@ -223,8 +273,11 @@ def _load_config(account: str | None = None) -> dict:
"account_name": None,
}
rows = _list_accounts_raw()
row = _resolve_account(account)
raw_rows = _read_accounts_from_db()
rows = _filter_accounts_for_owner(raw_rows)
row = _resolve_account_from_rows(rows, account)
if _current_owner() and raw_rows and not rows:
raise ValueError("No email account is configured for the authenticated owner")
if account and rows and not row:
available = ", ".join(
f"{r.get('name') or r.get('imap_user')} <{r.get('imap_user') or r.get('from_address') or '?'}>"
@@ -953,7 +1006,7 @@ def _stash_agent_draft(*, to, subject, body, in_reply_to=None, references=None,
now,
account or None,
"agent_draft",
"",
_current_owner(),
))
conn.commit()
conn.close()
@@ -1139,7 +1192,7 @@ def _create_email_draft_document(
doc_id = str(uuid.uuid4())
ver_id = str(uuid.uuid4())
doc_title = (title or subject or "Email draft").strip() or "Email draft"
doc_owner = _default_document_owner()
doc_owner = _current_owner() or _default_document_owner()
db = SessionLocal()
try:
@@ -1925,10 +1978,22 @@ async def list_tools() -> list[Tool]:
@server.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
arguments = dict(arguments) if isinstance(arguments, dict) else {}
owner = str(arguments.pop(_MCP_OWNER_ARG, "") or "").strip()
owner_token = _CURRENT_OWNER.set(owner or None)
try:
all_db_accounts = _read_accounts_from_db()
if _mcp_owner_required(all_db_accounts):
return [TextContent(
type="text",
text="Error: email MCP requires an authenticated owner when multiple email account owners are configured.",
)]
if name == "list_email_accounts":
rows = _list_accounts_raw()
rows = _filter_accounts_for_owner(all_db_accounts)
if not rows:
if all_db_accounts and owner:
return [TextContent(type="text", text="No email accounts configured for this owner.")]
return [TextContent(type="text", text="No email accounts configured. Legacy single-account mode active.")]
lines = [f"Found {len(rows)} email account(s):\n"]
for r in rows:
@@ -2108,6 +2173,16 @@ async def call_tool(name: str, arguments: dict) -> list[TextContent]:
bcc=arguments.get("bcc"),
account=acct,
)
if "error" in result:
return [TextContent(type="text", text=f"Error: {result['error']}")]
if result.get("pending"):
return [TextContent(
type="text",
text=(
f"Draft staged for approval (pending id: {result.get('pending_id')}). "
"Nothing has been sent yet. Review and approve it in Odysseus before delivery."
),
)]
acct_note = f" (from {result['account']})" if result.get("account") else ""
return [TextContent(type="text", text=f"Sent email to {result['to']} with subject '{result['subject']}'{acct_note}.")]
@@ -2283,6 +2358,8 @@ async def call_tool(name: str, arguments: dict) -> list[TextContent]:
except Exception as e:
return [TextContent(type="text", text=f"Error: {e}")]
finally:
_CURRENT_OWNER.reset(owner_token)
# ── Main ──
+4 -1
View File
@@ -310,7 +310,10 @@ def setup_codex_routes(
@router.post("/emails/draft-document")
async def codex_email_draft_document(request: Request, body: dict[str, Any] = Body(default_factory=dict)):
owner = _scope_owner_all(request, {"email:draft", "documents:write"})
owner = _scope_owner(request, EMAIL_DRAFT_SCOPES)
docs_owner = _scope_owner_all(request, DOCS_WRITE_SCOPES)
if docs_owner != owner:
raise HTTPException(403, "API token owner mismatch")
if documents_create_endpoint is None:
raise HTTPException(503, "Documents integration is not available")
from routes.document_routes import DocumentCreate
+3 -5
View File
@@ -2171,12 +2171,10 @@ def setup_email_routes():
try:
conn = sqlite3.connect(SCHEDULED_DB)
conn.row_factory = sqlite3.Row
# The MCP server can't easily set owner, so it stores '' — fall
# back to those rows in addition to the caller's owner.
rows = conn.execute(
"""SELECT id, to_addr, subject, body, created_at, account_id
FROM scheduled_emails
WHERE status = 'agent_draft' AND (owner = ? OR owner = '')
WHERE status = 'agent_draft' AND owner = ?
ORDER BY created_at DESC""",
(owner or "",),
).fetchall()
@@ -2197,7 +2195,7 @@ def setup_email_routes():
cur = conn.execute(
"""UPDATE scheduled_emails
SET status = 'pending', send_at = ?
WHERE id = ? AND status = 'agent_draft' AND (owner = ? OR owner = '')""",
WHERE id = ? AND status = 'agent_draft' AND owner = ?""",
(datetime.utcnow().isoformat(), sid, owner or ""),
)
conn.commit()
@@ -2218,7 +2216,7 @@ def setup_email_routes():
conn = sqlite3.connect(SCHEDULED_DB)
cur = conn.execute(
"""UPDATE scheduled_emails SET status = 'cancelled'
WHERE id = ? AND status = 'agent_draft' AND (owner = ? OR owner = '')""",
WHERE id = ? AND status = 'agent_draft' AND owner = ?""",
(sid, owner or ""),
)
conn.commit()
+26 -5
View File
@@ -323,6 +323,24 @@ _MCP_TOOL_MAP = {
"web_fetch": ("web_fetch", "web_fetch"),
"generate_image": ("image_gen", "generate_image"),
}
_EMAIL_MCP_OWNER_ARG = "_odysseus_owner"
def _parse_qualified_mcp_args(tool: str, content: str) -> tuple[Dict, Optional[str]]:
raw = (content or "").strip()
if not raw:
return {}, None
try:
parsed = json.loads(raw)
except (json.JSONDecodeError, TypeError):
if tool.startswith("mcp__email__"):
return {}, "Email MCP tool arguments must be a JSON object."
return {}, None
if not isinstance(parsed, dict):
if tool.startswith("mcp__email__"):
return {}, "Email MCP tool arguments must be a JSON object."
return {}, None
return parsed, None
def _parse_generate_image(content: str) -> Dict:
@@ -858,12 +876,15 @@ async def _execute_tool_block_impl(
# MCP tool dispatch
mcp = get_mcp_manager()
if mcp:
try:
args = json.loads(content) if content.strip().startswith("{") else {}
except (json.JSONDecodeError, TypeError):
args = {}
desc = f"mcp: {tool}"
result = await mcp.call_tool(tool, args)
args, parse_error = _parse_qualified_mcp_args(tool, content)
if parse_error:
result = {"error": parse_error, "exit_code": 1}
else:
if tool.startswith("mcp__email__") and owner:
args = dict(args)
args[_EMAIL_MCP_OWNER_ARG] = owner
result = await mcp.call_tool(tool, args)
else:
desc = f"mcp: {tool}"
result = {"error": "MCP manager not available", "exit_code": 1}
+10 -7
View File
@@ -1206,23 +1206,26 @@ def function_call_to_tool_block(name: str, arguments: str) -> Optional[ToolBlock
logger.error(f"Failed to parse function call arguments for {name}: {arguments}")
return None
tool_type = _TOOL_NAME_MAP.get(name, name)
_BUILTIN_EMAIL_TOOLS = {"list_email_accounts", "send_email", "list_emails", "read_email", "reply_to_email",
"archive_email", "delete_email", "mark_email_read", "bulk_email", "download_attachment"}
# Some models emit valid JSON that isn't an object (e.g. a bare array
# ["ls -la"], string, or number) as the function arguments. Every branch
# below assumes a dict and calls args.get(...), so a non-dict would raise
# AttributeError and abort the whole agent stream. Coerce to {} instead.
# ["ls -la"], string, or number) as function arguments. Most local tools keep
# the legacy empty-object coercion for stream robustness, but email MCP tools
# must fail closed so a malformed call cannot read the default mailbox.
if not isinstance(args, dict):
if tool_type.startswith("mcp__email__") or name in _BUILTIN_EMAIL_TOOLS:
logger.warning(f"Non-object email function call arguments for {name}: {args!r}; rejecting")
return None
logger.warning(f"Non-object function call arguments for {name}: {args!r}; treating as empty")
args = {}
tool_type = _TOOL_NAME_MAP.get(name, name)
# Allow MCP tools through (namespaced as mcp__serverid__toolname)
if tool_type.startswith("mcp__"):
content = json.dumps(args) if args else "{}"
return ToolBlock(tool_type, content)
# Email tools are implemented as MCP — route them to email
_BUILTIN_EMAIL_TOOLS = {"list_email_accounts", "send_email", "list_emails", "read_email", "reply_to_email",
"archive_email", "delete_email", "mark_email_read", "bulk_email", "download_attachment"}
if name in _BUILTIN_EMAIL_TOOLS:
return ToolBlock(f"mcp__email__{name}", json.dumps(args) if args else "{}")
if tool_type not in TOOL_TAGS:
+49 -3
View File
@@ -10,14 +10,14 @@ the validators the rest of the cookbook routes already apply.
import asyncio
import pytest
from fastapi import HTTPException
from fastapi import APIRouter, HTTPException
from starlette.requests import Request
import routes.codex_routes as codex_routes
def _route_endpoint(path: str, method: str):
router = codex_routes.setup_codex_routes()
def _route_endpoint(path: str, method: str, router=None):
router = router or codex_routes.setup_codex_routes()
for route in router.routes:
if route.path == path and method in route.methods:
return route.endpoint
@@ -40,6 +40,22 @@ def _launch_request() -> Request:
return request
def _codex_request(scopes) -> Request:
request = Request(
{
"type": "http",
"method": "POST",
"path": "/api/codex/emails/draft-document",
"headers": [],
"state": {},
}
)
request.state.api_token = True
request.state.api_token_owner = "alice"
request.state.api_token_scopes = list(scopes)
return request
def test_rejects_remote_host_with_shell_metacharacters():
task = {"remoteHost": "box; rm -rf ~", "sshPort": ""}
with pytest.raises(HTTPException) as exc:
@@ -105,3 +121,33 @@ def test_adopt_rejects_ssh_option_host_before_shell(monkeypatch):
assert exc.value.status_code == 400
assert calls == []
@pytest.mark.asyncio
async def test_email_draft_document_accepts_send_scope_with_document_write():
calls = []
document_router = APIRouter()
@document_router.post("/api/document")
async def create_document(request: Request, req):
calls.append((request.state.current_user, req.title, req.language, req.content))
return {"id": "doc-1", "title": req.title}
router = codex_routes.setup_codex_routes(document_router=document_router)
endpoint = _route_endpoint("/api/codex/emails/draft-document", "POST", router=router)
result = await endpoint(
_codex_request(["email:send", "documents:write"]),
{"to": "recipient@example.com", "subject": "Subject", "body": "Body"},
)
assert result["draft_type"] == "document"
assert result["send_required_confirmation"] is True
assert calls == [
(
"alice",
"Subject",
"email",
"To: recipient@example.com\nSubject: Subject\n---\nBody\n",
)
]
+48
View File
@@ -406,6 +406,54 @@ async def test_scheduled_email_routes_are_owner_scoped(tmp_path, monkeypatch):
assert alice_rows["scheduled"] == []
@pytest.mark.asyncio
async def test_pending_agent_draft_routes_do_not_expose_ownerless_rows(tmp_path, monkeypatch):
import routes.email_helpers as email_helpers
import routes.email_routes as email_routes
db_path = tmp_path / "scheduled_emails.db"
monkeypatch.setattr(email_helpers, "SCHEDULED_DB", db_path)
monkeypatch.setattr(email_routes, "SCHEDULED_DB", db_path)
email_helpers._init_scheduled_db()
conn = sqlite3.connect(db_path)
conn.executemany(
"""
INSERT INTO scheduled_emails
(id, to_addr, subject, body, attachments, send_at, created_at, status, account_id, owner)
VALUES (?, ?, ?, ?, '[]', '9999-12-31T00:00:00', ?, 'agent_draft', ?, ?)
""",
[
("draft-ownerless", "nobody@example.com", "Ownerless", "old", "2026-01-01", "acct-a", ""),
("draft-bob", "bob@example.com", "Bob", "bob body", "2026-01-02", "acct-b", "bob"),
],
)
conn.commit()
conn.close()
router = email_routes.setup_email_routes()
list_pending = _route_endpoint(router, "/api/email/pending", "GET")
approve_pending = _route_endpoint(router, "/api/email/pending/{sid}/approve", "POST")
cancel_pending = _route_endpoint(router, "/api/email/pending/{sid}", "DELETE")
alice_rows = await list_pending(owner="alice")
bob_rows = await list_pending(owner="bob")
assert alice_rows["pending"] == []
assert [row["id"] for row in bob_rows["pending"]] == ["draft-bob"]
assert (await approve_pending("draft-ownerless", owner="alice"))["success"] is False
assert (await cancel_pending("draft-ownerless", owner="bob"))["success"] is False
conn = sqlite3.connect(db_path)
try:
rows = conn.execute(
"SELECT id, status FROM scheduled_emails ORDER BY id",
).fetchall()
finally:
conn.close()
assert rows == [("draft-bob", "agent_draft"), ("draft-ownerless", "agent_draft")]
def test_scheduled_poller_resolves_config_with_row_owner(tmp_path, monkeypatch):
import routes.email_helpers as email_helpers
import routes.email_pollers as email_pollers
@@ -53,6 +53,13 @@ def test_non_object_arguments_do_not_crash(arguments):
assert block.content == ""
@pytest.mark.parametrize("tool_name", ["list_emails", "mcp__email__list_emails"])
def test_email_mcp_non_object_arguments_are_rejected(tool_name):
block = function_call_to_tool_block(tool_name, '["INBOX"]')
assert block is None
def test_edit_document_skips_non_object_edit_items():
block = function_call_to_tool_block(
"edit_document",
@@ -6,6 +6,9 @@ double space after "Re:" on every non-ASCII subject, a spurious space in
"Name <addr>" senders, and violated RFC 2047 6.2 which requires whitespace
between two adjacent encoded-words to be dropped.
"""
import json
import sqlite3
import pytest
pytest.importorskip("mcp")
@@ -13,6 +16,49 @@ pytest.importorskip("mcp")
import mcp_servers.email_server as es
def _init_accounts_db(path):
conn = sqlite3.connect(path)
conn.execute(
"""
CREATE TABLE email_accounts (
id TEXT PRIMARY KEY,
owner TEXT,
name TEXT NOT NULL,
is_default INTEGER NOT NULL DEFAULT 0,
enabled INTEGER NOT NULL DEFAULT 1,
imap_host TEXT,
imap_port INTEGER,
imap_user TEXT,
imap_password TEXT,
imap_starttls INTEGER,
smtp_host TEXT,
smtp_port INTEGER,
smtp_security TEXT,
smtp_user TEXT,
smtp_password TEXT,
from_address TEXT,
created_at TEXT
)
"""
)
conn.executemany(
"""
INSERT INTO email_accounts
(id, owner, name, is_default, enabled, imap_host, imap_port, imap_user,
imap_password, imap_starttls, smtp_host, smtp_port, smtp_security,
smtp_user, smtp_password, from_address, created_at)
VALUES (?, ?, ?, ?, 1, 'imap.example.com', 993, ?, '', 1,
'smtp.example.com', 465, 'ssl', ?, '', ?, ?)
""",
[
("acct-alice", "alice", "Alice Mail", 1, "alice@example.com", "alice@example.com", "alice@example.com", "2026-01-01"),
("acct-bob", "bob", "Bob Mail", 1, "bob@example.com", "bob@example.com", "bob@example.com", "2026-01-02"),
],
)
conn.commit()
conn.close()
def test_prefix_then_encoded_word_single_space():
assert es._decode_header("Re: =?utf-8?b?SsOzc2U=?=") == "Re: J\u00f3se"
@@ -32,3 +78,139 @@ def test_plain_ascii_header_unchanged():
def test_empty_header():
assert es._decode_header("") == ""
@pytest.mark.asyncio
async def test_mcp_email_accounts_are_filtered_by_hidden_owner(tmp_path, monkeypatch):
db_path = tmp_path / "app.db"
_init_accounts_db(db_path)
monkeypatch.setattr(es, "APP_DB", str(db_path))
es._ACCOUNT_CACHE.clear()
out = await es.call_tool("list_email_accounts", {"_odysseus_owner": "alice"})
text = out[0].text
assert "Alice Mail" in text
assert "Bob Mail" not in text
@pytest.mark.asyncio
async def test_mcp_email_requires_owner_when_multiple_account_owners_exist(tmp_path, monkeypatch):
db_path = tmp_path / "app.db"
_init_accounts_db(db_path)
monkeypatch.setattr(es, "APP_DB", str(db_path))
es._ACCOUNT_CACHE.clear()
out = await es.call_tool("list_email_accounts", {})
assert "requires an authenticated owner" in out[0].text
def test_mcp_email_scoped_owner_without_visible_account_skips_legacy_fallback(tmp_path, monkeypatch):
db_path = tmp_path / "app.db"
settings_path = tmp_path / "settings.json"
_init_accounts_db(db_path)
settings_path.write_text(
json.dumps(
{
"imap_host": "legacy-imap.example.com",
"imap_user": "legacy@example.com",
"imap_password": "legacy-secret",
"smtp_host": "legacy-smtp.example.com",
"smtp_user": "legacy@example.com",
"smtp_password": "legacy-secret",
"from_address": "legacy@example.com",
}
),
encoding="utf-8",
)
monkeypatch.setattr(es, "APP_DB", str(db_path))
monkeypatch.setattr(es, "_SETTINGS_FILE", str(settings_path))
es._ACCOUNT_CACHE.clear()
token = es._CURRENT_OWNER.set("charlie")
try:
with pytest.raises(ValueError, match="No email account is configured"):
es._load_config()
finally:
es._CURRENT_OWNER.reset(token)
es._ACCOUNT_CACHE.clear()
@pytest.mark.asyncio
async def test_mcp_send_email_stages_owner_scoped_pending_draft(tmp_path, monkeypatch):
import src.constants as constants
db_path = tmp_path / "scheduled_emails.db"
monkeypatch.setattr(constants, "SCHEDULED_EMAILS_DB", str(db_path))
monkeypatch.setattr(es, "_read_agent_email_confirm_setting", lambda: True)
out = await es.call_tool(
"send_email",
{
"to": "recipient@example.com",
"subject": "Review",
"body": "Please review.",
"_odysseus_owner": "alice",
},
)
assert "Draft staged for approval" in out[0].text
assert "Nothing has been sent yet" in out[0].text
conn = sqlite3.connect(db_path)
try:
row = conn.execute(
"SELECT owner, status, to_addr, subject FROM scheduled_emails"
).fetchone()
finally:
conn.close()
assert row == ("alice", "agent_draft", "recipient@example.com", "Review")
@pytest.mark.asyncio
async def test_mcp_draft_email_document_uses_hidden_owner(monkeypatch):
import core.database as db_mod
saved = []
class FakeDocument:
def __init__(self, **kwargs):
self.__dict__.update(kwargs)
class FakeDocumentVersion:
def __init__(self, **kwargs):
self.__dict__.update(kwargs)
class FakeDb:
def add(self, obj):
saved.append(obj)
def commit(self):
pass
def close(self):
pass
monkeypatch.setattr(db_mod, "Document", FakeDocument)
monkeypatch.setattr(db_mod, "DocumentVersion", FakeDocumentVersion)
monkeypatch.setattr(db_mod, "SessionLocal", lambda: FakeDb())
monkeypatch.setattr(
es,
"_load_config",
lambda account=None: {"account_name": "Alice Mail", "account_id": "acct-alice"},
)
out = await es.call_tool(
"draft_email",
{
"to": "recipient@example.com",
"subject": "Draft subject",
"body": "Draft body",
"_odysseus_owner": "alice",
},
)
assert "Created Odysseus email draft" in out[0].text
docs = [obj for obj in saved if isinstance(obj, FakeDocument)]
assert len(docs) == 1
assert docs[0].owner == "alice"
+57
View File
@@ -626,6 +626,63 @@ async def test_public_agent_policy_blocks_sensitive_tools(monkeypatch):
assert "restricted to admin users" in result["error"]
@pytest.mark.asyncio
async def test_email_mcp_non_object_args_fail_before_dispatch(monkeypatch):
import src.tool_execution as tool_execution
from src.tool_execution import execute_tool_block
class FakeMcp:
def __init__(self):
self.calls = []
async def call_tool(self, name, args):
self.calls.append((name, args))
return {"output": "called", "exit_code": 0}
fake = FakeMcp()
monkeypatch.setattr(tool_execution, "_owner_is_admin", lambda owner: True)
monkeypatch.setattr(tool_execution, "get_mcp_manager", lambda: fake)
desc, result = await execute_tool_block(
SimpleNamespace(tool_type="mcp__email__list_emails", content='["INBOX"]'),
owner="alice",
)
assert desc == "mcp: mcp__email__list_emails"
assert result["exit_code"] == 1
assert "JSON object" in result["error"]
assert fake.calls == []
@pytest.mark.asyncio
async def test_email_mcp_dispatch_includes_hidden_owner(monkeypatch):
import src.tool_execution as tool_execution
from src.tool_execution import execute_tool_block
class FakeMcp:
def __init__(self):
self.calls = []
async def call_tool(self, name, args):
self.calls.append((name, args))
return {"output": "called", "exit_code": 0}
fake = FakeMcp()
monkeypatch.setattr(tool_execution, "_owner_is_admin", lambda owner: True)
monkeypatch.setattr(tool_execution, "get_mcp_manager", lambda: fake)
desc, result = await execute_tool_block(
SimpleNamespace(tool_type="mcp__email__list_emails", content='{"folder":"INBOX"}'),
owner="alice",
)
assert desc == "mcp: mcp__email__list_emails"
assert result["exit_code"] == 0
assert fake.calls == [
("mcp__email__list_emails", {"folder": "INBOX", "_odysseus_owner": "alice"}),
]
def test_public_agent_policy_hides_sensitive_tools(monkeypatch):
auth_mod = _install_core_auth_stub(monkeypatch)
from src.tool_security import blocked_tools_for_owner