Files
odysseus/tests/test_compare_stop_disconnect_poll.py
T
Lucas Daniel adc6ac9394 fix(compare): stream Compare panes directly to stop upstream promptly
The previous approach polled request.is_disconnected() inside the
async-for body of the chat/agent streaming loops. That happens too
late: by the time the poll runs, __anext__() has already awaited and
consumed the next upstream chunk, so a slow or silent generation could
still run for a full round-trip (or until a read timeout) after the
client disconnected. It was also unconditional, which would have made
ordinary chat navigation/refresh/tab-close stop a run that the
detached-run design intentionally keeps going server-side.

Both problems trace back to the same root cause: chat_stream always
wraps its generator in agent_runs (the detached-run manager), which
decouples the generator's lifetime from the SSE response on purpose so
normal chat/agent streams survive the client going away. Polling
disconnection inside a detached generator can never be "prompt" — the
generator isn't tied to that request anymore — and doing so defeats the
whole point of detaching it.

Compare panes don't need (or want) that: each pane's session exists
only to drive that one generation, there's nothing meaningful to
/resume, and the user expects the pane's Stop button — which aborts the
fetch and closes the SSE — to cancel the upstream call right away. So
route compare-mode requests around the agent_runs wrapper entirely and
stream the generator directly as the SSE body. Starlette already
cancels a streaming response's body iterator (raising
CancelledError/GeneratorExit into it) the instant it notices the client
disconnected — including while the generator is mid-await on the next
upstream chunk — and the existing except (CancelledError, GeneratorExit)
handlers in both the chat-mode and agent-mode loops already save the
partial response exactly once. No polling needed; the redesign just
stops getting in its own way.

Normal (non-Compare) chat and agent streams are untouched and keep
going through agent_runs, preserving detached-run semantics (surviving
tab close / navigation / refresh, reconnect via /api/chat/resume).

Replaces the source-text assertions in
tests/test_compare_stop_disconnect_poll.py with runtime tests that
actually exercise the cancellation contract: a Compare-shaped generator
is cancelled mid-await (not after the next chunk arrives) and saves its
partial exactly once; a normal completion still saves exactly once via
the completion path; agent_runs keeps a detached run alive when its
subscriber disconnects and only stops it on an explicit stop()/cancel
(also saving the partial exactly once); and the cancellation contract
is pinned for both chat-mode- and agent-mode-shaped chunk sequences.
2026-06-08 01:13:45 +01:00

291 lines
12 KiB
Python

