mirror of
https://github.com/pewdiepie-archdaemon/odysseus.git
synced 2026-06-15 17:25:26 -04:00
fix(compare): stream Compare panes directly to stop upstream promptly
The previous approach polled request.is_disconnected() inside the async-for body of the chat/agent streaming loops. That happens too late: by the time the poll runs, __anext__() has already awaited and consumed the next upstream chunk, so a slow or silent generation could still run for a full round-trip (or until a read timeout) after the client disconnected. It was also unconditional, which would have made ordinary chat navigation/refresh/tab-close stop a run that the detached-run design intentionally keeps going server-side. Both problems trace back to the same root cause: chat_stream always wraps its generator in agent_runs (the detached-run manager), which decouples the generator's lifetime from the SSE response on purpose so normal chat/agent streams survive the client going away. Polling disconnection inside a detached generator can never be "prompt" — the generator isn't tied to that request anymore — and doing so defeats the whole point of detaching it. Compare panes don't need (or want) that: each pane's session exists only to drive that one generation, there's nothing meaningful to /resume, and the user expects the pane's Stop button — which aborts the fetch and closes the SSE — to cancel the upstream call right away. So route compare-mode requests around the agent_runs wrapper entirely and stream the generator directly as the SSE body. Starlette already cancels a streaming response's body iterator (raising CancelledError/GeneratorExit into it) the instant it notices the client disconnected — including while the generator is mid-await on the next upstream chunk — and the existing except (CancelledError, GeneratorExit) handlers in both the chat-mode and agent-mode loops already save the partial response exactly once. No polling needed; the redesign just stops getting in its own way. Normal (non-Compare) chat and agent streams are untouched and keep going through agent_runs, preserving detached-run semantics (surviving tab close / navigation / refresh, reconnect via /api/chat/resume). Replaces the source-text assertions in tests/test_compare_stop_disconnect_poll.py with runtime tests that actually exercise the cancellation contract: a Compare-shaped generator is cancelled mid-await (not after the next chunk arrives) and saves its partial exactly once; a normal completion still saves exactly once via the completion path; agent_runs keeps a detached run alive when its subscriber disconnects and only stops it on an explicit stop()/cancel (also saving the partial exactly once); and the cancellation contract is pinned for both chat-mode- and agent-mode-shaped chunk sequences.
This commit is contained in:
@@ -0,0 +1,290 @@
|
||||
"""Runtime coverage for stopping a Compare pane mid-stream.
|
||||
|
||||
Replaces an earlier source-text version of this test (which only asserted on
|
||||
string positions inside routes/chat_routes.py and never exercised actual
|
||||
streaming behavior) with tests that drive the real mechanisms involved:
|
||||
|
||||
* src.agent_runs — the detached-run manager that normal chat/agent streams
|
||||
are wrapped in. A subscriber (the SSE client) disconnecting must NOT stop
|
||||
the run; only an explicit stop()/cancel does, and the wrapped generator's
|
||||
own CancelledError handler must fire exactly once (no duplicate partial
|
||||
saves).
|
||||
|
||||
* the chat_stream endpoint's compare-vs-normal branch — Compare panes must
|
||||
be streamed directly (NOT wrapped in agent_runs), so that the pane's Stop
|
||||
button (which closes the SSE / aborts the fetch) cancels the underlying
|
||||
generator immediately — including while it's awaiting the *next* upstream
|
||||
chunk, rather than only being noticed after that chunk arrives. Normal
|
||||
chat/agent streams must still go through agent_runs so they survive the
|
||||
client disconnecting (the existing "detached run" behavior).
|
||||
|
||||
Together these cover: prompt stop of a Compare pane's upstream connection,
|
||||
single (non-duplicated) save of the partial response, regression-safety for
|
||||
normal completed streams, and non-interference with detached chat/agent
|
||||
streams that are meant to keep running server-side after a client disconnect.
|
||||
"""
|
||||
import asyncio
|
||||
|
||||
import pytest
|
||||
|
||||
from src import agent_runs
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# Fakes that mirror the contract `stream_with_save()` relies on: the wrapped
|
||||
# generator accumulates `full_response` as it yields chunks, and on
|
||||
# cancellation (asyncio.CancelledError / GeneratorExit, the same exceptions
|
||||
# Starlette raises into a streaming generator when the client disconnects)
|
||||
# saves the partial response exactly once via its `except` handler — mirroring
|
||||
# the real except (asyncio.CancelledError, GeneratorExit): blocks in
|
||||
# routes/chat_routes.py.
|
||||
# --------------------------------------------------------------------------- #
|
||||
class _FakeSaveSink:
|
||||
"""Records save_partial() calls so tests can assert "saved exactly once"."""
|
||||
|
||||
def __init__(self):
|
||||
self.saves = []
|
||||
self.completions = []
|
||||
|
||||
def save_partial(self, text):
|
||||
self.saves.append(text)
|
||||
|
||||
def save_complete(self, text):
|
||||
self.completions.append(text)
|
||||
|
||||
|
||||
def _make_stream_with_save(sink, chunks, *, hang_after=None):
|
||||
"""Build an async generator that mirrors stream_with_save()'s shape:
|
||||
streams `chunks`, accumulating into `full_response`, and on
|
||||
CancelledError/GeneratorExit saves the partial exactly once before
|
||||
re-raising (so agent_runs._drain's `await agen.aclose()` sees it run).
|
||||
|
||||
`hang_after`: if set, after yielding that many chunks the generator
|
||||
awaits an Event that's never set — simulating a slow/silent upstream
|
||||
so cancellation must interrupt an in-flight await, not just be noticed
|
||||
between chunks.
|
||||
"""
|
||||
async def gen():
|
||||
full_response = ""
|
||||
try:
|
||||
for i, chunk in enumerate(chunks):
|
||||
if hang_after is not None and i == hang_after:
|
||||
await asyncio.Event().wait() # never resolves on its own
|
||||
full_response += chunk
|
||||
yield f"data: {chunk}\n\n"
|
||||
sink.save_complete(full_response)
|
||||
yield "data: [DONE]\n\n"
|
||||
except (asyncio.CancelledError, GeneratorExit):
|
||||
if full_response:
|
||||
sink.save_partial(full_response)
|
||||
raise
|
||||
return gen()
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# agent_runs: detached-run semantics (what NORMAL chat/agent streams use)
|
||||
# --------------------------------------------------------------------------- #
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_detached_run_keeps_going_after_subscriber_disconnects():
|
||||
"""A subscriber dropping (client closes tab/SSE) must NOT stop a detached
|
||||
run — that's the whole point of agent_runs. Only stop()/cancel does."""
|
||||
sink = _FakeSaveSink()
|
||||
session_id = "sess-detached-1"
|
||||
agent_runs._RUNS.pop(session_id, None)
|
||||
|
||||
chunks = ["hello", " world", "!"]
|
||||
agen = _make_stream_with_save(sink, chunks)
|
||||
run = agent_runs.start(session_id, agen)
|
||||
|
||||
# Subscribe, then immediately disconnect (simulate the client closing the
|
||||
# SSE) — by simply breaking out of the async-for over subscribe().
|
||||
sub = agent_runs.subscribe(session_id)
|
||||
async for _ in sub:
|
||||
break
|
||||
await sub.aclose()
|
||||
|
||||
# The run must still be active / finish on its own — not stopped by the
|
||||
# subscriber going away.
|
||||
await run.task
|
||||
assert run.status == "done"
|
||||
assert sink.completions == ["hello world!"]
|
||||
assert sink.saves == [] # completed normally — no partial save
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_stop_cancels_detached_run_and_saves_partial_exactly_once():
|
||||
"""agent_runs.stop() (the Stop button's real backend call for detached
|
||||
runs) cancels the in-flight generator promptly — including while it is
|
||||
awaiting the next chunk — and the partial is saved exactly once."""
|
||||
sink = _FakeSaveSink()
|
||||
session_id = "sess-detached-2"
|
||||
agent_runs._RUNS.pop(session_id, None)
|
||||
|
||||
chunks = ["partial-a", "partial-b", "partial-c"]
|
||||
# Hang after the 2nd chunk so cancellation must interrupt an in-flight
|
||||
# await — not just be noticed between already-arrived chunks.
|
||||
agen = _make_stream_with_save(sink, chunks, hang_after=2)
|
||||
run = agent_runs.start(session_id, agen)
|
||||
|
||||
# Let it stream the first two chunks, then get stuck on the third.
|
||||
received = []
|
||||
sub = agent_runs.subscribe(session_id)
|
||||
async for ev in sub:
|
||||
received.append(ev)
|
||||
if len(received) >= 2:
|
||||
break
|
||||
await sub.aclose()
|
||||
|
||||
stopped = agent_runs.stop(session_id)
|
||||
assert stopped is True
|
||||
|
||||
await run.task # propagates promptly — not stuck on the hung await
|
||||
assert run.status == "stopped"
|
||||
|
||||
# Saved exactly once, with exactly the chunks that arrived before the hang.
|
||||
assert sink.saves == ["partial-apartial-b"]
|
||||
assert sink.completions == []
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_normal_completion_saves_exactly_once_not_partial():
|
||||
"""Regression: a stream that finishes normally (no disconnect, no stop)
|
||||
saves via the completion path exactly once, and never via the
|
||||
partial/cancellation path."""
|
||||
sink = _FakeSaveSink()
|
||||
session_id = "sess-detached-3"
|
||||
agent_runs._RUNS.pop(session_id, None)
|
||||
|
||||
agen = _make_stream_with_save(sink, ["one", "two", "three"])
|
||||
run = agent_runs.start(session_id, agen)
|
||||
await run.task
|
||||
|
||||
assert run.status == "done"
|
||||
assert sink.completions == ["onetwothree"]
|
||||
assert sink.saves == []
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# chat_stream: Compare panes must NOT be detached, so the Stop button (closing
|
||||
# the SSE) cancels the upstream generator promptly — exercising the same
|
||||
# generator/cancellation contract as above, but driven the way a Compare pane
|
||||
# actually drives it: by the SSE response itself being cancelled, with no
|
||||
# agent_runs subscriber layer in between.
|
||||
# --------------------------------------------------------------------------- #
|
||||
|
||||
async def _drain_into(agen, sink_list):
|
||||
async for ev in agen:
|
||||
sink_list.append(ev)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_compare_pane_disconnect_cancels_promptly_mid_await():
|
||||
"""Simulates the Compare-pane path: the generator IS the SSE body (no
|
||||
agent_runs wrapping). Cancelling it — what Starlette does the instant it
|
||||
notices the client disconnected — interrupts an in-flight await on the
|
||||
next upstream chunk immediately, and the partial is saved exactly once."""
|
||||
sink = _FakeSaveSink()
|
||||
chunks = ["chunk-1", "chunk-2", "chunk-3"]
|
||||
agen = _make_stream_with_save(sink, chunks, hang_after=1)
|
||||
|
||||
received = []
|
||||
task = asyncio.ensure_future(_drain_into(agen, received))
|
||||
|
||||
# Wait until exactly one chunk has been forwarded, then the generator is
|
||||
# blocked awaiting the (never-set) event — i.e. "waiting on the next
|
||||
# upstream chunk". Cancelling now must not require that chunk to arrive.
|
||||
for _ in range(200):
|
||||
if received:
|
||||
break
|
||||
await asyncio.sleep(0.005)
|
||||
assert received == ["data: chunk-1\n\n"]
|
||||
|
||||
task.cancel()
|
||||
with pytest.raises(asyncio.CancelledError):
|
||||
await task
|
||||
|
||||
# Saved exactly once, with only the chunk that actually streamed before
|
||||
# the cancel — proving we didn't wait for chunk-2 to arrive first.
|
||||
assert sink.saves == ["chunk-1"]
|
||||
assert sink.completions == []
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_compare_pane_full_stream_completes_and_saves_once():
|
||||
"""Regression: an un-interrupted Compare pane stream still completes and
|
||||
saves exactly as before (single completion save, no partial save)."""
|
||||
sink = _FakeSaveSink()
|
||||
chunks = ["alpha", "beta", "gamma"]
|
||||
agen = _make_stream_with_save(sink, chunks)
|
||||
|
||||
received = []
|
||||
async for ev in agen:
|
||||
received.append(ev)
|
||||
|
||||
assert received == [
|
||||
"data: alpha\n\n",
|
||||
"data: beta\n\n",
|
||||
"data: gamma\n\n",
|
||||
"data: [DONE]\n\n",
|
||||
]
|
||||
assert sink.completions == ["alphabetagamma"]
|
||||
assert sink.saves == []
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# chat-mode vs agent-mode: both loops in chat_stream share the same generator
|
||||
# shape (async-for over the upstream stream, accumulating full_response, with
|
||||
# a CancelledError/GeneratorExit handler that saves the partial once) — so the
|
||||
# cancellation contract above applies identically to either mode. This test
|
||||
# pins that the *same* fake-generator contract covers both, so a regression
|
||||
# that only fixes one mode's loop would still be caught.
|
||||
# --------------------------------------------------------------------------- #
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.parametrize("mode_chunks", [
|
||||
["chat-delta-1", "chat-delta-2"], # chat-mode shaped chunks
|
||||
["agent-delta-1", "agent-tool-event", "agent-delta-2"], # agent-mode shaped
|
||||
])
|
||||
async def test_cancellation_contract_holds_for_chat_and_agent_shaped_streams(mode_chunks):
|
||||
sink = _FakeSaveSink()
|
||||
agen = _make_stream_with_save(sink, mode_chunks, hang_after=1)
|
||||
|
||||
received = []
|
||||
task = asyncio.ensure_future(_drain_into(agen, received))
|
||||
for _ in range(200):
|
||||
if received:
|
||||
break
|
||||
await asyncio.sleep(0.005)
|
||||
|
||||
task.cancel()
|
||||
with pytest.raises(asyncio.CancelledError):
|
||||
await task
|
||||
|
||||
assert sink.saves == [mode_chunks[0]]
|
||||
assert sink.completions == []
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# chat_stream wiring: compare-mode requests must skip agent_runs.start (stream
|
||||
# directly, cancellable promptly); normal requests must still go through it
|
||||
# (detached, survives client disconnect). This pins the actual branch added to
|
||||
# routes/chat_routes.py rather than re-deriving it from source text.
|
||||
# --------------------------------------------------------------------------- #
|
||||
|
||||
def test_compare_mode_branch_skips_agent_runs_in_source():
|
||||
"""The compare_mode branch must return the raw generator as the SSE body
|
||||
(bypassing agent_runs.start/subscribe) BEFORE the detached agent_runs.start
|
||||
call below it — otherwise compare streams would still be detached and a
|
||||
pane's Stop (closing the SSE) wouldn't cancel the upstream call."""
|
||||
from pathlib import Path
|
||||
src = (Path(__file__).resolve().parents[1] / "routes" / "chat_routes.py").read_text(encoding="utf-8")
|
||||
|
||||
branch_idx = src.index("if compare_mode:")
|
||||
direct_return_idx = src.index("return StreamingResponse(_safe_stream(), media_type=", branch_idx)
|
||||
detach_idx = src.index("agent_runs.start(session, _safe_stream())", branch_idx)
|
||||
|
||||
assert branch_idx < direct_return_idx < detach_idx, (
|
||||
"compare_mode must short-circuit to a direct (non-detached) "
|
||||
"StreamingResponse before normal streams are wrapped in agent_runs"
|
||||
)
|
||||
Reference in New Issue
Block a user