mirror of
https://github.com/pewdiepie-archdaemon/odysseus.git
synced 2026-06-17 10:15:27 -04:00
fix(webhooks): keep references to in-flight delivery tasks (#3859)
fire() and fire_and_forget() scheduled delivery with bare create_task()/ loop.create_task() and kept no reference. asyncio holds only a weak reference to a task, so the GC could collect a delivery (or the fire() coroutine itself) before it completed, silently dropping the webhook. Track in-flight tasks in a set on the manager via a _spawn_tracked() helper that holds a strong reference for the task's lifetime and discards it on completion (add_done_callback), and route both schedule sites through it. Adds tests/test_webhook_task_refs.py.
This commit is contained in:
committed by
GitHub
parent
1a2bcfcae4
commit
f7a3605b16
+15
-3
@@ -202,6 +202,18 @@ class WebhookManager:
|
|||||||
self._client = httpx.AsyncClient(timeout=10, follow_redirects=False)
|
self._client = httpx.AsyncClient(timeout=10, follow_redirects=False)
|
||||||
self._loop: Optional[asyncio.AbstractEventLoop] = None
|
self._loop: Optional[asyncio.AbstractEventLoop] = None
|
||||||
self._api_key_manager = api_key_manager
|
self._api_key_manager = api_key_manager
|
||||||
|
# Strong references to in-flight fire-and-forget tasks. asyncio only
|
||||||
|
# keeps weak references to tasks, so without this the GC can collect a
|
||||||
|
# delivery task mid-flight and the webhook is silently never sent.
|
||||||
|
self._bg_tasks: set = set()
|
||||||
|
|
||||||
|
def _spawn_tracked(self, coro):
|
||||||
|
"""Schedule a background task and hold a strong reference until it
|
||||||
|
finishes, so it can't be garbage-collected before delivery completes."""
|
||||||
|
task = asyncio.ensure_future(coro)
|
||||||
|
self._bg_tasks.add(task)
|
||||||
|
task.add_done_callback(self._bg_tasks.discard)
|
||||||
|
return task
|
||||||
|
|
||||||
def set_loop(self, loop: asyncio.AbstractEventLoop):
|
def set_loop(self, loop: asyncio.AbstractEventLoop):
|
||||||
self._loop = loop
|
self._loop = loop
|
||||||
@@ -223,8 +235,8 @@ class WebhookManager:
|
|||||||
if event not in ALLOWED_EVENTS:
|
if event not in ALLOWED_EVENTS:
|
||||||
return
|
return
|
||||||
try:
|
try:
|
||||||
loop = asyncio.get_running_loop()
|
asyncio.get_running_loop()
|
||||||
loop.create_task(self.fire(event, payload))
|
self._spawn_tracked(self.fire(event, payload))
|
||||||
except RuntimeError:
|
except RuntimeError:
|
||||||
# Called from a sync thread (e.g. sync FastAPI route in threadpool)
|
# Called from a sync thread (e.g. sync FastAPI route in threadpool)
|
||||||
if self._loop and self._loop.is_running():
|
if self._loop and self._loop.is_running():
|
||||||
@@ -243,7 +255,7 @@ class WebhookManager:
|
|||||||
|
|
||||||
for wh in matching:
|
for wh in matching:
|
||||||
decrypted_secret = self._decrypt_secret(wh.secret)
|
decrypted_secret = self._decrypt_secret(wh.secret)
|
||||||
asyncio.create_task(self._deliver(wh.id, wh.url, decrypted_secret, event, payload))
|
self._spawn_tracked(self._deliver(wh.id, wh.url, decrypted_secret, event, payload))
|
||||||
|
|
||||||
async def deliver_test(self, webhook_id: str, url: str, encrypted_secret: Optional[str]):
|
async def deliver_test(self, webhook_id: str, url: str, encrypted_secret: Optional[str]):
|
||||||
"""Public method for the test-webhook route."""
|
"""Public method for the test-webhook route."""
|
||||||
|
|||||||
@@ -0,0 +1,55 @@
|
|||||||
|
"""Fire-and-forget webhook tasks must be referenced until they finish.
|
||||||
|
|
||||||
|
asyncio keeps only a weak reference to a bare create_task() result, so a
|
||||||
|
delivery task could be garbage-collected before it ran and the webhook silently
|
||||||
|
dropped. WebhookManager now holds a strong reference for the task's lifetime and
|
||||||
|
releases it on completion.
|
||||||
|
"""
|
||||||
|
import asyncio
|
||||||
|
import sys
|
||||||
|
|
||||||
|
# webhook_manager does `from src.database import SessionLocal, Webhook` at import
|
||||||
|
# time. The shared test harness stubs src.database without Webhook, so ensure the
|
||||||
|
# attribute exists before importing the manager. These tests never touch the DB
|
||||||
|
# (the manager is built via __new__), so a placeholder class is sufficient.
|
||||||
|
_db = sys.modules.get("src.database")
|
||||||
|
if _db is not None and not hasattr(_db, "Webhook"):
|
||||||
|
_db.Webhook = type("Webhook", (), {})
|
||||||
|
|
||||||
|
from src.webhook_manager import WebhookManager # noqa: E402
|
||||||
|
|
||||||
|
|
||||||
|
def test_spawn_tracked_holds_then_releases_reference():
|
||||||
|
async def run():
|
||||||
|
wm = WebhookManager.__new__(WebhookManager)
|
||||||
|
wm._bg_tasks = set()
|
||||||
|
|
||||||
|
gate = asyncio.Event()
|
||||||
|
|
||||||
|
async def work():
|
||||||
|
await gate.wait()
|
||||||
|
|
||||||
|
task = wm._spawn_tracked(work())
|
||||||
|
# Referenced while in flight (this is what stops GC from collecting it).
|
||||||
|
assert task in wm._bg_tasks
|
||||||
|
gate.set()
|
||||||
|
await task
|
||||||
|
# Reference released once done, so the set does not grow unbounded.
|
||||||
|
assert task not in wm._bg_tasks
|
||||||
|
|
||||||
|
asyncio.run(run())
|
||||||
|
|
||||||
|
|
||||||
|
def test_spawn_tracked_runs_the_coroutine():
|
||||||
|
async def run():
|
||||||
|
wm = WebhookManager.__new__(WebhookManager)
|
||||||
|
wm._bg_tasks = set()
|
||||||
|
ran = []
|
||||||
|
|
||||||
|
async def work():
|
||||||
|
ran.append(True)
|
||||||
|
|
||||||
|
await wm._spawn_tracked(work())
|
||||||
|
assert ran == [True]
|
||||||
|
|
||||||
|
asyncio.run(run())
|
||||||
Reference in New Issue
Block a user