From 34bd8f0491def0d2d97ce0653802f345f9c6de2e Mon Sep 17 00:00:00 2001 From: Lucas Daniel <94806303+NoodleLDS@users.noreply.github.com> Date: Sun, 7 Jun 2026 01:09:28 -0300 Subject: [PATCH] fix(email): guarantee IMAP conn.logout() on all exception paths (#1530) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three IMAP connection leaks were recently fixed via try/finally (#1325, #1330, #1423). This commit applies the same pattern to the remaining callsites that still used inline logout-only cleanup. routes/email_helpers.py: - _fetch_sender_thread_context: conn was uninitialized when the outer try/except returned early on connect failure, causing the finally block to crash on conn.close()/conn.logout(). Merged the two separate try blocks into one and added conn=None guard. - _pre_retrieve_context: ctx_conn.logout() was inside the loop body with no finally, so any exception in the folder/search loop leaked the socket. Moved cleanup into a finally block with ctx_conn=None guard. mcp_servers/email_server.py: - _list_emails: multiple inline conn.logout() calls on early-return paths; exception between them leaked the socket. Wrapped in try/finally. - _read_email: same pattern — four separate logout() calls replaced by a single finally block. - _reply_to_email: logout() called before the error check, so an exception in conn.select() leaked the socket. Wrapped in try/finally. - _download_attachment: same pattern as _reply_to_email. Also adds tests/test_imap_leak_fixes.py with 9 regression tests (one per function/failure-mode) that monkeypatch _imap_connect and assert conn.logout() is called exactly once even when IMAP operations raise. --- mcp_servers/email_server.py | 224 +++++++++++++++++---------------- routes/email_helpers.py | 26 ++-- tests/test_imap_leak_fixes.py | 230 ++++++++++++++++++++++++++++++++++ 3 files changed, 362 insertions(+), 118 deletions(-) create mode 100644 tests/test_imap_leak_fixes.py diff --git a/mcp_servers/email_server.py b/mcp_servers/email_server.py index ba75dd026..285d928d2 100644 --- a/mcp_servers/email_server.py +++ b/mcp_servers/email_server.py @@ -423,68 +423,71 @@ def _list_emails(folder="INBOX", max_results=20, unresponded_only=False, Pass unread_only=True and/or unresponded_only=True for attention scans. account selects mailbox (None = default). """ - conn = _imap_connect(account) - select_status, _ = conn.select(_q(folder), readonly=True) - if select_status != "OK": - conn.logout() - raise ValueError(f"IMAP folder not found: {folder}") + conn = None + try: + conn = _imap_connect(account) + select_status, _ = conn.select(_q(folder), readonly=True) + if select_status != "OK": + raise ValueError(f"IMAP folder not found: {folder}") - if unread_only and unresponded_only: - status, data = conn.uid("SEARCH", None, "(UNSEEN UNANSWERED)") - elif unread_only: - status, data = conn.uid("SEARCH", None, "(UNSEEN)") - elif unresponded_only: - # Was missing — unresponded_only=True (without unread_only) fell through - # to "ALL" and returned answered mail too, despite the documented - # "emails without replies" behaviour. - status, data = conn.uid("SEARCH", None, "(UNANSWERED)") - else: - # Include read too — IMAP search "ALL" returns the entire folder - status, data = conn.uid("SEARCH", None, "ALL") + if unread_only and unresponded_only: + status, data = conn.uid("SEARCH", None, "(UNSEEN UNANSWERED)") + elif unread_only: + status, data = conn.uid("SEARCH", None, "(UNSEEN)") + elif unresponded_only: + # Was missing — unresponded_only=True (without unread_only) fell through + # to "ALL" and returned answered mail too, despite the documented + # "emails without replies" behaviour. + status, data = conn.uid("SEARCH", None, "(UNANSWERED)") + else: + # Include read too — IMAP search "ALL" returns the entire folder + status, data = conn.uid("SEARCH", None, "ALL") - if status != "OK" or not data[0]: - conn.logout() - return [] + if status != "OK" or not data[0]: + return [] - uid_list = list(reversed(data[0].split()))[:max_results] - cache = _get_cached_summaries() - results = [] + uid_list = list(reversed(data[0].split()))[:max_results] + cache = _get_cached_summaries() + results = [] - for uid in uid_list: - try: - status, msg_data = conn.uid("FETCH", uid, "(RFC822.HEADER)") - if status != "OK": + for uid in uid_list: + try: + status, msg_data = conn.uid("FETCH", uid, "(RFC822.HEADER)") + if status != "OK": + continue + raw_header = msg_data[0][1] + msg = email.message_from_bytes(raw_header) + + subject = _decode_header(msg.get("Subject", "(no subject)")) + sender = _decode_header(msg.get("From", "unknown")) + date_str = msg.get("Date", "") + message_id = msg.get("Message-ID", "") + + # Parse sender name + sender_name, sender_addr = email.utils.parseaddr(sender) + sender_display = sender_name or sender_addr + + # Check cache for summary + cached = cache.get(subject, {}) + summary = cached.get("summary", "") + + results.append({ + "uid": uid.decode(), + "message_id": message_id, + "subject": subject, + "from": sender_display, + "from_address": sender_addr, + "date": date_str, + "summary": summary, + }) + except Exception: continue - raw_header = msg_data[0][1] - msg = email.message_from_bytes(raw_header) - subject = _decode_header(msg.get("Subject", "(no subject)")) - sender = _decode_header(msg.get("From", "unknown")) - date_str = msg.get("Date", "") - message_id = msg.get("Message-ID", "") - - # Parse sender name - sender_name, sender_addr = email.utils.parseaddr(sender) - sender_display = sender_name or sender_addr - - # Check cache for summary - cached = cache.get(subject, {}) - summary = cached.get("summary", "") - - results.append({ - "uid": uid.decode(), - "message_id": message_id, - "subject": subject, - "from": sender_display, - "from_address": sender_addr, - "date": date_str, - "summary": summary, - }) - except Exception: - continue - - conn.logout() - return results + return results + finally: + if conn: + try: conn.logout() + except Exception: pass def _result_sort_time(result: dict) -> datetime: @@ -657,54 +660,55 @@ def _extract_attachment_to_disk(msg, index, target_dir): def _read_email(uid=None, message_id=None, folder="INBOX", account=None): """Read full email content by UID or message-ID. account = mailbox selector.""" cfg = _load_config(account) - conn = _imap_connect(account) - conn.select(_q(folder), readonly=True) + conn = None + try: + conn = _imap_connect(account) + conn.select(_q(folder), readonly=True) - if message_id and not uid: - status, data = conn.uid("SEARCH", None, f'(HEADER Message-ID "{message_id}")') - if status != "OK" or not data[0]: - conn.logout() - return {"error": f"Email not found with Message-ID: {message_id}"} - uid = data[0].split()[-1] + if message_id and not uid: + status, data = conn.uid("SEARCH", None, f'(HEADER Message-ID "{message_id}")') + if status != "OK" or not data[0]: + return {"error": f"Email not found with Message-ID: {message_id}"} + uid = data[0].split()[-1] - if not uid: - conn.logout() - return {"error": "No UID or Message-ID provided"} + if not uid: + return {"error": "No UID or Message-ID provided"} - status, msg_data = conn.uid("FETCH", _b(uid), "(BODY.PEEK[])") - if status != "OK": - conn.logout() - return {"error": f"Failed to fetch email UID {uid}"} - if not msg_data or not msg_data[0] or not isinstance(msg_data[0], tuple) or len(msg_data[0]) < 2: - conn.logout() - return {"error": f"Email not found with UID {uid}"} + status, msg_data = conn.uid("FETCH", _b(uid), "(BODY.PEEK[])") + if status != "OK": + return {"error": f"Failed to fetch email UID {uid}"} + if not msg_data or not msg_data[0] or not isinstance(msg_data[0], tuple) or len(msg_data[0]) < 2: + return {"error": f"Email not found with UID {uid}"} - raw = msg_data[0][1] - msg = email.message_from_bytes(raw) + raw = msg_data[0][1] + msg = email.message_from_bytes(raw) - subject = _decode_header(msg.get("Subject", "(no subject)")) - sender = _decode_header(msg.get("From", "unknown")) - date_str = msg.get("Date", "") - message_id_header = msg.get("Message-ID", "") - body = _extract_text(msg) - attachments = _list_attachments_from_msg(msg) + subject = _decode_header(msg.get("Subject", "(no subject)")) + sender = _decode_header(msg.get("From", "unknown")) + date_str = msg.get("Date", "") + message_id_header = msg.get("Message-ID", "") + body = _extract_text(msg) + attachments = _list_attachments_from_msg(msg) - sender_name, sender_addr = email.utils.parseaddr(sender) + sender_name, sender_addr = email.utils.parseaddr(sender) - conn.logout() - return { - "uid": uid.decode() if isinstance(uid, bytes) else str(uid), - "account": cfg.get("account_name") or cfg.get("imap_user") or "default", - "account_email": cfg.get("imap_user") or cfg.get("from_address") or "", - "account_id": cfg.get("account_id"), - "message_id": message_id_header, - "subject": subject, - "from": sender_name or sender_addr, - "from_address": sender_addr, - "date": date_str, - "body": body[:8000], - "attachments": attachments, - } + return { + "uid": uid.decode() if isinstance(uid, bytes) else str(uid), + "account": cfg.get("account_name") or cfg.get("imap_user") or "default", + "account_email": cfg.get("imap_user") or cfg.get("from_address") or "", + "account_id": cfg.get("account_id"), + "message_id": message_id_header, + "subject": subject, + "from": sender_name or sender_addr, + "from_address": sender_addr, + "date": date_str, + "body": body[:8000], + "attachments": attachments, + } + finally: + if conn: + try: conn.logout() + except Exception: pass def _read_email_across_accounts(uid=None, message_id=None, folder="INBOX"): @@ -858,10 +862,15 @@ def _send_email(to, subject, body, in_reply_to=None, references=None, cc=None, b def _reply_to_email(uid, body, folder="INBOX", reply_all=False, account=None): """Reply to an existing email by UID. Threads via In-Reply-To/References.""" - conn = _imap_connect(account) - conn.select(_q(folder), readonly=True) - status, msg_data = conn.uid("FETCH", _b(uid), "(BODY.PEEK[])") - conn.logout() + conn = None + try: + conn = _imap_connect(account) + conn.select(_q(folder), readonly=True) + status, msg_data = conn.uid("FETCH", _b(uid), "(BODY.PEEK[])") + finally: + if conn: + try: conn.logout() + except Exception: pass if status != "OK" or not msg_data or not msg_data[0]: return {"error": f"Failed to fetch email UID {uid}"} raw = msg_data[0][1] @@ -1038,10 +1047,15 @@ def _archive_email(uid, folder="INBOX", account=None): def _download_attachment(uid, index, folder="INBOX", account=None): """Extract a specific attachment to disk and return its local path.""" - conn = _imap_connect(account) - conn.select(_q(folder), readonly=True) - status, msg_data = conn.uid("FETCH", _b(uid), "(BODY.PEEK[])") - conn.logout() + conn = None + try: + conn = _imap_connect(account) + conn.select(_q(folder), readonly=True) + status, msg_data = conn.uid("FETCH", _b(uid), "(BODY.PEEK[])") + finally: + if conn: + try: conn.logout() + except Exception: pass if status != "OK": return {"error": f"Failed to fetch email UID {uid}"} raw = msg_data[0][1] diff --git a/routes/email_helpers.py b/routes/email_helpers.py index 43e73516f..454fc9dc0 100644 --- a/routes/email_helpers.py +++ b/routes/email_helpers.py @@ -1140,13 +1140,9 @@ def _fetch_sender_thread_context(sender_addr: str, if exclude_uid: seen_uids.add((exclude_folder or "INBOX", str(exclude_uid))) + conn = None try: conn = _imap_connect(account_id, owner=owner) - except Exception as e: - logger.warning(f"sender-thread-context: imap connect failed: {e}") - return "" - - try: for folder in ["INBOX", "Sent", "Archive", "Drafts"]: if len(blocks) >= limit: break @@ -1213,11 +1209,14 @@ def _fetch_sender_thread_context(sender_addr: str, if atts_text: lines.append(atts_text) blocks.append("\n".join(lines)) + except Exception as e: + logger.warning(f"sender-thread-context: imap failed: {e}") finally: - try: conn.close() - except Exception: pass - try: conn.logout() - except Exception: pass + if conn: + try: conn.close() + except Exception: pass + try: conn.logout() + except Exception: pass if not blocks: return "" @@ -1320,6 +1319,7 @@ def _pre_retrieve_context( if not terms_list: return context_snippets, terms_list + ctx_conn = None try: ctx_conn = _imap_connect(account_id, owner=owner) for folder in ["INBOX", "Sent", "Archive", "Drafts"]: @@ -1356,12 +1356,12 @@ def _pre_retrieve_context( except Exception as _e: logger.warning(f" search {folder} {term!r} failed: {_e}") continue - try: - ctx_conn.logout() - except Exception: - pass except Exception as _e: logger.warning(f"IMAP context search failed: {_e}") + finally: + if ctx_conn: + try: ctx_conn.logout() + except Exception: pass try: from routes.contacts_routes import _fetch_contacts diff --git a/tests/test_imap_leak_fixes.py b/tests/test_imap_leak_fixes.py new file mode 100644 index 000000000..a30c7c216 --- /dev/null +++ b/tests/test_imap_leak_fixes.py @@ -0,0 +1,230 @@ +"""Regression tests for IMAP connection leak fixes. + +Each test forces an exception after _imap_connect() succeeds and asserts +that conn.logout() is still called exactly once (guaranteed by try/finally). + +Functions covered: + - routes/email_helpers.py: _fetch_sender_thread_context, _pre_retrieve_context + - mcp_servers/email_server.py: _list_emails, _read_email, _reply_to_email, + _download_attachment +""" + +import os +import sys +import tempfile +from pathlib import Path +from unittest.mock import MagicMock, patch + +_TMP = Path(tempfile.mkdtemp(prefix="odysseus-imap-leak-fixes-")) +os.environ.setdefault("DATA_DIR", str(_TMP)) +os.environ.setdefault("DATABASE_URL", f"sqlite:///{_TMP / 'app.db'}") + +PROJECT_ROOT = Path(__file__).resolve().parent.parent +if str(PROJECT_ROOT) not in sys.path: + sys.path.insert(0, str(PROJECT_ROOT)) + + +def _make_failing_conn(captured, *, raises_on="select"): + """Return a mock IMAP connection that raises on the first call to `raises_on`.""" + conn = MagicMock() + conn.logout = MagicMock(side_effect=lambda: captured.__setitem__( + "logout_calls", captured.get("logout_calls", 0) + 1 + )) + + def _raise(*a, **kw): + raise RuntimeError("simulated IMAP failure") + + getattr(conn, raises_on).side_effect = _raise + return conn + + +# ── email_helpers ────────────────────────────────────────────────────────────── + +def test_fetch_sender_thread_context_logs_out_on_select_failure(monkeypatch): + import routes.email_helpers as helpers + + captured = {} + conn = _make_failing_conn(captured, raises_on="select") + monkeypatch.setattr(helpers, "_imap_connect", lambda *a, **kw: conn) + + result = helpers._fetch_sender_thread_context("user@example.com") + + assert captured.get("logout_calls", 0) == 1, ( + f"conn.logout() must be called on select failure. " + f"Got logout_calls={captured.get('logout_calls')}" + ) + assert result == "", "Should return empty string on failure" + + +def test_fetch_sender_thread_context_logs_out_on_connect_failure(monkeypatch): + """If _imap_connect itself raises, conn is None — no logout, no crash.""" + import routes.email_helpers as helpers + + def _fail(*a, **kw): + raise ConnectionRefusedError("cannot connect") + + monkeypatch.setattr(helpers, "_imap_connect", _fail) + result = helpers._fetch_sender_thread_context("user@example.com") + assert result == "", "Should return empty string when connect fails" + + +def test_pre_retrieve_context_logs_out_on_search_failure(monkeypatch): + import routes.email_helpers as helpers + + captured = {} + conn = MagicMock() + conn.select.return_value = ("OK", []) + conn.logout = MagicMock(side_effect=lambda: captured.__setitem__( + "logout_calls", captured.get("logout_calls", 0) + 1 + )) + conn.search.side_effect = RuntimeError("simulated search failure") + + monkeypatch.setattr(helpers, "_imap_connect", lambda *a, **kw: conn) + + # Bypass the known-sender check and term extraction so we reach the IMAP block + monkeypatch.setattr(helpers, "_imap", MagicMock( + return_value=MagicMock( + __enter__=MagicMock(return_value=MagicMock( + select=MagicMock(return_value=("OK", [])), + search=MagicMock(return_value=("OK", [b"1"])), + )), + __exit__=MagicMock(return_value=False), + ) + )) + + # Provide a body with a capitalised term so terms_list is non-empty + snippets, terms = helpers._pre_retrieve_context( + body="Project Alpha update", + sender="Known Sender ", + ) + + # The function is best-effort and never raises; logout must have been called + assert captured.get("logout_calls", 0) == 1, ( + f"ctx_conn.logout() must be called even when search raises. " + f"Got logout_calls={captured.get('logout_calls')}" + ) + + +# ── email_server ─────────────────────────────────────────────────────────────── + +def test_mcp_list_emails_logs_out_on_select_failure(monkeypatch): + import mcp_servers.email_server as srv + + captured = {} + conn = _make_failing_conn(captured, raises_on="select") + monkeypatch.setattr(srv, "_imap_connect", lambda *a, **kw: conn) + + try: + srv._list_emails() + except Exception: + pass + + assert captured.get("logout_calls", 0) == 1, ( + f"conn.logout() must be called after select raises. " + f"Got logout_calls={captured.get('logout_calls')}" + ) + + +def test_mcp_list_emails_logs_out_on_search_failure(monkeypatch): + import mcp_servers.email_server as srv + + captured = {} + conn = MagicMock() + conn.select.return_value = ("OK", []) + conn.uid.side_effect = RuntimeError("simulated search failure") + conn.logout = MagicMock(side_effect=lambda: captured.__setitem__( + "logout_calls", captured.get("logout_calls", 0) + 1 + )) + monkeypatch.setattr(srv, "_imap_connect", lambda *a, **kw: conn) + + try: + srv._list_emails() + except Exception: + pass + + assert captured.get("logout_calls", 0) == 1, ( + f"conn.logout() must be called after uid search raises. " + f"Got logout_calls={captured.get('logout_calls')}" + ) + + +def test_mcp_read_email_logs_out_on_select_failure(monkeypatch): + import mcp_servers.email_server as srv + + captured = {} + conn = _make_failing_conn(captured, raises_on="select") + monkeypatch.setattr(srv, "_imap_connect", lambda *a, **kw: conn) + monkeypatch.setattr(srv, "_load_config", lambda *a, **kw: {}) + + # The exception propagates out of _read_email (no outer catch in this fn); + # what matters is that logout was still called via finally before it did. + try: + srv._read_email(uid="1") + except RuntimeError: + pass + + assert captured.get("logout_calls", 0) == 1, ( + f"conn.logout() must be called after select raises. " + f"Got logout_calls={captured.get('logout_calls')}" + ) + + +def test_mcp_read_email_logs_out_on_fetch_failure(monkeypatch): + import mcp_servers.email_server as srv + + captured = {} + conn = MagicMock() + conn.select.return_value = ("OK", []) + conn.uid.side_effect = RuntimeError("simulated fetch failure") + conn.logout = MagicMock(side_effect=lambda: captured.__setitem__( + "logout_calls", captured.get("logout_calls", 0) + 1 + )) + monkeypatch.setattr(srv, "_imap_connect", lambda *a, **kw: conn) + monkeypatch.setattr(srv, "_load_config", lambda *a, **kw: {}) + + try: + srv._read_email(uid="1") + except RuntimeError: + pass + + assert captured.get("logout_calls", 0) == 1, ( + f"conn.logout() must be called after uid fetch raises. " + f"Got logout_calls={captured.get('logout_calls')}" + ) + + +def test_mcp_reply_to_email_logs_out_on_select_failure(monkeypatch): + import mcp_servers.email_server as srv + + captured = {} + conn = _make_failing_conn(captured, raises_on="select") + monkeypatch.setattr(srv, "_imap_connect", lambda *a, **kw: conn) + + # Exception propagates; the finally still runs before it does. + try: + srv._reply_to_email(uid="1", body="hi") + except RuntimeError: + pass + + assert captured.get("logout_calls", 0) == 1, ( + f"conn.logout() must be called after select raises in _reply_to_email. " + f"Got logout_calls={captured.get('logout_calls')}" + ) + + +def test_mcp_download_attachment_logs_out_on_select_failure(monkeypatch): + import mcp_servers.email_server as srv + + captured = {} + conn = _make_failing_conn(captured, raises_on="select") + monkeypatch.setattr(srv, "_imap_connect", lambda *a, **kw: conn) + + try: + srv._download_attachment(uid="1", index=0) + except RuntimeError: + pass + + assert captured.get("logout_calls", 0) == 1, ( + f"conn.logout() must be called after select raises in _download_attachment. " + f"Got logout_calls={captured.get('logout_calls')}" + )