fix(webhooks): route public emitters through fire_and_forget (#3964) (#4336)

The three public webhook emitters in chat_helpers and webhook_routes
schedule deliveries via asyncio.create_task(webhook_manager.fire(...)),
which bypasses WebhookManager._bg_tasks. asyncio only holds a weak
reference to the outer task, so the GC can collect it mid-delivery and
the webhook is silently dropped.

Route all three through webhook_manager.fire_and_forget() so the task
is tracked by _spawn_tracked() and the manager owns the full lifecycle.

Adds an AST-level guard test that scans routes/ for direct
asyncio.create_task wrapping webhook_manager.fire(...) to prevent
regressions.
This commit is contained in:
Wei Hong
2026-06-16 06:41:45 +08:00
committed by GitHub
parent dd2e23c9af
commit 2196869c86
3 changed files with 60 additions and 7 deletions
+4 -4
View File
@@ -346,9 +346,9 @@ def add_user_message(sess, chat_handler, preprocessed: PreprocessedMessage, inco
def fire_message_event(request, webhook_manager, session_id: str, sess, message: str, compare_mode: bool = False): def fire_message_event(request, webhook_manager, session_id: str, sess, message: str, compare_mode: bool = False):
"""Fire webhook and event_bus events for a new user message.""" """Fire webhook and event_bus events for a new user message."""
if webhook_manager and not compare_mode: if webhook_manager and not compare_mode:
asyncio.create_task(webhook_manager.fire("chat.message", { webhook_manager.fire_and_forget("chat.message", {
"session_id": session_id, "model": sess.model, "message": message[:2000], "session_id": session_id, "model": sess.model, "message": message[:2000],
})) })
from src.event_bus import fire_event from src.event_bus import fire_event
user = effective_user(request) user = effective_user(request)
fire_event("message_sent", user) fire_event("message_sent", user)
@@ -1120,10 +1120,10 @@ def run_post_response_tasks(
# Webhook # Webhook
if webhook_manager and not compare_mode: if webhook_manager and not compare_mode:
asyncio.create_task(webhook_manager.fire("chat.completed", { webhook_manager.fire_and_forget("chat.completed", {
"session_id": session_id, "model": sess.model, "session_id": session_id, "model": sess.model,
"user_message": message, "response": full_response[:2000], "user_message": message, "response": full_response[:2000],
})) })
# Auto-name # Auto-name
if needs_auto_name(sess.name): if needs_auto_name(sess.name):
+2 -3
View File
@@ -1,6 +1,5 @@
"""Webhook, API Token, and sync chat routes.""" """Webhook, API Token, and sync chat routes."""
import asyncio
import uuid import uuid
import logging import logging
from typing import Optional from typing import Optional
@@ -385,10 +384,10 @@ def setup_webhook_routes(
sess.add_message(ChatMessage("assistant", reply)) sess.add_message(ChatMessage("assistant", reply))
session_manager.save_sessions() session_manager.save_sessions()
asyncio.create_task(webhook_manager.fire("chat.completed", { webhook_manager.fire_and_forget("chat.completed", {
"session_id": session_id, "model": sess.model, "session_id": session_id, "model": sess.model,
"user_message": message[:2000], "response": reply[:2000], "user_message": message[:2000], "response": reply[:2000],
})) })
return {"response": reply, "session_id": session_id, "model": sess.model} return {"response": reply, "session_id": session_id, "model": sess.model}
@@ -0,0 +1,54 @@
"""Guard: every public webhook emitter goes through the manager.
Public emitters in `routes/` must schedule their fire through
`webhook_manager.fire_and_forget(...)` (or `_spawn_tracked`). A bare
`asyncio.create_task(webhook_manager.fire(...))` escapes
`WebhookManager._bg_tasks`, so asyncio only holds a weak reference to the
delivery task and the GC can collect it before it sends — silently dropping
the webhook. Catching this with a scan stops a regression from sneaking
back in via a copy-paste.
"""
import ast
from pathlib import Path
ROUTES_DIR = Path(__file__).resolve().parent.parent / "routes"
def _untracked_fire_calls(tree: ast.AST) -> list[tuple[int, str]]:
"""Return (lineno, snippet) for any asyncio.create_task(webhook_manager.fire(...))."""
hits: list[tuple[int, str]] = []
for node in ast.walk(tree):
if not isinstance(node, ast.Call):
continue
func = node.func
if not (isinstance(func, ast.Attribute) and func.attr == "create_task"):
continue
if not (isinstance(func.value, ast.Name) and func.value.id == "asyncio"):
continue
if not node.args:
continue
inner = node.args[0]
if not isinstance(inner, ast.Call):
continue
inner_func = inner.func
if (
isinstance(inner_func, ast.Attribute)
and inner_func.attr == "fire"
and isinstance(inner_func.value, ast.Name)
and inner_func.value.id == "webhook_manager"
):
hits.append((node.lineno, ast.unparse(node)))
return hits
def test_no_untracked_webhook_fire_in_routes():
offenders: list[str] = []
for path in ROUTES_DIR.rglob("*.py"):
tree = ast.parse(path.read_text(), filename=str(path))
for lineno, snippet in _untracked_fire_calls(tree):
offenders.append(f"{path.relative_to(ROUTES_DIR.parent)}:{lineno}: {snippet}")
assert not offenders, (
"Public webhook emitters must use webhook_manager.fire_and_forget(...) "
"so the delivery task is tracked in WebhookManager._bg_tasks. Found "
"untracked emitter(s):\n " + "\n ".join(offenders)
)