mirror of
https://github.com/pewdiepie-archdaemon/odysseus.git
synced 2026-06-15 17:25:26 -04:00
f78539ba15
validate_caldav_url resolves and vets the initial host, but caldav's niquests session follows 3xx redirects by default, so a validated public URL can be redirected at request time to loopback/link-local/private space, re-opening the SSRF the host check closes. The existing redirect guard only covered the settings test-connection path. Add a shared _build_dav_client helper that pins the session to zero redirects (any 3xx then raises instead of silently following an attacker-chosen Location), and route both the pull (_sync_blocking) and write-back (_writeback_blocking) paths through it. Mirrors the follow_redirects=False already used on the test-connection path. Tests exercise the real DAVClient request path (a 302 toward an internal host is refused, the sink is never contacted; the PROPFIND is asserted to reach the public server first so the check can't pass vacuously), confirm the helper disables redirects on the installed client, guard against a raw DAVClient creeping back in, cover mixed public/internal DNS results in both orderings, and add the resolves-to-no-usable-records fail-closed branch.
178 lines
6.5 KiB
Python
178 lines
6.5 KiB
Python
import asyncio
|
|
import ipaddress
|
|
import sys
|
|
import types
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
|
|
from src import caldav_sync
|
|
|
|
|
|
def test_validate_caldav_url_normalizes_safe_url(monkeypatch):
|
|
monkeypatch.setattr(
|
|
caldav_sync,
|
|
"_resolve_caldav_host_ips",
|
|
lambda host: [ipaddress.ip_address("93.184.216.34")],
|
|
)
|
|
assert (
|
|
caldav_sync.validate_caldav_url(" https://calendar.example.com/dav/ ")
|
|
== "https://calendar.example.com/dav"
|
|
)
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"url, message",
|
|
[
|
|
("ftp://calendar.example.com/dav", "must start with"),
|
|
("https://alice:secret@calendar.example.com/dav", "credentials"),
|
|
("https://calendar.example.com/dav#frag", "fragments"),
|
|
("http://localhost:5232/dav", "host is not allowed"),
|
|
("http://service.localhost/dav", "host is not allowed"),
|
|
("http://127.0.0.1:5232/dav", "host is not allowed"),
|
|
("http://[::1]:5232/dav", "host is not allowed"),
|
|
("http://169.254.169.254/latest", "host is not allowed"),
|
|
],
|
|
)
|
|
def test_validate_caldav_url_rejects_unsafe_urls(url, message):
|
|
with pytest.raises(ValueError, match=message):
|
|
caldav_sync.validate_caldav_url(url)
|
|
|
|
|
|
def test_validate_caldav_url_blocks_private_ips_unless_explicitly_allowed(monkeypatch):
|
|
monkeypatch.delenv("ODYSSEUS_ALLOW_PRIVATE_CALDAV", raising=False)
|
|
with pytest.raises(ValueError, match="Private CalDAV IPs require"):
|
|
caldav_sync.validate_caldav_url("http://10.0.0.5:5232/dav")
|
|
|
|
monkeypatch.setenv("ODYSSEUS_ALLOW_PRIVATE_CALDAV", "1")
|
|
assert caldav_sync.validate_caldav_url("http://10.0.0.5:5232/dav") == "http://10.0.0.5:5232/dav"
|
|
|
|
|
|
def test_validate_caldav_url_blocks_dns_to_private(monkeypatch):
|
|
monkeypatch.delenv("ODYSSEUS_ALLOW_PRIVATE_CALDAV", raising=False)
|
|
monkeypatch.setattr(
|
|
caldav_sync,
|
|
"_resolve_caldav_host_ips",
|
|
lambda host: [ipaddress.ip_address("10.0.0.5")],
|
|
)
|
|
|
|
with pytest.raises(ValueError, match="Private CalDAV IPs require"):
|
|
caldav_sync.validate_caldav_url("https://calendar.example.com/dav")
|
|
|
|
|
|
def test_validate_caldav_url_blocks_dns_to_link_local_even_when_private_allowed(monkeypatch):
|
|
monkeypatch.setenv("ODYSSEUS_ALLOW_PRIVATE_CALDAV", "1")
|
|
monkeypatch.setattr(
|
|
caldav_sync,
|
|
"_resolve_caldav_host_ips",
|
|
lambda host: [ipaddress.ip_address("169.254.169.254")],
|
|
)
|
|
|
|
with pytest.raises(ValueError, match="host is not allowed"):
|
|
caldav_sync.validate_caldav_url("https://calendar.example.com/dav")
|
|
|
|
|
|
def test_validate_caldav_url_fails_closed_when_hostname_does_not_resolve(monkeypatch):
|
|
def _no_dns(host):
|
|
raise OSError("no such host")
|
|
|
|
monkeypatch.setattr(caldav_sync, "_resolve_caldav_host_ips", _no_dns)
|
|
|
|
with pytest.raises(ValueError, match="host does not resolve"):
|
|
caldav_sync.validate_caldav_url("https://calendar.example.com/dav")
|
|
|
|
|
|
def test_validate_caldav_url_fails_closed_when_host_resolves_to_no_usable_records(monkeypatch):
|
|
# Distinct from the OSError path above: here resolution *succeeds* but yields
|
|
# no usable A/AAAA records (the `if not addrs` branch). Fail closed there too
|
|
# rather than letting an un-vetted host through.
|
|
monkeypatch.setattr(caldav_sync, "_resolve_caldav_host_ips", lambda host: [])
|
|
|
|
with pytest.raises(ValueError, match="host does not resolve"):
|
|
caldav_sync.validate_caldav_url("https://calendar.example.com/dav")
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"addrs",
|
|
[
|
|
["93.184.216.34", "127.0.0.1"], # public first, internal second
|
|
["127.0.0.1", "93.184.216.34"], # internal first, public second
|
|
],
|
|
)
|
|
def test_validate_caldav_url_blocks_mixed_dns_in_any_order(monkeypatch, addrs):
|
|
# A host that resolves to BOTH a public and an internal address must be
|
|
# rejected regardless of record order — every resolved address is checked,
|
|
# so one internal answer is enough to block. Defends DNS round-robin and a
|
|
# rebind that slips an internal A-record alongside a public one.
|
|
monkeypatch.delenv("ODYSSEUS_ALLOW_PRIVATE_CALDAV", raising=False)
|
|
monkeypatch.setattr(
|
|
caldav_sync,
|
|
"_resolve_caldav_host_ips",
|
|
lambda host: [ipaddress.ip_address(a) for a in addrs],
|
|
)
|
|
|
|
with pytest.raises(ValueError, match="host is not allowed"):
|
|
caldav_sync.validate_caldav_url("https://calendar.example.com/dav")
|
|
|
|
|
|
def test_sync_caldav_decrypts_stored_password_and_validates_url(monkeypatch):
|
|
monkeypatch.setattr(
|
|
caldav_sync,
|
|
"_resolve_caldav_host_ips",
|
|
lambda host: [ipaddress.ip_address("93.184.216.34")],
|
|
)
|
|
saved = {}
|
|
prefs_mod = types.ModuleType("routes.prefs_routes")
|
|
prefs_mod._load_for_user = lambda owner: {
|
|
"caldav": {
|
|
"url": " https://calendar.example.com/dav/ ",
|
|
"username": owner,
|
|
"password": "enc:stored",
|
|
}
|
|
}
|
|
prefs_mod._save_for_user = lambda owner, prefs: saved.update({"owner": owner, "prefs": prefs})
|
|
monkeypatch.setitem(sys.modules, "routes.prefs_routes", prefs_mod)
|
|
|
|
secret_mod = types.ModuleType("src.secret_storage")
|
|
secret_mod.decrypt = lambda value: "decrypted-password" if value == "enc:stored" else value
|
|
monkeypatch.setitem(sys.modules, "src.secret_storage", secret_mod)
|
|
|
|
captured = {}
|
|
|
|
def fake_sync_blocking(owner, url, username, password, account_id=""):
|
|
captured.update(
|
|
{
|
|
"owner": owner,
|
|
"url": url,
|
|
"username": username,
|
|
"password": password,
|
|
}
|
|
)
|
|
return {"calendars": 1, "events": 0, "deleted": 0, "errors": []}
|
|
|
|
async def inline_to_thread(func, *args, **kwargs):
|
|
return func(*args, **kwargs)
|
|
|
|
monkeypatch.setattr(caldav_sync, "_sync_blocking", fake_sync_blocking)
|
|
monkeypatch.setattr(caldav_sync.asyncio, "to_thread", inline_to_thread)
|
|
|
|
result = asyncio.run(caldav_sync.sync_caldav("alice"))
|
|
|
|
assert result["calendars"] == 1
|
|
assert captured == {
|
|
"owner": "alice",
|
|
"url": "https://calendar.example.com/dav",
|
|
"username": "alice",
|
|
"password": "decrypted-password",
|
|
}
|
|
|
|
|
|
def test_calendar_routes_use_hardened_caldav_client_and_secret_storage():
|
|
text = Path("routes/calendar_routes.py").read_text(encoding="utf-8")
|
|
|
|
assert "validate_caldav_url(body.get(\"url\", \"\"))" in text
|
|
assert "encrypt(body[\"password\"])" in text
|
|
assert "pw = decrypt(pw)" in text
|
|
assert "follow_redirects=False, trust_env=False" in text
|
|
assert "Redirects are not followed for CalDAV safety" in text
|