Files
odysseus/src/context_compactor.py
T
Joshua Valderrama 35b4dd2824 fix: session context drifting — messages leaking between chats (#135) (#267)
* docs: add implementation plan for fixing chat context drifting (#135)

* fix: make Session.history immutable + fix {}.history crash

- Session.history now exposes a COPY of the internal _history list
- add_message() replaces history with a fresh copy each time
- get_context_messages() derives from _history directly
- replace_messages() updates both _history and history
- truncate_messages() updates both _history and history
- _persist_message() line 207: fixed {}.history fallback crash
- Added 11 tests for session isolation and edge cases

Addresses #135 root cause #1: shared mutable references

* fix: task scheduler uses SessionManager methods instead of overwriting sessions

- Added ensure_task_session() to SessionManager (checks cache first)
- Task scheduler now uses ensure_task_session() instead of direct dict assignment
- Task scheduler now uses SessionManager.add_message() for message persistence
- Removed direct sess_obj.history.append() that was silently losing data

Addresses #135 root causes #2 and #3

* fix: add age guard to cleanup_empty_sessions — don't delete sessions <1h old

Prevents the cleanup task from deleting sessions that were just created
and haven't received any messages yet (message_count == 0).

Addresses #135 root cause #5

* test: comprehensive session isolation tests (10/10 passing)

* refactor: consolidate _session_manager into singleton pattern

- Added set_session_manager_instance / get_session_manager_instance to core/models
- kept backward-compat aliases (set_session_manager, get_session_manager)
- session_manager.py re-exports the singleton functions
- ai_interaction.set_session_manager now syncs with the core singleton
- context_compactor uses get_session_manager_instance() instead of getattr hack
- app.py initializes the singleton once

Addresses #135 root cause #4: fragile global wiring

* test: add concurrent session isolation integration tests

Verifies:
- Concurrent add_message to different sessions doesn't cross-contaminate
- Rapid parallel writes maintain isolation
- Read-write concurrent access is safe

All 3 async tests pass, proving the immutable history fix works under concurrency

* fix: pre-import core.models in conftest to prevent test pollution

test_agent_loop.py stubs sys.modules['core.models'] = MagicMock() at
module level during collection. Any test collected after it imports
Session as a MagicMock. Pre-importing core.models in conftest.py
before test_agent_loop.py's module-level code runs prevents this.

* fix: make .history authoritative mutable list, address PR review

Per review feedback: keep .history as the authoritative mutable list so
existing code doing .history.pop(), .history = [...], etc. still works.
Fix the cross-contamination bug by ensuring __post_init__() gives each
Session its OWN unique history list (never shared).

Changes:
- core/models.py: .history IS the authoritative list. _history aliases it.
  Each Session gets its own list in __post_init__.
- core/session_manager.py: add_message() delegates to Session.add_message()
  instead of appending directly — no double-append, single source of truth.
- tests/test_session_manager.py: updated test to reflect that .history
  references see new messages (same list, not a snapshot).
- docs/plans/2026-06-01-fix-chat-context-drifting.md: removed (not for
  shipping — useful design context but too much process/doc to ship).

All 272 tests pass (3 pre-existing failures unrelated).

* Fix session manager message persistence

* Fix session history alias regressions

* Fix session history aliasing and task delivery
2026-06-09 14:12:52 +01:00

449 lines
18 KiB
Python

"""
context_compactor.py
Auto-compacts conversation history when approaching context window limits.
Summarizes older messages via the same LLM, preserving key context.
"""
import json
import logging
from typing import Any, Dict, List, Optional
from src.model_context import get_context_length, estimate_tokens
from src.llm_core import llm_call_async
from src.endpoint_resolver import resolve_endpoint
from core.models import ChatMessage
logger = logging.getLogger(__name__)
def _content_as_text(content: Any) -> str:
"""Flatten a message's content to plain text.
Handles the three shapes that flow through history: a plain string, a
multimodal list of content blocks (vision/image attachments), and None
(assistant turns that carried only native tool_calls persist content as
None). Returns "" for anything without text so callers can safely slice
the result.
"""
if isinstance(content, str):
return content
if isinstance(content, list):
return " ".join(
b.get("text", "") for b in content
if isinstance(b, dict) and b.get("text")
)
return ""
COMPACT_THRESHOLD = 0.85 # Trigger compaction at 85% of context window
SUMMARY_MAX_TOKENS = 1024
SMALL_CONTEXT_LIMIT = 8192 # Models with context <= this get aggressive trimming
# Cursor-style self-summarization prompt — produces structured, dense summaries
SELF_SUMMARY_SYSTEM_PROMPT = """You are summarizing a conversation to preserve context after compaction. Produce a structured summary that lets the conversation continue seamlessly.
Use this format:
## Conversation Summary
**Turns summarized:** {count} | **Compactions so far:** {n}
### User Goal
One sentence describing what the user is trying to accomplish.
### What Was Done
- Bullet points of completed actions, decisions made, and key outputs
- Include specific file paths, function names, variable names, URLs, and config values
- Note any errors encountered and how they were resolved
### Current State
What is the system/code/task state right now? What was the last thing discussed?
### Pending / Next Steps
- What remains to be done
- Any open questions or blockers
### Key Context
- Important constraints, preferences, or decisions that must not be forgotten
- Specific values: model names, ports, paths, credentials references, versions
Keep the summary under 1000 tokens. Be dense — every token should carry information. Do not include pleasantries or meta-commentary."""
def _sanitize_tool_messages(msgs: List[Dict]) -> List[Dict]:
"""Drop orphaned `tool` messages and dangling assistant `tool_calls`.
OpenAI's API requires every `role:"tool"` message to immediately
follow an assistant message that carries `tool_calls` (or another
tool message in the same batch). Front-trimming the history can cut
the assistant `tool_calls` parent while keeping its tool responses,
which triggers: "messages with role 'tool' must be a response to a
preceding message with 'tool_calls'". This pass repairs that:
- drops `tool` messages with no valid preceding tool_calls
- drops assistant `tool_calls` messages whose tool responses were
all trimmed away (some providers reject unanswered tool_calls)
"""
# Pass 1: drop orphan tool messages.
cleaned: List[Dict] = []
in_batch = False # are we right after an assistant tool_calls (or mid-batch)?
for m in msgs:
role = m.get("role")
if role == "tool":
if in_batch:
cleaned.append(m)
# else: orphan — drop
continue
if role == "assistant" and m.get("tool_calls"):
in_batch = True
else:
in_batch = False
cleaned.append(m)
# Pass 2: drop assistant tool_calls messages that have NO following
# tool response (dangling) — walk backwards so we know what follows.
out: List[Dict] = []
for i, m in enumerate(cleaned):
if m.get("role") == "assistant" and m.get("tool_calls"):
nxt = cleaned[i + 1] if i + 1 < len(cleaned) else None
if not (nxt and nxt.get("role") == "tool"):
# Dangling tool_calls — keep the message but strip the
# tool_calls so it's a plain assistant turn (preserves any
# text content the model produced alongside the calls).
m = {k: v for k, v in m.items() if k != "tool_calls"}
if not (m.get("content") or "").strip():
continue # nothing left worth keeping
out.append(m)
return out
def _message_text_token_estimate(text: str) -> int:
if not isinstance(text, str):
return 4
return int(len(text) * 0.3) + 4
def _truncate_text_to_token_budget(text: str, token_budget: int) -> str:
"""Trim a too-large current user message instead of dropping it entirely."""
if token_budget <= 32:
return "[Current user message omitted: it exceeded the model context window.]"
if not isinstance(text, str):
# This helper is typed/used as text downstream, so return an empty
# string rather than the raw non-string (which would move the crash
# into the caller that concatenates/measures the result).
return ""
# Match src.model_context.estimate_tokens' rough chars * 0.3 estimate.
max_chars = max(200, int((token_budget - 16) / 0.3))
if len(text) <= max_chars:
return text
notice = (
"\n\n[Notice: the pasted message was too large for this model's context "
"window, so Odysseus kept the beginning and end.]"
)
keep_chars = max(200, max_chars - len(notice))
head_len = max(100, int(keep_chars * 0.7))
tail_len = max(80, keep_chars - head_len)
return text[:head_len].rstrip() + notice + "\n\n" + text[-tail_len:].lstrip()
def _truncate_tool_call_args(msg: Dict[str, Any], token_budget: int) -> Dict[str, Any]:
"""Shrink oversized assistant ``tool_calls`` arguments to fit ``token_budget``.
A tool-only turn persists ``content=None`` with its whole payload in
``tool_calls[].function.arguments`` (e.g. a large create_document body), which
the text-content truncation can't reach — so the message could stay over
budget and the upstream call would 400. Replace each argument string that
overflows its share of the budget with a small valid-JSON placeholder,
preserving ``id``/``type``/``function.name`` so tool/result pairing and
provider validation are unaffected. Returns msg unchanged when there is
nothing oversized.
"""
tool_calls = msg.get("tool_calls")
if not isinstance(tool_calls, list) or not tool_calls:
return msg
# Budget left after whatever content survived (estimate_tokens counts tool
# arguments too, so measure content alone here).
content_tokens = estimate_tokens([{"role": msg.get("role", "assistant"), "content": msg.get("content")}])
per_call = max(16, (max(0, token_budget - content_tokens)) // len(tool_calls))
new_calls = []
changed = False
for tc in tool_calls:
fn = tc.get("function") if isinstance(tc, dict) else None
args = fn.get("arguments") if isinstance(fn, dict) else None
if isinstance(args, str) and int(len(args) * 0.3) > per_call:
new_fn = dict(fn)
new_fn["arguments"] = json.dumps({"_truncated_for_context": len(args)})
new_tc = dict(tc)
new_tc["function"] = new_fn
new_calls.append(new_tc)
changed = True
else:
new_calls.append(tc)
if not changed:
return msg
out = dict(msg)
out["tool_calls"] = new_calls
return out
def _truncate_message_to_token_budget(msg: Dict[str, Any], token_budget: int) -> Dict[str, Any]:
"""Return a copy of msg whose text content (and tool-call args) fit token_budget."""
out = dict(msg)
content = out.get("content", "")
if isinstance(content, str):
out["content"] = _truncate_text_to_token_budget(content, token_budget)
elif isinstance(content, list):
remaining = token_budget
new_content = []
for item in content:
if not isinstance(item, dict) or item.get("type") != "text":
new_content.append(item)
continue
text = item.get("text", "")
truncated = _truncate_text_to_token_budget(text, remaining)
cloned = dict(item)
cloned["text"] = truncated
new_content.append(cloned)
remaining -= _message_text_token_estimate(truncated)
out["content"] = new_content
# A tool-only turn (content=None) carries its payload in tool_calls args,
# which the branches above can't shrink — handle it so the message can fit.
return _truncate_tool_call_args(out, token_budget)
def trim_for_context(messages: List[Dict], context_length: int, reserve_tokens: int = 512) -> List[Dict]:
"""Trim system messages to fit within context_length.
For small-context models, progressively strips:
1. RAG/memory system messages (keep preset system prompt)
2. Older conversation turns
Reserves space for the response.
"""
budget = context_length - reserve_tokens
used = estimate_tokens(messages)
if used <= budget:
return messages
logger.info(f"Trimming messages: {used} tokens > {budget} budget (ctx={context_length})")
# Separate system messages from conversation.
# Messages marked _protected (e.g. active document) are never trimmed.
system_msgs = []
protected_msgs = []
convo_msgs = []
for msg in messages:
if msg.get("_protected"):
protected_msgs.append(msg)
elif msg.get("role") == "system":
system_msgs.append(msg)
else:
convo_msgs.append(msg)
# Protected messages count toward budget but are never dropped
protected_tokens = estimate_tokens(protected_msgs)
budget -= protected_tokens
# Priority: keep first system msg (preset prompt), drop others (memory, RAG, memo)
essential_system = system_msgs[:1] if system_msgs else []
extra_system = system_msgs[1:]
# Try dropping extra system messages one by one (from the end)
trimmed = essential_system + convo_msgs
if estimate_tokens(trimmed) <= budget:
# Dropping extras was enough — try adding back some
result = list(essential_system)
for msg in extra_system:
candidate = result + [msg] + convo_msgs
if estimate_tokens(candidate) <= budget:
result.append(msg)
else:
break
return _sanitize_tool_messages(result + protected_msgs + convo_msgs)
# Still too big — truncate the first system message (but keep more than 500 chars)
if essential_system:
sys_text = essential_system[0].get("content", "")
if len(sys_text) > 2000:
essential_system[0] = {"role": "system", "content": sys_text[:2000] + "\n[System prompt truncated for context limits]"}
trimmed = essential_system + convo_msgs
if estimate_tokens(trimmed) <= budget:
return _sanitize_tool_messages(essential_system + protected_msgs + convo_msgs)
# Still too big — drop older conversation turns BUT always keep the current
# user turn. If a pasted message alone exceeds the model context, truncate
# that message with a visible notice instead of dropping it; otherwise the
# model appears to "ignore" large pastes because it never receives them.
# Hermes-style: recent context matters more than old context.
PROTECT_RECENT = 10
current_msg = convo_msgs[-1:] if convo_msgs else []
prior_convo = convo_msgs[:-1] if convo_msgs else []
if len(prior_convo) >= PROTECT_RECENT:
old_msgs = prior_convo[:-(PROTECT_RECENT - 1)]
recent_msgs = prior_convo[-(PROTECT_RECENT - 1):] + current_msg
while old_msgs and estimate_tokens(essential_system + old_msgs + recent_msgs) > budget:
old_msgs.pop(0)
convo_msgs = old_msgs + recent_msgs
else:
convo_msgs = prior_convo + current_msg
while prior_convo and estimate_tokens(essential_system + prior_convo + current_msg) > budget:
prior_convo.pop(0)
convo_msgs = prior_convo + current_msg
# If the current message itself is too large, shrink only that message.
if current_msg and estimate_tokens(essential_system + protected_msgs + convo_msgs) > budget:
prefix = essential_system + protected_msgs + convo_msgs[:-1]
available_for_current = max(64, budget - estimate_tokens(prefix))
convo_msgs[-1] = _truncate_message_to_token_budget(convo_msgs[-1], available_for_current)
result = _sanitize_tool_messages(essential_system + protected_msgs + convo_msgs)
logger.info(f"Trimmed to {estimate_tokens(result)} tokens ({len(result)} messages)")
return result
async def maybe_compact(
session,
endpoint_url: str,
model: str,
messages: List[Dict],
headers: Optional[Dict] = None,
owner: Optional[str] = None,
) -> tuple:
"""Check context usage and compact if above threshold.
Returns (messages, context_length, was_compacted).
"""
context_length = get_context_length(endpoint_url, model)
used = estimate_tokens(messages)
pct = (used / context_length) * 100 if context_length else 0
if pct < COMPACT_THRESHOLD * 100:
return messages, context_length, False
logger.info(
f"Context at {pct:.1f}% ({used}/{context_length} tokens) — compacting"
)
# Split into system preface and conversation
system_msgs = []
convo_msgs = []
for msg in messages:
if msg.get("role") == "system":
system_msgs.append(msg)
else:
convo_msgs.append(msg)
if len(convo_msgs) < 4:
return messages, context_length, False
# Split conversation: summarize older half, keep recent half
split_point = len(convo_msgs) // 2
older = convo_msgs[:split_point]
recent = convo_msgs[split_point:]
# Build the text to summarize
convo_text = "\n".join(
f"{msg.get('role', 'user').upper()}: {_content_as_text(msg.get('content'))[:2000]}"
for msg in older
)
# Count prior compactions from existing summary messages
compaction_count = sum(
1 for m in system_msgs
if "[Conversation summary" in m.get("content", "")
)
# Use utility model if configured, otherwise fall back to session model
util_url, util_model, util_headers = resolve_endpoint("utility", owner=owner)
compact_url = util_url or endpoint_url
compact_model = util_model or model
compact_headers = util_headers if util_url else headers
prompt = SELF_SUMMARY_SYSTEM_PROMPT.replace(
"{count}", str(len(older))
).replace(
"{n}", str(compaction_count + 1)
)
summary_messages = [
{"role": "system", "content": prompt},
{"role": "user", "content": convo_text},
]
try:
summary = await llm_call_async(
compact_url,
compact_model,
summary_messages,
temperature=0.2,
max_tokens=SUMMARY_MAX_TOKENS,
headers=compact_headers,
timeout=30,
)
except Exception as e:
logger.error(f"Compaction summary failed: {e}")
# Degrade gracefully: keep the conversation intact rather than
# silently dropping the older half. was_compacted=False signals the
# caller nothing was summarized; trim_for_context handles length.
return messages, context_length, False
summary_msg = {
"role": "system",
"content": f"[Conversation summary — earlier messages were compacted]\n{summary}",
}
compacted = system_msgs + [summary_msg] + recent
# Update session history to match. Pass len(system_msgs) so the
# recent_history slice in _update_session_history uses the correct
# offset — session.history INCLUDES the system messages, but
# split_point is indexed against convo_msgs which does NOT. Without
# this, the slice drops the leading system message(s).
_update_session_history(session, split_point, summary, system_msg_count=len(system_msgs))
new_used = estimate_tokens(compacted)
logger.info(
f"Compacted: {used} -> {new_used} tokens "
f"({len(older)} messages summarized, {len(recent)} kept)"
)
return compacted, context_length, True
def _update_session_history(session, split_point: int, summary: str,
system_msg_count: int = 0):
"""Update the in-memory session history after compaction.
`split_point` is the index in `convo_msgs` (system-stripped). The
in-memory `session.history` includes leading system messages, so the
actual recent-history slice starts at `system_msg_count + split_point`.
Prepending `session.history[:system_msg_count]` to the new history
preserves persona, preset, and RAG system messages that would
otherwise be dropped.
"""
if not session or not hasattr(session, "history"):
return
effective_split = system_msg_count + split_point
if effective_split >= len(session.history):
return
# Keep the recent messages, prepend summary AND the leading system
# messages so the system prompt survives compaction.
system_prefix = list(session.history[:system_msg_count])
recent_history = session.history[effective_split:]
summary_msg = ChatMessage(
role="system",
content=f"[Conversation summary]\n{summary}",
metadata={"compacted": True, "summarized_count": split_point},
)
new_history = system_prefix + [summary_msg] + recent_history
try:
from core.models import get_session_manager_instance
manager = get_session_manager_instance()
except Exception:
manager = None
if manager and getattr(session, "id", None):
if manager.replace_messages(session.id, new_history):
return
session.history = new_history