Harden PDF document markers against cross-owner upload access (#445)

Route PDF lookups through UploadHandler.resolve_upload, reject poisoned pdf_source markers on document create/update, and add regression tests.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
Duarte Antunes
2026-06-01 14:38:14 +01:00
committed by GitHub
parent b2e8d692a4
commit 448401a0fc
5 changed files with 183 additions and 106 deletions
+61 -73
View File
@@ -5,16 +5,16 @@
import logging
import os
import re
from typing import Dict, Any, Optional
from typing import Any, Dict, Optional
from fastapi import HTTPException
from fastapi import HTTPException, Request
from pydantic import BaseModel
from core.database import Document, DocumentVersion
from core.database import Session as DbSession
from src.upload_handler import UploadHandler
logger = logging.getLogger(__name__)
_UPLOAD_ID_RE = re.compile(r"^[0-9a-fA-F]{32}\.[A-Za-z0-9]+$")
# ---- Request schemas ----
@@ -138,78 +138,66 @@ def _upload_path_inside(upload_dir: str, path: str) -> bool:
return False
def _upload_owner_allowed(
meta: Optional[dict],
user: Optional[str],
def _resolve_user_upload_path(
upload_handler: Any,
upload_id: str,
owner: Optional[str],
auth_manager=None,
allow_admin: bool = True,
) -> bool:
if not user:
return (
not bool(auth_manager and getattr(auth_manager, "is_configured", False))
and not (meta and meta.get("owner") is not None)
) -> Optional[str]:
"""Resolve an upload id to a filesystem path the caller may read."""
if upload_handler is None:
return None
resolved = upload_handler.resolve_upload(
upload_id,
owner=owner,
auth_manager=auth_manager,
)
if not resolved:
return None
path = resolved.get("path")
upload_dir = getattr(upload_handler, "upload_dir", None)
if path and upload_dir and not _upload_path_inside(upload_dir, path):
logger.warning("Upload path outside upload directory: %s", path)
return None
return path
def _locate_upload(
upload_dir: str,
file_id: str,
owner: Optional[str] = None,
auth_manager=None,
upload_handler: Any = None,
):
"""Find an upload by its filename ID via UploadHandler.resolve_upload."""
if upload_handler is None:
from src.upload_handler import UploadHandler
base_dir = os.path.dirname(os.path.abspath(upload_dir))
upload_handler = UploadHandler(base_dir, upload_dir)
return _resolve_user_upload_path(upload_handler, file_id, owner, auth_manager)
def _assert_pdf_marker_upload_owned(
request: Request,
content: str,
user: Optional[str],
upload_handler: Any,
) -> None:
"""Reject document content whose pdf_source marker points at another user's upload."""
if upload_handler is None:
return
from src.pdf_form_doc import find_source_upload_id
upload_id = find_source_upload_id(content or "")
if not upload_id:
return
auth_manager = getattr(getattr(request.app, "state", None), "auth_manager", None)
if not _resolve_user_upload_path(upload_handler, upload_id, user, auth_manager):
raise HTTPException(
400,
"Document PDF marker references an upload you do not own",
)
if allow_admin and auth_manager and hasattr(auth_manager, "is_admin"):
try:
if auth_manager.is_admin(user):
return True
except Exception:
pass
return bool(meta and meta.get("owner") == user)
def _locate_upload(upload_dir: str, file_id: str, owner: Optional[str] = None, auth_manager=None):
"""Find an upload by its filename ID.
Lookup order:
1. The `uploads.json` index that `UploadHandler.save_upload` maintains,
so owner can be verified before a document reads the source file.
2. Direct hit at `upload_dir/file_id` (very small deployments).
3. Fallback: `os.walk` the date-bucketed tree. Slow on large stores;
only allowed after the index owner check passes, or in single-user /
admin-style contexts where no owner is enforced.
`followlinks=False` keeps a stray symlink loop in `data/uploads/` from
spinning the walker into infinite recursion.
"""
import json as _json
if not _UPLOAD_ID_RE.fullmatch(file_id or ""):
logger.warning("Rejected invalid upload id in document lookup: %r", file_id)
return None
meta = None
try:
idx_path = os.path.join(upload_dir, "uploads.json")
if os.path.exists(idx_path):
with open(idx_path, "r", encoding="utf-8") as f:
idx = _json.load(f)
for item in (idx.values() if isinstance(idx, dict) else []):
if isinstance(item, dict) and item.get("id") == file_id:
meta = item
break
except Exception:
meta = None
if not _upload_owner_allowed(meta, owner, auth_manager):
logger.warning("Upload %s denied for document owner %s", file_id, owner)
return None
if meta:
p = meta.get("path")
if p and os.path.exists(p) and _upload_path_inside(upload_dir, p):
return p
direct = os.path.join(upload_dir, file_id)
if os.path.exists(direct) and _upload_path_inside(upload_dir, direct):
return direct
for root, _dirs, files in os.walk(upload_dir, followlinks=False):
if file_id in files:
p = os.path.join(root, file_id)
if _upload_path_inside(upload_dir, p):
return p
return None
def _derive_title(content: str) -> str: