diff --git a/src/webhook_manager.py b/src/webhook_manager.py index 267ceaa38..af28fe2a7 100644 --- a/src/webhook_manager.py +++ b/src/webhook_manager.py @@ -202,6 +202,18 @@ class WebhookManager: self._client = httpx.AsyncClient(timeout=10, follow_redirects=False) self._loop: Optional[asyncio.AbstractEventLoop] = None self._api_key_manager = api_key_manager + # Strong references to in-flight fire-and-forget tasks. asyncio only + # keeps weak references to tasks, so without this the GC can collect a + # delivery task mid-flight and the webhook is silently never sent. + self._bg_tasks: set = set() + + def _spawn_tracked(self, coro): + """Schedule a background task and hold a strong reference until it + finishes, so it can't be garbage-collected before delivery completes.""" + task = asyncio.ensure_future(coro) + self._bg_tasks.add(task) + task.add_done_callback(self._bg_tasks.discard) + return task def set_loop(self, loop: asyncio.AbstractEventLoop): self._loop = loop @@ -223,8 +235,8 @@ class WebhookManager: if event not in ALLOWED_EVENTS: return try: - loop = asyncio.get_running_loop() - loop.create_task(self.fire(event, payload)) + asyncio.get_running_loop() + self._spawn_tracked(self.fire(event, payload)) except RuntimeError: # Called from a sync thread (e.g. sync FastAPI route in threadpool) if self._loop and self._loop.is_running(): @@ -243,7 +255,7 @@ class WebhookManager: for wh in matching: decrypted_secret = self._decrypt_secret(wh.secret) - asyncio.create_task(self._deliver(wh.id, wh.url, decrypted_secret, event, payload)) + self._spawn_tracked(self._deliver(wh.id, wh.url, decrypted_secret, event, payload)) async def deliver_test(self, webhook_id: str, url: str, encrypted_secret: Optional[str]): """Public method for the test-webhook route.""" diff --git a/tests/test_webhook_task_refs.py b/tests/test_webhook_task_refs.py new file mode 100644 index 000000000..7b2c63697 --- /dev/null +++ b/tests/test_webhook_task_refs.py @@ -0,0 +1,55 @@ +"""Fire-and-forget webhook tasks must be referenced until they finish. + +asyncio keeps only a weak reference to a bare create_task() result, so a +delivery task could be garbage-collected before it ran and the webhook silently +dropped. WebhookManager now holds a strong reference for the task's lifetime and +releases it on completion. +""" +import asyncio +import sys + +# webhook_manager does `from src.database import SessionLocal, Webhook` at import +# time. The shared test harness stubs src.database without Webhook, so ensure the +# attribute exists before importing the manager. These tests never touch the DB +# (the manager is built via __new__), so a placeholder class is sufficient. +_db = sys.modules.get("src.database") +if _db is not None and not hasattr(_db, "Webhook"): + _db.Webhook = type("Webhook", (), {}) + +from src.webhook_manager import WebhookManager # noqa: E402 + + +def test_spawn_tracked_holds_then_releases_reference(): + async def run(): + wm = WebhookManager.__new__(WebhookManager) + wm._bg_tasks = set() + + gate = asyncio.Event() + + async def work(): + await gate.wait() + + task = wm._spawn_tracked(work()) + # Referenced while in flight (this is what stops GC from collecting it). + assert task in wm._bg_tasks + gate.set() + await task + # Reference released once done, so the set does not grow unbounded. + assert task not in wm._bg_tasks + + asyncio.run(run()) + + +def test_spawn_tracked_runs_the_coroutine(): + async def run(): + wm = WebhookManager.__new__(WebhookManager) + wm._bg_tasks = set() + ran = [] + + async def work(): + ran.append(True) + + await wm._spawn_tracked(work()) + assert ran == [True] + + asyncio.run(run())