fix(auth): gate api tokens from user routes (#2992)

This commit is contained in:
Vykos
2026-06-07 12:55:01 +02:00
committed by GitHub
parent 299538ea4e
commit 000932a6d9
3 changed files with 95 additions and 3 deletions
+12 -3
View File
@@ -15,7 +15,7 @@ from typing import Any
from fastapi import APIRouter, BackgroundTasks, Body, HTTPException, Request
from fastapi.responses import StreamingResponse
from src.auth_helpers import require_user
from src.auth_helpers import require_authenticated_request, require_user
from src.tool_implementations import do_manage_notes
@@ -41,7 +41,9 @@ async def _as_owner(request: Request, owner: str, fn, *args, **kwargs):
the scope-gated owner (not the "api" pseudo-user the bearer middleware sets).
Restores the original value when done. Works for sync and async handlers."""
orig = getattr(request.state, "current_user", None)
orig_api_token = getattr(request.state, "api_token", None)
request.state.current_user = owner
request.state.api_token = False
try:
result = fn(*args, **kwargs)
if asyncio.iscoroutine(result):
@@ -49,6 +51,13 @@ async def _as_owner(request: Request, owner: str, fn, *args, **kwargs):
return result
finally:
request.state.current_user = orig
if orig_api_token is None:
try:
delattr(request.state, "api_token")
except AttributeError:
pass
else:
request.state.api_token = orig_api_token
def _scope_owner(request: Request, allowed: set[str]) -> str:
@@ -146,7 +155,7 @@ def setup_codex_routes(
@router.get("/plugin.zip")
def plugin_zip(request: Request):
require_user(request)
require_authenticated_request(request)
root = Path(__file__).resolve().parent.parent / "integrations" / "codex"
if not root.exists():
raise HTTPException(404, "Codex plugin bundle not found")
@@ -762,7 +771,7 @@ def setup_claude_routes() -> APIRouter:
@router.get("/plugin.zip")
def plugin_zip(request: Request):
require_user(request)
require_authenticated_request(request)
# Only ship the skills/ subtree so extracting at ~/.claude/ doesn't dump
# README.md or other bundle metadata into the user's claude config dir.
skills_root = Path(__file__).resolve().parent.parent / "integrations" / "claude" / "skills"
+21
View File
@@ -34,6 +34,24 @@ def effective_user(request: Request) -> Optional[str]:
return get_current_user(request)
def _is_api_token_request(request: Request) -> bool:
"""Return True when middleware authenticated a bearer API token."""
return bool(getattr(request.state, "api_token", False))
def require_authenticated_request(request: Request) -> str:
"""Allow either a browser session or a valid bearer API token.
This is intentionally narrower than :func:`require_user`: use it only for
routes that need authentication but do not read or mutate owner-scoped
user data. Owner-scoped routes should use ``require_user`` for browser
sessions or their own API-token scope/owner gate.
"""
if _is_api_token_request(request):
return effective_user(request) or ""
return require_user(request)
def _auth_disabled() -> bool:
"""True when the operator has explicitly turned off auth via .env.
Mirrors the AUTH_ENABLED parse in app.py / core/middleware.py so the
@@ -60,6 +78,9 @@ def require_user(request: Request) -> str:
Use this on routes that touch user data so middleware misconfig can't
open them up.
"""
if _is_api_token_request(request):
raise HTTPException(403, "API tokens must use a scope-aware API route")
u = get_current_user(request)
if u:
return u
+62
View File
@@ -0,0 +1,62 @@
import asyncio
from pathlib import Path
from types import SimpleNamespace
import pytest
from fastapi import HTTPException
from src import auth_helpers
def _request(*, current_user="api", api_token=True, api_token_owner="alice"):
return SimpleNamespace(
state=SimpleNamespace(
current_user=current_user,
api_token=api_token,
api_token_owner=api_token_owner,
),
app=SimpleNamespace(
state=SimpleNamespace(
auth_manager=SimpleNamespace(is_configured=True),
),
),
client=SimpleNamespace(host="203.0.113.10"),
)
def test_require_user_rejects_api_token_pseudo_user(monkeypatch):
monkeypatch.setenv("AUTH_ENABLED", "true")
req = _request()
with pytest.raises(HTTPException) as exc:
auth_helpers.require_user(req)
assert exc.value.status_code == 403
def test_require_authenticated_request_allows_api_token_owner(monkeypatch):
monkeypatch.setenv("AUTH_ENABLED", "true")
req = _request()
assert auth_helpers.require_authenticated_request(req) == "alice"
def test_codex_as_owner_can_call_nested_user_routes(monkeypatch):
monkeypatch.setenv("AUTH_ENABLED", "true")
from routes.codex_routes import _as_owner
req = _request()
async def nested_handler(request):
return auth_helpers.require_user(request)
assert asyncio.run(_as_owner(req, "alice", nested_handler, req)) == "alice"
assert req.state.current_user == "api"
assert req.state.api_token is True
def test_codex_plugin_downloads_use_general_authenticated_gate():
source = Path("routes/codex_routes.py").read_text(encoding="utf-8")
assert "require_authenticated_request" in source
assert source.count("require_authenticated_request(request)") == 2