From 2196869c86c06e6698b786f5eba905e9396c4657 Mon Sep 17 00:00:00 2001 From: Wei Hong <49374928+ChangWeiHong@users.noreply.github.com> Date: Tue, 16 Jun 2026 06:41:45 +0800 Subject: [PATCH] fix(webhooks): route public emitters through fire_and_forget (#3964) (#4336) The three public webhook emitters in chat_helpers and webhook_routes schedule deliveries via asyncio.create_task(webhook_manager.fire(...)), which bypasses WebhookManager._bg_tasks. asyncio only holds a weak reference to the outer task, so the GC can collect it mid-delivery and the webhook is silently dropped. Route all three through webhook_manager.fire_and_forget() so the task is tracked by _spawn_tracked() and the manager owns the full lifecycle. Adds an AST-level guard test that scans routes/ for direct asyncio.create_task wrapping webhook_manager.fire(...) to prevent regressions. --- routes/chat_helpers.py | 8 ++-- routes/webhook_routes.py | 5 +- tests/test_webhook_emitters_use_manager.py | 54 ++++++++++++++++++++++ 3 files changed, 60 insertions(+), 7 deletions(-) create mode 100644 tests/test_webhook_emitters_use_manager.py diff --git a/routes/chat_helpers.py b/routes/chat_helpers.py index c5196551a..cc927eec9 100644 --- a/routes/chat_helpers.py +++ b/routes/chat_helpers.py @@ -346,9 +346,9 @@ def add_user_message(sess, chat_handler, preprocessed: PreprocessedMessage, inco def fire_message_event(request, webhook_manager, session_id: str, sess, message: str, compare_mode: bool = False): """Fire webhook and event_bus events for a new user message.""" if webhook_manager and not compare_mode: - asyncio.create_task(webhook_manager.fire("chat.message", { + webhook_manager.fire_and_forget("chat.message", { "session_id": session_id, "model": sess.model, "message": message[:2000], - })) + }) from src.event_bus import fire_event user = effective_user(request) fire_event("message_sent", user) @@ -1120,10 +1120,10 @@ def run_post_response_tasks( # Webhook if webhook_manager and not compare_mode: - asyncio.create_task(webhook_manager.fire("chat.completed", { + webhook_manager.fire_and_forget("chat.completed", { "session_id": session_id, "model": sess.model, "user_message": message, "response": full_response[:2000], - })) + }) # Auto-name if needs_auto_name(sess.name): diff --git a/routes/webhook_routes.py b/routes/webhook_routes.py index 77902c24b..c9cf856ca 100644 --- a/routes/webhook_routes.py +++ b/routes/webhook_routes.py @@ -1,6 +1,5 @@ """Webhook, API Token, and sync chat routes.""" -import asyncio import uuid import logging from typing import Optional @@ -385,10 +384,10 @@ def setup_webhook_routes( sess.add_message(ChatMessage("assistant", reply)) session_manager.save_sessions() - asyncio.create_task(webhook_manager.fire("chat.completed", { + webhook_manager.fire_and_forget("chat.completed", { "session_id": session_id, "model": sess.model, "user_message": message[:2000], "response": reply[:2000], - })) + }) return {"response": reply, "session_id": session_id, "model": sess.model} diff --git a/tests/test_webhook_emitters_use_manager.py b/tests/test_webhook_emitters_use_manager.py new file mode 100644 index 000000000..4edfa7336 --- /dev/null +++ b/tests/test_webhook_emitters_use_manager.py @@ -0,0 +1,54 @@ +"""Guard: every public webhook emitter goes through the manager. + +Public emitters in `routes/` must schedule their fire through +`webhook_manager.fire_and_forget(...)` (or `_spawn_tracked`). A bare +`asyncio.create_task(webhook_manager.fire(...))` escapes +`WebhookManager._bg_tasks`, so asyncio only holds a weak reference to the +delivery task and the GC can collect it before it sends — silently dropping +the webhook. Catching this with a scan stops a regression from sneaking +back in via a copy-paste. +""" +import ast +from pathlib import Path + +ROUTES_DIR = Path(__file__).resolve().parent.parent / "routes" + + +def _untracked_fire_calls(tree: ast.AST) -> list[tuple[int, str]]: + """Return (lineno, snippet) for any asyncio.create_task(webhook_manager.fire(...)).""" + hits: list[tuple[int, str]] = [] + for node in ast.walk(tree): + if not isinstance(node, ast.Call): + continue + func = node.func + if not (isinstance(func, ast.Attribute) and func.attr == "create_task"): + continue + if not (isinstance(func.value, ast.Name) and func.value.id == "asyncio"): + continue + if not node.args: + continue + inner = node.args[0] + if not isinstance(inner, ast.Call): + continue + inner_func = inner.func + if ( + isinstance(inner_func, ast.Attribute) + and inner_func.attr == "fire" + and isinstance(inner_func.value, ast.Name) + and inner_func.value.id == "webhook_manager" + ): + hits.append((node.lineno, ast.unparse(node))) + return hits + + +def test_no_untracked_webhook_fire_in_routes(): + offenders: list[str] = [] + for path in ROUTES_DIR.rglob("*.py"): + tree = ast.parse(path.read_text(), filename=str(path)) + for lineno, snippet in _untracked_fire_calls(tree): + offenders.append(f"{path.relative_to(ROUTES_DIR.parent)}:{lineno}: {snippet}") + assert not offenders, ( + "Public webhook emitters must use webhook_manager.fire_and_forget(...) " + "so the delivery task is tracked in WebhookManager._bg_tasks. Found " + "untracked emitter(s):\n " + "\n ".join(offenders) + )