feat(search): unify session transcript search (#2877)

This commit is contained in:
Nicholai
2026-06-05 18:08:31 -06:00
committed by GitHub
parent c2017fa089
commit 463713c2c6
8 changed files with 750 additions and 84 deletions
+68
View File
@@ -1627,11 +1627,79 @@ def init_db():
_migrate_add_calendar_is_utc()
_migrate_add_calendar_origin()
_migrate_add_calendar_account_id()
_migrate_chat_messages_fts()
_migrate_encrypt_email_passwords()
_migrate_encrypt_signatures()
_migrate_encrypt_endpoint_keys()
def _migrate_chat_messages_fts():
"""Create and backfill the session transcript FTS index for SQLite."""
if not DATABASE_URL.startswith("sqlite"):
return
db_path = DATABASE_URL.replace("sqlite:///", "")
if db_path == ":memory:":
return
conn = None
try:
conn = sqlite3.connect(db_path)
try:
conn.execute("CREATE VIRTUAL TABLE IF NOT EXISTS temp._odysseus_fts5_probe USING fts5(content)")
conn.execute("DROP TABLE IF EXISTS temp._odysseus_fts5_probe")
except Exception as e:
logging.getLogger(__name__).warning(f"chat_messages FTS migration skipped; FTS5 unavailable: {e}")
return
conn.executescript(
"""
CREATE VIRTUAL TABLE IF NOT EXISTS chat_messages_fts USING fts5(
content,
message_id UNINDEXED,
session_id UNINDEXED,
role UNINDEXED
);
CREATE TRIGGER IF NOT EXISTS chat_messages_fts_ai
AFTER INSERT ON chat_messages BEGIN
INSERT INTO chat_messages_fts(content, message_id, session_id, role)
VALUES (COALESCE(new.content, ''), new.id, new.session_id, new.role);
END;
CREATE TRIGGER IF NOT EXISTS chat_messages_fts_ad
AFTER DELETE ON chat_messages BEGIN
DELETE FROM chat_messages_fts WHERE message_id = old.id;
END;
CREATE TRIGGER IF NOT EXISTS chat_messages_fts_au
AFTER UPDATE ON chat_messages BEGIN
DELETE FROM chat_messages_fts WHERE message_id = old.id;
INSERT INTO chat_messages_fts(content, message_id, session_id, role)
VALUES (COALESCE(new.content, ''), new.id, new.session_id, new.role);
END;
"""
)
conn.execute(
"""
INSERT INTO chat_messages_fts(content, message_id, session_id, role)
SELECT COALESCE(cm.content, ''), cm.id, cm.session_id, cm.role
FROM chat_messages cm
WHERE NOT EXISTS (
SELECT 1 FROM chat_messages_fts fts
WHERE fts.message_id = cm.id
)
"""
)
conn.commit()
except Exception as e:
logging.getLogger(__name__).warning(f"chat_messages FTS migration failed: {e}")
finally:
try:
conn.close()
except Exception:
pass
def _migrate_add_email_smtp_security():
"""Add explicit SMTP security mode for Proton Bridge/custom local SMTP."""
import sqlite3
+10 -38
View File
@@ -20,6 +20,7 @@ from src import agent_runs
from src.model_context import estimate_tokens
from src.chat_helpers import coerce_message_and_session
from src.endpoint_resolver import normalize_base as _normalize_base, build_chat_url
from src.session_search import search_session_messages
from src.prompt_security import untrusted_context_message
from core.exceptions import SessionNotFoundError
from src.auth_helpers import get_current_user
@@ -1208,45 +1209,16 @@ def setup_chat_routes(
return []
_user = get_current_user(request)
query_term = q.strip()
db = SessionLocal()
try:
base_q = (
db.query(DBChatMessage, DBSession.name)
.join(DBSession, DBChatMessage.session_id == DBSession.id)
.filter(
DBSession.archived == False,
DBChatMessage.content.ilike(f"%{query_term}%"),
DBChatMessage.role.in_(["user", "assistant"]),
)
return [
result.to_dict()
for result in search_session_messages(
q,
limit=limit,
owner=_user,
restrict_owner=_user is not None,
include_legacy_owner=False,
)
if _user:
base_q = base_q.filter(DBSession.owner == _user)
rows = base_q.order_by(DBChatMessage.timestamp.desc()).limit(limit).all()
results = []
for msg, session_name in rows:
content = msg.content or ""
lower_content = content.lower()
idx = lower_content.find(query_term.lower())
if idx == -1:
snippet = content[:120]
else:
start = max(0, idx - 50)
end = min(len(content), idx + len(query_term) + 50)
snippet = ("..." if start > 0 else "") + content[start:end] + ("..." if end < len(content) else "")
results.append({
"session_id": msg.session_id,
"session_name": session_name or "Untitled",
"role": msg.role,
"content_snippet": snippet,
"timestamp": msg.timestamp.isoformat() if msg.timestamp else None,
})
return results
finally:
db.close()
]
# ------------------------------------------------------------------ #
# POST /api/rewrite — lightweight rewrite of last AI message (no tools)
+1 -1
View File
@@ -332,7 +332,7 @@ If the user asks for a reminder/alarm before the event, pass `reminder_minutes`
"create_session": "- ```create_session``` — Create a new chat. Line 1 = chat name, line 2 = model name. Use for background/parallel work.",
"list_sessions": "- ```list_sessions``` — List chats sorted MOST-RECENT FIRST (the UI calls them 'chats') with clickable chat-title links. Output includes a relative \"last active\" timestamp per row, so the first row is the user's most recent chat. Content = optional filter keyword (matches chat name). When answering, preserve the `[title](#session-id)` links exactly; do not convert them into plain text.",
"send_to_session": "- ```send_to_session``` — Send a message to another session. Line 1 = session_id, rest = message. Use for orchestrating work across sessions.",
"search_chats": "- ```search_chats``` — Search across all chat history. Use when user asks 'did we discuss X?' or 'find the conversation about Y'.",
"search_chats": "- ```search_chats``` — Search past session transcripts for direct conversation evidence. Use when user asks 'did we discuss X?', 'find the conversation about Y', or when prior chat context is more appropriate than persistent memory.",
"pipeline": "- ```pipeline``` — Run a multi-step AI pipeline. Args (JSON) with ordered steps, each specifying a model and prompt. Use for complex workflows.",
"ui_control": "- ```ui_control``` — Control the UI: toggle tools on/off, OPEN PANELS, open email reply drafts, switch models, change themes. Commands: `toggle <name> on/off` (names: bash/shell, web/search, research, incognito, document_editor/documents), `open_panel <name>` (panels: documents, gallery, email, sessions, notes, memories/brain, skills, settings, cookbook), `open_email_reply <uid> <folder> <reply|reply-all|ai-reply>` (opens an email compose document, does NOT send), `set_mode agent/chat`, `switch_model <name>`, `set_theme <preset>`, `create_theme <name> <bg> <fg> <panel> <border> <accent>` (optional key=val for advanced colors AND background effects: bgPattern=<none|dots|synapse|rain|constellations|perlin-flow|petals|sparkles|embers>, bgEffectColor=#RRGGBB, bgEffectIntensity=<num>, bgEffectSize=<num>, frosted=true|false). \"open documents\" / \"open library\" / \"show gallery\" / \"open inbox\" / \"open notes\" / \"open cookbook\" all map to `open_panel <name>`. Theme presets: dark, light, midnight, paper, cyberpunk, retrowave, forest, ocean, ume, copper, terminal, organs, lavender, gpt, claude, cute.",
"ask_user": "- ```ask_user``` — Ask the user a multiple-choice question when the task is genuinely ambiguous and the answer changes what you do next (pick an approach, confirm an assumption, choose a target). Args (JSON): {\"question\": \"...\", \"options\": [{\"label\": \"...\", \"description\": \"...\"?}, ...], \"multi\": false?}. 2-6 options. The user gets clickable buttons; calling this ENDS your turn and their choice comes back as your next message. Prefer sensible defaults — only ask when you truly can't proceed well without their input.",
+355
View File
@@ -0,0 +1,355 @@
"""Shared session transcript search for UI and agent tools."""
from __future__ import annotations
import logging
import re
from dataclasses import dataclass
from datetime import datetime
from typing import Any, Iterable
from sqlalchemy import text
from core.database import ChatMessage as DBChatMessage
from core.database import Session as DBSession
from core.database import SessionLocal
logger = logging.getLogger(__name__)
SEARCH_ROLES = ("user", "assistant")
@dataclass(frozen=True)
class SessionSearchResult:
message_id: str
session_id: str
session_name: str
role: str
content: str
content_snippet: str
timestamp: str | None
context_before: list[dict[str, Any]]
context_after: list[dict[str, Any]]
def to_dict(self) -> dict[str, Any]:
return {
"message_id": self.message_id,
"session_id": self.session_id,
"session_name": self.session_name,
"role": self.role,
"content_snippet": self.content_snippet,
"timestamp": self.timestamp,
"context_before": self.context_before,
"context_after": self.context_after,
}
def _iso(value: datetime | None) -> str | None:
return value.isoformat() if value else None
def _message_to_context(msg: DBChatMessage) -> dict[str, Any]:
return {
"message_id": msg.id,
"role": msg.role,
"content": msg.content or "",
"timestamp": _iso(msg.timestamp),
}
def _escape_like(value: str) -> str:
return value.replace("\\", "\\\\").replace("%", "\\%").replace("_", "\\_")
def _snippet(content: str, query: str, radius: int = 60) -> str:
content = content or ""
query = query or ""
if not query:
return content[: radius * 2]
idx = content.lower().find(query.lower())
if idx == -1:
return content[: radius * 2]
start = max(0, idx - radius)
end = min(len(content), idx + len(query) + radius)
return ("..." if start > 0 else "") + content[start:end] + ("..." if end < len(content) else "")
def _sanitize_fts_query(query: str) -> str | None:
"""Convert free text into a conservative FTS5 MATCH query.
User input can contain FTS5 operators or punctuation that raises
sqlite3.OperationalError. For transcript search we do not need advanced
syntax in v1, so keep only words and balanced quoted phrases.
"""
parts: list[str] = []
for match in re.finditer(r'"([^"]+)"|[\w][\w._-]*', query, flags=re.UNICODE):
phrase = match.group(1)
if phrase is not None:
phrase = phrase.strip()
if phrase:
parts.append('"' + phrase.replace('"', '""') + '"')
continue
token = match.group(0).strip("._-")
if not token:
continue
if any(ch in token for ch in "._-"):
parts.append('"' + token.replace('"', '""') + '"')
else:
parts.append(token)
if not parts:
return None
return " ".join(parts)
def _is_sqlite_session(db) -> bool:
try:
bind = db.get_bind()
return getattr(getattr(bind, "dialect", None), "name", None) == "sqlite"
except Exception:
return False
def _has_fts_table(db) -> bool:
if not _is_sqlite_session(db):
return False
try:
row = db.execute(
text("SELECT 1 FROM sqlite_master WHERE type='table' AND name='chat_messages_fts' LIMIT 1")
).first()
return row is not None
except Exception as e:
logger.debug("chat_messages_fts availability check failed: %s", e)
return False
def _owner_filter(query, owner: str | None, include_legacy_owner: bool):
if owner is None:
return query.filter(DBSession.owner.is_(None))
if not include_legacy_owner:
return query.filter(DBSession.owner == owner)
return query.filter((DBSession.owner == owner) | (DBSession.owner.is_(None)))
def _context_for_message(db, msg: DBChatMessage, count: int) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]:
if count <= 0 or not msg.timestamp:
return [], []
before_rows = (
db.query(DBChatMessage)
.filter(
DBChatMessage.session_id == msg.session_id,
DBChatMessage.role.in_(SEARCH_ROLES),
DBChatMessage.timestamp < msg.timestamp,
)
.order_by(DBChatMessage.timestamp.desc())
.limit(count)
.all()
)
after_rows = (
db.query(DBChatMessage)
.filter(
DBChatMessage.session_id == msg.session_id,
DBChatMessage.role.in_(SEARCH_ROLES),
DBChatMessage.timestamp > msg.timestamp,
)
.order_by(DBChatMessage.timestamp.asc())
.limit(count)
.all()
)
before = [_message_to_context(row) for row in reversed(before_rows)]
after = [_message_to_context(row) for row in after_rows]
return before, after
def _rows_to_results(db, rows: Iterable[tuple[DBChatMessage, str, str]], query: str, context_messages: int) -> list[SessionSearchResult]:
results: list[SessionSearchResult] = []
for msg, session_name, snippet in rows:
before, after = _context_for_message(db, msg, context_messages)
content = msg.content or ""
results.append(
SessionSearchResult(
message_id=msg.id,
session_id=msg.session_id,
session_name=session_name or "Untitled",
role=msg.role,
content=content,
content_snippet=snippet or _snippet(content, query),
timestamp=_iso(msg.timestamp),
context_before=before,
context_after=after,
)
)
return results
def _search_like(
db,
query: str,
limit: int,
owner: str | None,
include_archived: bool,
context_messages: int,
restrict_owner: bool,
include_legacy_owner: bool,
) -> list[SessionSearchResult]:
safe_q = _escape_like(query)
q = (
db.query(DBChatMessage, DBSession.name)
.join(DBSession, DBChatMessage.session_id == DBSession.id)
.filter(
DBChatMessage.content.ilike(f"%{safe_q}%", escape="\\"),
DBChatMessage.role.in_(SEARCH_ROLES),
)
)
if not include_archived:
q = q.filter(DBSession.archived == False)
if restrict_owner:
q = _owner_filter(q, owner, include_legacy_owner)
rows = q.order_by(DBChatMessage.timestamp.desc()).limit(limit).all()
shaped = ((msg, session_name, _snippet(msg.content or "", query)) for msg, session_name in rows)
return _rows_to_results(db, shaped, query, context_messages)
def _search_fts(
db,
query: str,
limit: int,
owner: str | None,
include_archived: bool,
context_messages: int,
restrict_owner: bool,
include_legacy_owner: bool,
) -> list[SessionSearchResult] | None:
fts_query = _sanitize_fts_query(query)
if not fts_query or not _has_fts_table(db):
return None
archived_clause = "" if include_archived else "AND s.archived = 0"
if not restrict_owner:
owner_clause = ""
elif owner is None:
owner_clause = "AND s.owner IS NULL"
elif not include_legacy_owner:
owner_clause = "AND s.owner = :owner"
else:
owner_clause = "AND (s.owner = :owner OR s.owner IS NULL)"
params: dict[str, Any] = {"fts_query": fts_query, "limit": limit}
if restrict_owner and owner is not None:
params["owner"] = owner
sql = text(
f"""
SELECT
m.id AS message_id,
snippet(chat_messages_fts, 0, '', '', '...', 24) AS content_snippet
FROM chat_messages_fts
JOIN chat_messages m ON m.id = chat_messages_fts.message_id
JOIN sessions s ON s.id = m.session_id
WHERE chat_messages_fts MATCH :fts_query
{archived_clause}
{owner_clause}
AND m.role IN ('user', 'assistant')
ORDER BY bm25(chat_messages_fts), m.timestamp DESC
LIMIT :limit
"""
)
try:
hits = db.execute(sql, params).fetchall()
except Exception as e:
logger.debug("FTS session search failed; falling back to LIKE: %s", e)
return None
if not hits:
return None
rows = []
for hit in hits:
message_id = hit[0]
snippet = hit[1] or ""
row = (
db.query(DBChatMessage, DBSession.name)
.join(DBSession, DBChatMessage.session_id == DBSession.id)
.filter(DBChatMessage.id == message_id)
.first()
)
if row:
msg, session_name = row
rows.append((msg, session_name, snippet))
return _rows_to_results(db, rows, query, context_messages)
def search_session_messages(
query: str,
limit: int = 20,
owner: str | None = None,
include_archived: bool = False,
context_messages: int = 1,
restrict_owner: bool = True,
include_legacy_owner: bool = True,
db=None,
) -> list[SessionSearchResult]:
"""Search session transcripts using FTS5 when available.
`owner=None` is deliberately treated as legacy/null-owner scope rather
than global access.
"""
query = (query or "").strip()
if not query:
return []
limit = max(1, min(int(limit or 20), 100))
context_messages = max(0, min(int(context_messages or 0), 3))
owns_db = db is None
if owns_db:
db = SessionLocal()
try:
fts_results = _search_fts(
db,
query,
limit,
owner,
include_archived,
context_messages,
restrict_owner,
include_legacy_owner,
)
if fts_results is not None:
like_results = _search_like(
db,
query,
limit,
owner,
include_archived,
context_messages,
restrict_owner,
include_legacy_owner,
)
merged: list[SessionSearchResult] = []
seen: set[str] = set()
for result in [*fts_results, *like_results]:
if result.message_id in seen:
continue
seen.add(result.message_id)
merged.append(result)
if len(merged) >= limit:
break
return merged
return _search_like(
db,
query,
limit,
owner,
include_archived,
context_messages,
restrict_owner,
include_legacy_owner,
)
finally:
if owns_db:
db.close()
+16 -43
View File
@@ -548,7 +548,7 @@ async def do_suggest_document(content: str, doc_id: str = None, owner: Optional[
# ---------------------------------------------------------------------------
async def do_search_chats(query: str, limit: int = 20, owner: str | None = None) -> Dict:
"""Search past chat messages for the calling user's sessions only.
"""Search past session transcripts for the calling user's sessions only.
Without an owner filter this used to leak EVERY user's chat history
into the agent's `search_chats` results (v2 review HIGH-11). The
@@ -556,63 +556,36 @@ async def do_search_chats(query: str, limit: int = 20, owner: str | None = None)
through; legacy callers without owner pass through as before but
will only see legacy/null-owner rows.
"""
from src.database import SessionLocal, ChatMessage as DBChatMessage, Session as DBSession
# Escape LIKE wildcards in the user-supplied query so a stray % or _
# doesn't widen the match (and to keep the response deterministic).
safe_q = query.replace("\\", "\\\\").replace("%", "\\%").replace("_", "\\_")
db = SessionLocal()
try:
q = (
db.query(DBChatMessage, DBSession.id, DBSession.name)
.join(DBSession, DBChatMessage.session_id == DBSession.id)
.filter(
DBSession.archived == False,
DBChatMessage.content.ilike(f"%{safe_q}%", escape="\\"),
DBChatMessage.role.in_(["user", "assistant"]),
)
)
if owner is not None:
# Restrict to this user's sessions plus legacy null-owner
# rows (so single-user upgrades keep seeing their own data).
q = q.filter((DBSession.owner == owner) | (DBSession.owner.is_(None)))
rows = q.order_by(DBChatMessage.timestamp.desc()).limit(limit).all()
from src.session_search import search_session_messages
if not rows:
results = search_session_messages(query, limit=limit, owner=owner)
if not results:
return {"results": f"No chats found matching \"{query}\"."}
# Group by session to avoid duplicate links
seen_sessions = {}
for msg, session_id, session_name in rows:
if session_id not in seen_sessions:
content = msg.content or ""
lower_content = content.lower()
idx = lower_content.find(query.lower())
if idx == -1:
snippet = content[:150]
else:
start = max(0, idx - 60)
end = min(len(content), idx + len(query) + 60)
snippet = ("..." if start > 0 else "") + content[start:end] + ("..." if end < len(content) else "")
seen_sessions[session_id] = {
"name": session_name or "Untitled",
"snippet": snippet,
"role": msg.role,
"timestamp": msg.timestamp.isoformat() if msg.timestamp else None,
}
for result in results:
if result.session_id not in seen_sessions:
seen_sessions[result.session_id] = result
lines = [f"Found {len(seen_sessions)} session(s) matching \"{query}\":\n"]
for sid, info in seen_sessions.items():
lines.append(f"- **{info['name']}** (#{sid})")
for sid, result in seen_sessions.items():
lines.append(f"- **{result.session_name}** (#{sid})")
lines.append(f" Link: [Open chat](#{sid})")
lines.append(f" > {info['snippet']}")
lines.append(f" Match ({result.role}): {result.content_snippet}")
if result.context_before:
before = result.context_before[-1]
lines.append(f" Before ({before['role']}): {before['content'][:180]}")
if result.context_after:
after = result.context_after[0]
lines.append(f" After ({after['role']}): {after['content'][:180]}")
lines.append("")
return {"results": "\n".join(lines)}
except Exception as e:
logger.error(f"search_chats failed: {e}")
return {"error": str(e), "exit_code": 1}
finally:
db.close()
# ---------------------------------------------------------------------------
+1 -1
View File
@@ -115,7 +115,7 @@ BUILTIN_TOOL_DESCRIPTIONS: Dict[str, str] = {
"create_session": "Create a new chat with a name and model.",
"list_sessions": "List all chats with their metadata (the UI calls these 'chats'). Use for 'list my chats', 'rename all my chats' (list first, then manage_session to rename each).",
"send_to_session": "Send a message to another chat. Cross-chat communication.",
"search_chats": "Search through chat history across all sessions.",
"search_chats": "Search past session transcripts across chats.",
"ask_user": "Ask the user a multiple-choice question to get a decision or clarification. Use this when the task is genuinely ambiguous and the answer changes what you do next — pick between approaches, confirm an assumption, choose among options — instead of guessing. Provide a clear `question` and 2-6 `options` (each with a short `label`, optional `description`). Calling this ENDS your turn: the user sees clickable buttons and their choice arrives as your next message. Don't use it for things you can decide from context or sensible defaults, or for irreversible-action confirmation if a dedicated flow exists.",
"update_plan": "Write back to the ACTIVE PLAN while executing an approved plan: mark steps done or revise them. After finishing a step call this with the full checklist and that step marked done; when the user asks to change the plan call it with the revised checklist. Always pass the COMPLETE markdown checklist (`- [ ]` / `- [x]`), not a diff. The user's docked plan window updates live. No effect when there is no active plan.",
"ui_control": "Control the UI and toggle tools on/off. Use this to turn off / turn on / disable / enable individual tools and features: shell (bash), search (web), research, browser, documents, incognito. Open panels (documents library, gallery, email inbox, sessions, notes, memories/brain, skills, settings, cookbook) via `open_panel <name>`. Use `open_email_reply <uid> <folder> reply` to open an email reply draft document without sending. Also switches between chat/agent modes, changes the current model, and applies/creates themes.",
+1 -1
View File
@@ -258,7 +258,7 @@ FUNCTION_TOOL_SCHEMAS = [
"type": "function",
"function": {
"name": "search_chats",
"description": "Search the user's past chat conversations by keyword. Use when the user asks about previous chats, past conversations, or wants to find a discussion they had before. Returns matching sessions with clickable links.",
"description": "Search the user's past session transcripts by keyword. Use when the user asks about previous chats, past conversations, or when direct transcript evidence is better than persistent memory. Returns matching sessions with clickable links and nearby context.",
"parameters": {
"type": "object",
"properties": {
+298
View File
@@ -0,0 +1,298 @@
from datetime import datetime, timedelta
import asyncio
import sqlite3
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from core.database import Base
from core.database import ChatMessage as DbChatMessage
from core.database import Session as DbSession
from src.session_search import SessionSearchResult, search_session_messages
def _db(with_fts=True):
engine = create_engine("sqlite:///:memory:")
Base.metadata.create_all(engine)
db = sessionmaker(bind=engine)()
if with_fts:
db.connection().exec_driver_sql(
"""
CREATE VIRTUAL TABLE chat_messages_fts USING fts5(
content,
message_id UNINDEXED,
session_id UNINDEXED,
role UNINDEXED
)
"""
)
return db
def _add_session(db, sid, owner="alice", archived=False, name=None):
db.add(
DbSession(
id=sid,
name=name or sid,
endpoint_url="http://example.test",
model="test-model",
owner=owner,
archived=archived,
message_count=0,
)
)
def _add_message(db, sid, mid, role, content, when):
db.add(DbChatMessage(id=mid, session_id=sid, role=role, content=content, timestamp=when))
if _has_fts(db):
db.connection().exec_driver_sql(
"INSERT INTO chat_messages_fts(content, message_id, session_id, role) VALUES (?, ?, ?, ?)",
(content, mid, sid, role),
)
def _has_fts(db):
return (
db.connection()
.exec_driver_sql("SELECT 1 FROM sqlite_master WHERE type='table' AND name='chat_messages_fts'")
.first()
is not None
)
def test_session_search_uses_fts_and_returns_context():
db = _db(with_fts=True)
try:
base = datetime(2026, 1, 1, 12, 0, 0)
_add_session(db, "s1", owner="alice", name="Jazz planning")
_add_message(db, "s1", "m1", "user", "Before context about music", base)
_add_message(db, "s1", "m2", "assistant", "We talked about modal jazz theory", base + timedelta(minutes=1))
_add_message(db, "s1", "m3", "user", "After context about tasks", base + timedelta(minutes=2))
db.commit()
results = search_session_messages("modal jazz", owner="alice", db=db)
assert [r.message_id for r in results] == ["m2"]
assert results[0].session_name == "Jazz planning"
assert results[0].context_before[0]["message_id"] == "m1"
assert results[0].context_after[0]["message_id"] == "m3"
assert "modal" in results[0].content_snippet.lower()
finally:
db.close()
def test_session_search_escapes_like_wildcards_in_fallback():
db = _db(with_fts=False)
try:
base = datetime(2026, 1, 1, 12, 0, 0)
_add_session(db, "s1", owner="alice")
_add_message(db, "s1", "literal", "user", "The literal token is foo_bar.", base)
_add_message(db, "s1", "wild", "user", "The wildcard-looking token is fooXbar.", base + timedelta(minutes=1))
db.commit()
results = search_session_messages("foo_bar", owner="alice", db=db)
assert [r.message_id for r in results] == ["literal"]
finally:
db.close()
def test_session_search_owner_scope_includes_legacy_and_excludes_other_users():
db = _db(with_fts=True)
try:
base = datetime(2026, 1, 1, 12, 0, 0)
_add_session(db, "alice", owner="alice")
_add_session(db, "legacy", owner=None)
_add_session(db, "bob", owner="bob")
_add_message(db, "alice", "m-alice", "user", "shared recall target", base)
_add_message(db, "legacy", "m-legacy", "user", "shared recall target", base + timedelta(minutes=1))
_add_message(db, "bob", "m-bob", "user", "shared recall target", base + timedelta(minutes=2))
db.commit()
results = search_session_messages("shared recall target", owner="alice", db=db)
assert {r.message_id for r in results} == {"m-alice", "m-legacy"}
finally:
db.close()
def test_session_search_can_exclude_legacy_rows_for_authenticated_ui_scope():
db = _db(with_fts=True)
try:
base = datetime(2026, 1, 1, 12, 0, 0)
_add_session(db, "alice", owner="alice")
_add_session(db, "legacy", owner=None)
_add_message(db, "alice", "m-alice", "user", "exact owner target", base)
_add_message(db, "legacy", "m-legacy", "user", "exact owner target", base + timedelta(minutes=1))
db.commit()
results = search_session_messages(
"exact owner target",
owner="alice",
include_legacy_owner=False,
db=db,
)
assert [r.message_id for r in results] == ["m-alice"]
finally:
db.close()
def test_session_search_ownerless_call_only_sees_legacy_rows():
db = _db(with_fts=True)
try:
base = datetime(2026, 1, 1, 12, 0, 0)
_add_session(db, "alice", owner="alice")
_add_session(db, "legacy", owner=None)
_add_message(db, "alice", "m-alice", "user", "ownerless search target", base)
_add_message(db, "legacy", "m-legacy", "user", "ownerless search target", base + timedelta(minutes=1))
db.commit()
results = search_session_messages("ownerless search target", owner=None, db=db)
assert [r.message_id for r in results] == ["m-legacy"]
finally:
db.close()
def test_session_search_falls_back_to_like_when_fts_has_no_substring_hits():
db = _db(with_fts=True)
try:
base = datetime(2026, 1, 1, 12, 0, 0)
_add_session(db, "s1", owner="alice")
_add_message(db, "s1", "m1", "user", "We discussed customidentifier routing.", base)
db.commit()
results = search_session_messages("identifier", owner="alice", db=db)
assert [r.message_id for r in results] == ["m1"]
assert "identifier" in results[0].content_snippet
finally:
db.close()
def test_session_search_merges_like_substring_hits_with_fts_hits():
db = _db(with_fts=True)
try:
base = datetime(2026, 1, 1, 12, 0, 0)
_add_session(db, "s1", owner="alice")
_add_message(db, "s1", "m-token", "user", "The identifier token is standalone.", base)
_add_message(db, "s1", "m-substring", "assistant", "We also discussed customidentifier routing.", base + timedelta(minutes=1))
db.commit()
results = search_session_messages("identifier", owner="alice", db=db)
assert {r.message_id for r in results} == {"m-token", "m-substring"}
finally:
db.close()
def test_session_search_can_preserve_unrestricted_no_auth_route_scope():
db = _db(with_fts=True)
try:
base = datetime(2026, 1, 1, 12, 0, 0)
_add_session(db, "owned", owner="admin")
_add_session(db, "legacy", owner=None)
_add_message(db, "owned", "m-owned", "user", "no auth search target", base)
_add_message(db, "legacy", "m-legacy", "user", "no auth search target", base + timedelta(minutes=1))
db.commit()
results = search_session_messages(
"no auth search target",
owner=None,
restrict_owner=False,
db=db,
)
assert {r.message_id for r in results} == {"m-owned", "m-legacy"}
finally:
db.close()
def test_session_search_excludes_archived_by_default():
db = _db(with_fts=True)
try:
base = datetime(2026, 1, 1, 12, 0, 0)
_add_session(db, "active", owner="alice")
_add_session(db, "archived", owner="alice", archived=True)
_add_message(db, "active", "m-active", "user", "archive filter target", base)
_add_message(db, "archived", "m-archived", "user", "archive filter target", base + timedelta(minutes=1))
db.commit()
results = search_session_messages("archive filter target", owner="alice", db=db)
assert [r.message_id for r in results] == ["m-active"]
finally:
db.close()
def test_chat_messages_fts_migration_backfills_and_tracks_inserts(tmp_path, monkeypatch):
from core import database as cdb
db_path = tmp_path / "app.db"
conn = sqlite3.connect(db_path)
conn.executescript(
"""
CREATE TABLE chat_messages (
id TEXT PRIMARY KEY,
session_id TEXT NOT NULL,
role TEXT NOT NULL,
content TEXT NOT NULL
);
INSERT INTO chat_messages(id, session_id, role, content)
VALUES ('m1', 's1', 'user', 'backfilled transcript search');
"""
)
conn.close()
monkeypatch.setattr(cdb, "DATABASE_URL", f"sqlite:///{db_path}")
cdb._migrate_chat_messages_fts()
conn = sqlite3.connect(db_path)
try:
backfilled = conn.execute(
"SELECT message_id FROM chat_messages_fts WHERE chat_messages_fts MATCH 'backfilled'"
).fetchall()
assert backfilled == [("m1",)]
conn.execute(
"INSERT INTO chat_messages(id, session_id, role, content) VALUES (?, ?, ?, ?)",
("m2", "s1", "assistant", "triggered transcript search"),
)
triggered = conn.execute(
"SELECT message_id FROM chat_messages_fts WHERE chat_messages_fts MATCH 'triggered'"
).fetchall()
assert triggered == [("m2",)]
finally:
conn.close()
def test_search_chats_formats_shared_results(monkeypatch):
from src import session_search
from src.tool_implementations import do_search_chats
def fake_search(query, limit=20, owner=None, include_archived=False, context_messages=1, db=None):
return [
SessionSearchResult(
message_id="m2",
session_id="s1",
session_name="Design notes",
role="assistant",
content="We discussed session search.",
content_snippet="We discussed session search.",
timestamp="2026-01-01T12:00:00",
context_before=[{"message_id": "m1", "role": "user", "content": "Can you find old chats?", "timestamp": None}],
context_after=[{"message_id": "m3", "role": "user", "content": "That helps.", "timestamp": None}],
)
]
monkeypatch.setattr(session_search, "search_session_messages", fake_search)
out = asyncio.run(do_search_chats("session search", owner="alice"))
assert "Design notes" in out["results"]
assert "Match (assistant): We discussed session search." in out["results"]
assert "Before (user): Can you find old chats?" in out["results"]
assert "After (user): That helps." in out["results"]