feat(email): add Google OAuth2 for Google Workspace / .edu IMAP & SMTP (#237)

* feat(email): add Google OAuth2 for Google Workspace / .edu IMAP & SMTP

Google deprecated basic-auth (password) access for Google Workspace
accounts in May 2025. This means any .edu or org Google email account
could no longer connect via IMAP/SMTP with a username + password —
the email feature was silently broken for a large class of users.

This PR adds full OAuth2 (XOAUTH2) support for Google accounts so
Workspace / .edu emails work out of the box.

## What changed

### Backend
- `core/database.py`: add `oauth_provider`, `oauth_access_token`,
  `oauth_refresh_token`, `oauth_token_expiry`, and `display_name`
  columns to `EmailAccount` + idempotent migration
- `routes/email_helpers.py`: XOAUTH2 auth in `_imap_connect()` and
  `_send_smtp_message()`, automatic token refresh, OAuth fields in
  `_get_email_config()`
- `routes/email_routes.py`: OAuth authorize + callback routes,
  `_smtp_ready()` fix, OAuth fields through `_deliver()` closure,
  `display_name` in `From:` header

### Frontend
- `static/js/settings.js`: "Google Workspace / .edu" provider preset,
  "Connect with Google" button, success/error banner, display name field
- `static/js/document.js`: `_accountCanSend()` recognises OAuth accounts
  as SMTP-capable

* security: sign OAuth state, scope callback by owner, fix quotes & logs

Addresses reviewer feedback on the email OAuth2 PR:

- OAuth state is now HMAC-SHA256 signed (keyed with the app secret from
  secret_storage) encoding account_id + owner + a random nonce, and is
  verified with constant-time comparison in the callback before any
  token write. Replaces the bare account_id state, closing the CSRF /
  state-guessing gap.
- Callback extracts the owner from the verified state and re-checks it
  against EmailAccount.owner before writing tokens, matching the
  ownership guards used elsewhere in the email routes. Single-user mode
  (owner == "") still accepts any account, consistent with
  _assert_owns_account.
- Replaced curly/smart quotes in the Name/Email/Display Name input rows
  with plain ASCII so getElementById lookups and event wiring work.
- Stripped account name, SMTP host/user, owner, and raw provider error
  text from send-config and OAuth logs; failures now surface as generic
  error codes in the redirect instead of raw exception strings.

* test(email): add OAuth2 state, _smtp_ready, and XOAUTH2 tests

Move the OAuth state sign/verify helpers out of the setup_email_routes
closure into module-level make_oauth_state/verify_oauth_state in
email_helpers.py so they can be unit-tested, then add tests/test_email_oauth.py:

- signed state round-trips account_id + owner, nonce is unique per call
- tampered account_id, forged signature, and garbage states are rejected
- _smtp_ready treats an OAuth account (no password) as send-capable, and
  still rejects host+user-only accounts with neither password nor OAuth
- _xoauth2_string / _xoauth2_bytes produce the correct SASL XOAUTH2 framing

14 new tests; existing test_security_regressions.py still passes (28).

* refactor(email): single XOAUTH2 frame helper, use RuntimeError

Polish from self-review before merge:

- Collapse the XOAUTH2 framing to one source of truth: _xoauth2_raw()
  returns the unencoded SASL string used by both the SMTP and IMAP auth
  callbacks (each library base64-encodes it), and _xoauth2_bytes() is
  just its .encode(). Removes the unused base64 _xoauth2_string helper
  and the duplicated inline frame in _send_smtp_message.
- Raise RuntimeError (not bare Exception) for the "OAuth token
  unavailable" path, matching the convention used across src/.
- Update tests accordingly.

All 14 OAuth tests + 28 security regressions pass; SMTP/IMAP XOAUTH2
verified live against a real Workspace account.

* tests(email-oauth): cover the security-sensitive OAuth paths before merge

The previous tests only exercised pure helpers (state signing, _smtp_ready,
XOAUTH2 framing). This adds coverage for the actual token-custody and
ownership behaviour, pinning the real route handlers rather than
re-implementations of their logic.

Real OAuth callback route (pulled live from setup_email_routes()):
- missing code -> generic missing_code redirect, no account id / owner in URL
- provider error -> generic google_error redirect, raw error not echoed
- tampered/invalid state -> invalid_state redirect, auth code never leaked
- signed state with owner mismatch -> token write refused (ownership_error),
  DB row left untouched
- signed state with matching owner -> tokens written encrypted, and only to
  the intended account (a second account stays untouched)

Real accounts-list route:
- exposes oauth_provider status but never the access/refresh token values,
  encrypted or otherwise

Token storage / refresh helpers (isolated in-memory SQLite, mocked HTTP):
- refreshed access token stored encrypted; expiry is a timestamp, not a token
- fresh token uses cache (no refresh call); expired token triggers refresh
- refresh HTTP failure returns None silently, no exception or secret surfaced
- missing client credentials short-circuits to None

Password-account regression:
- password IMAP accounts call conn.login(); OAuth accounts call XOAUTH2
  authenticate() and never login()

28 tests pass (14 prior + 14 new).

* fix(email-oauth): drop raw exception text from token-refresh log

Google token refresh failures now log the account id only, matching
the conservative logging used elsewhere on the OAuth path — no raw
provider/exception details surfacing in logs.

* fix(email-oauth): bring OAuth UI parity to the Integrations email form

The Google Workspace / .edu provider preset, Display Name field, and
Connect-with-Google flow were only wired into the Email-tab account
form. The Integrations-tab form (a separate code path for the same
account type) was missing all three, so the OAuth option was invisible
from that entry point. Mirrors the same PROVIDERS entry, OAuth section,
and connect handler so both forms behave identically.

---------

Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-authored-by: Alexandre Teixeira <111787685+alteixeira20@users.noreply.github.com>
This commit is contained in:
Hriday Ranka
2026-06-15 12:02:58 -04:00
committed by GitHub
parent 0750486654
commit 270b8570fc
6 changed files with 1013 additions and 26 deletions
+580
View File
@@ -0,0 +1,580 @@
"""Tests for the Google OAuth2 email helpers.
Covers the security-critical surface added for Google Workspace / .edu
IMAP/SMTP support:
- `make_oauth_state` / `verify_oauth_state` — HMAC-signed OAuth state so the
callback can't be CSRF'd or have its account_id/owner tampered with.
- `_smtp_ready` — an OAuth account (no stored password) must still count as
send-capable; a host+user-only account without password or OAuth must not.
- `_xoauth2_raw` / `_xoauth2_bytes` — SASL XOAUTH2 framing for SMTP/IMAP.
- `_refresh_google_token` — token refresh stores result encrypted; failure is
silent (no token/secret in logs or return value).
- `_get_valid_google_token` — uses cached token when fresh; calls refresh when
expired.
- `google_oauth_callback` (real route) — invalid/tampered/missing state and
provider errors return generic redirects with no PII; owner mismatch refuses
the token write; a valid owner writes encrypted tokens only to the intended
account.
- `list_email_accounts` (real route) — exposes OAuth status but never token
values.
- `_imap_connect` — password accounts use login(); OAuth accounts use XOAUTH2.
Route tests pull the live endpoint out of `setup_email_routes()` and call it
directly — they pin the real handler, not a re-implementation. The ASGI app is
not booted; outbound HTTP is mocked and the DB is an isolated in-memory SQLite.
"""
import base64
import json
import time
import unittest.mock as mock
import pytest
# ── OAuth state signing ──────────────────────────────────────────
def test_oauth_state_round_trips_account_and_owner():
from routes.email_helpers import make_oauth_state, verify_oauth_state
state = make_oauth_state("acct-123", "user@example.com")
payload = verify_oauth_state(state)
assert payload is not None
assert payload["a"] == "acct-123"
assert payload["o"] == "user@example.com"
assert payload["n"] # nonce present
def test_oauth_state_nonce_is_unique_per_call():
from routes.email_helpers import make_oauth_state, verify_oauth_state
a = verify_oauth_state(make_oauth_state("acct", "o"))
b = verify_oauth_state(make_oauth_state("acct", "o"))
assert a["n"] != b["n"]
def test_oauth_state_rejects_tampered_account_id():
from routes.email_helpers import make_oauth_state, verify_oauth_state
state = make_oauth_state("acct-123", "user@example.com")
decoded = base64.urlsafe_b64decode(state.encode()).decode()
payload_str, sig = decoded.rsplit("|", 1)
payload = json.loads(payload_str)
payload["a"] = "evil-acct" # attacker swaps the target account
forged = base64.urlsafe_b64encode(
(json.dumps(payload, separators=(",", ":")) + "|" + sig).encode()
).decode()
assert verify_oauth_state(forged) is None
def test_oauth_state_rejects_forged_signature():
from routes.email_helpers import make_oauth_state, verify_oauth_state
state = make_oauth_state("acct-123", "user@example.com")
decoded = base64.urlsafe_b64decode(state.encode()).decode()
payload_str, _ = decoded.rsplit("|", 1)
forged = base64.urlsafe_b64encode((payload_str + "|" + "deadbeef" * 8).encode()).decode()
assert verify_oauth_state(forged) is None
@pytest.mark.parametrize("garbage", ["", "not-base64-at-all", "###", "a|b|c"])
def test_oauth_state_rejects_garbage(garbage):
from routes.email_helpers import verify_oauth_state
assert verify_oauth_state(garbage) is None
# ── _smtp_ready: OAuth accounts have no password but can still send ──
def test_smtp_ready_true_for_oauth_account_without_password():
from routes.email_routes import _smtp_ready
cfg = {
"smtp_host": "smtp.gmail.com",
"smtp_user": "me@nyu.edu",
"smtp_password": "",
"oauth_provider": "google",
}
assert _smtp_ready(cfg) is True
def test_smtp_ready_true_for_password_account():
from routes.email_routes import _smtp_ready
cfg = {
"smtp_host": "smtp.example.com",
"smtp_user": "me@example.com",
"smtp_password": "app-password",
"oauth_provider": "",
}
assert _smtp_ready(cfg) is True
def test_smtp_ready_false_without_password_or_oauth():
from routes.email_routes import _smtp_ready
cfg = {
"smtp_host": "smtp.example.com",
"smtp_user": "me@example.com",
"smtp_password": "",
"oauth_provider": "",
}
assert _smtp_ready(cfg) is False
def test_smtp_ready_false_without_host():
from routes.email_routes import _smtp_ready
cfg = {"smtp_host": "", "smtp_user": "me@x.com", "oauth_provider": "google"}
assert _smtp_ready(cfg) is False
# ── XOAUTH2 SASL framing ─────────────────────────────────────────
def test_xoauth2_raw_is_unencoded_sasl_frame():
from routes.email_helpers import _xoauth2_raw
assert _xoauth2_raw("me@nyu.edu", "tok123") == "user=me@nyu.edu\x01auth=Bearer tok123\x01\x01"
def test_xoauth2_bytes_is_raw_frame_encoded():
from routes.email_helpers import _xoauth2_bytes
assert _xoauth2_bytes("me@nyu.edu", "tok123") == b"user=me@nyu.edu\x01auth=Bearer tok123\x01\x01"
# ── Helpers for in-memory DB fixtures ────────────────────────────
def _make_db():
"""Return (Session, SessionFactory) backed by an isolated in-memory SQLite DB.
Used to test DB-touching helpers without the real database.
The factory lets tests open a fresh session after the helper closes its own.
"""
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from core.database import Base
engine = create_engine("sqlite:///:memory:", connect_args={"check_same_thread": False})
Base.metadata.create_all(engine)
Factory = sessionmaker(bind=engine)
return Factory(), Factory
def _make_account(session, account_id="acct-1", owner="alice", **kwargs):
"""Insert a minimal EmailAccount row and return it."""
from core.database import EmailAccount
row = EmailAccount(
id=account_id,
owner=owner,
name=kwargs.get("name", "Test"),
from_address=kwargs.get("from_address", "test@example.com"),
imap_host=kwargs.get("imap_host", "imap.gmail.com"),
imap_port=kwargs.get("imap_port", 993),
imap_user=kwargs.get("imap_user", "test@example.com"),
smtp_host=kwargs.get("smtp_host", "smtp.gmail.com"),
smtp_port=kwargs.get("smtp_port", 587),
smtp_user=kwargs.get("smtp_user", "test@example.com"),
)
for k, v in kwargs.items():
if hasattr(row, k):
setattr(row, k, v)
session.add(row)
session.commit()
return row
# ── Token encryption at rest ─────────────────────────────────────
def test_refresh_token_stored_encrypted_not_raw():
"""_refresh_google_token must encrypt the new access token before writing it
to the DB — storing the raw token string would expose credentials at rest."""
from src.secret_storage import encrypt as _enc, decrypt as _dec
from core.database import EmailAccount
raw_token = "ya29.test_access_token_raw"
db, Factory = _make_db()
_make_account(db, account_id="acct-r", owner="bob",
oauth_refresh_token=_enc("refresh-tok-xyz"))
db.close()
fake_resp = mock.MagicMock()
fake_resp.raise_for_status = mock.MagicMock()
fake_resp.json.return_value = {"access_token": raw_token, "expires_in": 3600}
with mock.patch("httpx.post", return_value=fake_resp), \
mock.patch("core.database.SessionLocal", Factory), \
mock.patch("routes.email_helpers.os.environ.get", side_effect=lambda k, d="": {
"GOOGLE_OAUTH_CLIENT_ID": "cid", "GOOGLE_OAUTH_CLIENT_SECRET": "csec"
}.get(k, d)):
from routes.email_helpers import _refresh_google_token
result = _refresh_google_token("acct-r")
verify_db = Factory()
row = verify_db.query(EmailAccount).filter(EmailAccount.id == "acct-r").first()
stored = row.oauth_access_token
verify_db.close()
assert result == raw_token, "function should return the plain access token to callers"
assert stored != raw_token, "raw token must not be stored directly in the DB"
assert _dec(stored) == raw_token, "stored value must decrypt back to the raw token"
def test_refresh_stores_encrypted_expiry_not_token():
"""oauth_token_expiry stores only a timestamp, never the token value."""
from src.secret_storage import encrypt as _enc
from core.database import EmailAccount
db, Factory = _make_db()
_make_account(db, account_id="acct-e", owner="bob",
oauth_refresh_token=_enc("ref-tok"))
db.close()
fake_resp = mock.MagicMock()
fake_resp.raise_for_status = mock.MagicMock()
fake_resp.json.return_value = {"access_token": "ya29.secret", "expires_in": 3600}
with mock.patch("httpx.post", return_value=fake_resp), \
mock.patch("core.database.SessionLocal", Factory), \
mock.patch("routes.email_helpers.os.environ.get", side_effect=lambda k, d="": {
"GOOGLE_OAUTH_CLIENT_ID": "cid", "GOOGLE_OAUTH_CLIENT_SECRET": "csec"
}.get(k, d)):
from routes.email_helpers import _refresh_google_token
_refresh_google_token("acct-e")
verify_db = Factory()
row = verify_db.query(EmailAccount).filter(EmailAccount.id == "acct-e").first()
expiry = row.oauth_token_expiry
verify_db.close()
assert "ya29" not in (expiry or ""), \
"token_expiry must be a timestamp, not the token string"
# ── Real OAuth callback route ─────────────────────────────────────
#
# These pull the actual google_oauth_callback endpoint out of the router and
# invoke it — they pin the real route's behaviour, not a re-implementation, so
# they fail if the ownership/state guards are ever removed or weakened.
def _callback_endpoint():
"""Return the live google_oauth_callback endpoint from the email router."""
from routes.email_routes import setup_email_routes
router = setup_email_routes()
for route in router.routes:
if route.path == "/api/email/oauth/google/callback" and "GET" in getattr(route, "methods", set()):
return route.endpoint
raise AssertionError("google_oauth_callback route not found")
class _FakeRequest:
"""Minimal stand-in for starlette Request — the callback only reads headers."""
headers = {"host": "localhost:7000"}
def _location(resp):
"""Pull the redirect target out of a RedirectResponse."""
return resp.headers["location"]
@pytest.mark.asyncio
async def test_callback_missing_code_returns_generic_error():
"""No `code` query param → generic error redirect, with no account id, owner,
or state echoed back into the URL."""
from routes.email_helpers import make_oauth_state
callback = _callback_endpoint()
state = make_oauth_state("acct-1", "alice")
resp = await callback(code=None, state=state, error=None, request=_FakeRequest())
loc = _location(resp)
assert "email_oauth_error=missing_code" in loc
assert "acct-1" not in loc, "account id must not appear in redirect URL"
assert "alice" not in loc, "owner must not appear in redirect URL"
@pytest.mark.asyncio
async def test_callback_provider_error_returns_generic_error():
"""An `error` from Google → generic error redirect, no raw provider text."""
callback = _callback_endpoint()
resp = await callback(code=None, state=None, error="access_denied", request=_FakeRequest())
loc = _location(resp)
assert "email_oauth_error=google_error" in loc
assert "access_denied" not in loc, "raw provider error must not leak into redirect"
@pytest.mark.asyncio
async def test_callback_tampered_state_returns_generic_error_no_leak():
"""Tampered/invalid state → invalid_state redirect; the auth code and any
token must never appear in the redirect URL."""
callback = _callback_endpoint()
resp = await callback(code="4/secret-auth-code", state="not-a-valid-state",
error=None, request=_FakeRequest())
loc = _location(resp)
assert "email_oauth_error=invalid_state" in loc
assert "4/secret-auth-code" not in loc, "auth code must not leak into redirect"
assert "token" not in loc
@pytest.mark.asyncio
async def test_callback_owner_mismatch_does_not_write_tokens():
"""A signed, valid state whose owner does not match the target account's
owner must NOT write tokens — this blocks one authenticated user from
binding their Google account onto another user's mailbox row.
"""
from routes.email_helpers import make_oauth_state
from core.database import EmailAccount
db, Factory = _make_db()
_make_account(db, account_id="acct-x", owner="alice")
db.close()
# Token-exchange + userinfo would succeed — the point is the ownership gate
# rejects the write *before* trusting them.
token_resp = mock.MagicMock()
token_resp.raise_for_status = mock.MagicMock()
token_resp.json.return_value = {"access_token": "ya29.attacker", "refresh_token": "r", "expires_in": 3600}
userinfo_resp = mock.MagicMock()
userinfo_resp.is_success = True
userinfo_resp.json.return_value = {"email": "bob@evil.com", "name": "Bob"}
# State is genuinely signed, but for owner "bob" — not the row owner "alice".
state = make_oauth_state("acct-x", "bob")
with mock.patch("httpx.post", return_value=token_resp), \
mock.patch("httpx.get", return_value=userinfo_resp), \
mock.patch("core.database.SessionLocal", Factory):
callback = _callback_endpoint()
resp = await callback(code="4/code", state=state, error=None, request=_FakeRequest())
loc = _location(resp)
assert "email_oauth_error=ownership_error" in loc
verify_db = Factory()
row = verify_db.query(EmailAccount).filter(EmailAccount.id == "acct-x").first()
token_after = row.oauth_access_token
verify_db.close()
assert token_after is None, "no token may be written when ownership check fails"
@pytest.mark.asyncio
async def test_callback_valid_owner_writes_encrypted_tokens_to_intended_account():
"""A signed state whose owner matches the target account writes the tokens —
and only to that account, stored encrypted (raw token never persisted)."""
from routes.email_helpers import make_oauth_state
from src.secret_storage import decrypt as _dec
from core.database import EmailAccount
db, Factory = _make_db()
_make_account(db, account_id="acct-v", owner="alice", imap_host="", smtp_host="")
_make_account(db, account_id="acct-other", owner="alice") # must stay untouched
db.close()
raw_access = "ya29.legit_access_token"
raw_refresh = "1//legit_refresh_token"
token_resp = mock.MagicMock()
token_resp.raise_for_status = mock.MagicMock()
token_resp.json.return_value = {"access_token": raw_access, "refresh_token": raw_refresh, "expires_in": 3600}
userinfo_resp = mock.MagicMock()
userinfo_resp.is_success = True
userinfo_resp.json.return_value = {"email": "alice@nyu.edu", "name": "Alice"}
state = make_oauth_state("acct-v", "alice")
with mock.patch("httpx.post", return_value=token_resp), \
mock.patch("httpx.get", return_value=userinfo_resp), \
mock.patch("core.database.SessionLocal", Factory):
callback = _callback_endpoint()
resp = await callback(code="4/code", state=state, error=None, request=_FakeRequest())
assert "email_oauth_success=1" in _location(resp)
verify_db = Factory()
target = verify_db.query(EmailAccount).filter(EmailAccount.id == "acct-v").first()
other = verify_db.query(EmailAccount).filter(EmailAccount.id == "acct-other").first()
verify_db.close()
assert target.oauth_provider == "google"
assert target.oauth_access_token != raw_access, "access token must be stored encrypted"
assert _dec(target.oauth_access_token) == raw_access
assert _dec(target.oauth_refresh_token) == raw_refresh
assert other.oauth_access_token is None, "tokens must only touch the intended account"
# ── Token refresh scenarios ───────────────────────────────────────
def test_get_valid_google_token_uses_cached_when_fresh():
"""_get_valid_google_token must NOT call refresh when the stored token is
still valid (expiry - 60s buffer > now). Refresh is an outbound HTTP call
that should only happen when genuinely needed."""
from src.secret_storage import encrypt as _enc
from routes.email_helpers import _get_valid_google_token
future_expiry = str(int(time.time()) + 7200) # 2 hours from now
cfg = {
"account_id": "acct-fresh",
"oauth_access_token": _enc("ya29.fresh_token"),
"oauth_token_expiry": future_expiry,
}
with mock.patch("routes.email_helpers._refresh_google_token") as mock_refresh:
result = _get_valid_google_token("acct-fresh", cfg)
assert result == "ya29.fresh_token"
mock_refresh.assert_not_called()
def test_get_valid_google_token_refreshes_when_expired():
"""_get_valid_google_token must call refresh when the token is expired."""
from src.secret_storage import encrypt as _enc
from routes.email_helpers import _get_valid_google_token
past_expiry = str(int(time.time()) - 10) # already expired
cfg = {
"account_id": "acct-exp",
"oauth_access_token": _enc("ya29.old_token"),
"oauth_token_expiry": past_expiry,
}
with mock.patch("routes.email_helpers._refresh_google_token", return_value="ya29.new_token") as mock_refresh:
result = _get_valid_google_token("acct-exp", cfg)
mock_refresh.assert_called_once_with("acct-exp")
assert result == "ya29.new_token"
def test_refresh_failure_returns_none_no_secret_raised():
"""When the refresh HTTP call fails, _refresh_google_token must return None
silently. It must not raise an exception or surface token/secret details."""
from src.secret_storage import encrypt as _enc
db, Factory = _make_db()
_make_account(db, account_id="acct-fail", owner="dave",
oauth_refresh_token=_enc("ref-tok"))
db.close()
failing_resp = mock.MagicMock()
failing_resp.raise_for_status.side_effect = Exception("401 Unauthorized")
with mock.patch("httpx.post", return_value=failing_resp), \
mock.patch("core.database.SessionLocal", Factory), \
mock.patch("routes.email_helpers.os.environ.get", side_effect=lambda k, d="": {
"GOOGLE_OAUTH_CLIENT_ID": "cid", "GOOGLE_OAUTH_CLIENT_SECRET": "csec"
}.get(k, d)):
from routes.email_helpers import _refresh_google_token
result = _refresh_google_token("acct-fail")
assert result is None, "failed refresh must return None, not raise"
def test_refresh_without_credentials_returns_none():
"""_refresh_google_token must return None immediately when the OAuth client
credentials are not configured — no DB query, no HTTP call."""
with mock.patch("routes.email_helpers.os.environ.get", return_value=""):
from routes.email_helpers import _refresh_google_token
result = _refresh_google_token("acct-any")
assert result is None
# ── Password-account regression ───────────────────────────────────
def test_imap_connect_uses_login_for_password_accounts():
"""Existing password-auth IMAP accounts must still call conn.login() and
must NOT trigger the XOAUTH2 authenticate path."""
from routes.email_helpers import _imap_connect
mock_conn = mock.MagicMock()
# _imap_connect calls _get_email_config internally — mock it to return our cfg.
cfg = {
"imap_host": "imap.gmail.com",
"imap_port": 993,
"imap_starttls": False,
"imap_user": "me@gmail.com",
"imap_password": "app-password-xyz",
"oauth_provider": "",
"account_id": "acct-pw",
}
with mock.patch("routes.email_helpers._open_imap_connection", return_value=mock_conn), \
mock.patch("routes.email_helpers._get_email_config", return_value=cfg):
_imap_connect("acct-pw", owner="alice")
mock_conn.login.assert_called_once_with("me@gmail.com", "app-password-xyz")
mock_conn.authenticate.assert_not_called()
def test_imap_connect_uses_xoauth2_for_oauth_accounts():
"""OAuth accounts must call conn.authenticate('XOAUTH2', ...) and must NOT
call conn.login() — which would fail without a password."""
from routes.email_helpers import _imap_connect
from src.secret_storage import encrypt as _enc
mock_conn = mock.MagicMock()
future_expiry = str(int(time.time()) + 7200)
cfg = {
"imap_host": "imap.gmail.com",
"imap_port": 993,
"imap_starttls": False,
"imap_user": "me@nyu.edu",
"imap_password": "",
"oauth_provider": "google",
"account_id": "acct-oauth",
"oauth_access_token": _enc("ya29.live_token"),
"oauth_token_expiry": future_expiry,
}
with mock.patch("routes.email_helpers._open_imap_connection", return_value=mock_conn), \
mock.patch("routes.email_helpers._get_email_config", return_value=cfg):
_imap_connect("acct-oauth", owner="alice")
mock_conn.authenticate.assert_called_once()
assert mock_conn.authenticate.call_args[0][0] == "XOAUTH2"
mock_conn.login.assert_not_called()
@pytest.mark.asyncio
async def test_account_list_response_does_not_expose_token_values():
"""The /accounts list route is the client-facing account inventory. It must
expose `oauth_provider` (so the UI can show OAuth status) but never the
access/refresh token values, encrypted or otherwise — only boolean
has_*_password flags and the provider name."""
from routes.email_routes import setup_email_routes
from src.secret_storage import encrypt as _enc
raw_access = "ya29.super_secret_access_token"
raw_refresh = "1//super_secret_refresh_token"
db, Factory = _make_db()
_make_account(db, account_id="acct-list", owner="alice",
oauth_provider="google",
oauth_access_token=_enc(raw_access),
oauth_refresh_token=_enc(raw_refresh))
db.close()
router = setup_email_routes()
list_accounts = None
for route in router.routes:
if route.path == "/api/email/accounts" and "GET" in getattr(route, "methods", set()):
list_accounts = route.endpoint
break
assert list_accounts is not None, "accounts list route not found"
with mock.patch("core.database.SessionLocal", Factory):
result = await list_accounts(owner="alice")
blob = json.dumps(result)
assert raw_access not in blob, "raw access token must not appear in list response"
assert raw_refresh not in blob, "raw refresh token must not appear in list response"
assert _enc(raw_access) not in blob, "encrypted token must not be sent to the client either"
acct = result["accounts"][0]
assert acct["oauth_provider"] == "google" # status is exposed
assert "oauth_access_token" not in acct # token value is not
assert "oauth_refresh_token" not in acct