diff --git a/routes/codex_routes.py b/routes/codex_routes.py index 9898daed2..c641c3915 100644 --- a/routes/codex_routes.py +++ b/routes/codex_routes.py @@ -15,7 +15,7 @@ from typing import Any from fastapi import APIRouter, BackgroundTasks, Body, HTTPException, Request from fastapi.responses import StreamingResponse -from src.auth_helpers import require_user +from src.auth_helpers import require_authenticated_request, require_user from src.tool_implementations import do_manage_notes @@ -41,7 +41,9 @@ async def _as_owner(request: Request, owner: str, fn, *args, **kwargs): the scope-gated owner (not the "api" pseudo-user the bearer middleware sets). Restores the original value when done. Works for sync and async handlers.""" orig = getattr(request.state, "current_user", None) + orig_api_token = getattr(request.state, "api_token", None) request.state.current_user = owner + request.state.api_token = False try: result = fn(*args, **kwargs) if asyncio.iscoroutine(result): @@ -49,6 +51,13 @@ async def _as_owner(request: Request, owner: str, fn, *args, **kwargs): return result finally: request.state.current_user = orig + if orig_api_token is None: + try: + delattr(request.state, "api_token") + except AttributeError: + pass + else: + request.state.api_token = orig_api_token def _scope_owner(request: Request, allowed: set[str]) -> str: @@ -146,7 +155,7 @@ def setup_codex_routes( @router.get("/plugin.zip") def plugin_zip(request: Request): - require_user(request) + require_authenticated_request(request) root = Path(__file__).resolve().parent.parent / "integrations" / "codex" if not root.exists(): raise HTTPException(404, "Codex plugin bundle not found") @@ -762,7 +771,7 @@ def setup_claude_routes() -> APIRouter: @router.get("/plugin.zip") def plugin_zip(request: Request): - require_user(request) + require_authenticated_request(request) # Only ship the skills/ subtree so extracting at ~/.claude/ doesn't dump # README.md or other bundle metadata into the user's claude config dir. skills_root = Path(__file__).resolve().parent.parent / "integrations" / "claude" / "skills" diff --git a/src/auth_helpers.py b/src/auth_helpers.py index afe46c74e..49f3f01be 100644 --- a/src/auth_helpers.py +++ b/src/auth_helpers.py @@ -34,6 +34,24 @@ def effective_user(request: Request) -> Optional[str]: return get_current_user(request) +def _is_api_token_request(request: Request) -> bool: + """Return True when middleware authenticated a bearer API token.""" + return bool(getattr(request.state, "api_token", False)) + + +def require_authenticated_request(request: Request) -> str: + """Allow either a browser session or a valid bearer API token. + + This is intentionally narrower than :func:`require_user`: use it only for + routes that need authentication but do not read or mutate owner-scoped + user data. Owner-scoped routes should use ``require_user`` for browser + sessions or their own API-token scope/owner gate. + """ + if _is_api_token_request(request): + return effective_user(request) or "" + return require_user(request) + + def _auth_disabled() -> bool: """True when the operator has explicitly turned off auth via .env. Mirrors the AUTH_ENABLED parse in app.py / core/middleware.py so the @@ -60,6 +78,9 @@ def require_user(request: Request) -> str: Use this on routes that touch user data so middleware misconfig can't open them up. """ + if _is_api_token_request(request): + raise HTTPException(403, "API tokens must use a scope-aware API route") + u = get_current_user(request) if u: return u diff --git a/tests/test_api_token_user_route_gate.py b/tests/test_api_token_user_route_gate.py new file mode 100644 index 000000000..1b74049e6 --- /dev/null +++ b/tests/test_api_token_user_route_gate.py @@ -0,0 +1,62 @@ +import asyncio +from pathlib import Path +from types import SimpleNamespace + +import pytest +from fastapi import HTTPException + +from src import auth_helpers + + +def _request(*, current_user="api", api_token=True, api_token_owner="alice"): + return SimpleNamespace( + state=SimpleNamespace( + current_user=current_user, + api_token=api_token, + api_token_owner=api_token_owner, + ), + app=SimpleNamespace( + state=SimpleNamespace( + auth_manager=SimpleNamespace(is_configured=True), + ), + ), + client=SimpleNamespace(host="203.0.113.10"), + ) + + +def test_require_user_rejects_api_token_pseudo_user(monkeypatch): + monkeypatch.setenv("AUTH_ENABLED", "true") + req = _request() + + with pytest.raises(HTTPException) as exc: + auth_helpers.require_user(req) + + assert exc.value.status_code == 403 + + +def test_require_authenticated_request_allows_api_token_owner(monkeypatch): + monkeypatch.setenv("AUTH_ENABLED", "true") + req = _request() + + assert auth_helpers.require_authenticated_request(req) == "alice" + + +def test_codex_as_owner_can_call_nested_user_routes(monkeypatch): + monkeypatch.setenv("AUTH_ENABLED", "true") + from routes.codex_routes import _as_owner + + req = _request() + + async def nested_handler(request): + return auth_helpers.require_user(request) + + assert asyncio.run(_as_owner(req, "alice", nested_handler, req)) == "alice" + assert req.state.current_user == "api" + assert req.state.api_token is True + + +def test_codex_plugin_downloads_use_general_authenticated_gate(): + source = Path("routes/codex_routes.py").read_text(encoding="utf-8") + + assert "require_authenticated_request" in source + assert source.count("require_authenticated_request(request)") == 2