Harden DAV outbound URL validation (#2819)

This commit is contained in:
Vykos
2026-06-05 13:22:21 +02:00
committed by GitHub
parent 6d64055328
commit 370ae5d451
7 changed files with 326 additions and 22 deletions
+46 -4
View File
@@ -27,6 +27,7 @@ import hashlib
import ipaddress
import logging
import os
import socket
import uuid
from datetime import date, datetime, timedelta, timezone
from urllib.parse import urlparse, urlunparse
@@ -50,15 +51,55 @@ def _private_caldav_allowed() -> bool:
return os.environ.get("ODYSSEUS_ALLOW_PRIVATE_CALDAV", "0").lower() in {"1", "true", "yes"}
def _validate_caldav_address(addr: ipaddress._BaseAddress) -> None:
if isinstance(addr, ipaddress.IPv6Address) and addr.ipv4_mapped is not None:
addr = addr.ipv4_mapped
if (
addr.is_loopback
or addr.is_link_local
or addr.is_multicast
or addr.is_unspecified
or addr.is_reserved
):
raise ValueError("CalDAV URL host is not allowed")
if addr.is_private and not _private_caldav_allowed():
raise ValueError("Private CalDAV IPs require ODYSSEUS_ALLOW_PRIVATE_CALDAV=1")
def _validate_caldav_ip(host: str) -> None:
try:
ip = ipaddress.ip_address(host.strip("[]"))
except ValueError:
return
if ip.is_loopback or ip.is_link_local or ip.is_multicast or ip.is_unspecified:
raise ValueError("CalDAV URL host is not allowed")
if ip.is_private and not _private_caldav_allowed():
raise ValueError("Private CalDAV IPs require ODYSSEUS_ALLOW_PRIVATE_CALDAV=1")
_validate_caldav_address(ip)
def _resolve_caldav_host_ips(host: str) -> list[ipaddress._BaseAddress]:
addrs: list[ipaddress._BaseAddress] = []
for family, _, _, _, sockaddr in socket.getaddrinfo(host, None):
if family not in (socket.AF_INET, socket.AF_INET6):
continue
try:
addrs.append(ipaddress.ip_address(sockaddr[0].split("%", 1)[0]))
except ValueError:
continue
return addrs
def _validate_caldav_hostname(host: str) -> None:
try:
ipaddress.ip_address(host.strip("[]"))
return
except ValueError:
pass
try:
addrs = _resolve_caldav_host_ips(host)
except OSError:
raise ValueError("CalDAV URL host does not resolve")
if not addrs:
raise ValueError("CalDAV URL host does not resolve")
for addr in addrs:
_validate_caldav_address(addr)
def validate_caldav_url(raw_url: str) -> str:
@@ -83,6 +124,7 @@ def validate_caldav_url(raw_url: str) -> str:
if host in _BLOCKED_HOSTS or host.endswith(".localhost"):
raise ValueError("CalDAV URL host is not allowed")
_validate_caldav_ip(host)
_validate_caldav_hostname(host)
return urlunparse(parsed._replace(fragment="")).rstrip("/")
+6
View File
@@ -167,6 +167,12 @@ async def writeback_event(owner: str, calendar_source: str, calendar_id: str,
pw = decrypt(cfg.get("password") or "")
if not (url and user and pw):
return {"skipped": "caldav not configured"}
from src.caldav_sync import validate_caldav_url
try:
url = validate_caldav_url(url)
except ValueError as e:
logger.warning("CalDAV write-back URL rejected: %s", e)
return {"ok": False, "error": str(e)[:200]}
result = await asyncio.to_thread(_writeback_blocking, calendar_id, ev, delete, url, user, pw)
if not result.get("ok"):
logger.warning("CalDAV write-back did not apply: %s", result.get("error") or result)