Scope email account workflows by owner (#1309)

This commit is contained in:
Vykos
2026-06-02 19:21:02 +02:00
committed by GitHub
parent e73545f64f
commit 1adf21a7e5
4 changed files with 322 additions and 77 deletions
+72 -19
View File
@@ -297,7 +297,8 @@ def _init_scheduled_db():
send_at TEXT NOT NULL, send_at TEXT NOT NULL,
created_at TEXT NOT NULL, created_at TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'pending', status TEXT NOT NULL DEFAULT 'pending',
error TEXT error TEXT,
owner TEXT DEFAULT ''
) )
""") """)
# Email summary cache (keyed by Message-ID) # Email summary cache (keyed by Message-ID)
@@ -435,6 +436,35 @@ def _init_scheduled_db():
conn.execute("ALTER TABLE scheduled_emails ADD COLUMN account_id TEXT") conn.execute("ALTER TABLE scheduled_emails ADD COLUMN account_id TEXT")
if "odysseus_kind" not in cols: if "odysseus_kind" not in cols:
conn.execute("ALTER TABLE scheduled_emails ADD COLUMN odysseus_kind TEXT") conn.execute("ALTER TABLE scheduled_emails ADD COLUMN odysseus_kind TEXT")
if "owner" not in cols:
conn.execute("ALTER TABLE scheduled_emails ADD COLUMN owner TEXT DEFAULT ''")
conn.execute("CREATE INDEX IF NOT EXISTS ix_scheduled_emails_owner_status ON scheduled_emails(owner, status)")
# Backfill owner on legacy rows from the owning email account so the
# owner-scoped list/cancel routes surface pre-migration scheduled
# sends to the right user (the poller already resolves these by
# account at send time; this aligns the UI with that).
legacy_accounts = conn.execute(
"SELECT DISTINCT account_id FROM scheduled_emails "
"WHERE (owner IS NULL OR owner = '') AND account_id IS NOT NULL AND account_id != ''"
).fetchall()
if legacy_accounts:
try:
from core.database import SessionLocal as _SL, EmailAccount as _EA
_db = _SL()
try:
for (acct_id,) in legacy_accounts:
row = _db.query(_EA.owner).filter(_EA.id == acct_id).first()
acct_owner = (row[0] or "") if row else ""
if acct_owner:
conn.execute(
"UPDATE scheduled_emails SET owner = ? "
"WHERE account_id = ? AND (owner IS NULL OR owner = '')",
(acct_owner, acct_id),
)
finally:
_db.close()
except Exception:
pass
except Exception: except Exception:
pass pass
# Lazy migration: add turns_json to email_boundaries for server-side # Lazy migration: add turns_json to email_boundaries for server-side
@@ -815,10 +845,10 @@ def _detect_spam_folder(conn):
return None return None
def _imap_move(uid, dest, src="INBOX"): def _imap_move(uid, dest, src="INBOX", account_id: str | None = None, owner: str = ""):
"""Move a single IMAP UID from src folder to dest. Returns True on success.""" """Move a single IMAP UID from src folder to dest. Returns True on success."""
try: try:
c = _imap_connect() c = _imap_connect(account_id, owner=owner)
c.select(_q(src)) c.select(_q(src))
status, _ = c.copy(uid, _q(dest)) status, _ = c.copy(uid, _q(dest))
if status != "OK": if status != "OK":
@@ -1021,7 +1051,9 @@ def _fetch_sender_thread_context(sender_addr: str,
exclude_folder: str = "INBOX", exclude_folder: str = "INBOX",
limit: int = 3, limit: int = 3,
max_chars_per_email: int = 1500, max_chars_per_email: int = 1500,
max_attachment_chars: int = 4000) -> str: max_attachment_chars: int = 4000,
account_id: str | None = None,
owner: str = "") -> str:
"""Pull the last N emails from `sender_addr` (across common folders), """Pull the last N emails from `sender_addr` (across common folders),
extract their body snippets + attachment text, and return one formatted extract their body snippets + attachment text, and return one formatted
block ready to be glued into an LLM system prompt as "REFERENCED MATERIAL". block ready to be glued into an LLM system prompt as "REFERENCED MATERIAL".
@@ -1043,7 +1075,7 @@ def _fetch_sender_thread_context(sender_addr: str,
seen_uids.add((exclude_folder or "INBOX", str(exclude_uid))) seen_uids.add((exclude_folder or "INBOX", str(exclude_uid)))
try: try:
conn = _imap_connect() conn = _imap_connect(account_id, owner=owner)
except Exception as e: except Exception as e:
logger.warning(f"sender-thread-context: imap connect failed: {e}") logger.warning(f"sender-thread-context: imap connect failed: {e}")
return "" return ""
@@ -1126,7 +1158,12 @@ def _fetch_sender_thread_context(sender_addr: str,
return "\n\n=====\n\n".join(blocks) return "\n\n=====\n\n".join(blocks)
def _pre_retrieve_context(body: str, sender: str) -> tuple: def _pre_retrieve_context(
body: str,
sender: str,
account_id: str | None = None,
owner: str = "",
) -> tuple:
"""Extract key terms from an incoming email and search past emails + contacts. """Extract key terms from an incoming email and search past emails + contacts.
Returns (context_snippets, terms_list). Best-effort; never raises. Returns (context_snippets, terms_list). Best-effort; never raises.
@@ -1150,21 +1187,37 @@ def _pre_retrieve_context(body: str, sender: str) -> tuple:
# ── Known-sender check: only retrieve context for senders we already # ── Known-sender check: only retrieve context for senders we already
# have a relationship with. New / cold senders get an empty context. # have a relationship with. New / cold senders get an empty context.
sender_addr = email.utils.parseaddr(sender or "")[1].lower() sender_addr = email.utils.parseaddr(sender or "")[1].lower()
is_known = False # The CardDAV address book is global admin data backed by a single
# Radicale instance, so only fold it into reply context for an admin /
# single-user owner. Non-admin owners still get their own (owner-scoped)
# IMAP history below, just not the shared contacts.
try: try:
from routes.contacts_routes import _fetch_contacts from src.tool_security import owner_is_admin_or_single_user
for c in _fetch_contacts() or []: contacts_allowed = owner_is_admin_or_single_user(owner or None)
# Contacts are normalized to plural `emails` lists (see
# contacts_routes._normalize_contact); the old `c.get("email")`
# singular key never exists, so known senders were never matched.
if sender_addr in [(e or "").lower() for e in (c.get("emails") or [])]:
is_known = True
break
except Exception: except Exception:
pass contacts_allowed = not bool(owner)
is_known = False
if contacts_allowed:
try:
from routes.contacts_routes import _fetch_contacts
for c in _fetch_contacts() or []:
# Contacts are normalized to plural `emails` lists, but
# keep the legacy singular key fallback for older data.
contact_emails = []
raw_emails = c.get("emails")
if isinstance(raw_emails, list):
contact_emails.extend(str(e or "") for e in raw_emails)
legacy_email = c.get("email")
if legacy_email:
contact_emails.append(str(legacy_email))
if any((addr or "").strip().lower() == sender_addr for addr in contact_emails):
is_known = True
break
except Exception:
pass
if not is_known and sender_addr: if not is_known and sender_addr:
try: try:
with _imap() as _ck: with _imap(account_id, owner=owner) as _ck:
_ck.select("INBOX", readonly=True) _ck.select("INBOX", readonly=True)
st_known, dk = _ck.search(None, f'(FROM "{sender_addr}")') st_known, dk = _ck.search(None, f'(FROM "{sender_addr}")')
if st_known == "OK" and dk and dk[0]: if st_known == "OK" and dk and dk[0]:
@@ -1202,7 +1255,7 @@ def _pre_retrieve_context(body: str, sender: str) -> tuple:
return context_snippets, terms_list return context_snippets, terms_list
try: try:
ctx_conn = _imap_connect() ctx_conn = _imap_connect(account_id, owner=owner)
for folder in ["INBOX", "Sent", "Archive", "Drafts"]: for folder in ["INBOX", "Sent", "Archive", "Drafts"]:
try: try:
st_sel, _sd = ctx_conn.select(_q(folder), readonly=True) st_sel, _sd = ctx_conn.select(_q(folder), readonly=True)
@@ -1246,7 +1299,7 @@ def _pre_retrieve_context(body: str, sender: str) -> tuple:
try: try:
from routes.contacts_routes import _fetch_contacts from routes.contacts_routes import _fetch_contacts
all_contacts = _fetch_contacts() all_contacts = _fetch_contacts() if contacts_allowed else []
for term in terms_list: for term in terms_list:
t_lower = term.lower() t_lower = term.lower()
matches = [c for c in all_contacts matches = [c for c in all_contacts
+45 -30
View File
@@ -45,6 +45,21 @@ from routes.email_helpers import (
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def _owner_for_email_account(account_id: str | None) -> str:
if not account_id:
return ""
try:
from core.database import SessionLocal as _SL, EmailAccount as _EA
db = _SL()
try:
row = db.query(_EA.owner).filter(_EA.id == account_id).first()
return (row[0] or "") if row else ""
finally:
db.close()
except Exception:
return ""
# ── Routes ── # ── Routes ──
async def _emit_progress(progress_cb, message: str): async def _emit_progress(progress_cb, message: str):
@@ -143,25 +158,17 @@ async def _auto_summarize_pass_single(days_back: int = 1, account_id: str | None
if not auto_sum and not auto_reply and not auto_tag and not auto_spam and not auto_cal: if not auto_sum and not auto_reply and not auto_tag and not auto_spam and not auto_cal:
return "Nothing to do" return "Nothing to do"
# Owner of the account being processed. All calendar reads/writes below are # Owner of the account being processed. All calendar + mailbox reads/writes
# scoped to this user: the multi-account fan-out runs every user's mailbox, # below are scoped to this user: the multi-account fan-out runs every user's
# so an unscoped pass would disclose and mutate other tenants' calendars. # mailbox, so an unscoped pass would disclose/mutate other tenants' data.
_acct_owner = None # One resolution feeds both the mailbox path (account_owner) and upstream's
try: # calendar path (_acct_owner, which expects None rather than "").
from core.database import SessionLocal as _SLo, EmailAccount as _EAo account_owner = _owner_for_email_account(account_id)
_dbo = _SLo() _acct_owner = account_owner or None
try:
if account_id:
_arow = _dbo.query(_EAo).filter(_EAo.id == account_id).first()
_acct_owner = _arow.owner if _arow else None
finally:
_dbo.close()
except Exception:
_acct_owner = None
try: try:
await _emit_progress(progress_cb, "Connecting to mail…") await _emit_progress(progress_cb, "Connecting to mail…")
conn = _imap_connect(account_id) conn = _imap_connect(account_id, owner=account_owner)
from datetime import timedelta as _td from datetime import timedelta as _td
since = (datetime.utcnow() - _td(days=max(1, days_back))).strftime("%d-%b-%Y") since = (datetime.utcnow() - _td(days=max(1, days_back))).strftime("%d-%b-%Y")
# uid_list carries real IMAP UIDs, matching the email UI/read routes. # uid_list carries real IMAP UIDs, matching the email UI/read routes.
@@ -212,7 +219,13 @@ async def _auto_summarize_pass_single(days_back: int = 1, account_id: str | None
_c = _sql3.connect(SCHEDULED_DB) _c = _sql3.connect(SCHEDULED_DB)
_sum_existing = {r[0] for r in _c.execute("SELECT message_id FROM email_summaries").fetchall()} _sum_existing = {r[0] for r in _c.execute("SELECT message_id FROM email_summaries").fetchall()}
_reply_existing = {r[0] for r in _c.execute("SELECT message_id FROM email_ai_replies").fetchall()} _reply_existing = {r[0] for r in _c.execute("SELECT message_id FROM email_ai_replies").fetchall()}
_tag_existing = {r[0] for r in _c.execute("SELECT message_id FROM email_tags").fetchall()} if (auto_tag or auto_spam) else set() if auto_tag or auto_spam:
if account_owner:
_tag_existing = {r[0] for r in _c.execute("SELECT message_id FROM email_tags WHERE owner=?", (account_owner,)).fetchall()}
else:
_tag_existing = {r[0] for r in _c.execute("SELECT message_id FROM email_tags WHERE owner='' OR owner IS NULL").fetchall()}
else:
_tag_existing = set()
_cal_existing = {r[0] for r in _c.execute("SELECT message_id FROM email_calendar_extractions").fetchall()} if auto_cal else set() _cal_existing = {r[0] for r in _c.execute("SELECT message_id FROM email_calendar_extractions").fetchall()} if auto_cal else set()
# Urgency is handled by the built-in `check_email_urgency` task. Keep # Urgency is handled by the built-in `check_email_urgency` task. Keep
# this legacy poller path disabled so users don't get two independent # this legacy poller path disabled so users don't get two independent
@@ -225,7 +238,7 @@ async def _auto_summarize_pass_single(days_back: int = 1, account_id: str | None
# this per-iteration was making big inbox scans crawl. Used by the # this per-iteration was making big inbox scans crawl. Used by the
# urgency self-loop check below. # urgency self-loop check below.
try: try:
_self_self_addr = (_get_email_config(account_id).get("from_address") or "").strip().lower() _self_self_addr = (_get_email_config(account_id, owner=account_owner).get("from_address") or "").strip().lower()
except Exception: except Exception:
_self_self_addr = "" _self_self_addr = ""
@@ -233,9 +246,9 @@ async def _auto_summarize_pass_single(days_back: int = 1, account_id: str | None
if auto_spam and not spam_folder: if auto_spam and not spam_folder:
logger.warning("Auto-spam enabled but no Junk/Spam folder detected — will classify but not move") logger.warning("Auto-spam enabled but no Junk/Spam folder detected — will classify but not move")
url, model, headers = resolve_endpoint("utility") url, model, headers = resolve_endpoint("utility", owner=account_owner)
if not url: if not url:
url, model, headers = resolve_endpoint("default") url, model, headers = resolve_endpoint("default", owner=account_owner)
if not url or not model: if not url or not model:
conn.logout() conn.logout()
return "No model configured" return "No model configured"
@@ -395,8 +408,8 @@ async def _auto_summarize_pass_single(days_back: int = 1, account_id: str | None
await _emit_progress(progress_cb, f"Drafting reply {processed + 1}/{_max_process} · checked {examined}/{len(uid_list)}") await _emit_progress(progress_cb, f"Drafting reply {processed + 1}/{_max_process} · checked {examined}/{len(uid_list)}")
# Background reply drafting should not make the whole app # Background reply drafting should not make the whole app
# feel busy. Keep it lightweight: no extra IMAP context # feel busy. Keep it lightweight: no extra IMAP context
# mining here; manual AI Reply can still do that when the # mining here; manual AI Reply can still do that (owner-scoped)
# user explicitly asks for a draft on one email. # when the user explicitly asks for a draft on one email.
context_snippets, _terms = [], [] context_snippets, _terms = [], []
sys_prompt = _EMAIL_REPLY_SYS_PROMPT_BASE sys_prompt = _EMAIL_REPLY_SYS_PROMPT_BASE
if att_text: if att_text:
@@ -711,7 +724,7 @@ async def _auto_summarize_pass_single(days_back: int = 1, account_id: str | None
# Send alert email immediately if critical or high # Send alert email immediately if critical or high
if urgency in ("critical", "high"): if urgency in ("critical", "high"):
try: try:
cfg = _get_email_config(account_id) cfg = _get_email_config(account_id, owner=account_owner)
to_addr = cfg["from_address"] # self-email to_addr = cfg["from_address"] # self-email
# Deep-link to open the original email in Odysseus (if public URL is configured). # Deep-link to open the original email in Odysseus (if public URL is configured).
@@ -846,17 +859,17 @@ async def _auto_summarize_pass_single(days_back: int = 1, account_id: str | None
moved_to = "" moved_to = ""
if is_spam and auto_spam and spam_folder: if is_spam and auto_spam and spam_folder:
if _imap_move(uid, spam_folder): if _imap_move(uid, spam_folder, account_id=account_id, owner=account_owner):
moved_to = spam_folder moved_to = spam_folder
logger.info(f"Auto-spam moved uid={uid.decode()} to {spam_folder}: {spam_reason}") logger.info(f"Auto-spam moved uid={uid.decode()} to {spam_folder}: {spam_reason}")
_c = _sql3.connect(SCHEDULED_DB) _c = _sql3.connect(SCHEDULED_DB)
_c.execute(""" _c.execute("""
INSERT OR REPLACE INTO email_tags INSERT OR REPLACE INTO email_tags
(message_id, uid, folder, subject, sender, tags, spam_verdict, (message_id, owner, uid, folder, subject, sender, tags, spam_verdict,
spam_reason, moved_to, model_used, created_at) spam_reason, moved_to, model_used, created_at)
VALUES (?, ?, 'INBOX', ?, ?, ?, ?, ?, ?, ?, ?) VALUES (?, ?, ?, 'INBOX', ?, ?, ?, ?, ?, ?, ?, ?)
""", (message_id, uid.decode(), subject, sender, """, (message_id, account_owner or "", uid.decode(), subject, sender,
json.dumps(tags), 1 if is_spam else 0, json.dumps(tags), 1 if is_spam else 0,
spam_reason, moved_to, model, datetime.utcnow().isoformat())) spam_reason, moved_to, model, datetime.utcnow().isoformat()))
_c.commit() _c.commit()
@@ -936,8 +949,9 @@ def _scheduled_poll_once() -> dict:
conn = sqlite3.connect(SCHEDULED_DB) conn = sqlite3.connect(SCHEDULED_DB)
cols = [row[1] for row in conn.execute("PRAGMA table_info(scheduled_emails)").fetchall()] cols = [row[1] for row in conn.execute("PRAGMA table_info(scheduled_emails)").fetchall()]
kind_expr = "odysseus_kind" if "odysseus_kind" in cols else "'scheduled' AS odysseus_kind" kind_expr = "odysseus_kind" if "odysseus_kind" in cols else "'scheduled' AS odysseus_kind"
owner_expr = "owner" if "owner" in cols else "'' AS owner"
rows = conn.execute(f""" rows = conn.execute(f"""
SELECT id, to_addr, cc, bcc, subject, body, in_reply_to, references_hdr, attachments, account_id, {kind_expr} SELECT id, to_addr, cc, bcc, subject, body, in_reply_to, references_hdr, attachments, account_id, {kind_expr}, {owner_expr}
FROM scheduled_emails FROM scheduled_emails
WHERE status = 'pending' AND send_at <= ? WHERE status = 'pending' AND send_at <= ?
""", (now_iso,)).fetchall() """, (now_iso,)).fetchall()
@@ -949,7 +963,8 @@ def _scheduled_poll_once() -> dict:
attachments = json.loads(r[8] or "[]") attachments = json.loads(r[8] or "[]")
row_account_id = r[9] if len(r) > 9 else None row_account_id = r[9] if len(r) > 9 else None
odysseus_kind = r[10] if len(r) > 10 else "scheduled" odysseus_kind = r[10] if len(r) > 10 else "scheduled"
cfg = _get_email_config(row_account_id) row_owner = (r[11] if len(r) > 11 else "") or _owner_for_email_account(row_account_id)
cfg = _get_email_config(row_account_id, owner=row_owner)
has_atts = bool(attachments) has_atts = bool(attachments)
if has_atts: if has_atts:
outer = MIMEMultipart("mixed") outer = MIMEMultipart("mixed")
@@ -986,7 +1001,7 @@ def _scheduled_poll_once() -> dict:
# Append to local Sent folder # Append to local Sent folder
try: try:
with _imap() as imap: with _imap(row_account_id, owner=row_owner) as imap:
sent_folder = _detect_sent_folder(imap) sent_folder = _detect_sent_folder(imap)
imap.append(sent_folder, "\\Seen", None, outer.as_bytes()) imap.append(sent_folder, "\\Seen", None, outer.as_bytes())
except Exception as e: except Exception as e:
+51 -28
View File
@@ -90,6 +90,16 @@ def _email_tag_owner_aliases(account_id: str | None, owner: str = "") -> list[st
return out or [""] return out or [""]
def _email_tag_owner_clause(account_id: str | None, owner: str = "") -> tuple[str, list[str]]:
aliases = _email_tag_owner_aliases(account_id, owner)
placeholders = ",".join("?" * len(aliases))
# In configured multi-user mode, do not treat legacy owner='' rows as
# visible to everyone. Single-user/unconfigured mode keeps legacy rows.
if owner:
return f"owner IN ({placeholders})", aliases
return f"(owner IN ({placeholders}) OR owner IS NULL)", aliases
def _record_email_received_events(owner: str, account_id: str | None, folder: str, emails: list[dict]): def _record_email_received_events(owner: str, account_id: str | None, folder: str, emails: list[dict]):
"""Baseline inbox messages, then fire `email_received` for new arrivals.""" """Baseline inbox messages, then fire `email_received` for new arrivals."""
if not owner or (folder or "INBOX").upper() != "INBOX" or not emails: if not owner or (folder or "INBOX").upper() != "INBOX" or not emails:
@@ -645,8 +655,7 @@ def setup_email_routes():
try: try:
import sqlite3 as _sql3t import sqlite3 as _sql3t
_ct = _sql3t.connect(SCHEDULED_DB) _ct = _sql3t.connect(SCHEDULED_DB)
_owner_aliases = _email_tag_owner_aliases(account_id, owner) _owner_clause, _owner_params = _email_tag_owner_clause(account_id, owner)
_owner_ph = ",".join("?" * len(_owner_aliases))
# SECURITY: owner-scope the lookup (review C2/H8). Without # SECURITY: owner-scope the lookup (review C2/H8). Without
# this, user A's `tag:urgent` filter would surface UIDs # this, user A's `tag:urgent` filter would surface UIDs
# written by user B and IMAP would return whatever # written by user B and IMAP would return whatever
@@ -658,8 +667,8 @@ def setup_email_routes():
rows_t = _ct.execute( rows_t = _ct.execute(
"SELECT message_id, uid FROM email_tags " "SELECT message_id, uid FROM email_tags "
"WHERE folder=? AND spam_verdict=1 " "WHERE folder=? AND spam_verdict=1 "
f"AND (owner IN ({_owner_ph}) OR owner IS NULL)", f"AND {_owner_clause}",
(folder, *_owner_aliases), (folder, *_owner_params),
).fetchall() ).fetchall()
for mid, uid in rows_t: for mid, uid in rows_t:
if mid: if mid:
@@ -670,8 +679,8 @@ def setup_email_routes():
rows_t = _ct.execute( rows_t = _ct.execute(
"SELECT message_id, uid, tags FROM email_tags " "SELECT message_id, uid, tags FROM email_tags "
"WHERE folder=? AND tags IS NOT NULL AND tags != '' " "WHERE folder=? AND tags IS NOT NULL AND tags != '' "
f"AND (owner IN ({_owner_ph}) OR owner IS NULL)", f"AND {_owner_clause}",
(folder, *_owner_aliases), (folder, *_owner_params),
).fetchall() ).fetchall()
for r in rows_t: for r in rows_t:
try: try:
@@ -743,12 +752,11 @@ def setup_email_routes():
_uid_strs = [u.decode() for u in uid_list] _uid_strs = [u.decode() for u in uid_list]
if _uid_strs: if _uid_strs:
placeholders = ",".join("?" * len(_uid_strs)) placeholders = ",".join("?" * len(_uid_strs))
_owner_aliases = _email_tag_owner_aliases(account_id, owner) _owner_clause, _owner_params = _email_tag_owner_clause(account_id, owner)
_owner_ph = ",".join("?" * len(_owner_aliases))
rows = _c.execute( rows = _c.execute(
f"SELECT uid, tags, spam_verdict FROM email_tags " f"SELECT uid, tags, spam_verdict FROM email_tags "
f"WHERE folder=? AND (owner IN ({_owner_ph}) OR owner IS NULL) AND uid IN ({placeholders})", f"WHERE folder=? AND {_owner_clause} AND uid IN ({placeholders})",
[folder, *_owner_aliases, *_uid_strs], [folder, *_owner_params, *_uid_strs],
).fetchall() ).fetchall()
for r in rows: for r in rows:
try: try:
@@ -805,14 +813,13 @@ def setup_email_routes():
if header_ids: if header_ids:
import sqlite3 as _sql3m import sqlite3 as _sql3m
_cm = _sql3m.connect(SCHEDULED_DB) _cm = _sql3m.connect(SCHEDULED_DB)
_owner_aliases_m = _email_tag_owner_aliases(account_id, owner) _owner_clause_m, _owner_params_m = _email_tag_owner_clause(account_id, owner)
_owner_ph_m = ",".join("?" * len(_owner_aliases_m))
_mid_ph = ",".join("?" * len(header_ids)) _mid_ph = ",".join("?" * len(header_ids))
rows_m = _cm.execute( rows_m = _cm.execute(
f"SELECT message_id, tags, spam_verdict FROM email_tags " f"SELECT message_id, tags, spam_verdict FROM email_tags "
f"WHERE folder=? AND (owner IN ({_owner_ph_m}) OR owner IS NULL) " f"WHERE folder=? AND {_owner_clause_m} "
f"AND message_id IN ({_mid_ph})", f"AND message_id IN ({_mid_ph})",
[folder, *_owner_aliases_m, *header_ids], [folder, *_owner_params_m, *header_ids],
).fetchall() ).fetchall()
_cm.close() _cm.close()
for mid, tags_raw, spam_raw in rows_m: for mid, tags_raw, spam_raw in rows_m:
@@ -971,10 +978,11 @@ def setup_email_routes():
async def unflag_spam(uid: str, owner: str = Depends(require_owner)): async def unflag_spam(uid: str, owner: str = Depends(require_owner)):
"""User override — mark email as not spam.""" """User override — mark email as not spam."""
try: try:
owner_clause, owner_params = _email_tag_owner_clause(None, owner)
_c = _sql3.connect(SCHEDULED_DB) _c = _sql3.connect(SCHEDULED_DB)
_c.execute( _c.execute(
"UPDATE email_tags SET spam_verdict=0, spam_reason='' WHERE uid=?", f"UPDATE email_tags SET spam_verdict=0, spam_reason='' WHERE uid=? AND {owner_clause}",
(uid,), [uid, *owner_params],
) )
_c.commit() _c.commit()
_c.close() _c.close()
@@ -997,8 +1005,10 @@ def setup_email_routes():
ql = (q or "").strip().lower() ql = (q or "").strip().lower()
try: try:
conn = _sql3.connect(SCHEDULED_DB) conn = _sql3.connect(SCHEDULED_DB)
owner_clause, owner_params = _email_tag_owner_clause(None, owner)
rows = conn.execute( rows = conn.execute(
"SELECT sender FROM email_tags WHERE sender IS NOT NULL AND sender != ''" f"SELECT sender FROM email_tags WHERE sender IS NOT NULL AND sender != '' AND {owner_clause}",
owner_params,
).fetchall() ).fetchall()
conn.close() conn.close()
seen = {} seen = {}
@@ -1969,8 +1979,8 @@ def setup_email_routes():
conn = sqlite3.connect(SCHEDULED_DB) conn = sqlite3.connect(SCHEDULED_DB)
conn.execute(""" conn.execute("""
INSERT INTO scheduled_emails INSERT INTO scheduled_emails
(id, to_addr, cc, bcc, subject, body, in_reply_to, references_hdr, attachments, send_at, created_at, status, account_id, odysseus_kind) (id, to_addr, cc, bcc, subject, body, in_reply_to, references_hdr, attachments, send_at, created_at, status, account_id, odysseus_kind, owner)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 'pending', ?, ?) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 'pending', ?, ?, ?)
""", ( """, (
sid, sid,
req.get("to", ""), req.get("to", ""),
@@ -1985,6 +1995,7 @@ def setup_email_routes():
datetime.utcnow().isoformat(), datetime.utcnow().isoformat(),
req.get("account_id") or None, req.get("account_id") or None,
req.get("odysseus_kind") or "scheduled", req.get("odysseus_kind") or "scheduled",
owner or "",
)) ))
conn.commit() conn.commit()
conn.close() conn.close()
@@ -2003,9 +2014,9 @@ def setup_email_routes():
rows = conn.execute(""" rows = conn.execute("""
SELECT id, to_addr, cc, subject, send_at, created_at, status, error SELECT id, to_addr, cc, subject, send_at, created_at, status, error
FROM scheduled_emails FROM scheduled_emails
WHERE status IN ('pending', 'failed') WHERE status IN ('pending', 'failed') AND owner = ?
ORDER BY send_at ASC ORDER BY send_at ASC
""").fetchall() """, (owner or "",)).fetchall()
conn.close() conn.close()
return {"scheduled": [ return {"scheduled": [
{ {
@@ -2023,7 +2034,10 @@ def setup_email_routes():
import sqlite3 import sqlite3
try: try:
conn = sqlite3.connect(SCHEDULED_DB) conn = sqlite3.connect(SCHEDULED_DB)
conn.execute("DELETE FROM scheduled_emails WHERE id = ? AND status = 'pending'", (sid,)) conn.execute(
"DELETE FROM scheduled_emails WHERE id = ? AND status = 'pending' AND owner = ?",
(sid, owner or ""),
)
conn.commit() conn.commit()
conn.close() conn.close()
return {"success": True} return {"success": True}
@@ -2035,7 +2049,7 @@ def setup_email_routes():
async def resolve_contact(name: str = Query(..., description="Name to search for"), owner: str = Depends(require_owner)): async def resolve_contact(name: str = Query(..., description="Name to search for"), owner: str = Depends(require_owner)):
"""Search Sent folder for a contact by name. Returns matching email addresses.""" """Search Sent folder for a contact by name. Returns matching email addresses."""
try: try:
with _imap() as conn: with _imap(owner=owner) as conn:
matches = {} matches = {}
for folder in ["Sent", "INBOX", "Drafts"]: for folder in ["Sent", "INBOX", "Drafts"]:
try: try:
@@ -2590,7 +2604,7 @@ def setup_email_routes():
# `api_key` field. # `api_key` field.
from core.database import SessionLocal as _SL, Session as _CS from core.database import SessionLocal as _SL, Session as _CS
_db = _SL() _db = _SL()
sess = _db.query(_CS).filter(_CS.id == session_id).first() sess = _db.query(_CS).filter(_CS.id == session_id, _CS.owner == owner).first()
if sess and sess.endpoint_url: if sess and sess.endpoint_url:
url = sess.endpoint_url url = sess.endpoint_url
# Some sessions stored headers double-encoded (a JSON # Some sessions stored headers double-encoded (a JSON
@@ -2649,9 +2663,10 @@ def setup_email_routes():
# Manual AI Reply should feel immediate. The heavier context mining # Manual AI Reply should feel immediate. The heavier context mining
# can involve multiple IMAP folder searches and attachment parsing; # can involve multiple IMAP folder searches and attachment parsing;
# reserve that for callers that explicitly opt out of fast mode. # reserve that for callers that explicitly opt out of fast mode.
# Owner-scoped so pre-retrieval never crosses tenants.
context_snippets, _terms = ([], []) context_snippets, _terms = ([], [])
if not fast_reply: if not fast_reply:
context_snippets, _terms = _pre_retrieve_context(original_body, to) context_snippets, _terms = _pre_retrieve_context(original_body, to, owner=owner)
# NEW: also pull the last few emails from the original sender + # NEW: also pull the last few emails from the original sender +
# their attachments. The "to" field on this endpoint is the # their attachments. The "to" field on this endpoint is the
@@ -2667,6 +2682,7 @@ def setup_email_routes():
exclude_uid=source_uid, exclude_uid=source_uid,
exclude_folder=source_folder, exclude_folder=source_folder,
limit=3, limit=3,
owner=owner,
) )
except Exception as _e: except Exception as _e:
logger.warning(f"sender-thread-context failed: {_e}") logger.warning(f"sender-thread-context failed: {_e}")
@@ -2728,7 +2744,7 @@ def setup_email_routes():
# Configured fallback chains last. # Configured fallback chains last.
for cand in resolve_utility_fallback_candidates(owner=owner) or []: for cand in resolve_utility_fallback_candidates(owner=owner) or []:
_add(*cand) _add(*cand)
for cand in resolve_chat_fallback_candidates() or []: for cand in resolve_chat_fallback_candidates(owner=owner) or []:
_add(*cand) _add(*cand)
try: try:
reply = await llm_call_async_with_fallback( reply = await llm_call_async_with_fallback(
@@ -2819,9 +2835,12 @@ def setup_email_routes():
import uuid as _uuid import uuid as _uuid
db = SessionLocal() db = SessionLocal()
try: try:
row = db.query(EmailAccount).filter(EmailAccount.is_default == True).first() # noqa: E712 q = db.query(EmailAccount).filter(EmailAccount.is_default == True) # noqa: E712
if owner:
q = q.filter(EmailAccount.owner == owner)
row = q.first()
if row is None: if row is None:
row = EmailAccount(id=_uuid.uuid4().hex, name="Default", is_default=True, enabled=True) row = EmailAccount(id=_uuid.uuid4().hex, owner=owner, name="Default", is_default=True, enabled=True)
db.add(row) db.add(row)
field_map = { field_map = {
"smtp_host": "smtp_host", "smtp_port": "smtp_port", "smtp_user": "smtp_user", "smtp_host": "smtp_host", "smtp_port": "smtp_port", "smtp_user": "smtp_user",
@@ -2843,6 +2862,10 @@ def setup_email_routes():
row.imap_password = _enc(data["imap_password"]) row.imap_password = _enc(data["imap_password"])
if data.get("smtp_password"): if data.get("smtp_password"):
row.smtp_password = _enc(data["smtp_password"]) row.smtp_password = _enc(data["smtp_password"])
clear_q = db.query(EmailAccount).filter(EmailAccount.id != row.id)
if owner:
clear_q = clear_q.filter(EmailAccount.owner == owner)
clear_q.update({EmailAccount.is_default: False})
db.commit() db.commit()
finally: finally:
db.close() db.close()
+154
View File
@@ -0,0 +1,154 @@
import sqlite3
from datetime import datetime, timedelta, timezone
import pytest
def _route_endpoint(router, path: str, method: str):
method = method.upper()
for route in router.routes:
if route.path == path and method in getattr(route, "methods", set()):
return route.endpoint
raise AssertionError(f"route not found: {method} {path}")
def test_email_tag_clause_excludes_legacy_owner_rows_for_authenticated_owner(monkeypatch):
import routes.email_routes as email_routes
monkeypatch.setattr(
email_routes,
"_email_tag_owner_aliases",
lambda account_id, owner="": ["alice", "alice@example.com"],
)
clause, params = email_routes._email_tag_owner_clause("acct-alice", "alice")
assert clause == "owner IN (?,?)"
assert params == ["alice", "alice@example.com"]
assert "owner IS NULL" not in clause
def test_email_tag_clause_keeps_legacy_rows_for_single_user_mode(monkeypatch):
import routes.email_routes as email_routes
monkeypatch.setattr(
email_routes,
"_email_tag_owner_aliases",
lambda account_id, owner="": [""],
)
clause, params = email_routes._email_tag_owner_clause(None, "")
assert clause == "(owner IN (?) OR owner IS NULL)"
assert params == [""]
@pytest.mark.asyncio
async def test_scheduled_email_routes_are_owner_scoped(tmp_path, monkeypatch):
import routes.email_helpers as email_helpers
import routes.email_routes as email_routes
db_path = tmp_path / "scheduled_emails.db"
monkeypatch.setattr(email_helpers, "SCHEDULED_DB", db_path)
monkeypatch.setattr(email_routes, "SCHEDULED_DB", db_path)
email_helpers._init_scheduled_db()
router = email_routes.setup_email_routes()
schedule_email = _route_endpoint(router, "/api/email/schedule", "POST")
list_scheduled = _route_endpoint(router, "/api/email/scheduled", "GET")
cancel_scheduled = _route_endpoint(router, "/api/email/scheduled/{sid}", "DELETE")
send_at = (datetime.now(timezone.utc) + timedelta(days=1)).isoformat()
alice = await schedule_email(
{"to": "a@example.com", "body": "alice body", "send_at": send_at},
owner="alice",
)
bob = await schedule_email(
{"to": "b@example.com", "body": "bob body", "send_at": send_at},
owner="bob",
)
assert alice["success"] is True
assert bob["success"] is True
alice_rows = await list_scheduled(owner="alice")
bob_rows = await list_scheduled(owner="bob")
assert [row["id"] for row in alice_rows["scheduled"]] == [alice["id"]]
assert [row["id"] for row in bob_rows["scheduled"]] == [bob["id"]]
await cancel_scheduled(bob["id"], owner="alice")
bob_rows = await list_scheduled(owner="bob")
assert [row["id"] for row in bob_rows["scheduled"]] == [bob["id"]]
await cancel_scheduled(alice["id"], owner="alice")
alice_rows = await list_scheduled(owner="alice")
assert alice_rows["scheduled"] == []
def test_scheduled_poller_resolves_config_with_row_owner(tmp_path, monkeypatch):
import routes.email_helpers as email_helpers
import routes.email_pollers as email_pollers
db_path = tmp_path / "scheduled_emails.db"
monkeypatch.setattr(email_helpers, "SCHEDULED_DB", db_path)
monkeypatch.setattr(email_pollers, "SCHEDULED_DB", db_path)
email_helpers._init_scheduled_db()
conn = sqlite3.connect(db_path)
conn.execute(
"""
INSERT INTO scheduled_emails
(id, to_addr, subject, body, attachments, send_at, created_at, status, account_id, owner)
VALUES (?, ?, ?, ?, ?, ?, ?, 'pending', ?, ?)
""",
(
"sched-1",
"recipient@example.com",
"Subject",
"Body",
"[]",
"2000-01-01T00:00:00",
"1999-12-31T00:00:00",
"acct-alice",
"alice",
),
)
conn.commit()
conn.close()
calls = []
def fake_get_email_config(account_id=None, owner=""):
calls.append(("config", account_id, owner))
return {
"from_address": "alice@example.com",
"smtp_host": "smtp.example.com",
"smtp_user": "alice@example.com",
"smtp_password": "secret",
}
class FakeImap:
def __init__(self, account_id=None, owner=""):
calls.append(("imap", account_id, owner))
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
return False
def append(self, folder, flags, date_time, message):
calls.append(("append", folder))
monkeypatch.setattr(email_pollers, "_get_email_config", fake_get_email_config)
monkeypatch.setattr(email_pollers, "_send_smtp_message", lambda *args, **kwargs: calls.append(("send", args[1], args[2])))
monkeypatch.setattr(email_pollers, "_imap", FakeImap)
monkeypatch.setattr(email_pollers, "_detect_sent_folder", lambda imap: "Sent")
monkeypatch.setattr(email_pollers, "_cleanup_compose_uploads", lambda attachments: calls.append(("cleanup", attachments)))
result = email_pollers._scheduled_poll_once()
assert result == {"sent": ["sched-1"], "failed": []}
assert ("config", "acct-alice", "alice") in calls
assert ("imap", "acct-alice", "alice") in calls