Enforce owner checks for upload attachments

This commit is contained in:
Duarte Antunes
2026-06-01 08:47:48 +01:00
committed by GitHub
parent 8874a11baf
commit e77d87fa80
6 changed files with 352 additions and 59 deletions
+66 -17
View File
@@ -3,6 +3,8 @@
"""Document routes — CRUD for living documents with version history.""" """Document routes — CRUD for living documents with version history."""
import logging import logging
import os
import re
from typing import Dict, Any, Optional from typing import Dict, Any, Optional
from fastapi import HTTPException from fastapi import HTTPException
@@ -12,6 +14,7 @@ from core.database import Document, DocumentVersion
from core.database import Session as DbSession from core.database import Session as DbSession
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
_UPLOAD_ID_RE = re.compile(r"^[0-9a-fA-F]{32}\.[A-Za-z0-9]+$")
# ---- Request schemas ---- # ---- Request schemas ----
@@ -126,40 +129,86 @@ def _slug(name: str) -> str:
_PDF_RENDER_SCALE = 2.0 _PDF_RENDER_SCALE = 2.0
def _locate_upload(upload_dir: str, file_id: str): def _upload_path_inside(upload_dir: str, path: str) -> bool:
base = os.path.realpath(upload_dir)
p = os.path.realpath(path)
try:
return os.path.commonpath([base, p]) == base
except Exception:
return False
def _upload_owner_allowed(
meta: Optional[dict],
user: Optional[str],
auth_manager=None,
allow_admin: bool = True,
) -> bool:
if not user:
return (
not bool(auth_manager and getattr(auth_manager, "is_configured", False))
and not (meta and meta.get("owner") is not None)
)
if allow_admin and auth_manager and hasattr(auth_manager, "is_admin"):
try:
if auth_manager.is_admin(user):
return True
except Exception:
pass
return bool(meta and meta.get("owner") == user)
def _locate_upload(upload_dir: str, file_id: str, owner: Optional[str] = None, auth_manager=None):
"""Find an upload by its filename ID. """Find an upload by its filename ID.
Lookup order: Lookup order:
1. Direct hit at `upload_dir/file_id` (very small deployments). 1. The `uploads.json` index that `UploadHandler.save_upload` maintains,
2. The `uploads.json` index that `UploadHandler.save_upload` maintains — so owner can be verified before a document reads the source file.
maps file_hash → metadata containing the full path. O(1) once loaded. 2. Direct hit at `upload_dir/file_id` (very small deployments).
3. Fallback: `os.walk` the date-bucketed tree. Slow on large stores; 3. Fallback: `os.walk` the date-bucketed tree. Slow on large stores;
only triggers for legacy uploads recorded before the index existed. only allowed after the index owner check passes, or in single-user /
admin-style contexts where no owner is enforced.
`followlinks=False` keeps a stray symlink loop in `data/uploads/` from `followlinks=False` keeps a stray symlink loop in `data/uploads/` from
spinning the walker into infinite recursion. spinning the walker into infinite recursion.
""" """
import os
import json as _json import json as _json
direct = os.path.join(upload_dir, file_id)
if os.path.exists(direct): if not _UPLOAD_ID_RE.fullmatch(file_id or ""):
return direct logger.warning("Rejected invalid upload id in document lookup: %r", file_id)
# O(1) via uploads.json return None
meta = None
try: try:
idx_path = os.path.join(upload_dir, "uploads.json") idx_path = os.path.join(upload_dir, "uploads.json")
if os.path.exists(idx_path): if os.path.exists(idx_path):
with open(idx_path, "r", encoding="utf-8") as f: with open(idx_path, "r", encoding="utf-8") as f:
idx = _json.load(f) idx = _json.load(f)
for meta in (idx.values() if isinstance(idx, dict) else []): for item in (idx.values() if isinstance(idx, dict) else []):
if meta.get("id") == file_id: if isinstance(item, dict) and item.get("id") == file_id:
p = meta.get("path") meta = item
if p and os.path.exists(p): break
return p
except Exception: except Exception:
pass meta = None
if not _upload_owner_allowed(meta, owner, auth_manager):
logger.warning("Upload %s denied for document owner %s", file_id, owner)
return None
if meta:
p = meta.get("path")
if p and os.path.exists(p) and _upload_path_inside(upload_dir, p):
return p
direct = os.path.join(upload_dir, file_id)
if os.path.exists(direct) and _upload_path_inside(upload_dir, direct):
return direct
for root, _dirs, files in os.walk(upload_dir, followlinks=False): for root, _dirs, files in os.walk(upload_dir, followlinks=False):
if file_id in files: if file_id in files:
return os.path.join(root, file_id) p = os.path.join(root, file_id)
if _upload_path_inside(upload_dir, p):
return p
return None return None
+15 -9
View File
@@ -24,6 +24,12 @@ from routes.document_helpers import (
_PDF_RENDER_SCALE, _PDF_RENDER_SCALE,
) )
def _locate_current_user_upload(request: Request, upload_dir: str, upload_id: str, user: Optional[str]):
auth_manager = getattr(getattr(request.app, "state", None), "auth_manager", None)
return _locate_upload(upload_dir, upload_id, owner=user, auth_manager=auth_manager)
def setup_document_routes(session_manager, upload_handler=None) -> APIRouter: def setup_document_routes(session_manager, upload_handler=None) -> APIRouter:
router = APIRouter(tags=["documents"]) router = APIRouter(tags=["documents"])
@@ -160,7 +166,7 @@ def setup_document_routes(session_manager, upload_handler=None) -> APIRouter:
raise HTTPException(500, f"Upload failed: {e}") raise HTTPException(500, f"Upload failed: {e}")
upload_id = meta["id"] upload_id = meta["id"]
pdf_path = _locate_upload(UPLOAD_DIR, upload_id) pdf_path = _locate_current_user_upload(request, UPLOAD_DIR, upload_id, user)
if not pdf_path: if not pdf_path:
raise HTTPException(500, "Saved PDF could not be located") raise HTTPException(500, "Saved PDF could not be located")
@@ -401,7 +407,7 @@ def setup_document_routes(session_manager, upload_handler=None) -> APIRouter:
raise HTTPException(400, "Document is not a PDF — no pdf_source marker found") raise HTTPException(400, "Document is not a PDF — no pdf_source marker found")
upload_id = m.group(1) upload_id = m.group(1)
pdf_path = _locate_upload(UPLOAD_DIR, upload_id) pdf_path = _locate_current_user_upload(request, UPLOAD_DIR, upload_id, user)
if not pdf_path: if not pdf_path:
raise HTTPException(404, "Source PDF could not be located") raise HTTPException(404, "Source PDF could not be located")
@@ -914,7 +920,7 @@ def setup_document_routes(session_manager, upload_handler=None) -> APIRouter:
if not upload_id: if not upload_id:
raise HTTPException(400, "Document is not linked to a source PDF") raise HTTPException(400, "Document is not linked to a source PDF")
pdf_path = _locate_upload(UPLOAD_DIR, upload_id) pdf_path = _locate_current_user_upload(request, UPLOAD_DIR, upload_id, user)
if not pdf_path: if not pdf_path:
raise HTTPException(404, f"Source PDF {upload_id} not found in uploads") raise HTTPException(404, f"Source PDF {upload_id} not found in uploads")
@@ -978,7 +984,7 @@ def setup_document_routes(session_manager, upload_handler=None) -> APIRouter:
upload_id = find_source_upload_id(doc.current_content or "") upload_id = find_source_upload_id(doc.current_content or "")
if not upload_id: if not upload_id:
raise HTTPException(400, "Document is not linked to a source PDF") raise HTTPException(400, "Document is not linked to a source PDF")
pdf_path = _locate_upload(UPLOAD_DIR, upload_id) pdf_path = _locate_current_user_upload(request, UPLOAD_DIR, upload_id, user)
if not pdf_path: if not pdf_path:
raise HTTPException(404, f"Source PDF {upload_id} not found") raise HTTPException(404, f"Source PDF {upload_id} not found")
@@ -1046,7 +1052,7 @@ def setup_document_routes(session_manager, upload_handler=None) -> APIRouter:
upload_id = find_source_upload_id(doc.current_content or "") upload_id = find_source_upload_id(doc.current_content or "")
if not upload_id: if not upload_id:
raise HTTPException(400, "Document is not linked to a source PDF") raise HTTPException(400, "Document is not linked to a source PDF")
pdf_path = _locate_upload(UPLOAD_DIR, upload_id) pdf_path = _locate_current_user_upload(request, UPLOAD_DIR, upload_id, user)
if not pdf_path: if not pdf_path:
raise HTTPException(404, "Source PDF not found") raise HTTPException(404, "Source PDF not found")
finally: finally:
@@ -1101,7 +1107,7 @@ def setup_document_routes(session_manager, upload_handler=None) -> APIRouter:
upload_id = find_source_upload_id(doc.current_content or "") upload_id = find_source_upload_id(doc.current_content or "")
if not upload_id: if not upload_id:
raise HTTPException(400, "Document is not linked to a source PDF") raise HTTPException(400, "Document is not linked to a source PDF")
pdf_path = _locate_upload(UPLOAD_DIR, upload_id) pdf_path = _locate_current_user_upload(request, UPLOAD_DIR, upload_id, user)
if not pdf_path: if not pdf_path:
raise HTTPException(404, "Source PDF not found") raise HTTPException(404, "Source PDF not found")
finally: finally:
@@ -1250,7 +1256,7 @@ def setup_document_routes(session_manager, upload_handler=None) -> APIRouter:
upload_id = find_source_upload_id(doc.current_content or "") upload_id = find_source_upload_id(doc.current_content or "")
if not upload_id: if not upload_id:
raise HTTPException(400, "Document is not linked to a source PDF") raise HTTPException(400, "Document is not linked to a source PDF")
pdf_path = _locate_upload(UPLOAD_DIR, upload_id) pdf_path = _locate_current_user_upload(request, UPLOAD_DIR, upload_id, user)
if not pdf_path: if not pdf_path:
raise HTTPException(404, f"Source PDF {upload_id} not found") raise HTTPException(404, f"Source PDF {upload_id} not found")
@@ -1345,7 +1351,7 @@ def setup_document_routes(session_manager, upload_handler=None) -> APIRouter:
if not upload_id: if not upload_id:
raise HTTPException(400, "Document is not linked to a source PDF") raise HTTPException(400, "Document is not linked to a source PDF")
pdf_path = _locate_upload(UPLOAD_DIR, upload_id) pdf_path = _locate_current_user_upload(request, UPLOAD_DIR, upload_id, user)
if not pdf_path: if not pdf_path:
raise HTTPException(404, f"Source PDF {upload_id} not found in uploads") raise HTTPException(404, f"Source PDF {upload_id} not found in uploads")
@@ -1489,7 +1495,7 @@ def setup_document_routes(session_manager, upload_handler=None) -> APIRouter:
upload_id = find_source_upload_id(doc.current_content or "") upload_id = find_source_upload_id(doc.current_content or "")
if not upload_id: if not upload_id:
raise HTTPException(400, "Document is not linked to a source PDF") raise HTTPException(400, "Document is not linked to a source PDF")
pdf_path = _locate_upload(UPLOAD_DIR, upload_id) pdf_path = _locate_current_user_upload(request, UPLOAD_DIR, upload_id, user)
if not pdf_path: if not pdf_path:
raise HTTPException(404, f"Source PDF {upload_id} not found") raise HTTPException(404, f"Source PDF {upload_id} not found")
+10 -10
View File
@@ -1,7 +1,6 @@
# src/chat_handler.py # src/chat_handler.py
"""Handler for chat endpoint operations.""" """Handler for chat endpoint operations."""
import os import os
import json
import asyncio import asyncio
import logging import logging
from typing import Dict, List, Optional, Any from typing import Dict, List, Optional, Any
@@ -149,23 +148,22 @@ class ChatHandler:
vision_enabled = get_setting("vision_enabled", True) vision_enabled = get_setting("vision_enabled", True)
main_is_vision = is_vision_model(sess.model or "") main_is_vision = is_vision_model(sess.model or "")
# Read uploads DB once and index by id (was read twice + linear-scanned per attachment) # Resolve uploads once with the session owner. Attachment IDs are
# bearer-like references; never trust them without an owner check.
files_by_id: Dict[str, Dict] = {} files_by_id: Dict[str, Dict] = {}
owner = getattr(sess, "owner", None)
if att_ids: if att_ids:
uploads_db_path = os.path.join(UPLOAD_DIR, "uploads.json") for att_id in att_ids:
try: fi = self.upload_handler.resolve_upload(att_id, owner=owner)
with open(uploads_db_path, "r", encoding="utf-8") as f: if fi:
_all_files = json.load(f) files_by_id[att_id] = fi
files_by_id = {fi["id"]: fi for fi in _all_files.values() if "id" in fi}
except (FileNotFoundError, json.JSONDecodeError):
pass
for att_id in att_ids: for att_id in att_ids:
fi = files_by_id.get(att_id) fi = files_by_id.get(att_id)
if fi: if fi:
attachment_meta.append({ attachment_meta.append({
"id": fi["id"], "id": fi["id"],
"name": fi["name"], "name": fi.get("name") or fi.get("original_name") or fi["id"],
"mime": fi.get("mime", ""), "mime": fi.get("mime", ""),
"size": fi.get("size", 0), "size": fi.get("size", 0),
"width": fi.get("width"), "width": fi.get("width"),
@@ -242,6 +240,8 @@ class ChatHandler:
enhanced_message, att_ids, UPLOAD_DIR, self.upload_handler, enhanced_message, att_ids, UPLOAD_DIR, self.upload_handler,
session_id=getattr(sess, "id", None), session_id=getattr(sess, "id", None),
auto_opened_docs=auto_opened_docs, auto_opened_docs=auto_opened_docs,
owner=owner,
resolved_uploads=files_by_id,
) )
# Strip image_url entries for text-only models (VL description is already in the text) # Strip image_url entries for text-only models (VL description is already in the text)
+21 -22
View File
@@ -257,6 +257,8 @@ def build_user_content(
upload_handler, upload_handler,
session_id: str | None = None, session_id: str | None = None,
auto_opened_docs: list[Dict[str, Any]] | None = None, auto_opened_docs: list[Dict[str, Any]] | None = None,
owner: str | None = None,
resolved_uploads: dict[str, Dict[str, Any]] | None = None,
) -> str | List[Dict[str, Any]]: ) -> str | List[Dict[str, Any]]:
"""Build user content with attachments (text, images, audio, documents). """Build user content with attachments (text, images, audio, documents).
@@ -268,33 +270,30 @@ def build_user_content(
""" """
content = [{"type": "text", "text": text}] content = [{"type": "text", "text": text}]
for fid in attachment_ids: for fid in attachment_ids or []:
if not upload_handler.validate_upload_id(fid): upload_info = (resolved_uploads or {}).get(fid)
logger.warning(f"Invalid attachment ID format: {fid}") if upload_info is None and hasattr(upload_handler, "resolve_upload"):
upload_info = upload_handler.resolve_upload(fid, owner=owner)
if upload_info is None:
logger.warning(f"Attachment {fid} not found or not authorized")
continue continue
path = os.path.join(upload_dir, fid) path = upload_info.get("path")
if not (upload_handler.inside_base_dir(path) and os.path.exists(path)): if not path or not os.path.exists(path):
found = False logger.warning(f"Attachment {fid} path is missing")
for root, dirs, files in os.walk(upload_dir): continue
if fid in files and not fid.endswith(".json"): if hasattr(upload_handler, "_inside_upload_dir") and not upload_handler._inside_upload_dir(path):
path = os.path.join(root, fid) logger.warning(f"Attachment {fid} path is outside upload directory: {path}")
if upload_handler.inside_base_dir(path): continue
found = True if not hasattr(upload_handler, "_inside_upload_dir") and not upload_handler.inside_base_dir(path):
logger.info(f"Found attachment {fid} at {path}")
break
if not found:
logger.warning(f"Attachment {fid} not found in upload directories")
continue
if not upload_handler.inside_base_dir(path):
logger.warning(f"Attachment {fid} path is outside base directory: {path}") logger.warning(f"Attachment {fid} path is outside base directory: {path}")
continue continue
_, ext = os.path.splitext(path.lower()) _, ext = os.path.splitext(path.lower())
mime = mimetypes.guess_type(path)[0] or "application/octet-stream" mime = upload_info.get("mime") or mimetypes.guess_type(path)[0] or "application/octet-stream"
display_name = upload_info.get("name") or upload_info.get("original_name") or path
if upload_handler.is_image_file(path, mime): if upload_handler.is_image_file(display_name, mime):
try: try:
with open(path, "rb") as image_file: with open(path, "rb") as image_file:
encoded_string = base64.b64encode(image_file.read()).decode("utf-8") encoded_string = base64.b64encode(image_file.read()).decode("utf-8")
@@ -310,7 +309,7 @@ def build_user_content(
else: else:
content.insert(0, {"type": "text", "text": "[Image attached but could not be processed]"}) content.insert(0, {"type": "text", "text": "[Image attached but could not be processed]"})
elif upload_handler.is_audio_file(path, mime): elif upload_handler.is_audio_file(display_name, mime):
try: try:
with open(path, "rb") as audio_file: with open(path, "rb") as audio_file:
encoded_string = base64.b64encode(audio_file.read()).decode("utf-8") encoded_string = base64.b64encode(audio_file.read()).decode("utf-8")
@@ -326,7 +325,7 @@ def build_user_content(
else: else:
content.insert(0, {"type": "text", "text": "[Audio attached but could not be processed]"}) content.insert(0, {"type": "text", "text": "[Audio attached but could not be processed]"})
elif upload_handler.is_document_file(path, mime): elif upload_handler.is_document_file(display_name, mime):
if mime == "application/pdf": if mime == "application/pdf":
extracted_text = None extracted_text = None
if session_id: if session_id:
+101 -1
View File
@@ -8,7 +8,7 @@ import hashlib
import mimetypes import mimetypes
import threading import threading
from datetime import datetime, timedelta from datetime import datetime, timedelta
from typing import Dict, Any from typing import Dict, Any, Optional
from fastapi import HTTPException, UploadFile from fastapi import HTTPException, UploadFile
def secure_filename(filename: str) -> str: def secure_filename(filename: str) -> str:
"""Sanitize a filename (replaces werkzeug.utils.secure_filename).""" """Sanitize a filename (replaces werkzeug.utils.secure_filename)."""
@@ -226,6 +226,106 @@ class UploadHandler:
pattern = r'^[0-9a-fA-F]{32}\.[A-Za-z0-9]+$' pattern = r'^[0-9a-fA-F]{32}\.[A-Za-z0-9]+$'
return re.fullmatch(pattern, upload_id) is not None return re.fullmatch(pattern, upload_id) is not None
def _inside_upload_dir(self, path: str) -> bool:
"""Check if path is inside the upload directory."""
base = os.path.realpath(self.upload_dir)
p = os.path.realpath(path)
try:
return os.path.commonpath([base, p]) == base
except Exception:
return False
def _load_upload_index(self) -> Dict[str, Any]:
uploads_db_path = os.path.join(self.upload_dir, "uploads.json")
if not os.path.exists(uploads_db_path):
return {}
try:
with open(uploads_db_path, "r") as f:
data = json.load(f)
return data if isinstance(data, dict) else {}
except Exception as e:
logger.warning(f"Failed to read uploads database: {e}")
return {}
def get_upload_info(self, upload_id: str) -> Optional[Dict[str, Any]]:
"""Return the uploads.json metadata row for an upload ID, if present."""
if not self.validate_upload_id(upload_id):
return None
for info in self._load_upload_index().values():
if isinstance(info, dict) and info.get("id") == upload_id:
return dict(info)
return None
def _find_upload_path(self, upload_id: str) -> Optional[str]:
"""Find an upload file by ID while staying inside upload_dir."""
if not self.validate_upload_id(upload_id):
return None
direct = os.path.join(self.upload_dir, upload_id)
if os.path.exists(direct) and self._inside_upload_dir(direct):
return direct
for root, _dirs, files in os.walk(self.upload_dir, followlinks=False):
if upload_id in files:
path = os.path.join(root, upload_id)
if self._inside_upload_dir(path):
return path
return None
def resolve_upload(
self,
upload_id: str,
owner: Optional[str] = None,
auth_manager: Any = None,
allow_admin: bool = True,
) -> Optional[Dict[str, Any]]:
"""Resolve an upload ID to metadata only if the caller may read it.
This is the owner-aware lookup used by internal processors. Public
download routes already perform owner checks; chat/document paths must
do the same before reading file bytes server-side.
"""
if not self.validate_upload_id(upload_id):
logger.warning(f"Invalid upload ID format: {upload_id}")
return None
auth_configured = bool(auth_manager and getattr(auth_manager, "is_configured", False))
if auth_configured and not owner:
return None
info = self.get_upload_info(upload_id) or {}
is_admin = False
if allow_admin and owner and auth_manager and hasattr(auth_manager, "is_admin"):
try:
is_admin = bool(auth_manager.is_admin(owner))
except Exception:
is_admin = False
if owner and not is_admin:
if info.get("owner") != owner:
logger.warning("Upload %s denied for owner %s", upload_id, owner)
return None
if not owner and info.get("owner") is not None:
logger.warning("Upload %s denied without an authenticated owner", upload_id)
return None
path = info.get("path")
if not path or not os.path.exists(path) or not self._inside_upload_dir(path):
path = self._find_upload_path(upload_id)
if not path:
return None
if not self._inside_upload_dir(path):
logger.warning(f"Upload path outside upload directory: {path}")
return None
resolved = dict(info)
resolved.setdefault("id", upload_id)
resolved["path"] = path
resolved.setdefault("name", os.path.basename(path))
resolved.setdefault("original_name", resolved["name"])
resolved.setdefault("mime", mimetypes.guess_type(path)[0] or "application/octet-stream")
return resolved
def cleanup_rate_limits(self): def cleanup_rate_limits(self):
"""Remove stale entries from upload_rate_log.""" """Remove stale entries from upload_rate_log."""
now = time.time() now = time.time()
+139
View File
@@ -168,6 +168,145 @@ def test_path_name_strips_traversal(token, expected):
assert Path(token).name == expected assert Path(token).name == expected
# -- upload owner gates -------------------------------------------------------
def _make_upload_store(tmp_path):
upload_dir = tmp_path / "uploads"
dated = upload_dir / "2026" / "06" / "01"
dated.mkdir(parents=True)
alice_id = "a" * 32 + ".txt"
bob_id = "b" * 32 + ".txt"
alice_path = dated / alice_id
bob_path = dated / bob_id
alice_path.write_text("alice private note", encoding="utf-8")
bob_path.write_text("bob private note", encoding="utf-8")
index = {
"alice:h1": {
"id": alice_id,
"path": str(alice_path),
"mime": "text/plain",
"size": alice_path.stat().st_size,
"name": "alice.txt",
"original_name": "alice.txt",
"owner": "alice",
},
"bob:h2": {
"id": bob_id,
"path": str(bob_path),
"mime": "text/plain",
"size": bob_path.stat().st_size,
"name": "bob.txt",
"original_name": "bob.txt",
"owner": "bob",
},
}
(upload_dir / "uploads.json").write_text(json.dumps(index), encoding="utf-8")
return upload_dir, alice_id, bob_id
def _stub_core_database_for_route_imports(monkeypatch):
from unittest.mock import MagicMock
core_pkg = types.ModuleType("core")
core_pkg.__path__ = []
models = types.ModuleType("core.models")
models.ChatMessage = MagicMock()
db = types.ModuleType("core.database")
for name in (
"SessionLocal",
"Session",
"ChatMessage",
"Document",
"DocumentVersion",
"GalleryImage",
"ModelEndpoint",
):
setattr(db, name, MagicMock())
monkeypatch.setitem(sys.modules, "core", core_pkg)
monkeypatch.setitem(sys.modules, "core.models", models)
monkeypatch.setitem(sys.modules, "core.database", db)
def test_upload_resolver_rejects_cross_owner_upload_ids(tmp_path):
from src.upload_handler import UploadHandler
upload_dir, alice_id, bob_id = _make_upload_store(tmp_path)
handler = UploadHandler(str(tmp_path), str(upload_dir))
assert handler.resolve_upload(alice_id, owner="alice")["id"] == alice_id
assert handler.resolve_upload(bob_id, owner="alice") is None
def test_build_user_content_skips_cross_owner_attachments(tmp_path):
from src.document_processor import build_user_content
from src.upload_handler import UploadHandler
upload_dir, _alice_id, bob_id = _make_upload_store(tmp_path)
handler = UploadHandler(str(tmp_path), str(upload_dir))
content = build_user_content(
"hello",
[bob_id],
str(upload_dir),
handler,
owner="alice",
)
assert content == "hello"
assert "bob private note" not in content
def test_chat_preprocess_does_not_surface_cross_owner_attachment(tmp_path, monkeypatch):
import asyncio
from types import SimpleNamespace
for mod_name in ("src.chat_handler", "routes.chat_helpers"):
sys.modules.pop(mod_name, None)
_stub_core_database_for_route_imports(monkeypatch)
from src.chat_handler import ChatHandler
from src.upload_handler import UploadHandler
from src import settings
upload_dir, _alice_id, bob_id = _make_upload_store(tmp_path)
handler = UploadHandler(str(tmp_path), str(upload_dir))
monkeypatch.setattr("src.chat_handler.UPLOAD_DIR", str(upload_dir))
monkeypatch.setattr(
settings,
"get_setting",
lambda key, default=None: False if key == "vision_enabled" else default,
)
chat_handler = ChatHandler(None, None, None, None, None, handler)
sess = SimpleNamespace(id="s1", owner="alice", model="text-model")
_enhanced, user_content, _text_ctx, _yt, attachment_meta = asyncio.run(
chat_handler.preprocess_message(
"hello",
[bob_id],
sess,
)
)
assert attachment_meta == []
assert user_content == "hello"
for mod_name in ("src.chat_handler", "routes.chat_helpers"):
sys.modules.pop(mod_name, None)
def test_document_upload_lookup_rejects_cross_owner_marker(tmp_path, monkeypatch):
sys.modules.pop("routes.document_helpers", None)
_stub_core_database_for_route_imports(monkeypatch)
from routes.document_helpers import _locate_upload
upload_dir, _alice_id, bob_id = _make_upload_store(tmp_path)
assert _locate_upload(str(upload_dir), bob_id, owner="alice") is None
assert _locate_upload(str(upload_dir), bob_id, owner="bob").endswith(bob_id)
sys.modules.pop("routes.document_helpers", None)
# ── require_user dependency rejects anon callers ──────────────── # ── require_user dependency rejects anon callers ────────────────
def test_require_user_rejects_unauthenticated(monkeypatch): def test_require_user_rejects_unauthenticated(monkeypatch):