Scope document session links by owner (#3005)

This commit is contained in:
Vykos
2026-06-07 12:47:20 +02:00
committed by GitHub
parent 7b4e6c4c1b
commit 06d28e23ac
2 changed files with 165 additions and 21 deletions
+22 -21
View File
@@ -7,7 +7,7 @@ from typing import Dict, Any, List, Optional
from fastapi import APIRouter, HTTPException, Query, Request, UploadFile, File, Form
from sqlalchemy import func
from sqlalchemy import func, or_
from core.database import SessionLocal, Document, DocumentVersion
from core.database import Session as DbSession
from src.auth_helpers import get_current_user
@@ -15,6 +15,15 @@ from src.auth_helpers import get_current_user
logger = logging.getLogger(__name__)
def _get_session_or_404(db, session_id: str, user: Optional[str]):
session = db.query(DbSession).filter(DbSession.id == session_id).first()
if not session:
raise HTTPException(404, "Session not found")
if user and session.owner != user:
raise HTTPException(404, "Session not found")
return session
def _aggregate_language_facets(lang_rows):
"""Sum document counts per display language for the library facet.
@@ -69,17 +78,12 @@ def setup_document_routes(session_manager, upload_handler=None) -> APIRouter:
# the doc is owner-stamped, so it lives in the library on its own.
session = None
if req.session_id:
session = db.query(DbSession).filter(DbSession.id == req.session_id).first()
if not session:
raise HTTPException(404, "Session not found")
# Match the lenient ownership model the rest of the app uses
# (see _owner_filter): only block when an AUTHENTICATED user is
# writing into a DIFFERENT user's session. In single-user /
# unconfigured / localhost-bypass mode the middleware leaves
# current_user unset (None), and those sessions are already
# served freely everywhere else.
if user and session.owner and session.owner != user:
raise HTTPException(403, "Cannot create document in another user's session")
# unconfigured / localhost-bypass mode, falsey users preserve
# the existing lenient path.
session = _get_session_or_404(db, req.session_id, user)
doc_id = str(uuid.uuid4())
ver_id = str(uuid.uuid4())
@@ -171,11 +175,7 @@ def setup_document_routes(session_manager, upload_handler=None) -> APIRouter:
if session_id:
db = SessionLocal()
try:
sess = db.query(DbSession).filter(DbSession.id == session_id).first()
if not sess:
raise HTTPException(404, "Session not found")
if user and sess.owner and sess.owner != user:
raise HTTPException(403, "Cannot import into another user's session")
_get_session_or_404(db, session_id, user)
finally:
db.close()
@@ -359,18 +359,17 @@ def setup_document_routes(session_manager, upload_handler=None) -> APIRouter:
try:
if not user:
raise HTTPException(403, "Authentication required")
session = db.query(DbSession).filter(DbSession.id == session_id).first()
# v2 review HIGH-9: raise 403 explicitly when the caller
# can't see this session, instead of returning [] which the
# UI treats identically to "no docs" and silently masks
# auth failures.
if not session:
raise HTTPException(404, "Session not found")
if user and session.owner and session.owner != user:
raise HTTPException(403, "Access denied")
docs = db.query(Document).filter(
_get_session_or_404(db, session_id, user)
q = db.query(Document).filter(
Document.session_id == session_id
).order_by(Document.created_at.desc()).all()
)
if user:
q = q.filter(or_(Document.owner == user, Document.owner.is_(None)))
docs = q.order_by(Document.created_at.desc()).all()
return [_doc_to_dict(d) for d in docs]
finally:
db.close()
@@ -606,6 +605,8 @@ def setup_document_routes(session_manager, upload_handler=None) -> APIRouter:
doc.language = req.language
if req.session_id is not None:
# Empty string = unlink from session
if req.session_id:
_get_session_or_404(db, req.session_id, user)
doc.session_id = req.session_id if req.session_id else None
if not req.session_id:
# Tab closed / doc detached from its session — drop the
+143
View File
@@ -0,0 +1,143 @@
"""Document session owner-scope regressions.
Route handlers are called directly, matching the pattern used by the existing
document route tests. This keeps coverage on the real closures without spinning
up middleware.
"""
import tempfile
import uuid
from types import SimpleNamespace
from unittest.mock import MagicMock
import pytest
from fastapi import HTTPException
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import NullPool
from tests.helpers.import_state import clear_fake_database_modules
clear_fake_database_modules()
import core.database as cdb
import routes.document_routes as droutes
from core.database import Document
from core.database import Session as DbSession
from routes.document_helpers import DocumentPatch
_TMPDB = tempfile.NamedTemporaryFile(suffix=".db", delete=False)
_ENGINE = create_engine(
f"sqlite:///{_TMPDB.name}",
connect_args={"check_same_thread": False},
poolclass=NullPool,
)
cdb.Base.metadata.create_all(_ENGINE)
_TS = sessionmaker(bind=_ENGINE, autoflush=False, autocommit=False)
def _req(user="alice"):
return SimpleNamespace(state=SimpleNamespace(current_user=user))
def _endpoint(method, path):
router = droutes.setup_document_routes(MagicMock(), None)
for route in router.routes:
if getattr(route, "path", None) == path and method in getattr(route, "methods", set()):
return route.endpoint
raise RuntimeError(f"{method} {path} not found")
def _bind_test_db():
previous = droutes.SessionLocal
droutes.SessionLocal = _TS
return previous
def _seed():
alice_session = "alice-" + uuid.uuid4().hex[:8]
bob_session = "bob-" + uuid.uuid4().hex[:8]
alice_doc = str(uuid.uuid4())
bob_doc = str(uuid.uuid4())
legacy_doc = str(uuid.uuid4())
db = _TS()
try:
db.add(DbSession(id=alice_session, owner="alice", name="alice", model="m", endpoint_url="http://x"))
db.add(DbSession(id=bob_session, owner="bob", name="bob", model="m", endpoint_url="http://x"))
db.add(Document(
id=alice_doc,
session_id=alice_session,
title="alice doc",
language="markdown",
current_content="alice body",
version_count=1,
is_active=True,
owner="alice",
))
db.add(Document(
id=bob_doc,
session_id=bob_session,
title="bob doc",
language="markdown",
current_content="bob body",
version_count=1,
is_active=True,
owner="bob",
))
db.add(Document(
id=legacy_doc,
session_id=alice_session,
title="legacy doc",
language="markdown",
current_content="legacy body",
version_count=1,
is_active=True,
owner=None,
))
db.commit()
return alice_session, bob_session, alice_doc, bob_doc, legacy_doc
finally:
db.close()
@pytest.mark.asyncio
async def test_patch_document_rejects_cross_owner_session_link():
previous_session_local = _bind_test_db()
try:
patch_document = _endpoint("PATCH", "/api/document/{doc_id}")
alice_session, bob_session, _alice_doc, bob_doc, _legacy_doc = _seed()
with pytest.raises(HTTPException) as exc:
await patch_document(_req("bob"), bob_doc, DocumentPatch(session_id=alice_session))
assert exc.value.status_code == 404
db = _TS()
try:
assert db.query(Document).filter(Document.id == bob_doc).first().session_id == bob_session
finally:
db.close()
finally:
droutes.SessionLocal = previous_session_local
@pytest.mark.asyncio
async def test_list_documents_filters_foreign_docs_in_visible_session():
previous_session_local = _bind_test_db()
try:
list_documents = _endpoint("GET", "/api/documents/{session_id}")
alice_session, _bob_session, alice_doc, bob_doc, legacy_doc = _seed()
db = _TS()
try:
db.query(Document).filter(Document.id == bob_doc).update({"session_id": alice_session})
db.commit()
finally:
db.close()
rows = await list_documents(_req("alice"), alice_session)
ids = {row["id"] for row in rows}
assert alice_doc in ids
assert legacy_doc in ids
assert bob_doc not in ids
finally:
droutes.SessionLocal = previous_session_local