mirror of
https://github.com/pewdiepie-archdaemon/odysseus.git
synced 2026-06-15 17:25:26 -04:00
fix(email): scope learned sender signatures by owner (#3724)
This commit is contained in:
+54
-14
@@ -304,6 +304,7 @@ OWNER_SCOPED_EMAIL_CACHE_TABLES = {
|
||||
"email_ai_replies",
|
||||
"email_calendar_extractions",
|
||||
"email_urgency_alerts",
|
||||
"sender_signatures",
|
||||
}
|
||||
|
||||
|
||||
@@ -341,6 +342,55 @@ def _ensure_owner_scoped_email_cache_table(conn, table: str, create_sql: str, co
|
||||
_lg.getLogger(__name__).warning(f"{table} owner-migration skipped: {_mig_e}")
|
||||
|
||||
|
||||
def _ensure_sender_signatures_table(conn):
|
||||
"""Create/migrate learned sender signatures to an owner-scoped cache."""
|
||||
create_sql = """
|
||||
CREATE TABLE IF NOT EXISTS sender_signatures (
|
||||
from_address TEXT,
|
||||
owner TEXT DEFAULT '',
|
||||
signature_text TEXT,
|
||||
sample_count INTEGER,
|
||||
last_built_at TEXT NOT NULL,
|
||||
model_used TEXT,
|
||||
source TEXT,
|
||||
PRIMARY KEY (from_address, owner)
|
||||
)
|
||||
"""
|
||||
conn.execute(create_sql)
|
||||
try:
|
||||
info = conn.execute("PRAGMA table_info(sender_signatures)").fetchall()
|
||||
cols = [r[1] for r in info]
|
||||
pk_cols = [r[1] for r in sorted((r for r in info if r[5]), key=lambda r: r[5])]
|
||||
if "owner" in cols and pk_cols == ["from_address", "owner"]:
|
||||
return
|
||||
|
||||
conn.execute("ALTER TABLE sender_signatures RENAME TO sender_signatures__old")
|
||||
conn.execute(create_sql)
|
||||
old_cols = [r[1] for r in conn.execute("PRAGMA table_info(sender_signatures__old)").fetchall()]
|
||||
copy_cols = [
|
||||
c for c in (
|
||||
"from_address",
|
||||
"signature_text",
|
||||
"sample_count",
|
||||
"last_built_at",
|
||||
"model_used",
|
||||
"source",
|
||||
)
|
||||
if c in old_cols
|
||||
]
|
||||
source_owner = "COALESCE(owner, '')" if "owner" in old_cols else "''"
|
||||
conn.execute(
|
||||
f"INSERT OR IGNORE INTO sender_signatures "
|
||||
f"({', '.join([*copy_cols, 'owner'])}) "
|
||||
f"SELECT {', '.join([*copy_cols, source_owner])} "
|
||||
f"FROM sender_signatures__old"
|
||||
)
|
||||
conn.execute("DROP TABLE sender_signatures__old")
|
||||
except Exception as _mig_e:
|
||||
import logging as _lg
|
||||
_lg.getLogger(__name__).warning(f"sender_signatures owner-migration skipped: {_mig_e}")
|
||||
|
||||
|
||||
def attachment_extract_dir(folder: str, uid: str) -> Path:
|
||||
"""Containment-safe extraction directory for an attachment.
|
||||
|
||||
@@ -559,20 +609,10 @@ def _init_scheduled_db():
|
||||
conn.execute("ALTER TABLE email_boundaries ADD COLUMN turns_json TEXT")
|
||||
except Exception:
|
||||
pass
|
||||
# Per-sender signature cache. Populated by `learn_sender_signatures`
|
||||
# action: the LLM extracts the common trailing block across N emails
|
||||
# from each sender; the renderer folds it consistently for every
|
||||
# future email from that address.
|
||||
conn.execute("""
|
||||
CREATE TABLE IF NOT EXISTS sender_signatures (
|
||||
from_address TEXT PRIMARY KEY,
|
||||
signature_text TEXT,
|
||||
sample_count INTEGER,
|
||||
last_built_at TEXT NOT NULL,
|
||||
model_used TEXT,
|
||||
source TEXT
|
||||
)
|
||||
""")
|
||||
# Per-sender signature cache. Populated by `learn_sender_signatures`.
|
||||
# Message sender addresses are global, so signatures must be scoped to the
|
||||
# mailbox owner before `/read` returns them to the renderer.
|
||||
_ensure_sender_signatures_table(conn)
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
|
||||
@@ -1247,8 +1247,9 @@ def setup_email_routes():
|
||||
try:
|
||||
if sender_addr:
|
||||
_rs = _c.execute(
|
||||
"SELECT signature_text FROM sender_signatures WHERE from_address = ?",
|
||||
(sender_addr.lower().strip(),),
|
||||
f"SELECT signature_text FROM sender_signatures "
|
||||
f"WHERE from_address = ? AND {owner_clause}",
|
||||
(sender_addr.lower().strip(), *owner_params),
|
||||
).fetchone()
|
||||
if _rs and _rs[0]:
|
||||
cached_sender_sig = _rs[0]
|
||||
|
||||
+10
-7
@@ -809,14 +809,14 @@ async def action_learn_sender_signatures(owner: str, **kwargs) -> Tuple[str, boo
|
||||
import email as _email_mod
|
||||
import asyncio as _aio
|
||||
from datetime import datetime as _dt, timedelta as _td
|
||||
from routes.email_helpers import _imap_connect, SCHEDULED_DB
|
||||
from routes.email_helpers import _email_cache_owner_clause, _imap_connect, SCHEDULED_DB
|
||||
from src.endpoint_resolver import resolve_endpoint
|
||||
from src.llm_core import llm_call_async
|
||||
|
||||
# 1. Pull recent UIDs + From headers cheaply (header-only fetch).
|
||||
def _pull_headers():
|
||||
results = []
|
||||
conn = _imap_connect(None)
|
||||
conn = _imap_connect(None, owner=owner)
|
||||
try:
|
||||
conn.select("INBOX", readonly=True)
|
||||
status, data = conn.search(None, "ALL")
|
||||
@@ -868,9 +868,11 @@ async def action_learn_sender_signatures(owner: str, **kwargs) -> Tuple[str, boo
|
||||
# 3. Eligibility: ≥3 emails AND (no cache OR cache > 30 days old).
|
||||
try:
|
||||
conn = _sql3.connect(SCHEDULED_DB)
|
||||
owner_clause, owner_params = _email_cache_owner_clause(owner)
|
||||
cached = {
|
||||
r[0]: r[1] for r in conn.execute(
|
||||
"SELECT from_address, last_built_at FROM sender_signatures"
|
||||
f"SELECT from_address, last_built_at FROM sender_signatures WHERE {owner_clause}",
|
||||
owner_params,
|
||||
).fetchall()
|
||||
}
|
||||
conn.close()
|
||||
@@ -901,7 +903,7 @@ async def action_learn_sender_signatures(owner: str, **kwargs) -> Tuple[str, boo
|
||||
|
||||
def _fetch_bodies(_msgs):
|
||||
bodies = []
|
||||
conn2 = _imap_connect(None)
|
||||
conn2 = _imap_connect(None, owner=owner)
|
||||
try:
|
||||
conn2.select("INBOX", readonly=True)
|
||||
for mm in _msgs:
|
||||
@@ -978,11 +980,12 @@ async def action_learn_sender_signatures(owner: str, **kwargs) -> Tuple[str, boo
|
||||
|
||||
try:
|
||||
conn = _sql3.connect(SCHEDULED_DB)
|
||||
owner_value = (owner or "").strip()
|
||||
conn.execute(
|
||||
"INSERT OR REPLACE INTO sender_signatures "
|
||||
"(from_address, signature_text, sample_count, last_built_at, model_used, source) "
|
||||
"VALUES (?, ?, ?, ?, ?, ?)",
|
||||
(addr, cached_sig, len(bodies), _dt.utcnow().isoformat(), model, "llm"),
|
||||
"(from_address, owner, signature_text, sample_count, last_built_at, model_used, source) "
|
||||
"VALUES (?, ?, ?, ?, ?, ?, ?)",
|
||||
(addr, owner_value, cached_sig, len(bodies), _dt.utcnow().isoformat(), model, "llm"),
|
||||
)
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
@@ -106,6 +106,9 @@ async def test_learn_sender_signatures_resolves_llm_for_task_owner(monkeypatch):
|
||||
from src.builtin_actions import action_learn_sender_signatures
|
||||
|
||||
class FakeImap:
|
||||
def __init__(self, owner=""):
|
||||
self.owner = owner
|
||||
|
||||
def select(self, *_args, **_kwargs):
|
||||
return "OK", []
|
||||
|
||||
@@ -119,13 +122,20 @@ async def test_learn_sender_signatures_resolves_llm_for_task_owner(monkeypatch):
|
||||
return None
|
||||
|
||||
calls, _fallback_calls = _resolver_spy(monkeypatch, utility_result=("", "", {}), default_result=("", "", {}))
|
||||
monkeypatch.setattr(email_helpers, "_imap_connect", lambda _account_id=None: FakeImap())
|
||||
imap_owners = []
|
||||
|
||||
def fake_imap_connect(_account_id=None, owner=""):
|
||||
imap_owners.append(owner)
|
||||
return FakeImap(owner)
|
||||
|
||||
monkeypatch.setattr(email_helpers, "_imap_connect", fake_imap_connect)
|
||||
|
||||
message, ok = await action_learn_sender_signatures("alice")
|
||||
|
||||
assert ok is False
|
||||
assert message == "No LLM endpoint available"
|
||||
assert calls == [("utility", "alice"), ("default", "alice")]
|
||||
assert imap_owners == ["alice"]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
import sqlite3
|
||||
from contextlib import contextmanager
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from types import SimpleNamespace
|
||||
|
||||
import pytest
|
||||
|
||||
@@ -117,6 +119,71 @@ def test_email_ai_cache_tables_are_owner_scoped_and_migrate_legacy_rows(tmp_path
|
||||
conn.close()
|
||||
|
||||
|
||||
def test_sender_signature_cache_is_owner_scoped_and_migrates_legacy_rows(tmp_path, monkeypatch):
|
||||
import routes.email_helpers as email_helpers
|
||||
|
||||
db_path = tmp_path / "scheduled_emails.db"
|
||||
monkeypatch.setattr(email_helpers, "SCHEDULED_DB", db_path)
|
||||
|
||||
conn = sqlite3.connect(db_path)
|
||||
conn.execute(
|
||||
"""
|
||||
CREATE TABLE sender_signatures (
|
||||
from_address TEXT PRIMARY KEY,
|
||||
signature_text TEXT,
|
||||
sample_count INTEGER,
|
||||
last_built_at TEXT NOT NULL,
|
||||
model_used TEXT,
|
||||
source TEXT
|
||||
)
|
||||
"""
|
||||
)
|
||||
conn.execute(
|
||||
"""
|
||||
INSERT INTO sender_signatures
|
||||
(from_address, signature_text, sample_count, last_built_at, model_used, source)
|
||||
VALUES ('writer@example.com', 'legacy sig', 3, '2026-01-01', 'm', 'llm')
|
||||
"""
|
||||
)
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
email_helpers._init_scheduled_db()
|
||||
|
||||
conn = sqlite3.connect(db_path)
|
||||
try:
|
||||
info = conn.execute("PRAGMA table_info(sender_signatures)").fetchall()
|
||||
pk_cols = [r[1] for r in sorted((r for r in info if r[5]), key=lambda r: r[5])]
|
||||
assert pk_cols == ["from_address", "owner"]
|
||||
assert conn.execute(
|
||||
"SELECT owner, signature_text FROM sender_signatures WHERE from_address=?",
|
||||
("writer@example.com",),
|
||||
).fetchone() == ("", "legacy sig")
|
||||
conn.execute(
|
||||
"""
|
||||
INSERT INTO sender_signatures
|
||||
(from_address, owner, signature_text, sample_count, last_built_at, model_used, source)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?)
|
||||
""",
|
||||
("writer@example.com", "alice", "alice sig", 3, "2026-01-02", "m", "llm"),
|
||||
)
|
||||
conn.execute(
|
||||
"""
|
||||
INSERT INTO sender_signatures
|
||||
(from_address, owner, signature_text, sample_count, last_built_at, model_used, source)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?)
|
||||
""",
|
||||
("writer@example.com", "bob", "bob sig", 3, "2026-01-03", "m", "llm"),
|
||||
)
|
||||
rows = conn.execute(
|
||||
"SELECT owner, signature_text FROM sender_signatures WHERE from_address=? ORDER BY owner",
|
||||
("writer@example.com",),
|
||||
).fetchall()
|
||||
assert rows == [("", "legacy sig"), ("alice", "alice sig"), ("bob", "bob sig")]
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_ai_reply_cache_lookup_is_owner_scoped(tmp_path, monkeypatch):
|
||||
import routes.email_helpers as email_helpers
|
||||
@@ -166,6 +233,136 @@ async def test_ai_reply_cache_lookup_is_owner_scoped(tmp_path, monkeypatch):
|
||||
assert result["model_used"] == "m-b"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_sender_signature_read_lookup_is_owner_scoped(tmp_path, monkeypatch):
|
||||
import routes.email_helpers as email_helpers
|
||||
import routes.email_routes as email_routes
|
||||
|
||||
db_path = tmp_path / "scheduled_emails.db"
|
||||
monkeypatch.setattr(email_helpers, "SCHEDULED_DB", db_path)
|
||||
monkeypatch.setattr(email_routes, "SCHEDULED_DB", db_path)
|
||||
email_helpers._init_scheduled_db()
|
||||
|
||||
conn = sqlite3.connect(db_path)
|
||||
conn.execute(
|
||||
"""
|
||||
INSERT INTO sender_signatures
|
||||
(from_address, owner, signature_text, sample_count, last_built_at, model_used, source)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?)
|
||||
""",
|
||||
("writer@example.com", "alice", "alice private sig", 3, "2026-01-01", "m-a", "llm"),
|
||||
)
|
||||
conn.execute(
|
||||
"""
|
||||
INSERT INTO sender_signatures
|
||||
(from_address, owner, signature_text, sample_count, last_built_at, model_used, source)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?)
|
||||
""",
|
||||
("writer@example.com", "bob", "bob private sig", 3, "2026-01-02", "m-b", "llm"),
|
||||
)
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
raw = (
|
||||
b"From: Writer <writer@example.com>\r\n"
|
||||
b"To: Bob <bob@example.com>\r\n"
|
||||
b"Subject: Hello\r\n"
|
||||
b"Message-ID: <shared@example.com>\r\n"
|
||||
b"Date: Tue, 01 Jan 2026 12:00:00 +0000\r\n"
|
||||
b"Content-Type: text/plain; charset=utf-8\r\n"
|
||||
b"\r\n"
|
||||
b"Body"
|
||||
)
|
||||
|
||||
class FakeImap:
|
||||
def select(self, *_args, **_kwargs):
|
||||
return "OK", []
|
||||
|
||||
def uid(self, command, _uid, query):
|
||||
assert command == "FETCH"
|
||||
assert query == "(BODY.PEEK[])"
|
||||
return "OK", [(b"1 (UID 1 BODY[])", raw)]
|
||||
|
||||
@contextmanager
|
||||
def fake_imap(_account_id=None, owner=""):
|
||||
assert owner == "bob"
|
||||
yield FakeImap()
|
||||
|
||||
monkeypatch.setattr(email_routes, "_imap", fake_imap)
|
||||
router = email_routes.setup_email_routes()
|
||||
read_email = _route_endpoint(router, "/api/email/read/{uid}", "GET")
|
||||
|
||||
result = await read_email("1", folder="INBOX", account_id=None, owner="bob", mark_seen=False)
|
||||
|
||||
assert result["sender_signature"] == "bob private sig"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_sender_signature_clear_cache_keeps_other_owner_rows(tmp_path, monkeypatch):
|
||||
import routes.email_helpers as email_helpers
|
||||
import routes.task_routes as task_routes
|
||||
|
||||
db_path = tmp_path / "scheduled_emails.db"
|
||||
monkeypatch.setattr(email_helpers, "SCHEDULED_DB", db_path)
|
||||
email_helpers._init_scheduled_db()
|
||||
|
||||
conn = sqlite3.connect(db_path)
|
||||
conn.execute(
|
||||
"""
|
||||
INSERT INTO sender_signatures
|
||||
(from_address, owner, signature_text, sample_count, last_built_at, model_used, source)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?)
|
||||
""",
|
||||
("writer@example.com", "alice", "alice private sig", 3, "2026-01-01", "m-a", "llm"),
|
||||
)
|
||||
conn.execute(
|
||||
"""
|
||||
INSERT INTO sender_signatures
|
||||
(from_address, owner, signature_text, sample_count, last_built_at, model_used, source)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?)
|
||||
""",
|
||||
("writer@example.com", "bob", "bob private sig", 3, "2026-01-02", "m-b", "llm"),
|
||||
)
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
class FakeQuery:
|
||||
def filter(self, *_args):
|
||||
return self
|
||||
|
||||
def first(self):
|
||||
return SimpleNamespace(
|
||||
id="task-1",
|
||||
owner="alice",
|
||||
action="learn_sender_signatures",
|
||||
)
|
||||
|
||||
class FakeDb:
|
||||
def query(self, _model):
|
||||
return FakeQuery()
|
||||
|
||||
def close(self):
|
||||
pass
|
||||
|
||||
monkeypatch.setattr(task_routes, "SessionLocal", lambda: FakeDb())
|
||||
monkeypatch.setattr(task_routes, "get_current_user", lambda _request: "alice")
|
||||
|
||||
router = task_routes.setup_task_routes(task_scheduler=SimpleNamespace(pop_notifications=lambda owner: []))
|
||||
clear_cache = _route_endpoint(router, "/api/tasks/{task_id}/clear-cache", "POST")
|
||||
|
||||
result = await clear_cache(SimpleNamespace(), "task-1")
|
||||
|
||||
assert result["cleared"]["sender_signatures"] == 1
|
||||
conn = sqlite3.connect(db_path)
|
||||
try:
|
||||
rows = conn.execute(
|
||||
"SELECT owner, signature_text FROM sender_signatures ORDER BY owner",
|
||||
).fetchall()
|
||||
finally:
|
||||
conn.close()
|
||||
assert rows == [("bob", "bob private sig")]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_scheduled_email_routes_are_owner_scoped(tmp_path, monkeypatch):
|
||||
import routes.email_helpers as email_helpers
|
||||
|
||||
Reference in New Issue
Block a user