mirror of
https://github.com/pewdiepie-archdaemon/odysseus.git
synced 2026-06-16 09:45:24 -04:00
Constrain signature uploads to PNG data (#2844)
This commit is contained in:
+44
-16
@@ -21,10 +21,44 @@ from src.auth_helpers import get_current_user
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
_DATA_URL_RE = re.compile(
|
||||
r'^data:image/(?P<fmt>png|jpeg|jpg);base64,(?P<data>.+)$',
|
||||
re.IGNORECASE | re.DOTALL,
|
||||
)
|
||||
_DATA_URL_RE = re.compile(r"^data:image/png;base64,(?P<data>.+)$", re.IGNORECASE | re.DOTALL)
|
||||
_ANY_IMAGE_DATA_URL_RE = re.compile(r"^data:image/[^;]+;base64,", re.IGNORECASE)
|
||||
_PNG_MAGIC = b"\x89PNG\r\n\x1a\n"
|
||||
_MAX_SIGNATURE_BYTES = 2 * 1024 * 1024
|
||||
_MAX_SIGNATURE_B64 = ((_MAX_SIGNATURE_BYTES + 2) // 3) * 4
|
||||
_MAX_SIGNATURE_DIMENSION = 4096
|
||||
|
||||
|
||||
def _normalize_signature_png(raw: str) -> str:
|
||||
raw = (raw or "").strip()
|
||||
m = _DATA_URL_RE.match(raw)
|
||||
if m:
|
||||
b64 = m.group("data")
|
||||
elif _ANY_IMAGE_DATA_URL_RE.match(raw):
|
||||
raise HTTPException(400, "Signature data must be a PNG image")
|
||||
else:
|
||||
b64 = raw
|
||||
if len(b64) > _MAX_SIGNATURE_B64:
|
||||
raise HTTPException(400, "Signature PNG is too large")
|
||||
try:
|
||||
payload = base64.b64decode(b64, validate=True)
|
||||
except Exception:
|
||||
raise HTTPException(400, "Signature data must be base64-encoded PNG bytes")
|
||||
if not payload:
|
||||
raise HTTPException(400, "Signature PNG is empty")
|
||||
if len(payload) > _MAX_SIGNATURE_BYTES:
|
||||
raise HTTPException(400, "Signature PNG is too large")
|
||||
if not payload.startswith(_PNG_MAGIC):
|
||||
raise HTTPException(400, "Signature data must be a PNG image")
|
||||
return base64.b64encode(payload).decode("ascii")
|
||||
|
||||
|
||||
def _signature_dimension(value: Optional[int]) -> Optional[int]:
|
||||
if value is None:
|
||||
return None
|
||||
if not isinstance(value, int) or value < 1 or value > _MAX_SIGNATURE_DIMENSION:
|
||||
raise HTTPException(400, "Signature dimensions are invalid")
|
||||
return value
|
||||
|
||||
|
||||
class SignatureCreate(BaseModel):
|
||||
@@ -67,24 +101,18 @@ def setup_signature_routes() -> APIRouter:
|
||||
@router.post("/api/signatures")
|
||||
async def create_signature(request: Request, req: SignatureCreate) -> Dict[str, Any]:
|
||||
user = get_current_user(request)
|
||||
raw = (req.data or "").strip()
|
||||
m = _DATA_URL_RE.match(raw)
|
||||
b64 = m.group("data") if m else raw
|
||||
try:
|
||||
payload = base64.b64decode(b64, validate=True)
|
||||
if not payload:
|
||||
raise ValueError("empty payload")
|
||||
except Exception:
|
||||
raise HTTPException(400, "Signature data must be base64-encoded PNG bytes")
|
||||
b64 = _normalize_signature_png(req.data)
|
||||
width = _signature_dimension(req.width)
|
||||
height = _signature_dimension(req.height)
|
||||
|
||||
sig = Signature(
|
||||
id=str(uuid.uuid4()),
|
||||
owner=user,
|
||||
name=(req.name or "Signature").strip() or "Signature",
|
||||
data_png=b64,
|
||||
width=req.width,
|
||||
height=req.height,
|
||||
svg=req.svg,
|
||||
width=width,
|
||||
height=height,
|
||||
svg=None,
|
||||
)
|
||||
db = SessionLocal()
|
||||
try:
|
||||
|
||||
@@ -25,7 +25,7 @@ function _esc(s) {
|
||||
|
||||
function _safeSignatureDataUrl(raw) {
|
||||
const value = String(raw || '').trim();
|
||||
return /^data:image\/(?:png|jpe?g);base64,[a-z0-9+/=\s]+$/i.test(value) ? value : '';
|
||||
return /^data:image\/png;base64,[a-z0-9+/=\s]+$/i.test(value) ? value : '';
|
||||
}
|
||||
|
||||
// Last signature the user picked or created in this session. Lets the export
|
||||
|
||||
@@ -0,0 +1,104 @@
|
||||
import asyncio
|
||||
import base64
|
||||
from types import SimpleNamespace
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
import pytest
|
||||
from fastapi import HTTPException
|
||||
|
||||
from routes import signature_routes
|
||||
|
||||
|
||||
_PNG_BYTES = b"\x89PNG\r\n\x1a\nsignature-bytes"
|
||||
_PNG_B64 = base64.b64encode(_PNG_BYTES).decode("ascii")
|
||||
|
||||
|
||||
class _SignatureRecord:
|
||||
def __init__(self, **kwargs):
|
||||
self.__dict__.update(kwargs)
|
||||
self.created_at = None
|
||||
|
||||
|
||||
class _FakeDb:
|
||||
def __init__(self):
|
||||
self.added = None
|
||||
self.add = MagicMock(side_effect=self._add)
|
||||
self.commit = MagicMock()
|
||||
self.refresh = MagicMock()
|
||||
self.rollback = MagicMock()
|
||||
self.close = MagicMock()
|
||||
|
||||
def _add(self, sig):
|
||||
self.added = sig
|
||||
|
||||
|
||||
def _request(user="alice"):
|
||||
return SimpleNamespace(state=SimpleNamespace(current_user=user))
|
||||
|
||||
|
||||
def _route_endpoint(path, method):
|
||||
router = signature_routes.setup_signature_routes()
|
||||
for route in router.routes:
|
||||
if route.path == path and method in route.methods:
|
||||
return route.endpoint
|
||||
raise AssertionError(f"route not found: {method} {path}")
|
||||
|
||||
|
||||
def test_signature_png_normalization_accepts_data_url_and_raw_base64():
|
||||
data_url = f"data:image/png;base64,{_PNG_B64}"
|
||||
|
||||
assert signature_routes._normalize_signature_png(data_url) == _PNG_B64
|
||||
assert signature_routes._normalize_signature_png(_PNG_B64) == _PNG_B64
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"raw",
|
||||
[
|
||||
"",
|
||||
"not base64!!!",
|
||||
base64.b64encode(b"not a png").decode("ascii"),
|
||||
"data:image/jpeg;base64," + base64.b64encode(b"\xff\xd8jpeg").decode("ascii"),
|
||||
"A" * (signature_routes._MAX_SIGNATURE_B64 + 4),
|
||||
],
|
||||
)
|
||||
def test_signature_png_normalization_rejects_invalid_inputs(raw):
|
||||
with pytest.raises(HTTPException) as exc:
|
||||
signature_routes._normalize_signature_png(raw)
|
||||
|
||||
assert exc.value.status_code == 400
|
||||
|
||||
|
||||
@pytest.mark.parametrize("value", [0, -1, signature_routes._MAX_SIGNATURE_DIMENSION + 1, "20"])
|
||||
def test_signature_dimensions_are_bounded(value):
|
||||
with pytest.raises(HTTPException) as exc:
|
||||
signature_routes._signature_dimension(value)
|
||||
|
||||
assert exc.value.status_code == 400
|
||||
|
||||
|
||||
def test_create_signature_stores_normalized_png_and_drops_svg(monkeypatch):
|
||||
db = _FakeDb()
|
||||
monkeypatch.setattr(signature_routes, "SessionLocal", lambda: db)
|
||||
monkeypatch.setattr(signature_routes, "Signature", _SignatureRecord)
|
||||
create_signature = _route_endpoint("/api/signatures", "POST")
|
||||
|
||||
response = asyncio.run(create_signature(
|
||||
_request(),
|
||||
signature_routes.SignatureCreate(
|
||||
name=" Full signature ",
|
||||
data=f"data:image/png;base64,{_PNG_B64}",
|
||||
width=320,
|
||||
height=80,
|
||||
svg='<svg onload="alert(1)"></svg>',
|
||||
),
|
||||
))
|
||||
|
||||
assert db.added.owner == "alice"
|
||||
assert db.added.name == "Full signature"
|
||||
assert db.added.data_png == _PNG_B64
|
||||
assert db.added.width == 320
|
||||
assert db.added.height == 80
|
||||
assert db.added.svg is None
|
||||
assert response["data_url"] == f"data:image/png;base64,{_PNG_B64}"
|
||||
db.commit.assert_called_once()
|
||||
db.close.assert_called_once()
|
||||
@@ -10,7 +10,7 @@ def test_signature_picker_allows_only_raster_data_urls():
|
||||
src = (_REPO / "static" / "js" / "signature.js").read_text(encoding="utf-8")
|
||||
|
||||
assert "function _safeSignatureDataUrl(raw)" in src
|
||||
assert r"^data:image\/(?:png|jpe?g);base64," in src
|
||||
assert r"^data:image\/png;base64," in src
|
||||
assert '<img src="${_esc(dataUrl)}"/>' in src
|
||||
assert 'dataUrl: s.data_url' not in src
|
||||
|
||||
|
||||
Reference in New Issue
Block a user