Files
odysseus/tests/test_imap_leak_fixes.py
nubs 932b7f2446 fix(email): close IMAP socket when connect/login fails (#3174) (#3363)
* fix(email): close IMAP socket when connect/login fails (#3174)

_imap_connect opened a live socket via _open_imap_connection and then
called conn.login() with no try/finally, and _open_imap_connection called
conn.starttls() unguarded. When auth fails (e.g. an Office 365 app password
on an MFA-enabled tenant, #3174) or STARTTLS is rejected, the already-open
socket was orphaned. Every IMAP caller funnels through _imap_connect,
including the 30-minute _auto_summarize_poller, so a persistently
misconfigured account leaked one descriptor per pass toward FD exhaustion.

The previously merged leak fixes (#1325/#1330/#1423/#1530) only guard the
post-connect body and monkeypatch _imap_connect to succeed, so this
connect-time path was uncovered. Wrap login() and starttls() so a failure
calls conn.shutdown() (low-level close; logout() can't run pre-auth) before
re-raising. Adds two regression tests that fail without the guard.

* fix(email): guard MCP IMAP+SMTP connect-time leaks too (#3174)

Folds in the sibling connect-time leaks vdmkenny flagged on #3363, so the
whole connect-then-step leak class is closed in one place:

- mcp_servers/email_server.py::_imap_connect — guard starttls() and login();
  close pre-auth with conn.shutdown() before re-raising.
- mcp_servers/email_server.py::_smtp_connect — guard starttls() and login();
  SMTP has no shutdown(), so close with conn.close() (socket close, no QUIT).

Routes SMTP (_send_smtp_message) is already safe via 'with smtplib.SMTP(...)'.
Adds four regression tests (one per guard), verified to fail without the fix.
2026-06-08 21:21:41 +02:00

405 lines
14 KiB
Python

"""Regression tests for IMAP connection leak fixes.
Each test forces an exception after _imap_connect() succeeds and asserts
that conn.logout() is still called exactly once (guaranteed by try/finally).
Functions covered:
- routes/email_helpers.py: _fetch_sender_thread_context, _pre_retrieve_context
- mcp_servers/email_server.py: _list_emails, _read_email, _reply_to_email,
_download_attachment
"""
import imaplib
import os
import sys
import tempfile
from pathlib import Path
from unittest.mock import MagicMock, patch
_TMP = Path(tempfile.mkdtemp(prefix="odysseus-imap-leak-fixes-"))
os.environ.setdefault("DATA_DIR", str(_TMP))
os.environ.setdefault("DATABASE_URL", f"sqlite:///{_TMP / 'app.db'}")
PROJECT_ROOT = Path(__file__).resolve().parent.parent
if str(PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(PROJECT_ROOT))
def _make_failing_conn(captured, *, raises_on="select"):
"""Return a mock IMAP connection that raises on the first call to `raises_on`."""
conn = MagicMock()
conn.logout = MagicMock(side_effect=lambda: captured.__setitem__(
"logout_calls", captured.get("logout_calls", 0) + 1
))
def _raise(*a, **kw):
raise RuntimeError("simulated IMAP failure")
getattr(conn, raises_on).side_effect = _raise
return conn
# ── email_helpers ──────────────────────────────────────────────────────────────
def test_fetch_sender_thread_context_logs_out_on_select_failure(monkeypatch):
import routes.email_helpers as helpers
captured = {}
conn = _make_failing_conn(captured, raises_on="select")
monkeypatch.setattr(helpers, "_imap_connect", lambda *a, **kw: conn)
result = helpers._fetch_sender_thread_context("user@example.com")
assert captured.get("logout_calls", 0) == 1, (
f"conn.logout() must be called on select failure. "
f"Got logout_calls={captured.get('logout_calls')}"
)
assert result == "", "Should return empty string on failure"
def test_fetch_sender_thread_context_logs_out_on_connect_failure(monkeypatch):
"""If _imap_connect itself raises, conn is None — no logout, no crash."""
import routes.email_helpers as helpers
def _fail(*a, **kw):
raise ConnectionRefusedError("cannot connect")
monkeypatch.setattr(helpers, "_imap_connect", _fail)
result = helpers._fetch_sender_thread_context("user@example.com")
assert result == "", "Should return empty string when connect fails"
def test_pre_retrieve_context_logs_out_on_search_failure(monkeypatch):
import routes.email_helpers as helpers
captured = {}
conn = MagicMock()
conn.select.return_value = ("OK", [])
conn.logout = MagicMock(side_effect=lambda: captured.__setitem__(
"logout_calls", captured.get("logout_calls", 0) + 1
))
conn.search.side_effect = RuntimeError("simulated search failure")
monkeypatch.setattr(helpers, "_imap_connect", lambda *a, **kw: conn)
# Bypass the known-sender check and term extraction so we reach the IMAP block
monkeypatch.setattr(helpers, "_imap", MagicMock(
return_value=MagicMock(
__enter__=MagicMock(return_value=MagicMock(
select=MagicMock(return_value=("OK", [])),
search=MagicMock(return_value=("OK", [b"1"])),
)),
__exit__=MagicMock(return_value=False),
)
))
# Provide a body with a capitalised term so terms_list is non-empty
snippets, terms = helpers._pre_retrieve_context(
body="Project Alpha update",
sender="Known Sender <known@example.com>",
)
# The function is best-effort and never raises; logout must have been called
assert captured.get("logout_calls", 0) == 1, (
f"ctx_conn.logout() must be called even when search raises. "
f"Got logout_calls={captured.get('logout_calls')}"
)
# ── email_server ───────────────────────────────────────────────────────────────
def test_mcp_list_emails_logs_out_on_select_failure(monkeypatch):
import mcp_servers.email_server as srv
captured = {}
conn = _make_failing_conn(captured, raises_on="select")
monkeypatch.setattr(srv, "_imap_connect", lambda *a, **kw: conn)
try:
srv._list_emails()
except Exception:
pass
assert captured.get("logout_calls", 0) == 1, (
f"conn.logout() must be called after select raises. "
f"Got logout_calls={captured.get('logout_calls')}"
)
def test_mcp_list_emails_logs_out_on_search_failure(monkeypatch):
import mcp_servers.email_server as srv
captured = {}
conn = MagicMock()
conn.select.return_value = ("OK", [])
conn.uid.side_effect = RuntimeError("simulated search failure")
conn.logout = MagicMock(side_effect=lambda: captured.__setitem__(
"logout_calls", captured.get("logout_calls", 0) + 1
))
monkeypatch.setattr(srv, "_imap_connect", lambda *a, **kw: conn)
try:
srv._list_emails()
except Exception:
pass
assert captured.get("logout_calls", 0) == 1, (
f"conn.logout() must be called after uid search raises. "
f"Got logout_calls={captured.get('logout_calls')}"
)
def test_mcp_read_email_logs_out_on_select_failure(monkeypatch):
import mcp_servers.email_server as srv
captured = {}
conn = _make_failing_conn(captured, raises_on="select")
monkeypatch.setattr(srv, "_imap_connect", lambda *a, **kw: conn)
monkeypatch.setattr(srv, "_load_config", lambda *a, **kw: {})
# The exception propagates out of _read_email (no outer catch in this fn);
# what matters is that logout was still called via finally before it did.
try:
srv._read_email(uid="1")
except RuntimeError:
pass
assert captured.get("logout_calls", 0) == 1, (
f"conn.logout() must be called after select raises. "
f"Got logout_calls={captured.get('logout_calls')}"
)
def test_mcp_read_email_logs_out_on_fetch_failure(monkeypatch):
import mcp_servers.email_server as srv
captured = {}
conn = MagicMock()
conn.select.return_value = ("OK", [])
conn.uid.side_effect = RuntimeError("simulated fetch failure")
conn.logout = MagicMock(side_effect=lambda: captured.__setitem__(
"logout_calls", captured.get("logout_calls", 0) + 1
))
monkeypatch.setattr(srv, "_imap_connect", lambda *a, **kw: conn)
monkeypatch.setattr(srv, "_load_config", lambda *a, **kw: {})
try:
srv._read_email(uid="1")
except RuntimeError:
pass
assert captured.get("logout_calls", 0) == 1, (
f"conn.logout() must be called after uid fetch raises. "
f"Got logout_calls={captured.get('logout_calls')}"
)
def test_mcp_reply_to_email_logs_out_on_select_failure(monkeypatch):
import mcp_servers.email_server as srv
captured = {}
conn = _make_failing_conn(captured, raises_on="select")
monkeypatch.setattr(srv, "_imap_connect", lambda *a, **kw: conn)
# Exception propagates; the finally still runs before it does.
try:
srv._reply_to_email(uid="1", body="hi")
except RuntimeError:
pass
assert captured.get("logout_calls", 0) == 1, (
f"conn.logout() must be called after select raises in _reply_to_email. "
f"Got logout_calls={captured.get('logout_calls')}"
)
def test_mcp_download_attachment_logs_out_on_select_failure(monkeypatch):
import mcp_servers.email_server as srv
captured = {}
conn = _make_failing_conn(captured, raises_on="select")
monkeypatch.setattr(srv, "_imap_connect", lambda *a, **kw: conn)
try:
srv._download_attachment(uid="1", index=0)
except RuntimeError:
pass
assert captured.get("logout_calls", 0) == 1, (
f"conn.logout() must be called after select raises in _download_attachment. "
f"Got logout_calls={captured.get('logout_calls')}"
)
# ── connect-time leak: _imap_connect / _open_imap_connection (#3174) ──────────
# The cases above all monkeypatch _imap_connect to *succeed*; these cover the
# gap where the connect itself fails (bad/expired app password, rejected
# STARTTLS) and the already-open socket would otherwise be orphaned.
def test_imap_connect_shuts_down_socket_on_login_failure(monkeypatch):
"""A failed login() must close the already-connected socket, not leak it."""
import routes.email_helpers as helpers
captured = {}
conn = MagicMock()
conn.shutdown = MagicMock(side_effect=lambda: captured.__setitem__(
"shutdown_calls", captured.get("shutdown_calls", 0) + 1
))
conn.login = MagicMock(side_effect=imaplib.IMAP4.error(b"AUTHENTICATE failed."))
monkeypatch.setattr(helpers, "_get_email_config", lambda *a, **kw: {
"imap_host": "imap.example.com",
"imap_port": 993,
"imap_starttls": False,
"imap_user": "user@example.com",
"imap_password": "wrong",
})
monkeypatch.setattr(helpers, "_open_imap_connection", lambda *a, **kw: conn)
raised = False
try:
helpers._imap_connect()
except Exception:
raised = True
assert raised, "login failure must propagate to the caller"
assert captured.get("shutdown_calls", 0) == 1, (
f"conn.shutdown() must be called exactly once when login fails. "
f"Got shutdown_calls={captured.get('shutdown_calls')}"
)
def test_open_imap_connection_shuts_down_on_starttls_failure(monkeypatch):
"""A rejected STARTTLS upgrade must close the open plain socket."""
import routes.email_helpers as helpers
captured = {}
conn = MagicMock()
conn.shutdown = MagicMock(side_effect=lambda: captured.__setitem__(
"shutdown_calls", captured.get("shutdown_calls", 0) + 1
))
conn.starttls = MagicMock(side_effect=RuntimeError("STARTTLS rejected"))
monkeypatch.setattr(helpers.imaplib, "IMAP4", lambda *a, **kw: conn)
raised = False
try:
helpers._open_imap_connection("imap.example.com", 143, starttls=True)
except Exception:
raised = True
assert raised, "starttls failure must propagate to the caller"
assert captured.get("shutdown_calls", 0) == 1, (
f"conn.shutdown() must be called exactly once when STARTTLS fails. "
f"Got shutdown_calls={captured.get('shutdown_calls')}"
)
# ── connect-time leak: mcp_servers/email_server.py (folded in per review #3363) ──
# Same connect-then-step pattern as the routes path. IMAP closes pre-auth with
# shutdown(); SMTP has no shutdown(), so close() (socket close, no QUIT).
def _cfg_imap(ssl=True, starttls=False):
return {
"imap_ssl": ssl, "imap_starttls": starttls,
"imap_host": "imap.example.com", "imap_port": 993,
"imap_user": "user@example.com", "imap_password": "wrong",
}
def test_mcp_imap_connect_shuts_down_on_login_failure(monkeypatch):
import mcp_servers.email_server as srv
captured = {}
conn = MagicMock()
conn.shutdown = MagicMock(side_effect=lambda: captured.__setitem__(
"shutdown_calls", captured.get("shutdown_calls", 0) + 1))
conn.login = MagicMock(side_effect=imaplib.IMAP4.error(b"AUTHENTICATE failed."))
monkeypatch.setattr(srv, "_load_config", lambda *a, **kw: _cfg_imap(ssl=True))
monkeypatch.setattr(srv.imaplib, "IMAP4_SSL", lambda *a, **kw: conn)
raised = False
try:
srv._imap_connect()
except Exception:
raised = True
assert raised, "login failure must propagate"
assert captured.get("shutdown_calls", 0) == 1, (
f"shutdown() must be called once on MCP IMAP login failure. Got {captured.get('shutdown_calls')}")
def test_mcp_imap_connect_shuts_down_on_starttls_failure(monkeypatch):
import mcp_servers.email_server as srv
captured = {}
conn = MagicMock()
conn.shutdown = MagicMock(side_effect=lambda: captured.__setitem__(
"shutdown_calls", captured.get("shutdown_calls", 0) + 1))
conn.starttls = MagicMock(side_effect=RuntimeError("STARTTLS rejected"))
monkeypatch.setattr(srv, "_load_config", lambda *a, **kw: _cfg_imap(ssl=False, starttls=True))
monkeypatch.setattr(srv.imaplib, "IMAP4", lambda *a, **kw: conn)
raised = False
try:
srv._imap_connect()
except Exception:
raised = True
assert raised, "starttls failure must propagate"
assert captured.get("shutdown_calls", 0) == 1, (
f"shutdown() must be called once on MCP IMAP STARTTLS failure. Got {captured.get('shutdown_calls')}")
def _cfg_smtp(security):
return {
"smtp_host": "smtp.example.com",
"smtp_port": 587 if security == "starttls" else 465,
"smtp_security": security, "smtp_user": "user@example.com",
"smtp_password": "wrong", "account_name": "test",
}
def test_mcp_smtp_connect_closes_on_login_failure(monkeypatch):
import mcp_servers.email_server as srv
captured = {}
conn = MagicMock()
conn.close = MagicMock(side_effect=lambda: captured.__setitem__(
"close_calls", captured.get("close_calls", 0) + 1))
conn.login = MagicMock(side_effect=Exception("SMTP auth failed"))
monkeypatch.setattr(srv, "_load_config", lambda *a, **kw: _cfg_smtp("ssl"))
monkeypatch.setattr(srv, "_smtp_ready", lambda cfg: True)
monkeypatch.setattr(srv.smtplib, "SMTP_SSL", lambda *a, **kw: conn)
raised = False
try:
srv._smtp_connect()
except Exception:
raised = True
assert raised, "login failure must propagate"
assert captured.get("close_calls", 0) == 1, (
f"close() must be called once on MCP SMTP login failure. Got {captured.get('close_calls')}")
def test_mcp_smtp_connect_closes_on_starttls_failure(monkeypatch):
import mcp_servers.email_server as srv
captured = {}
conn = MagicMock()
conn.close = MagicMock(side_effect=lambda: captured.__setitem__(
"close_calls", captured.get("close_calls", 0) + 1))
conn.starttls = MagicMock(side_effect=Exception("STARTTLS rejected"))
monkeypatch.setattr(srv, "_load_config", lambda *a, **kw: _cfg_smtp("starttls"))
monkeypatch.setattr(srv, "_smtp_ready", lambda cfg: True)
monkeypatch.setattr(srv.smtplib, "SMTP", lambda *a, **kw: conn)
raised = False
try:
srv._smtp_connect()
except Exception:
raised = True
assert raised, "starttls failure must propagate"
assert captured.get("close_calls", 0) == 1, (
f"close() must be called once on MCP SMTP STARTTLS failure. Got {captured.get('close_calls')}")