diff --git a/routes/chat_routes.py b/routes/chat_routes.py index 365a9cabd..063c1cba6 100644 --- a/routes/chat_routes.py +++ b/routes/chat_routes.py @@ -1209,11 +1209,30 @@ def setup_chat_routes( finally: _active_streams.pop(session, None) - # Run the stream as a DETACHED background task so it survives the client - # closing the tab / navigating away (true terminal-agent behavior). The - # SSE response just subscribes (replay buffered output + live); dropping - # the SSE only removes a subscriber — the run keeps going and saves the - # assistant message on completion regardless. Reconnect via /api/chat/resume. + # Compare panes are short-lived, single-shot generations whose sessions + # exist only to drive that one pane — there's nothing to "resume" and + # the user expects the pane's Stop button (which aborts the fetch, + # closing this SSE) to promptly cancel the upstream LLM call. Detaching + # them would keep burning upstream tokens/compute after the pane is + # stopped or the comparison is abandoned, and would surface a stale + # "still streaming" /resume target for a session nobody will revisit. + # + # So: stream them directly (no agent_runs wrapping). Starlette cancels + # the underlying async generator (raising CancelledError/GeneratorExit + # inside it) as soon as it notices the client disconnected — which the + # mode-specific except blocks above already handle by saving the + # partial response exactly once. This stops the upstream call promptly + # without waiting on the next streamed chunk. + # + # Normal chat/agent streams keep the DETACHED behavior below: they + # survive the client closing the tab / navigating away (true + # terminal-agent semantics). The SSE response just subscribes (replay + # buffered output + live); dropping the SSE only removes a subscriber — + # the run keeps going and saves the assistant message on completion + # regardless. Reconnect via /api/chat/resume. + if compare_mode: + return StreamingResponse(_safe_stream(), media_type="text/event-stream") + agent_runs.start(session, _safe_stream()) return StreamingResponse(agent_runs.subscribe(session), media_type="text/event-stream") diff --git a/tests/test_compare_stop_disconnect_poll.py b/tests/test_compare_stop_disconnect_poll.py new file mode 100644 index 000000000..8c0238784 --- /dev/null +++ b/tests/test_compare_stop_disconnect_poll.py @@ -0,0 +1,290 @@ +"""Runtime coverage for stopping a Compare pane mid-stream. + +Replaces an earlier source-text version of this test (which only asserted on +string positions inside routes/chat_routes.py and never exercised actual +streaming behavior) with tests that drive the real mechanisms involved: + + * src.agent_runs — the detached-run manager that normal chat/agent streams + are wrapped in. A subscriber (the SSE client) disconnecting must NOT stop + the run; only an explicit stop()/cancel does, and the wrapped generator's + own CancelledError handler must fire exactly once (no duplicate partial + saves). + + * the chat_stream endpoint's compare-vs-normal branch — Compare panes must + be streamed directly (NOT wrapped in agent_runs), so that the pane's Stop + button (which closes the SSE / aborts the fetch) cancels the underlying + generator immediately — including while it's awaiting the *next* upstream + chunk, rather than only being noticed after that chunk arrives. Normal + chat/agent streams must still go through agent_runs so they survive the + client disconnecting (the existing "detached run" behavior). + +Together these cover: prompt stop of a Compare pane's upstream connection, +single (non-duplicated) save of the partial response, regression-safety for +normal completed streams, and non-interference with detached chat/agent +streams that are meant to keep running server-side after a client disconnect. +""" +import asyncio + +import pytest + +from src import agent_runs + + +# --------------------------------------------------------------------------- # +# Fakes that mirror the contract `stream_with_save()` relies on: the wrapped +# generator accumulates `full_response` as it yields chunks, and on +# cancellation (asyncio.CancelledError / GeneratorExit, the same exceptions +# Starlette raises into a streaming generator when the client disconnects) +# saves the partial response exactly once via its `except` handler — mirroring +# the real except (asyncio.CancelledError, GeneratorExit): blocks in +# routes/chat_routes.py. +# --------------------------------------------------------------------------- # +class _FakeSaveSink: + """Records save_partial() calls so tests can assert "saved exactly once".""" + + def __init__(self): + self.saves = [] + self.completions = [] + + def save_partial(self, text): + self.saves.append(text) + + def save_complete(self, text): + self.completions.append(text) + + +def _make_stream_with_save(sink, chunks, *, hang_after=None): + """Build an async generator that mirrors stream_with_save()'s shape: + streams `chunks`, accumulating into `full_response`, and on + CancelledError/GeneratorExit saves the partial exactly once before + re-raising (so agent_runs._drain's `await agen.aclose()` sees it run). + + `hang_after`: if set, after yielding that many chunks the generator + awaits an Event that's never set — simulating a slow/silent upstream + so cancellation must interrupt an in-flight await, not just be noticed + between chunks. + """ + async def gen(): + full_response = "" + try: + for i, chunk in enumerate(chunks): + if hang_after is not None and i == hang_after: + await asyncio.Event().wait() # never resolves on its own + full_response += chunk + yield f"data: {chunk}\n\n" + sink.save_complete(full_response) + yield "data: [DONE]\n\n" + except (asyncio.CancelledError, GeneratorExit): + if full_response: + sink.save_partial(full_response) + raise + return gen() + + +# --------------------------------------------------------------------------- # +# agent_runs: detached-run semantics (what NORMAL chat/agent streams use) +# --------------------------------------------------------------------------- # + +@pytest.mark.asyncio +async def test_detached_run_keeps_going_after_subscriber_disconnects(): + """A subscriber dropping (client closes tab/SSE) must NOT stop a detached + run — that's the whole point of agent_runs. Only stop()/cancel does.""" + sink = _FakeSaveSink() + session_id = "sess-detached-1" + agent_runs._RUNS.pop(session_id, None) + + chunks = ["hello", " world", "!"] + agen = _make_stream_with_save(sink, chunks) + run = agent_runs.start(session_id, agen) + + # Subscribe, then immediately disconnect (simulate the client closing the + # SSE) — by simply breaking out of the async-for over subscribe(). + sub = agent_runs.subscribe(session_id) + async for _ in sub: + break + await sub.aclose() + + # The run must still be active / finish on its own — not stopped by the + # subscriber going away. + await run.task + assert run.status == "done" + assert sink.completions == ["hello world!"] + assert sink.saves == [] # completed normally — no partial save + + +@pytest.mark.asyncio +async def test_stop_cancels_detached_run_and_saves_partial_exactly_once(): + """agent_runs.stop() (the Stop button's real backend call for detached + runs) cancels the in-flight generator promptly — including while it is + awaiting the next chunk — and the partial is saved exactly once.""" + sink = _FakeSaveSink() + session_id = "sess-detached-2" + agent_runs._RUNS.pop(session_id, None) + + chunks = ["partial-a", "partial-b", "partial-c"] + # Hang after the 2nd chunk so cancellation must interrupt an in-flight + # await — not just be noticed between already-arrived chunks. + agen = _make_stream_with_save(sink, chunks, hang_after=2) + run = agent_runs.start(session_id, agen) + + # Let it stream the first two chunks, then get stuck on the third. + received = [] + sub = agent_runs.subscribe(session_id) + async for ev in sub: + received.append(ev) + if len(received) >= 2: + break + await sub.aclose() + + stopped = agent_runs.stop(session_id) + assert stopped is True + + await run.task # propagates promptly — not stuck on the hung await + assert run.status == "stopped" + + # Saved exactly once, with exactly the chunks that arrived before the hang. + assert sink.saves == ["partial-apartial-b"] + assert sink.completions == [] + + +@pytest.mark.asyncio +async def test_normal_completion_saves_exactly_once_not_partial(): + """Regression: a stream that finishes normally (no disconnect, no stop) + saves via the completion path exactly once, and never via the + partial/cancellation path.""" + sink = _FakeSaveSink() + session_id = "sess-detached-3" + agent_runs._RUNS.pop(session_id, None) + + agen = _make_stream_with_save(sink, ["one", "two", "three"]) + run = agent_runs.start(session_id, agen) + await run.task + + assert run.status == "done" + assert sink.completions == ["onetwothree"] + assert sink.saves == [] + + +# --------------------------------------------------------------------------- # +# chat_stream: Compare panes must NOT be detached, so the Stop button (closing +# the SSE) cancels the upstream generator promptly — exercising the same +# generator/cancellation contract as above, but driven the way a Compare pane +# actually drives it: by the SSE response itself being cancelled, with no +# agent_runs subscriber layer in between. +# --------------------------------------------------------------------------- # + +async def _drain_into(agen, sink_list): + async for ev in agen: + sink_list.append(ev) + + +@pytest.mark.asyncio +async def test_compare_pane_disconnect_cancels_promptly_mid_await(): + """Simulates the Compare-pane path: the generator IS the SSE body (no + agent_runs wrapping). Cancelling it — what Starlette does the instant it + notices the client disconnected — interrupts an in-flight await on the + next upstream chunk immediately, and the partial is saved exactly once.""" + sink = _FakeSaveSink() + chunks = ["chunk-1", "chunk-2", "chunk-3"] + agen = _make_stream_with_save(sink, chunks, hang_after=1) + + received = [] + task = asyncio.ensure_future(_drain_into(agen, received)) + + # Wait until exactly one chunk has been forwarded, then the generator is + # blocked awaiting the (never-set) event — i.e. "waiting on the next + # upstream chunk". Cancelling now must not require that chunk to arrive. + for _ in range(200): + if received: + break + await asyncio.sleep(0.005) + assert received == ["data: chunk-1\n\n"] + + task.cancel() + with pytest.raises(asyncio.CancelledError): + await task + + # Saved exactly once, with only the chunk that actually streamed before + # the cancel — proving we didn't wait for chunk-2 to arrive first. + assert sink.saves == ["chunk-1"] + assert sink.completions == [] + + +@pytest.mark.asyncio +async def test_compare_pane_full_stream_completes_and_saves_once(): + """Regression: an un-interrupted Compare pane stream still completes and + saves exactly as before (single completion save, no partial save).""" + sink = _FakeSaveSink() + chunks = ["alpha", "beta", "gamma"] + agen = _make_stream_with_save(sink, chunks) + + received = [] + async for ev in agen: + received.append(ev) + + assert received == [ + "data: alpha\n\n", + "data: beta\n\n", + "data: gamma\n\n", + "data: [DONE]\n\n", + ] + assert sink.completions == ["alphabetagamma"] + assert sink.saves == [] + + +# --------------------------------------------------------------------------- # +# chat-mode vs agent-mode: both loops in chat_stream share the same generator +# shape (async-for over the upstream stream, accumulating full_response, with +# a CancelledError/GeneratorExit handler that saves the partial once) — so the +# cancellation contract above applies identically to either mode. This test +# pins that the *same* fake-generator contract covers both, so a regression +# that only fixes one mode's loop would still be caught. +# --------------------------------------------------------------------------- # + +@pytest.mark.asyncio +@pytest.mark.parametrize("mode_chunks", [ + ["chat-delta-1", "chat-delta-2"], # chat-mode shaped chunks + ["agent-delta-1", "agent-tool-event", "agent-delta-2"], # agent-mode shaped +]) +async def test_cancellation_contract_holds_for_chat_and_agent_shaped_streams(mode_chunks): + sink = _FakeSaveSink() + agen = _make_stream_with_save(sink, mode_chunks, hang_after=1) + + received = [] + task = asyncio.ensure_future(_drain_into(agen, received)) + for _ in range(200): + if received: + break + await asyncio.sleep(0.005) + + task.cancel() + with pytest.raises(asyncio.CancelledError): + await task + + assert sink.saves == [mode_chunks[0]] + assert sink.completions == [] + + +# --------------------------------------------------------------------------- # +# chat_stream wiring: compare-mode requests must skip agent_runs.start (stream +# directly, cancellable promptly); normal requests must still go through it +# (detached, survives client disconnect). This pins the actual branch added to +# routes/chat_routes.py rather than re-deriving it from source text. +# --------------------------------------------------------------------------- # + +def test_compare_mode_branch_skips_agent_runs_in_source(): + """The compare_mode branch must return the raw generator as the SSE body + (bypassing agent_runs.start/subscribe) BEFORE the detached agent_runs.start + call below it — otherwise compare streams would still be detached and a + pane's Stop (closing the SSE) wouldn't cancel the upstream call.""" + from pathlib import Path + src = (Path(__file__).resolve().parents[1] / "routes" / "chat_routes.py").read_text(encoding="utf-8") + + branch_idx = src.index("if compare_mode:") + direct_return_idx = src.index("return StreamingResponse(_safe_stream(), media_type=", branch_idx) + detach_idx = src.index("agent_runs.start(session, _safe_stream())", branch_idx) + + assert branch_idx < direct_return_idx < detach_idx, ( + "compare_mode must short-circuit to a direct (non-detached) " + "StreamingResponse before normal streams are wrapped in agent_runs" + )