diff --git a/routes/upload_routes.py b/routes/upload_routes.py index 4f55b503d..f348453ac 100644 --- a/routes/upload_routes.py +++ b/routes/upload_routes.py @@ -13,9 +13,43 @@ from src.upload_handler import count_recent_uploads logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/upload", tags=["upload"]) +UPLOAD_RESPONSE_HEADERS = {"X-Content-Type-Options": "nosniff"} def setup_upload_routes(upload_handler): """Setup upload routes with the provided handler""" + + def _upload_root() -> str: + from src.constants import UPLOAD_DIR + return os.path.realpath(getattr(upload_handler, "upload_dir", UPLOAD_DIR)) + + def _path_inside_upload_dir(path: str) -> bool: + try: + return os.path.commonpath([_upload_root(), os.path.realpath(path)]) == _upload_root() + except Exception: + return False + + def _resolve_upload_path(file_id: str) -> str: + from src.constants import UPLOAD_DIR + upload_root = getattr(upload_handler, "upload_dir", UPLOAD_DIR) + direct = os.path.join(upload_root, file_id) + if os.path.lexists(direct): + if not _path_inside_upload_dir(direct): + raise HTTPException(403, "Access denied") + if os.path.isfile(direct): + return direct + raise HTTPException(404, "File not found") + + for root, _dirs, files in os.walk(upload_root, followlinks=False): + if file_id not in files: + continue + path = os.path.join(root, file_id) + if not _path_inside_upload_dir(path): + raise HTTPException(403, "Access denied") + if os.path.isfile(path): + return path + raise HTTPException(404, "File not found") + + raise HTTPException(404, "File not found") @router.post("") async def api_upload(request: Request, files: List[UploadFile] = File(...)): @@ -91,23 +125,11 @@ def setup_upload_routes(upload_handler): client isn't downloading the full-resolution photo just to show it tiny.""" if not upload_handler.validate_upload_id(file_id): raise HTTPException(400, "Invalid file ID") - # Search upload directories for the file - from src.constants import UPLOAD_DIR import mimetypes as _mt - path = os.path.join(UPLOAD_DIR, file_id) - if not os.path.exists(path): - for root, dirs, files in os.walk(UPLOAD_DIR): - if file_id in files: - path = os.path.join(root, file_id) - break - else: - raise HTTPException(404, "File not found") - if not upload_handler.inside_base_dir(path): - raise HTTPException(403, "Access denied") # Look up original filename and owner from uploads.json original_name = file_id info = None - uploads_db = os.path.join(UPLOAD_DIR, "uploads.json") + uploads_db = os.path.join(_upload_root(), "uploads.json") if os.path.exists(uploads_db): with open(uploads_db, encoding="utf-8") as f: db = json.load(f) @@ -123,13 +145,14 @@ def setup_upload_routes(upload_handler): raise HTTPException(403, "Access denied") if file_owner != current_user and not auth_mgr.is_admin(current_user): raise HTTPException(404, "File not found") - mime = _mt.guess_type(path)[0] or "application/octet-stream" + path = _resolve_upload_path(file_id) + mime = (info or {}).get("mime") or _mt.guess_type(path)[0] or "application/octet-stream" from fastapi.responses import FileResponse # Downscaled thumbnail for image previews — generated once and cached. if thumb and mime.startswith("image/"): try: from PIL import Image, ImageOps - thumb_dir = os.path.join(UPLOAD_DIR, ".thumbs") + thumb_dir = os.path.join(_upload_root(), ".thumbs") os.makedirs(thumb_dir, exist_ok=True) thumb_path = os.path.join(thumb_dir, file_id + ".jpg") if (not os.path.exists(thumb_path) @@ -145,17 +168,21 @@ def setup_upload_routes(upload_handler): if im.mode not in ("RGB", "L"): im = im.convert("RGB") im.save(thumb_path, "JPEG", quality=80) - return FileResponse(thumb_path, media_type="image/jpeg") + return FileResponse(thumb_path, media_type="image/jpeg", headers=UPLOAD_RESPONSE_HEADERS) except Exception as e: logger.warning(f"Thumbnail generation failed for {file_id}: {e}") # Fall through to the full image. - return FileResponse(path, media_type=mime, filename=original_name) + return FileResponse( + path, + media_type=mime, + filename=original_name, + headers=UPLOAD_RESPONSE_HEADERS, + ) def _load_upload_info(file_id: str): """Look up the uploads.json record for a file_id, with owner/auth checks.""" - from src.constants import UPLOAD_DIR info = None - uploads_db = os.path.join(UPLOAD_DIR, "uploads.json") + uploads_db = os.path.join(_upload_root(), "uploads.json") if os.path.exists(uploads_db): with open(uploads_db, encoding="utf-8") as f: db = json.load(f) @@ -163,8 +190,7 @@ def setup_upload_routes(upload_handler): return info def _vision_cache_path(file_id: str) -> str: - from src.constants import UPLOAD_DIR - cache_dir = os.path.join(UPLOAD_DIR, ".vision") + cache_dir = os.path.join(_upload_root(), ".vision") os.makedirs(cache_dir, exist_ok=True) return os.path.join(cache_dir, file_id + ".txt") @@ -175,17 +201,6 @@ def setup_upload_routes(upload_handler): subsequent loads are instant. Pass force=1 to recompute.""" if not upload_handler.validate_upload_id(file_id): raise HTTPException(400, "Invalid file ID") - from src.constants import UPLOAD_DIR - path = os.path.join(UPLOAD_DIR, file_id) - if not os.path.exists(path): - for root, dirs, files in os.walk(UPLOAD_DIR): - if file_id in files: - path = os.path.join(root, file_id) - break - else: - raise HTTPException(404, "File not found") - if not upload_handler.inside_base_dir(path): - raise HTTPException(403, "Access denied") info = _load_upload_info(file_id) auth_mgr = getattr(request.app.state, "auth_manager", None) auth_configured = bool(auth_mgr and auth_mgr.is_configured) @@ -196,8 +211,9 @@ def setup_upload_routes(upload_handler): raise HTTPException(403, "Access denied") if file_owner != current_user and not auth_mgr.is_admin(current_user): raise HTTPException(404, "File not found") + path = _resolve_upload_path(file_id) import mimetypes as _mt - mime = _mt.guess_type(path)[0] or "" + mime = (info or {}).get("mime") or _mt.guess_type(path)[0] or "" if not mime.startswith("image/"): raise HTTPException(400, "Not an image") cache_path = _vision_cache_path(file_id) @@ -238,6 +254,7 @@ def setup_upload_routes(upload_handler): raise HTTPException(403, "Access denied") if file_owner != current_user and not auth_mgr.is_admin(current_user): raise HTTPException(404, "File not found") + _resolve_upload_path(file_id) body = await request.json() text = (body or {}).get("text", "") if not isinstance(text, str): diff --git a/tests/test_upload_routes_owner_scope.py b/tests/test_upload_routes_owner_scope.py index 497c58399..a2647f580 100644 --- a/tests/test_upload_routes_owner_scope.py +++ b/tests/test_upload_routes_owner_scope.py @@ -1,6 +1,7 @@ import asyncio import builtins import json +import os from types import SimpleNamespace import pytest @@ -90,6 +91,35 @@ def _guard_cache_open(monkeypatch, cache_path, blocked_modes): monkeypatch.setattr(builtins, "open", guarded_open) +def _add_upload_row(upload_dir, row): + db_path = upload_dir / "uploads.json" + index = json.loads(db_path.read_text(encoding="utf-8")) + index[f"{row.get('owner')}:{row['id']}"] = row + db_path.write_text(json.dumps(index), encoding="utf-8") + + +def _add_upload_symlink(upload_dir, file_id, target_path, owner="alice"): + dated = upload_dir / "2026" / "06" / "02" + link_path = dated / file_id + try: + os.symlink(target_path, link_path) + except (AttributeError, NotImplementedError, OSError) as exc: + pytest.skip(f"symlinks unavailable: {exc}") + _add_upload_row( + upload_dir, + { + "id": file_id, + "path": str(link_path), + "mime": "image/png", + "size": target_path.stat().st_size, + "name": "escape.png", + "original_name": "escape.png", + "owner": owner, + }, + ) + return link_path + + def test_download_file_denies_anonymous_when_auth_is_configured(tmp_path, monkeypatch): handler, alice_id, _bob_id, _upload_dir = _make_upload_store(tmp_path, monkeypatch) download_file = _upload_endpoints(handler, monkeypatch)["download_file"] @@ -120,6 +150,7 @@ def test_download_file_allows_same_owner(tmp_path, monkeypatch): assert response.path.endswith(alice_id) assert response.media_type == "image/png" + assert response.headers["X-Content-Type-Options"] == "nosniff" def test_download_file_allows_admin_to_read_other_owner_upload(tmp_path, monkeypatch): @@ -137,6 +168,44 @@ def test_download_file_allows_admin_to_read_other_owner_upload(tmp_path, monkeyp assert response.media_type == "image/png" +def test_download_file_rejects_upload_symlink_escape(tmp_path, monkeypatch): + handler, _alice_id, _bob_id, upload_dir = _make_upload_store(tmp_path, monkeypatch) + download_file = _upload_endpoints(handler, monkeypatch)["download_file"] + escape_id = "c" * 32 + ".png" + outside = tmp_path / "outside-upload-root.png" + outside.write_bytes(b"outside upload root") + _add_upload_symlink(upload_dir, escape_id, outside) + + with pytest.raises(HTTPException) as exc: + asyncio.run( + download_file( + _Request(user="alice", auth_manager=_AuthManager()), + escape_id, + ) + ) + + assert exc.value.status_code == 403 + + +def test_download_file_keeps_owner_gate_before_path_resolution(tmp_path, monkeypatch): + handler, _alice_id, _bob_id, upload_dir = _make_upload_store(tmp_path, monkeypatch) + download_file = _upload_endpoints(handler, monkeypatch)["download_file"] + bob_escape_id = "d" * 32 + ".png" + outside = tmp_path / "bob-outside-upload-root.png" + outside.write_bytes(b"bob outside upload root") + _add_upload_symlink(upload_dir, bob_escape_id, outside, owner="bob") + + with pytest.raises(HTTPException) as exc: + asyncio.run( + download_file( + _Request(user="alice", auth_manager=_AuthManager()), + bob_escape_id, + ) + ) + + assert exc.value.status_code == 404 + + def test_get_vision_text_denies_cross_owner_before_cache_read(tmp_path, monkeypatch): handler, _alice_id, bob_id, upload_dir = _make_upload_store(tmp_path, monkeypatch) get_vision_text = _upload_endpoints(handler, monkeypatch)["get_vision_text"] @@ -178,6 +247,31 @@ def test_get_vision_text_denies_cross_owner_before_image_analysis(tmp_path, monk assert exc.value.status_code == 404 +def test_get_vision_text_rejects_upload_symlink_escape_before_analysis(tmp_path, monkeypatch): + handler, _alice_id, _bob_id, upload_dir = _make_upload_store(tmp_path, monkeypatch) + get_vision_text = _upload_endpoints(handler, monkeypatch)["get_vision_text"] + escape_id = "e" * 32 + ".png" + outside = tmp_path / "vision-outside-upload-root.png" + outside.write_bytes(b"outside upload root") + _add_upload_symlink(upload_dir, escape_id, outside) + + def fail_analysis(_path): + raise AssertionError("upload root gate should run before image analysis") + + monkeypatch.setattr("src.document_processor.analyze_image_with_vl", fail_analysis) + + with pytest.raises(HTTPException) as exc: + asyncio.run( + get_vision_text( + _Request(user="alice", auth_manager=_AuthManager()), + escape_id, + force=1, + ) + ) + + assert exc.value.status_code == 403 + + def test_put_vision_text_denies_cross_owner_before_cache_write(tmp_path, monkeypatch): handler, _alice_id, bob_id, upload_dir = _make_upload_store(tmp_path, monkeypatch) put_vision_text = _upload_endpoints(handler, monkeypatch)["put_vision_text"]