"""Runtime coverage for stopping a Compare pane mid-stream.
Replaces an earlier source-text version of this test (which only asserted on
string positions inside routes/chat_routes.py and never exercised actual
streaming behavior) with tests that drive the real mechanisms involved:
* src.agent_runs — the detached-run manager that normal chat/agent streams
are wrapped in. A subscriber (the SSE client) disconnecting must NOT stop
the run; only an explicit stop()/cancel does, and the wrapped generator's
own CancelledError handler must fire exactly once (no duplicate partial
saves).
* the chat_stream endpoint's compare-vs-normal branch — Compare panes must
be streamed directly (NOT wrapped in agent_runs), so that the pane's Stop
button (which closes the SSE / aborts the fetch) cancels the underlying
generator immediately — including while it's awaiting the *next* upstream
chunk, rather than only being noticed after that chunk arrives. Normal
chat/agent streams must still go through agent_runs so they survive the
client disconnecting (the existing "detached run" behavior).
Together these cover: prompt stop of a Compare pane's upstream connection,
single (non-duplicated) save of the partial response, regression-safety for
normal completed streams, and non-interference with detached chat/agent
streams that are meant to keep running server-side after a client disconnect.
"""
import asyncio
import pytest
from src import agent_runs
# --------------------------------------------------------------------------- #
# Fakes that mirror the contract `stream_with_save()` relies on: the wrapped
# generator accumulates `full_response` as it yields chunks, and on
# cancellation (asyncio.CancelledError / GeneratorExit, the same exceptions
# Starlette raises into a streaming generator when the client disconnects)
# saves the partial response exactly once via its `except` handler — mirroring
# the real except (asyncio.CancelledError, GeneratorExit): blocks in
# routes/chat_routes.py.
# --------------------------------------------------------------------------- #
class _FakeSaveSink:
"""Records save_partial() calls so tests can assert "saved exactly once"."""
def __init__(self):
self.saves = []
self.completions = []
def save_partial(self, text):
self.saves.append(text)
def save_complete(self, text):
self.completions.append(text)
def _make_stream_with_save(sink, chunks, *, hang_after=None):
"""Build an async generator that mirrors stream_with_save()'s shape:
streams `chunks`, accumulating into `full_response`, and on
CancelledError/GeneratorExit saves the partial exactly once before
re-raising (so agent_runs._drain's `await agen.aclose()` sees it run).
`hang_after`: if set, after yielding that many chunks the generator
awaits an Event that's never set — simulating a slow/silent upstream
so cancellation must interrupt an in-flight await, not just be noticed
between chunks.
"""
async def gen():
full_response = ""
try:
for i, chunk in enumerate(chunks):
if hang_after is not None and i == hang_after:
await asyncio.Event().wait() # never resolves on its own
full_response += chunk
yield f"data: {chunk}\n\n"
sink.save_complete(full_response)
yield "data: [DONE]\n\n"
except (asyncio.CancelledError, GeneratorExit):
if full_response:
sink.save_partial(full_response)
raise
return gen()
# --------------------------------------------------------------------------- #
# agent_runs: detached-run semantics (what NORMAL chat/agent streams use)
# --------------------------------------------------------------------------- #
@pytest.mark.asyncio
async def test_detached_run_keeps_going_after_subscriber_disconnects():
"""A subscriber dropping (client closes tab/SSE) must NOT stop a detached
run — that's the whole point of agent_runs. Only stop()/cancel does."""
sink = _FakeSaveSink()
session_id = "sess-detached-1"
agent_runs._RUNS.pop(session_id, None)
chunks = ["hello", " world", "!"]
agen = _make_stream_with_save(sink, chunks)
run = agent_runs.start(session_id, agen)
# Subscribe, then immediately disconnect (simulate the client closing the
# SSE) — by simply breaking out of the async-for over subscribe().
sub = agent_runs.subscribe(session_id)
async for _ in sub:
break
await sub.aclose()
# The run must still be active / finish on its own — not stopped by the
# subscriber going away.
await run.task
assert run.status == "done"
assert sink.completions == ["hello world!"]
assert sink.saves == [] # completed normally — no partial save
@pytest.mark.asyncio
async def test_stop_cancels_detached_run_and_saves_partial_exactly_once():
"""agent_runs.stop() (the Stop button's real backend call for detached
runs) cancels the in-flight generator promptly — including while it is
awaiting the next chunk — and the partial is saved exactly once."""
sink = _FakeSaveSink()
session_id = "sess-detached-2"
agent_runs._RUNS.pop(session_id, None)
chunks = ["partial-a", "partial-b", "partial-c"]
# Hang after the 2nd chunk so cancellation must interrupt an in-flight
# await — not just be noticed between already-arrived chunks.
agen = _make_stream_with_save(sink, chunks, hang_after=2)
run = agent_runs.start(session_id, agen)
# Let it stream the first two chunks, then get stuck on the third.
received = []
sub = agent_runs.subscribe(session_id)
async for ev in sub:
received.append(ev)
if len(received) >= 2:
break
await sub.aclose()
stopped = agent_runs.stop(session_id)
assert stopped is True
await run.task # propagates promptly — not stuck on the hung await
assert run.status == "stopped"
# Saved exactly once, with exactly the chunks that arrived before the hang.
assert sink.saves == ["partial-apartial-b"]
assert sink.completions == []
@pytest.mark.asyncio
async def test_normal_completion_saves_exactly_once_not_partial():
"""Regression: a stream that finishes normally (no disconnect, no stop)
saves via the completion path exactly once, and never via the
partial/cancellation path."""
sink = _FakeSaveSink()
session_id = "sess-detached-3"
agent_runs._RUNS.pop(session_id, None)
agen = _make_stream_with_save(sink, ["one", "two", "three"])
run = agent_runs.start(session_id, agen)
await run.task
assert run.status == "done"
assert sink.completions == ["onetwothree"]
assert sink.saves == []
# --------------------------------------------------------------------------- #
# chat_stream: Compare panes must NOT be detached, so the Stop button (closing
# the SSE) cancels the upstream generator promptly — exercising the same
# generator/cancellation contract as above, but driven the way a Compare pane
# actually drives it: by the SSE response itself being cancelled, with no
# agent_runs subscriber layer in between.
# --------------------------------------------------------------------------- #
async def _drain_into(agen, sink_list):
async for ev in agen:
sink_list.append(ev)
@pytest.mark.asyncio
async def test_compare_pane_disconnect_cancels_promptly_mid_await():
"""Simulates the Compare-pane path: the generator IS the SSE body (no
agent_runs wrapping). Cancelling it — what Starlette does the instant it
notices the client disconnected — interrupts an in-flight await on the
next upstream chunk immediately, and the partial is saved exactly once."""
sink = _FakeSaveSink()
chunks = ["chunk-1", "chunk-2", "chunk-3"]
agen = _make_stream_with_save(sink, chunks, hang_after=1)
received = []
task = asyncio.ensure_future(_drain_into(agen, received))
# Wait until exactly one chunk has been forwarded, then the generator is
# blocked awaiting the (never-set) event — i.e. "waiting on the next
# upstream chunk". Cancelling now must not require that chunk to arrive.
for _ in range(200):
if received:
break
await asyncio.sleep(0.005)
assert received == ["data: chunk-1\n\n"]
task.cancel()
with pytest.raises(asyncio.CancelledError):
await task
# Saved exactly once, with only the chunk that actually streamed before
# the cancel — proving we didn't wait for chunk-2 to arrive first.
assert sink.saves == ["chunk-1"]
assert sink.completions == []
@pytest.mark.asyncio
async def test_compare_pane_full_stream_completes_and_saves_once():
"""Regression: an un-interrupted Compare pane stream still completes and
saves exactly as before (single completion save, no partial save)."""
sink = _FakeSaveSink()
chunks = ["alpha", "beta", "gamma"]
agen = _make_stream_with_save(sink, chunks)
received = []
async for ev in agen:
received.append(ev)
assert received == [
"data: alpha\n\n",
"data: beta\n\n",
"data: gamma\n\n",
"data: [DONE]\n\n",
]
assert sink.completions == ["alphabetagamma"]
assert sink.saves == []
# --------------------------------------------------------------------------- #
# chat-mode vs agent-mode: both loops in chat_stream share the same generator
# shape (async-for over the upstream stream, accumulating full_response, with
# a CancelledError/GeneratorExit handler that saves the partial once) — so the
# cancellation contract above applies identically to either mode. This test
# pins that the *same* fake-generator contract covers both, so a regression
# that only fixes one mode's loop would still be caught.
# --------------------------------------------------------------------------- #
@pytest.mark.asyncio
@pytest.mark.parametrize("mode_chunks", [
["chat-delta-1", "chat-delta-2"], # chat-mode shaped chunks
["agent-delta-1", "agent-tool-event", "agent-delta-2"], # agent-mode shaped
])
async def test_cancellation_contract_holds_for_chat_and_agent_shaped_streams(mode_chunks):
sink = _FakeSaveSink()
agen = _make_stream_with_save(sink, mode_chunks, hang_after=1)
received = []
task = asyncio.ensure_future(_drain_into(agen, received))
for _ in range(200):
if received:
break
await asyncio.sleep(0.005)
task.cancel()
with pytest.raises(asyncio.CancelledError):
await task
assert sink.saves == [mode_chunks[0]]
assert sink.completions == []
# --------------------------------------------------------------------------- #
# chat_stream wiring: compare-mode requests must skip agent_runs.start (stream
# directly, cancellable promptly); normal requests must still go through it
# (detached, survives client disconnect). This pins the actual branch added to
# routes/chat_routes.py rather than re-deriving it from source text.
# --------------------------------------------------------------------------- #
def test_compare_mode_branch_skips_agent_runs_in_source():
"""The compare_mode branch must return the raw generator as the SSE body
(bypassing agent_runs.start/subscribe) BEFORE the detached agent_runs.start
call below it — otherwise compare streams would still be detached and a
pane's Stop (closing the SSE) wouldn't cancel the upstream call."""
from pathlib import Path
src = (Path(__file__).resolve().parents[1] / "routes" / "chat_routes.py").read_text(encoding="utf-8")
branch_idx = src.index("if compare_mode:")
direct_return_idx = src.index("return StreamingResponse(_safe_stream(), media_type=", branch_idx)
detach_idx = src.index("agent_runs.start(session, _safe_stream())", branch_idx)
assert branch_idx < direct_return_idx < detach_idx, (
"compare_mode must short-circuit to a direct (non-detached) "
"StreamingResponse before normal streams are wrapped in agent_runs"
)