fix: quote IMAP mailbox arguments (#2170)

* fix: quote IMAP mailbox arguments

* fix: quote MCP move destinations

---------

Co-authored-by: Kevin <120500656+oooindefatigable@users.noreply.github.com>
This commit is contained in:
ooovenenoso
2026-06-05 10:00:20 -04:00
committed by GitHub
parent 6973c5427c
commit c9d0c6db18
4 changed files with 138 additions and 18 deletions
+22 -15
View File
@@ -38,6 +38,11 @@ def _b(value) -> bytes:
return str(value).encode()
def _q(name: str) -> str:
"""Quote an IMAP mailbox name for commands that take mailbox args."""
return '"' + (name or "").replace("\\", "\\\\").replace('"', '\\"') + '"'
def _uid_fetch_rows(data) -> list:
return [d for d in (data or []) if isinstance(d, bytes) and b"UID " in d]
@@ -419,7 +424,7 @@ def _list_emails(folder="INBOX", max_results=20, unresponded_only=False,
account selects mailbox (None = default).
"""
conn = _imap_connect(account)
select_status, _ = conn.select(folder, readonly=True)
select_status, _ = conn.select(_q(folder), readonly=True)
if select_status != "OK":
conn.logout()
raise ValueError(f"IMAP folder not found: {folder}")
@@ -542,7 +547,7 @@ def _search_emails(query, folders=None, max_results=20, account=None):
try:
for folder in folders:
try:
status, _ = conn.select(folder, readonly=True)
status, _ = conn.select(_q(folder), readonly=True)
if status != "OK":
continue
status, data = conn.uid("SEARCH", None, search_cmd)
@@ -653,7 +658,7 @@ def _read_email(uid=None, message_id=None, folder="INBOX", account=None):
"""Read full email content by UID or message-ID. account = mailbox selector."""
cfg = _load_config(account)
conn = _imap_connect(account)
conn.select(folder, readonly=True)
conn.select(_q(folder), readonly=True)
if message_id and not uid:
status, data = conn.uid("SEARCH", None, f'(HEADER Message-ID "{message_id}")')
@@ -827,7 +832,7 @@ def _send_email(to, subject, body, in_reply_to=None, references=None, cc=None, b
imap = _imap_connect(send_account)
try:
sent_folder = _detect_sent_folder(imap)
append_st, append_data = imap.append(sent_folder, "\\Seen", None, msg.as_bytes())
append_st, append_data = imap.append(_q(sent_folder), "\\Seen", None, msg.as_bytes())
if append_st == "OK" and append_data:
m = re.search(rb"APPENDUID\s+\d+\s+(\d+)", append_data[0] or b"")
if m:
@@ -854,7 +859,7 @@ def _send_email(to, subject, body, in_reply_to=None, references=None, cc=None, b
def _reply_to_email(uid, body, folder="INBOX", reply_all=False, account=None):
"""Reply to an existing email by UID. Threads via In-Reply-To/References."""
conn = _imap_connect(account)
conn.select(folder, readonly=True)
conn.select(_q(folder), readonly=True)
status, msg_data = conn.uid("FETCH", _b(uid), "(BODY.PEEK[])")
conn.logout()
if status != "OK" or not msg_data or not msg_data[0]:
@@ -896,7 +901,7 @@ def _reply_to_email(uid, body, folder="INBOX", reply_all=False, account=None):
def _set_flag(uid, folder, flag, add=True, account=None):
"""Add or remove an IMAP flag (e.g. \\Seen, \\Answered, \\Deleted)."""
conn = _imap_connect(account)
conn.select(folder)
conn.select(_q(folder))
op = "+FLAGS" if add else "-FLAGS"
try:
status, data = conn.uid("STORE", _b(uid), op, flag)
@@ -918,7 +923,7 @@ def _bulk_set_flag(uids, folder, flag, add=True, account=None):
conn = _imap_connect(account)
touched = []
try:
conn.select(folder)
conn.select(_q(folder))
op = "+FLAGS" if add else "-FLAGS"
msg_set = ",".join(str(u) for u in uids)
try:
@@ -945,7 +950,7 @@ def _bulk_move(uids, source_folder, dest_folder, account=None, role: str = ""):
conn = _imap_connect(account)
moved = 0
try:
conn.select(source_folder)
conn.select(_q(source_folder))
dest_folder = _resolve_folder(conn, dest_folder, role or _folder_role_from_name(dest_folder))
msg_set = ",".join(str(u) for u in uids)
try:
@@ -956,10 +961,11 @@ def _bulk_move(uids, source_folder, dest_folder, account=None, role: str = ""):
if not existing:
return 0
moved = len(existing)
status, _ = conn.uid("MOVE", _b(msg_set), dest_folder)
dest_arg = _q(dest_folder)
status, _ = conn.uid("MOVE", _b(msg_set), dest_arg)
if status != "OK":
# Fallback: UID copy + flag-delete + expunge
status, _ = conn.uid("COPY", _b(msg_set), dest_folder)
status, _ = conn.uid("COPY", _b(msg_set), dest_arg)
if status != "OK":
return 0
status, _ = conn.uid("STORE", _b(msg_set), "+FLAGS", "\\Deleted")
@@ -976,7 +982,7 @@ def _search_uids(folder="INBOX", criteria="UNSEEN", account=None):
ALL, ANSWERED). Used to resolve selectors like all_unread → uids."""
conn = _imap_connect(account)
try:
conn.select(folder, readonly=True)
conn.select(_q(folder), readonly=True)
status, data = conn.uid("SEARCH", None, criteria)
if status != "OK" or not data or not data[0]:
return []
@@ -988,7 +994,7 @@ def _search_uids(folder="INBOX", criteria="UNSEEN", account=None):
def _move_message(uid, source_folder, dest_folder, account=None, role: str = ""):
"""Move a message between folders. Tries IMAP MOVE, falls back to copy+delete."""
conn = _imap_connect(account)
conn.select(source_folder)
conn.select(_q(source_folder))
try:
dest_folder = _resolve_folder(conn, dest_folder, role or _folder_role_from_name(dest_folder))
try:
@@ -998,11 +1004,12 @@ def _move_message(uid, source_folder, dest_folder, account=None, role: str = "")
existing = _uid_fetch_rows(data)
if status != "OK" or not existing:
return False
status, _ = conn.uid("MOVE", _b(uid), dest_folder)
dest_arg = _q(dest_folder)
status, _ = conn.uid("MOVE", _b(uid), dest_arg)
if status == "OK":
return True
# Fallback: UID copy + delete
status, _ = conn.uid("COPY", _b(uid), dest_folder)
status, _ = conn.uid("COPY", _b(uid), dest_arg)
if status != "OK":
return False
status, _ = conn.uid("STORE", _b(uid), "+FLAGS", "\\Deleted")
@@ -1032,7 +1039,7 @@ def _archive_email(uid, folder="INBOX", account=None):
def _download_attachment(uid, index, folder="INBOX", account=None):
"""Extract a specific attachment to disk and return its local path."""
conn = _imap_connect(account)
conn.select(folder, readonly=True)
conn.select(_q(folder), readonly=True)
status, msg_data = conn.uid("FETCH", _b(uid), "(BODY.PEEK[])")
conn.logout()
if status != "OK":
+3 -1
View File
@@ -1629,9 +1629,11 @@ def setup_document_routes(session_manager, upload_handler=None) -> APIRouter:
# context (To/Subject/In-Reply-To/References).
try:
from routes.email_routes import _imap, _decode_header
from routes.email_helpers import _q
except Exception:
_imap = None
_decode_header = lambda x: x or ""
_q = lambda x: x or ""
to_addr = ""
from_name = ""
@@ -1641,7 +1643,7 @@ def setup_document_routes(session_manager, upload_handler=None) -> APIRouter:
if _imap:
try:
with _imap(doc.source_email_account_id or None) as conn:
conn.select(doc.source_email_folder, readonly=True)
conn.select(_q(doc.source_email_folder), readonly=True)
status, data = conn.fetch(doc.source_email_uid.encode(), "(RFC822.HEADER)")
if status == "OK" and data and data[0]:
raw_hdr = data[0][1]
+2 -2
View File
@@ -210,7 +210,7 @@ async def _auto_summarize_pass_single(days_back: int = 1, account_id: str | None
if auto_cal:
for sent_name in ("Sent", "INBOX/Sent", "Sent Items", "[Gmail]/Sent Mail"):
try:
st, _ = conn.select(sent_name, readonly=True)
st, _ = conn.select(_q(sent_name), readonly=True)
if st == "OK":
folders_to_scan.append(sent_name)
break
@@ -1046,7 +1046,7 @@ def _scheduled_poll_once() -> dict:
try:
with _imap(row_account_id, owner=row_owner) as imap:
sent_folder = _detect_sent_folder(imap)
imap.append(sent_folder, "\\Seen", None, outer.as_bytes())
imap.append(_q(sent_folder), "\\Seen", None, outer.as_bytes())
except Exception as e:
logger.warning(f"Failed to append scheduled {sid} to Sent: {e}")
+111
View File
@@ -0,0 +1,111 @@
"""Regression coverage for IMAP mailbox names that contain spaces.
imaplib does not quote mailbox arguments for SELECT/APPEND/MOVE/COPY, so callers
must quote names such as "[Gmail]/All Mail" or "Sent Items" themselves.
"""
from pathlib import Path
import pytest
pytest.importorskip("mcp")
import mcp_servers.email_server as es
class FakeListConn:
def __init__(self):
self.calls = []
def select(self, folder, readonly=False):
self.calls.append(("select", folder, readonly))
return "OK", []
def uid(self, command, *args):
self.calls.append(("uid", command, *args))
if command == "SEARCH":
return "OK", [b""]
return "OK", []
def logout(self):
self.calls.append(("logout",))
class FakeMoveConn:
def __init__(self):
self.calls = []
def list(self):
self.calls.append(("list",))
return "OK", []
def select(self, folder, readonly=False):
self.calls.append(("select", folder, readonly))
return "OK", []
def uid(self, command, *args):
self.calls.append(("uid", command, *args))
if command == "FETCH":
return "OK", [b"1 (UID 123)"]
if command == "MOVE":
return "NO", []
return "OK", []
def expunge(self):
self.calls.append(("expunge",))
def logout(self):
self.calls.append(("logout",))
def test_mcp_list_emails_quotes_spaced_folder_on_select(monkeypatch):
conn = FakeListConn()
monkeypatch.setattr(es, "_imap_connect", lambda account=None: conn)
assert es._list_emails(folder="Sent Items") == []
assert conn.calls[0] == ("select", '"Sent Items"', True)
def test_mcp_quote_helper_handles_spaced_and_quoted_mailboxes():
assert es._q("Sent Items") == '"Sent Items"'
assert es._q('[Gmail]/All Mail') == '"[Gmail]/All Mail"'
assert es._q('Label "Needs Reply"') == '"Label \\"Needs Reply\\""'
def test_known_imap_mailbox_call_sites_are_quoted():
mcp = Path("mcp_servers/email_server.py").read_text()
assert "conn.select(folder" not in mcp
assert "conn.select(source_folder" not in mcp
assert "imap.append(sent_folder" not in mcp
assert 'conn.uid("MOVE", _b(msg_set), dest_folder)' not in mcp
assert 'conn.uid("COPY", _b(msg_set), dest_folder)' not in mcp
assert 'conn.uid("MOVE", _b(uid), dest_folder)' not in mcp
assert 'conn.uid("COPY", _b(uid), dest_folder)' not in mcp
pollers = Path("routes/email_pollers.py").read_text()
assert "conn.select(sent_name" not in pollers
assert "imap.append(sent_folder" not in pollers
document_routes = Path("routes/document_routes.py").read_text()
assert "conn.select(doc.source_email_folder" not in document_routes
def test_mcp_move_message_quotes_destination_for_move_and_fallback_copy(monkeypatch):
conn = FakeMoveConn()
monkeypatch.setattr(es, "_imap_connect", lambda account=None: conn)
assert es._move_message("123", "INBOX", "[Gmail]/All Mail") is True
assert ("uid", "MOVE", b"123", '"[Gmail]/All Mail"') in conn.calls
assert ("uid", "COPY", b"123", '"[Gmail]/All Mail"') in conn.calls
def test_mcp_bulk_move_quotes_destination_for_move_and_fallback_copy(monkeypatch):
conn = FakeMoveConn()
monkeypatch.setattr(es, "_imap_connect", lambda account=None: conn)
assert es._bulk_move(["123"], "INBOX", "[Gmail]/All Mail") == 1
assert ("uid", "MOVE", b"123", '"[Gmail]/All Mail"') in conn.calls
assert ("uid", "COPY", b"123", '"[Gmail]/All Mail"') in conn.calls