mirror of
https://github.com/pewdiepie-archdaemon/odysseus.git
synced 2026-06-17 10:15:27 -04:00
Security: sanitize export and gallery filenames
Co-authored-by: RefuseOdd <refuseodd@users.noreply.github.com>
This commit is contained in:
@@ -3,6 +3,9 @@
|
|||||||
import os
|
import os
|
||||||
import hashlib
|
import hashlib
|
||||||
import logging
|
import logging
|
||||||
|
import re
|
||||||
|
import uuid
|
||||||
|
from pathlib import Path
|
||||||
from typing import Dict, Any, Optional
|
from typing import Dict, Any, Optional
|
||||||
|
|
||||||
from fastapi import APIRouter, HTTPException, Query, Request
|
from fastapi import APIRouter, HTTPException, Query, Request
|
||||||
@@ -17,6 +20,14 @@ from routes.gallery_helpers import (
|
|||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def _sanitize_gallery_filename(filename: str) -> str:
|
||||||
|
"""Return a local filename safe to join under generated_images."""
|
||||||
|
safe_name = re.sub(r"[^A-Za-z0-9._-]", "_", Path(filename or "").name)[:128]
|
||||||
|
if not safe_name or safe_name in {".", ".."}:
|
||||||
|
safe_name = uuid.uuid4().hex[:12]
|
||||||
|
return safe_name
|
||||||
|
|
||||||
def setup_gallery_routes() -> APIRouter:
|
def setup_gallery_routes() -> APIRouter:
|
||||||
router = APIRouter(tags=["gallery"])
|
router = APIRouter(tags=["gallery"])
|
||||||
|
|
||||||
@@ -122,7 +133,7 @@ def setup_gallery_routes() -> APIRouter:
|
|||||||
content = await file.read()
|
content = await file.read()
|
||||||
img_dir = Path("data/generated_images")
|
img_dir = Path("data/generated_images")
|
||||||
img_dir.mkdir(parents=True, exist_ok=True)
|
img_dir.mkdir(parents=True, exist_ok=True)
|
||||||
img_path = img_dir / img.filename
|
img_path = img_dir / _sanitize_gallery_filename(img.filename)
|
||||||
img_path.write_bytes(content)
|
img_path.write_bytes(content)
|
||||||
|
|
||||||
# Refresh dimensions in case the editor resized the canvas.
|
# Refresh dimensions in case the editor resized the canvas.
|
||||||
|
|||||||
@@ -14,6 +14,13 @@ from core.database import Session as DbSession, SessionLocal, Document, GalleryI
|
|||||||
from src.auth_helpers import get_current_user, effective_user
|
from src.auth_helpers import get_current_user, effective_user
|
||||||
|
|
||||||
|
|
||||||
|
def _sanitize_export_filename(name: str) -> str:
|
||||||
|
"""Return a conservative filename safe for Content-Disposition."""
|
||||||
|
name = name or ""
|
||||||
|
name = re.sub(r"[^A-Za-z0-9._-]", "_", name)
|
||||||
|
return name[:128]
|
||||||
|
|
||||||
|
|
||||||
def _verify_session_owner(request: Request, session_id: str):
|
def _verify_session_owner(request: Request, session_id: str):
|
||||||
"""Verify the current user owns the session. Raises 404 if not."""
|
"""Verify the current user owns the session. Raises 404 if not."""
|
||||||
user = effective_user(request)
|
user = effective_user(request)
|
||||||
@@ -558,6 +565,7 @@ def setup_session_routes(session_manager: SessionManager, config: dict, webhook_
|
|||||||
|
|
||||||
safe_name = re.sub(r'[^\w\-_]', '_', session.name)
|
safe_name = re.sub(r'[^\w\-_]', '_', session.name)
|
||||||
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
|
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
|
||||||
|
filename = _sanitize_export_filename(filename)
|
||||||
|
|
||||||
if fmt == "json":
|
if fmt == "json":
|
||||||
import json as _json
|
import json as _json
|
||||||
|
|||||||
@@ -943,6 +943,85 @@ def test_mcp_oauth_page_escapes_reflected_values():
|
|||||||
assert f"{var} = html.escape({var}" in body, var
|
assert f"{var} = html.escape({var}" in body, var
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
# -- export/gallery filename hardening ----------------------------------------
|
||||||
|
|
||||||
|
def _install_route_import_stubs(monkeypatch):
|
||||||
|
core_mod = types.ModuleType("core")
|
||||||
|
core_mod.__path__ = []
|
||||||
|
|
||||||
|
db_mod = types.ModuleType("core.database")
|
||||||
|
db_mod.SessionLocal = lambda: None
|
||||||
|
for name in (
|
||||||
|
"Session",
|
||||||
|
"Document",
|
||||||
|
"GalleryImage",
|
||||||
|
"GalleryAlbum",
|
||||||
|
"ModelEndpoint",
|
||||||
|
):
|
||||||
|
setattr(db_mod, name, type(name, (), {}))
|
||||||
|
|
||||||
|
session_manager_mod = types.ModuleType("core.session_manager")
|
||||||
|
session_manager_mod.SessionManager = type("SessionManager", (), {})
|
||||||
|
|
||||||
|
models_mod = types.ModuleType("core.models")
|
||||||
|
models_mod.ChatMessage = type("ChatMessage", (), {})
|
||||||
|
|
||||||
|
monkeypatch.setitem(sys.modules, "core", core_mod)
|
||||||
|
monkeypatch.setitem(sys.modules, "core.database", db_mod)
|
||||||
|
monkeypatch.setitem(sys.modules, "core.session_manager", session_manager_mod)
|
||||||
|
monkeypatch.setitem(sys.modules, "core.models", models_mod)
|
||||||
|
|
||||||
|
|
||||||
|
def _import_session_routes_for_filename(monkeypatch):
|
||||||
|
_install_route_import_stubs(monkeypatch)
|
||||||
|
monkeypatch.delitem(sys.modules, "routes.session_routes", raising=False)
|
||||||
|
from routes import session_routes
|
||||||
|
return session_routes
|
||||||
|
|
||||||
|
|
||||||
|
def _import_gallery_routes_for_filename(monkeypatch):
|
||||||
|
_install_route_import_stubs(monkeypatch)
|
||||||
|
monkeypatch.delitem(sys.modules, "routes.gallery_helpers", raising=False)
|
||||||
|
monkeypatch.delitem(sys.modules, "routes.gallery_routes", raising=False)
|
||||||
|
from routes import gallery_routes
|
||||||
|
return gallery_routes
|
||||||
|
|
||||||
|
|
||||||
|
def test_export_filename_sanitizer_blocks_header_and_path_chars(monkeypatch):
|
||||||
|
mod = _import_session_routes_for_filename(monkeypatch)
|
||||||
|
|
||||||
|
out = mod._sanitize_export_filename('chat.md\r\nX-Test: yes/..\\evil;quote".txt\x00')
|
||||||
|
|
||||||
|
assert out
|
||||||
|
assert len(out) <= 128
|
||||||
|
for ch in '\r\n/\\:\x00;" ':
|
||||||
|
assert ch not in out
|
||||||
|
|
||||||
|
|
||||||
|
def test_export_filename_sanitizer_preserves_safe_names(monkeypatch):
|
||||||
|
mod = _import_session_routes_for_filename(monkeypatch)
|
||||||
|
|
||||||
|
assert mod._sanitize_export_filename("conversation_20260602.md") == "conversation_20260602.md"
|
||||||
|
assert mod._sanitize_export_filename("") == ""
|
||||||
|
|
||||||
|
|
||||||
|
def test_gallery_replace_filename_sanitizer_uses_basename(monkeypatch):
|
||||||
|
mod = _import_gallery_routes_for_filename(monkeypatch)
|
||||||
|
|
||||||
|
out = mod._sanitize_gallery_filename("../../etc/cron.d/evil image.png")
|
||||||
|
|
||||||
|
assert out == "evil_image.png"
|
||||||
|
assert "/" not in out
|
||||||
|
assert "\\" not in out
|
||||||
|
|
||||||
|
|
||||||
|
def test_gallery_replace_filename_sanitizer_falls_back_when_empty(monkeypatch):
|
||||||
|
mod = _import_gallery_routes_for_filename(monkeypatch)
|
||||||
|
monkeypatch.setattr(mod.uuid, "uuid4", lambda: types.SimpleNamespace(hex="abcdef1234567890"))
|
||||||
|
|
||||||
|
assert mod._sanitize_gallery_filename("../") == "abcdef123456"
|
||||||
|
|
||||||
def test_chat_active_document_lookup_is_owner_scoped():
|
def test_chat_active_document_lookup_is_owner_scoped():
|
||||||
"""The explicit `active_doc_id` path in /api/chat_stream must scope the
|
"""The explicit `active_doc_id` path in /api/chat_stream must scope the
|
||||||
document lookup to the caller. Resolving by id alone let any user inject
|
document lookup to the caller. Resolving by id alone let any user inject
|
||||||
|
|||||||
Reference in New Issue
Block a user