fix(email): validate IMAP/SMTP ports instead of crashing with 500 (#4464)

The email-account endpoints coerced user-supplied ports with a bare int(data.get("imap_port") or 993), so a non-numeric port (e.g. "imap") raised ValueError and surfaced as an HTTP 500 in the create, update, and test-config endpoints.

Add a _coerce_port(value, default) -> (port, error) helper and use it in all three endpoints, returning the endpoints standard {"ok": False, "error": ...} response (matching the existing "name required" validation) instead of crashing. A blank or missing port still falls back to the default (993/465).
This commit is contained in:
Victor
2026-06-26 14:32:56 -04:00
committed by GitHub
parent ac05dff73c
commit d4cd6d60f1
2 changed files with 92 additions and 8 deletions
+36 -8
View File
@@ -64,6 +64,21 @@ ODYSSEUS_MAIL_ORIGIN = "odysseus-ui"
EMAIL_READ_ATTACHMENT_VERSION = 2
def _coerce_port(value, default):
"""Coerce a user-supplied port to int.
Returns ``(port, error)``. A missing or blank value yields ``default``; a
non-numeric value yields ``(None, message)`` so callers can return a clean
error instead of letting ``int()`` raise and surface as an HTTP 500.
"""
if value in (None, ""):
return default, None
try:
return int(value), None
except (TypeError, ValueError):
return None, f"Invalid port {value!r}; must be a whole number"
def _email_tag_owner_aliases(account_id: str | None, owner: str = "") -> list[str]:
aliases = [owner or ""]
try:
@@ -3329,6 +3344,12 @@ def setup_email_routes():
name = (data.get("name") or "").strip()
if not name:
return {"ok": False, "error": "name required"}
imap_port, port_err = _coerce_port(data.get("imap_port"), 993)
if port_err:
return {"ok": False, "error": port_err}
smtp_port, port_err = _coerce_port(data.get("smtp_port"), 465)
if port_err:
return {"ok": False, "error": port_err}
db = SessionLocal()
try:
row = EmailAccount(
@@ -3337,13 +3358,13 @@ def setup_email_routes():
is_default=bool(data.get("is_default", False)),
enabled=bool(data.get("enabled", True)),
imap_host=(data.get("imap_host") or "").strip(),
imap_port=int(data.get("imap_port") or 993),
imap_port=imap_port,
imap_user=(data.get("imap_user") or "").strip(),
imap_password=_enc(data.get("imap_password") or ""),
imap_starttls=bool(data.get("imap_starttls", True)),
smtp_host=(data.get("smtp_host") or "").strip(),
smtp_port=int(data.get("smtp_port") or 465),
smtp_security=_smtp_security_mode({"smtp_security": data.get("smtp_security"), "smtp_port": data.get("smtp_port") or 465}),
smtp_port=smtp_port,
smtp_security=_smtp_security_mode({"smtp_security": data.get("smtp_security"), "smtp_port": smtp_port}),
smtp_user=(data.get("smtp_user") or "").strip(),
smtp_password=_enc(data.get("smtp_password") or ""),
from_address=(data.get("from_address") or "").strip(),
@@ -3387,7 +3408,10 @@ def setup_email_routes():
setattr(row, key, (data[key] or "").strip())
for key in ("imap_port", "smtp_port"):
if data.get(key) not in (None, ""):
setattr(row, key, int(data[key]))
port, port_err = _coerce_port(data.get(key), None)
if port_err:
return {"ok": False, "error": port_err}
setattr(row, key, port)
if "smtp_security" in data:
row.smtp_security = _smtp_security_mode({"smtp_security": data.get("smtp_security"), "smtp_port": data.get("smtp_port") or row.smtp_port})
for key in ("imap_starttls", "enabled"):
@@ -3491,12 +3515,14 @@ def setup_email_routes():
smtp_result = None
imap_host = (body.get("imap_host") or "").strip()
imap_port = int(body.get("imap_port") or 993)
imap_port, imap_port_err = _coerce_port(body.get("imap_port"), 993)
imap_user = (body.get("imap_user") or "").strip()
imap_pass = body.get("imap_password") or ""
imap_starttls = bool(body.get("imap_starttls"))
if not (imap_host and imap_user and imap_pass):
if imap_port_err:
imap_result = {"ok": False, "error": imap_port_err}
elif not (imap_host and imap_user and imap_pass):
imap_result = {"ok": False, "error": "Need IMAP host, username, and password"}
else:
# Connection mode resolution:
@@ -3523,8 +3549,10 @@ def setup_email_routes():
imap_result = {"ok": False, "error": _friendly_email_auth_error("IMAP", imap_host, e)}
smtp_host = (body.get("smtp_host") or "").strip()
if smtp_host:
smtp_port = int(body.get("smtp_port") or 465)
smtp_port, smtp_port_err = _coerce_port(body.get("smtp_port"), 465)
if smtp_host and smtp_port_err:
smtp_result = {"ok": False, "error": smtp_port_err}
elif smtp_host:
smtp_security = _smtp_security_mode({"smtp_security": body.get("smtp_security"), "smtp_port": smtp_port})
smtp_user = (body.get("smtp_user") or imap_user).strip()
smtp_pass = body.get("smtp_password") or imap_pass