Replace webhook manager datetime.utcnow calls (#1499)

Co-authored-by: ghreprimand <203024559+ghreprimand@users.noreply.github.com>
This commit is contained in:
ghreprimand
2026-06-03 00:14:23 -05:00
committed by GitHub
parent c639daa7a2
commit 6fd52cf317
2 changed files with 75 additions and 4 deletions
+9 -4
View File
@@ -7,7 +7,7 @@ import ipaddress
import json import json
import logging import logging
import re import re
from datetime import datetime from datetime import datetime, timezone
from typing import Optional from typing import Optional
from urllib.parse import urlparse from urllib.parse import urlparse
@@ -37,6 +37,11 @@ _PRIVATE_NETWORKS = [
] ]
def _utcnow() -> datetime:
"""Return naive UTC for existing DB columns while avoiding datetime.utcnow()."""
return datetime.now(timezone.utc).replace(tzinfo=None)
def _ip_is_private(addr: ipaddress._BaseAddress) -> bool: def _ip_is_private(addr: ipaddress._BaseAddress) -> bool:
# If the address is IPv4-mapped IPv6, extract and evaluate the embedded IPv4 # If the address is IPv4-mapped IPv6, extract and evaluate the embedded IPv4
if isinstance(addr, ipaddress.IPv6Address) and addr.ipv4_mapped is not None: if isinstance(addr, ipaddress.IPv6Address) and addr.ipv4_mapped is not None:
@@ -203,7 +208,7 @@ class WebhookManager:
logger.warning(f"Webhook {webhook_id} has invalid URL, skipping: {e}") logger.warning(f"Webhook {webhook_id} has invalid URL, skipping: {e}")
return return
body = json.dumps({"event": event, "timestamp": datetime.utcnow().isoformat(), "data": payload}) body = json.dumps({"event": event, "timestamp": _utcnow().isoformat(), "data": payload})
headers = { headers = {
"Content-Type": "application/json", "Content-Type": "application/json",
"X-Odysseus-Event": event, "X-Odysseus-Event": event,
@@ -217,7 +222,7 @@ class WebhookManager:
try: try:
resp = await self._client.post(url, content=body, headers=headers) resp = await self._client.post(url, content=body, headers=headers)
db.query(Webhook).filter(Webhook.id == webhook_id).update({ db.query(Webhook).filter(Webhook.id == webhook_id).update({
"last_triggered_at": datetime.utcnow(), "last_triggered_at": _utcnow(),
"last_status_code": resp.status_code, "last_status_code": resp.status_code,
"last_error": None, "last_error": None,
}) })
@@ -226,7 +231,7 @@ class WebhookManager:
logger.warning(f"Webhook delivery failed for {webhook_id}") logger.warning(f"Webhook delivery failed for {webhook_id}")
try: try:
db.query(Webhook).filter(Webhook.id == webhook_id).update({ db.query(Webhook).filter(Webhook.id == webhook_id).update({
"last_triggered_at": datetime.utcnow(), "last_triggered_at": _utcnow(),
"last_status_code": None, "last_status_code": None,
"last_error": sanitize_error(str(e)), "last_error": sanitize_error(str(e)),
}) })
+66
View File
@@ -1,4 +1,7 @@
import sys import sys
import json
from datetime import datetime
# conftest.py stubs src.database with a fake module; webhook_manager imports # conftest.py stubs src.database with a fake module; webhook_manager imports
# from it, so drop the stub here to load the real module under test. # from it, so drop the stub here to load the real module under test.
if "src.database" in sys.modules: if "src.database" in sys.modules:
@@ -26,3 +29,66 @@ def test_webhook_url_ssrf_mitigation():
# A clearly public IP literal must still be accepted. # A clearly public IP literal must still be accepted.
public_url = "http://93.184.216.34/" public_url = "http://93.184.216.34/"
assert validate_webhook_url(public_url) == public_url assert validate_webhook_url(public_url) == public_url
@pytest.mark.asyncio
async def test_webhook_delivery_uses_naive_utc_timestamps(monkeypatch):
import src.webhook_manager as wm
class _Query:
def __init__(self, updates):
self.updates = updates
def filter(self, *_args, **_kwargs):
return self
def update(self, values):
self.updates.append(values)
class _Db:
def __init__(self):
self.updates = []
self.committed = False
self.closed = False
def query(self, _model):
return _Query(self.updates)
def commit(self):
self.committed = True
def rollback(self):
pass
def close(self):
self.closed = True
class _Response:
status_code = 204
class _Client:
def __init__(self):
self.content = ""
async def post(self, _url, content, headers):
self.content = content
assert headers["X-Odysseus-Event"] == "webhook.test"
return _Response()
db = _Db()
client = _Client()
monkeypatch.setattr(wm, "SessionLocal", lambda: db)
manager = wm.WebhookManager()
await manager._client.aclose()
manager._client = client
await manager._deliver("hook-1", "http://93.184.216.34/", None, "webhook.test", {"ok": True})
body = json.loads(client.content)
payload_timestamp = datetime.fromisoformat(body["timestamp"])
assert payload_timestamp.tzinfo is None
assert db.updates[0]["last_triggered_at"].tzinfo is None
assert db.updates[0]["last_status_code"] == 204
assert db.committed is True
assert db.closed is True