Constrain upload paths to upload root (#2825)

This commit is contained in:
Vykos
2026-06-05 13:15:23 +02:00
committed by GitHub
parent 2a1febdeef
commit 688194113b
2 changed files with 144 additions and 33 deletions
+94
View File
@@ -1,6 +1,7 @@
import asyncio
import builtins
import json
import os
from types import SimpleNamespace
import pytest
@@ -90,6 +91,35 @@ def _guard_cache_open(monkeypatch, cache_path, blocked_modes):
monkeypatch.setattr(builtins, "open", guarded_open)
def _add_upload_row(upload_dir, row):
db_path = upload_dir / "uploads.json"
index = json.loads(db_path.read_text(encoding="utf-8"))
index[f"{row.get('owner')}:{row['id']}"] = row
db_path.write_text(json.dumps(index), encoding="utf-8")
def _add_upload_symlink(upload_dir, file_id, target_path, owner="alice"):
dated = upload_dir / "2026" / "06" / "02"
link_path = dated / file_id
try:
os.symlink(target_path, link_path)
except (AttributeError, NotImplementedError, OSError) as exc:
pytest.skip(f"symlinks unavailable: {exc}")
_add_upload_row(
upload_dir,
{
"id": file_id,
"path": str(link_path),
"mime": "image/png",
"size": target_path.stat().st_size,
"name": "escape.png",
"original_name": "escape.png",
"owner": owner,
},
)
return link_path
def test_download_file_denies_anonymous_when_auth_is_configured(tmp_path, monkeypatch):
handler, alice_id, _bob_id, _upload_dir = _make_upload_store(tmp_path, monkeypatch)
download_file = _upload_endpoints(handler, monkeypatch)["download_file"]
@@ -120,6 +150,7 @@ def test_download_file_allows_same_owner(tmp_path, monkeypatch):
assert response.path.endswith(alice_id)
assert response.media_type == "image/png"
assert response.headers["X-Content-Type-Options"] == "nosniff"
def test_download_file_allows_admin_to_read_other_owner_upload(tmp_path, monkeypatch):
@@ -137,6 +168,44 @@ def test_download_file_allows_admin_to_read_other_owner_upload(tmp_path, monkeyp
assert response.media_type == "image/png"
def test_download_file_rejects_upload_symlink_escape(tmp_path, monkeypatch):
handler, _alice_id, _bob_id, upload_dir = _make_upload_store(tmp_path, monkeypatch)
download_file = _upload_endpoints(handler, monkeypatch)["download_file"]
escape_id = "c" * 32 + ".png"
outside = tmp_path / "outside-upload-root.png"
outside.write_bytes(b"outside upload root")
_add_upload_symlink(upload_dir, escape_id, outside)
with pytest.raises(HTTPException) as exc:
asyncio.run(
download_file(
_Request(user="alice", auth_manager=_AuthManager()),
escape_id,
)
)
assert exc.value.status_code == 403
def test_download_file_keeps_owner_gate_before_path_resolution(tmp_path, monkeypatch):
handler, _alice_id, _bob_id, upload_dir = _make_upload_store(tmp_path, monkeypatch)
download_file = _upload_endpoints(handler, monkeypatch)["download_file"]
bob_escape_id = "d" * 32 + ".png"
outside = tmp_path / "bob-outside-upload-root.png"
outside.write_bytes(b"bob outside upload root")
_add_upload_symlink(upload_dir, bob_escape_id, outside, owner="bob")
with pytest.raises(HTTPException) as exc:
asyncio.run(
download_file(
_Request(user="alice", auth_manager=_AuthManager()),
bob_escape_id,
)
)
assert exc.value.status_code == 404
def test_get_vision_text_denies_cross_owner_before_cache_read(tmp_path, monkeypatch):
handler, _alice_id, bob_id, upload_dir = _make_upload_store(tmp_path, monkeypatch)
get_vision_text = _upload_endpoints(handler, monkeypatch)["get_vision_text"]
@@ -178,6 +247,31 @@ def test_get_vision_text_denies_cross_owner_before_image_analysis(tmp_path, monk
assert exc.value.status_code == 404
def test_get_vision_text_rejects_upload_symlink_escape_before_analysis(tmp_path, monkeypatch):
handler, _alice_id, _bob_id, upload_dir = _make_upload_store(tmp_path, monkeypatch)
get_vision_text = _upload_endpoints(handler, monkeypatch)["get_vision_text"]
escape_id = "e" * 32 + ".png"
outside = tmp_path / "vision-outside-upload-root.png"
outside.write_bytes(b"outside upload root")
_add_upload_symlink(upload_dir, escape_id, outside)
def fail_analysis(_path):
raise AssertionError("upload root gate should run before image analysis")
monkeypatch.setattr("src.document_processor.analyze_image_with_vl", fail_analysis)
with pytest.raises(HTTPException) as exc:
asyncio.run(
get_vision_text(
_Request(user="alice", auth_manager=_AuthManager()),
escape_id,
force=1,
)
)
assert exc.value.status_code == 403
def test_put_vision_text_denies_cross_owner_before_cache_write(tmp_path, monkeypatch):
handler, _alice_id, bob_id, upload_dir = _make_upload_store(tmp_path, monkeypatch)
put_vision_text = _upload_endpoints(handler, monkeypatch)["put_vision_text"]