diff --git a/routes/contacts_routes.py b/routes/contacts_routes.py index 409184fa1..8a90cf473 100644 --- a/routes/contacts_routes.py +++ b/routes/contacts_routes.py @@ -11,14 +11,17 @@ import uuid import json import csv import io +import os import httpx from pathlib import Path from datetime import datetime -from fastapi import APIRouter, Query, Depends, Response +from urllib.parse import urljoin, urlparse, urlunparse + +from fastapi import APIRouter, Query, Depends, Response, HTTPException from typing import List, Dict, Optional -from src.auth_helpers import require_user from core.middleware import require_admin +from src.url_safety import check_outbound_url logger = logging.getLogger(__name__) @@ -53,6 +56,21 @@ def _carddav_configured(cfg: Optional[Dict] = None) -> bool: return bool((cfg.get("url") or "").strip()) +def _validate_carddav_url(url: str) -> str: + cleaned = (url if isinstance(url, str) else "").strip().rstrip("/") + ok, reason = check_outbound_url( + cleaned, + block_private=os.getenv("CARDDAV_BLOCK_PRIVATE_IPS", "false").lower() == "true", + ) + if not ok: + raise ValueError(f"Rejected CardDAV URL: {reason}") + return cleaned + + +def _carddav_base_url(cfg: Dict) -> str: + return _validate_carddav_url(cfg.get("url") or "") + + def _normalize_contact(contact: Dict) -> Dict: emails = [] for e in contact.get("emails") or ([] if not contact.get("email") else [contact.get("email")]): @@ -219,14 +237,18 @@ _contact_cache = {"contacts": [], "fetched_at": None} def _abs_url(href: str) -> str: """Combine a multistatus (an absolute path like /user/contacts/x.vcf) with the configured CardDAV server origin so we - get a fully-qualified URL to PUT/DELETE. If href is already absolute - (http...), return it as-is.""" - from urllib.parse import urlparse, urlunparse - if href.startswith("http://") or href.startswith("https://"): - return href + get a fully-qualified URL to PUT/DELETE. Absolute hrefs are accepted only + for the configured origin; a cross-origin href is treated as a path on the + configured server so a malicious CardDAV response cannot redirect later + writes/deletes to cloud metadata or another host.""" cfg = _get_carddav_config() - p = urlparse(cfg["url"]) - return urlunparse((p.scheme, p.netloc, href, "", "", "")) + base = _carddav_base_url(cfg) + base_p = urlparse(base) + joined = urljoin(base.rstrip("/") + "/", href or "") + joined_p = urlparse(joined) + if (joined_p.scheme, joined_p.netloc) != (base_p.scheme, base_p.netloc): + joined = urlunparse((base_p.scheme, base_p.netloc, joined_p.path or "/", "", joined_p.query, "")) + return _validate_carddav_url(joined) # CardDAV REPORT body — pull every card's etag + raw vCard in ONE request, @@ -297,6 +319,7 @@ def _fetch_contacts(force=False): return contacts try: + cfg["url"] = _carddav_base_url(cfg) auth = None if cfg["username"]: auth = (cfg["username"], cfg["password"]) @@ -353,8 +376,8 @@ def _create_contact(name: str, email: str) -> bool: contact_uid = str(uuid.uuid4()) vcard = _build_vcard(name, email, contact_uid) - url = cfg["url"].rstrip("/") + "/" + contact_uid + ".vcf" try: + url = _carddav_base_url(cfg) + "/" + contact_uid + ".vcf" auth = None if cfg["username"]: auth = (cfg["username"], cfg["password"]) @@ -382,7 +405,7 @@ def _vcard_url(uid: str) -> str: escape the collection and target an arbitrary CardDAV resource.""" from urllib.parse import quote cfg = _get_carddav_config() - return cfg["url"].rstrip("/") + "/" + quote(uid, safe="") + ".vcf" + return _carddav_base_url(cfg) + "/" + quote(uid, safe="") + ".vcf" def _import_vcards(text: str) -> Dict: @@ -413,6 +436,11 @@ def _import_vcards(text: str) -> Dict: if imported: _save_local_contacts(contacts) return {"imported": imported, "failed": 0, "total": len(parsed)} + try: + base_url = _carddav_base_url(cfg) + except ValueError as e: + logger.warning("CardDAV import URL rejected: %s", e) + return {"imported": 0, "failed": 0, "total": 0, "error": str(e)} auth = (cfg["username"], cfg["password"]) if cfg["username"] else None # Split into individual cards. re.split drops the BEGIN line, so we # re-add it. Normalize CRLF. @@ -441,7 +469,7 @@ def _import_vcards(text: str) -> Dict: elif not re.search(r"^VERSION:", block, re.MULTILINE): block = block.replace("BEGIN:VCARD", "BEGIN:VCARD\nVERSION:4.0", 1) vcard = block.replace("\n", "\r\n") + "\r\n" - url = cfg["url"].rstrip("/") + "/" + quote(uid, safe="") + ".vcf" + url = base_url + "/" + quote(uid, safe="") + ".vcf" try: r = httpx.put( url, data=vcard.encode("utf-8"), @@ -601,8 +629,8 @@ def _update_contact(uid: str, name: str, emails: List[str], phones: List[str]) - vcard = _build_vcard(name, "", uid=uid, emails=emails, phones=phones) # Use the real resource href (handles externally-created contacts whose # filename != UID); falls back to the .vcf guess. - url = _resolve_resource_url(uid) try: + url = _resolve_resource_url(uid) auth = (cfg["username"], cfg["password"]) if cfg["username"] else None r = httpx.put( url, @@ -630,8 +658,8 @@ def _delete_contact(uid: str) -> bool: _save_local_contacts(remaining) return True - url = _resolve_resource_url(uid) try: + url = _resolve_resource_url(uid) auth = (cfg["username"], cfg["password"]) if cfg["username"] else None r = httpx.delete(url, auth=auth, timeout=10) if r.status_code in (200, 204): @@ -747,7 +775,13 @@ def setup_contacts_routes(): settings = _load_settings() for key in ("carddav_url", "carddav_username", "carddav_password"): if key in data: - settings[key] = data[key] + if key == "carddav_url" and str(data[key] or "").strip(): + try: + settings[key] = _validate_carddav_url(data[key]) + except ValueError as e: + raise HTTPException(400, str(e)) + else: + settings[key] = data[key] _save_settings(settings) # Force re-fetch _contact_cache["fetched_at"] = None diff --git a/src/caldav_sync.py b/src/caldav_sync.py index 663c0bd59..b139dbbb1 100644 --- a/src/caldav_sync.py +++ b/src/caldav_sync.py @@ -27,6 +27,7 @@ import hashlib import ipaddress import logging import os +import socket import uuid from datetime import date, datetime, timedelta, timezone from urllib.parse import urlparse, urlunparse @@ -50,15 +51,55 @@ def _private_caldav_allowed() -> bool: return os.environ.get("ODYSSEUS_ALLOW_PRIVATE_CALDAV", "0").lower() in {"1", "true", "yes"} +def _validate_caldav_address(addr: ipaddress._BaseAddress) -> None: + if isinstance(addr, ipaddress.IPv6Address) and addr.ipv4_mapped is not None: + addr = addr.ipv4_mapped + if ( + addr.is_loopback + or addr.is_link_local + or addr.is_multicast + or addr.is_unspecified + or addr.is_reserved + ): + raise ValueError("CalDAV URL host is not allowed") + if addr.is_private and not _private_caldav_allowed(): + raise ValueError("Private CalDAV IPs require ODYSSEUS_ALLOW_PRIVATE_CALDAV=1") + + def _validate_caldav_ip(host: str) -> None: try: ip = ipaddress.ip_address(host.strip("[]")) except ValueError: return - if ip.is_loopback or ip.is_link_local or ip.is_multicast or ip.is_unspecified: - raise ValueError("CalDAV URL host is not allowed") - if ip.is_private and not _private_caldav_allowed(): - raise ValueError("Private CalDAV IPs require ODYSSEUS_ALLOW_PRIVATE_CALDAV=1") + _validate_caldav_address(ip) + + +def _resolve_caldav_host_ips(host: str) -> list[ipaddress._BaseAddress]: + addrs: list[ipaddress._BaseAddress] = [] + for family, _, _, _, sockaddr in socket.getaddrinfo(host, None): + if family not in (socket.AF_INET, socket.AF_INET6): + continue + try: + addrs.append(ipaddress.ip_address(sockaddr[0].split("%", 1)[0])) + except ValueError: + continue + return addrs + + +def _validate_caldav_hostname(host: str) -> None: + try: + ipaddress.ip_address(host.strip("[]")) + return + except ValueError: + pass + try: + addrs = _resolve_caldav_host_ips(host) + except OSError: + raise ValueError("CalDAV URL host does not resolve") + if not addrs: + raise ValueError("CalDAV URL host does not resolve") + for addr in addrs: + _validate_caldav_address(addr) def validate_caldav_url(raw_url: str) -> str: @@ -83,6 +124,7 @@ def validate_caldav_url(raw_url: str) -> str: if host in _BLOCKED_HOSTS or host.endswith(".localhost"): raise ValueError("CalDAV URL host is not allowed") _validate_caldav_ip(host) + _validate_caldav_hostname(host) return urlunparse(parsed._replace(fragment="")).rstrip("/") diff --git a/src/caldav_writeback.py b/src/caldav_writeback.py index 1b6d6cc80..e5bc46d0e 100644 --- a/src/caldav_writeback.py +++ b/src/caldav_writeback.py @@ -167,6 +167,12 @@ async def writeback_event(owner: str, calendar_source: str, calendar_id: str, pw = decrypt(cfg.get("password") or "") if not (url and user and pw): return {"skipped": "caldav not configured"} + from src.caldav_sync import validate_caldav_url + try: + url = validate_caldav_url(url) + except ValueError as e: + logger.warning("CalDAV write-back URL rejected: %s", e) + return {"ok": False, "error": str(e)[:200]} result = await asyncio.to_thread(_writeback_blocking, calendar_id, ev, delete, url, user, pw) if not result.get("ok"): logger.warning("CalDAV write-back did not apply: %s", result.get("error") or result) diff --git a/tests/test_caldav_url_hardening.py b/tests/test_caldav_url_hardening.py index 40b1f3485..39de9a9eb 100644 --- a/tests/test_caldav_url_hardening.py +++ b/tests/test_caldav_url_hardening.py @@ -1,4 +1,5 @@ import asyncio +import ipaddress import sys import types from pathlib import Path @@ -8,7 +9,12 @@ import pytest from src import caldav_sync -def test_validate_caldav_url_normalizes_safe_url(): +def test_validate_caldav_url_normalizes_safe_url(monkeypatch): + monkeypatch.setattr( + caldav_sync, + "_resolve_caldav_host_ips", + lambda host: [ipaddress.ip_address("93.184.216.34")], + ) assert ( caldav_sync.validate_caldav_url(" https://calendar.example.com/dav/ ") == "https://calendar.example.com/dav" @@ -42,7 +48,46 @@ def test_validate_caldav_url_blocks_private_ips_unless_explicitly_allowed(monkey assert caldav_sync.validate_caldav_url("http://10.0.0.5:5232/dav") == "http://10.0.0.5:5232/dav" +def test_validate_caldav_url_blocks_dns_to_private(monkeypatch): + monkeypatch.delenv("ODYSSEUS_ALLOW_PRIVATE_CALDAV", raising=False) + monkeypatch.setattr( + caldav_sync, + "_resolve_caldav_host_ips", + lambda host: [ipaddress.ip_address("10.0.0.5")], + ) + + with pytest.raises(ValueError, match="Private CalDAV IPs require"): + caldav_sync.validate_caldav_url("https://calendar.example.com/dav") + + +def test_validate_caldav_url_blocks_dns_to_link_local_even_when_private_allowed(monkeypatch): + monkeypatch.setenv("ODYSSEUS_ALLOW_PRIVATE_CALDAV", "1") + monkeypatch.setattr( + caldav_sync, + "_resolve_caldav_host_ips", + lambda host: [ipaddress.ip_address("169.254.169.254")], + ) + + with pytest.raises(ValueError, match="host is not allowed"): + caldav_sync.validate_caldav_url("https://calendar.example.com/dav") + + +def test_validate_caldav_url_fails_closed_when_hostname_does_not_resolve(monkeypatch): + def _no_dns(host): + raise OSError("no such host") + + monkeypatch.setattr(caldav_sync, "_resolve_caldav_host_ips", _no_dns) + + with pytest.raises(ValueError, match="host does not resolve"): + caldav_sync.validate_caldav_url("https://calendar.example.com/dav") + + def test_sync_caldav_decrypts_stored_password_and_validates_url(monkeypatch): + monkeypatch.setattr( + caldav_sync, + "_resolve_caldav_host_ips", + lambda host: [ipaddress.ip_address("93.184.216.34")], + ) prefs_mod = types.ModuleType("routes.prefs_routes") prefs_mod._load_for_user = lambda owner: { "caldav": { diff --git a/tests/test_caldav_url_nonstring.py b/tests/test_caldav_url_nonstring.py index a9d8f3f58..db50b8c26 100644 --- a/tests/test_caldav_url_nonstring.py +++ b/tests/test_caldav_url_nonstring.py @@ -5,9 +5,13 @@ It did `(raw_url or "").strip()`, so a non-string scalar (e.g. an int from a mis-typed config) reached `.strip()` and raised TypeError instead of the function\'s own ValueError. """ +import ipaddress + import pytest -from src.caldav_sync import validate_caldav_url +from src import caldav_sync + +validate_caldav_url = caldav_sync.validate_caldav_url def test_non_string_raises_valueerror_not_typeerror(): @@ -17,6 +21,11 @@ def test_non_string_raises_valueerror_not_typeerror(): validate_caldav_url(None) -def test_valid_url_passes(): +def test_valid_url_passes(monkeypatch): + monkeypatch.setattr( + caldav_sync, + "_resolve_caldav_host_ips", + lambda host: [ipaddress.ip_address("93.184.216.34")], + ) out = validate_caldav_url("https://dav.example.com/calendars/") assert "example.com" in out diff --git a/tests/test_caldav_writeback.py b/tests/test_caldav_writeback.py index c501ad155..f63671236 100644 --- a/tests/test_caldav_writeback.py +++ b/tests/test_caldav_writeback.py @@ -5,6 +5,9 @@ iCalendar serialization, hash-based remote-calendar discovery, and the create/update/delete orchestration. """ +import asyncio +import sys +import types from datetime import datetime from src.caldav_writeback import ( @@ -123,3 +126,102 @@ def test_push_missing_uid_reports_input_error_before_remote_lookup(): res = push_event([cal], CAL_ID, _ev(uid="")) assert res["ok"] is False and "uid" in res["error"] assert cal._existing.saved is False + + +def test_writeback_validates_saved_url_before_remote_call(monkeypatch): + import src.caldav_sync as sync + import src.caldav_writeback as wb + + prefs_mod = types.ModuleType("routes.prefs_routes") + prefs_mod._load_for_user = lambda owner: { + "caldav": { + "url": " https://dav.example.com/calendars/home/ ", + "username": owner, + "password": "enc:pw", + } + } + secret_mod = types.ModuleType("src.secret_storage") + secret_mod.decrypt = lambda value: "plain-password" + monkeypatch.setitem(sys.modules, "routes.prefs_routes", prefs_mod) + monkeypatch.setitem(sys.modules, "src.secret_storage", secret_mod) + + captured = {} + + def fake_validate(url): + captured["validated_url"] = url + return "https://dav.example.com/calendars/home" + + def fake_writeback_blocking(local_cal_id, ev, delete, url, username, password): + captured.update( + { + "local_cal_id": local_cal_id, + "delete": delete, + "url": url, + "username": username, + "password": password, + } + ) + return {"ok": True} + + async def inline_to_thread(func, *args, **kwargs): + return func(*args, **kwargs) + + monkeypatch.setattr(sync, "validate_caldav_url", fake_validate) + monkeypatch.setattr(wb, "_writeback_blocking", fake_writeback_blocking) + monkeypatch.setattr(wb.asyncio, "to_thread", inline_to_thread) + + result = asyncio.run( + wb.writeback_event("alice", "caldav", "caldav-123", {"uid": "evt-1"}) + ) + + assert result == {"ok": True} + assert captured == { + "validated_url": "https://dav.example.com/calendars/home/", + "local_cal_id": "caldav-123", + "delete": False, + "url": "https://dav.example.com/calendars/home", + "username": "alice", + "password": "plain-password", + } + + +def test_writeback_rejects_unsafe_saved_url_before_remote_call(monkeypatch): + import src.caldav_sync as sync + import src.caldav_writeback as wb + + prefs_mod = types.ModuleType("routes.prefs_routes") + prefs_mod._load_for_user = lambda owner: { + "caldav": { + "url": "http://evil.example/latest/meta-data", + "username": owner, + "password": "enc:pw", + } + } + secret_mod = types.ModuleType("src.secret_storage") + secret_mod.decrypt = lambda value: "plain-password" + monkeypatch.setitem(sys.modules, "routes.prefs_routes", prefs_mod) + monkeypatch.setitem(sys.modules, "src.secret_storage", secret_mod) + + called = False + + def fake_validate(_url): + raise ValueError("CalDAV URL host is not allowed") + + def fake_writeback_blocking(*_args, **_kwargs): + nonlocal called + called = True + return {"ok": True} + + async def inline_to_thread(func, *args, **kwargs): + return func(*args, **kwargs) + + monkeypatch.setattr(sync, "validate_caldav_url", fake_validate) + monkeypatch.setattr(wb, "_writeback_blocking", fake_writeback_blocking) + monkeypatch.setattr(wb.asyncio, "to_thread", inline_to_thread) + + result = asyncio.run( + wb.writeback_event("alice", "caldav", "caldav-123", {"uid": "evt-1"}) + ) + + assert result == {"ok": False, "error": "CalDAV URL host is not allowed"} + assert called is False diff --git a/tests/test_contacts_carddav_security.py b/tests/test_contacts_carddav_security.py new file mode 100644 index 000000000..8a20af08f --- /dev/null +++ b/tests/test_contacts_carddav_security.py @@ -0,0 +1,66 @@ +"""CardDAV outbound URL hardening tests.""" + +import pytest + +import routes.contacts_routes as contacts + + +def test_validate_carddav_url_blocks_metadata_targets(monkeypatch): + monkeypatch.setattr( + contacts, + "check_outbound_url", + lambda url, *, block_private=False: (False, "link-local address blocked"), + ) + + with pytest.raises(ValueError, match="link-local"): + contacts._validate_carddav_url("http://169.254.169.254/latest/meta-data") + + +def test_validate_carddav_url_rejects_non_string(monkeypatch): + monkeypatch.setattr( + contacts, + "check_outbound_url", + lambda url, *, block_private=False: (False, "URL is required"), + ) + + with pytest.raises(ValueError, match="URL is required"): + contacts._validate_carddav_url(12345) + + +def test_abs_url_pins_cross_origin_href_to_configured_carddav_origin(monkeypatch): + monkeypatch.setattr( + contacts, + "_get_carddav_config", + lambda: {"url": "https://dav.example.com/addressbooks/alice", "username": "", "password": ""}, + ) + monkeypatch.setattr( + contacts, + "check_outbound_url", + lambda url, *, block_private=False: (True, "ok"), + ) + + assert ( + contacts._abs_url("http://169.254.169.254/latest/meta-data") + == "https://dav.example.com/latest/meta-data" + ) + + +def test_vcard_url_validates_base_and_quotes_uid(monkeypatch): + seen = [] + monkeypatch.setattr( + contacts, + "_get_carddav_config", + lambda: {"url": "https://dav.example.com/addressbooks/alice/", "username": "", "password": ""}, + ) + + def _safe(url, *, block_private=False): + seen.append((url, block_private)) + return True, "ok" + + monkeypatch.setattr(contacts, "check_outbound_url", _safe) + + assert ( + contacts._vcard_url("uid/../../escape") + == "https://dav.example.com/addressbooks/alice/uid%2F..%2F..%2Fescape.vcf" + ) + assert seen == [("https://dav.example.com/addressbooks/alice", False)]