fix(security): gate codex cookbook routes behind admin check for cookie sessions (#4554)

The Codex cookbook bridge authorized cookie sessions with require_user()
only, allowing non-admin accounts to read cookbook task state, server
topology, task logs, tmux sessions, and model presets. The stop/adopt
routes also execute local or SSH-backed tmux commands.

Add _require_cookbook_scope() that enforces require_admin() for
cookie-session callers while preserving the existing API-token scope
checks. Apply it to all nine /api/codex/cookbook/* routes.

Fixes #4542

Co-authored-by: michaelxer <michaelxer@users.noreply.github.com>
This commit is contained in:
Michael
2026-06-27 20:09:32 +07:00
committed by GitHub
parent 8888819d74
commit e3ecdd3207
2 changed files with 142 additions and 9 deletions
+24 -9
View File
@@ -15,6 +15,7 @@ from typing import Any
from fastapi import APIRouter, BackgroundTasks, Body, HTTPException, Request from fastapi import APIRouter, BackgroundTasks, Body, HTTPException, Request
from fastapi.responses import StreamingResponse from fastapi.responses import StreamingResponse
from core.middleware import require_admin
from src.auth_helpers import require_authenticated_request, require_user from src.auth_helpers import require_authenticated_request, require_user
from src.tool_implementations import do_manage_notes from src.tool_implementations import do_manage_notes
from src.constants import COOKBOOK_STATE_FILE from src.constants import COOKBOOK_STATE_FILE
@@ -109,6 +110,20 @@ def _scope_owner_all(request: Request, required: set[str]) -> str:
return require_user(request) return require_user(request)
def _require_cookbook_scope(request: Request, allowed: set[str]) -> str:
"""Authorize a Codex cookbook route.
For API-token callers, enforce the given scope set.
For cookie-session callers, additionally require admin privileges
because cookbook surfaces expose host topology, task logs, tmux
commands, and model-serving controls.
"""
owner = _scope_owner(request, allowed)
if not getattr(request.state, "api_token", False):
require_admin(request)
return owner
def _find_endpoint(router: APIRouter | None, method: str, path: str): def _find_endpoint(router: APIRouter | None, method: str, path: str):
if router is None: if router is None:
return None return None
@@ -532,14 +547,14 @@ def setup_codex_routes(
@router.get("/cookbook/tasks") @router.get("/cookbook/tasks")
async def codex_cookbook_tasks(request: Request): async def codex_cookbook_tasks(request: Request):
_scope_owner(request, COOKBOOK_READ_SCOPES) _require_cookbook_scope(request, COOKBOOK_READ_SCOPES)
state = _read_cookbook_state() state = _read_cookbook_state()
tasks = state.get("tasks") or [] tasks = state.get("tasks") or []
return {"tasks": [_redact_task(t) for t in tasks]} return {"tasks": [_redact_task(t) for t in tasks]}
@router.get("/cookbook/servers") @router.get("/cookbook/servers")
async def codex_cookbook_servers(request: Request): async def codex_cookbook_servers(request: Request):
_scope_owner(request, COOKBOOK_READ_SCOPES) _require_cookbook_scope(request, COOKBOOK_READ_SCOPES)
state = _read_cookbook_state() state = _read_cookbook_state()
servers = state.get("env", {}).get("servers") or [] servers = state.get("env", {}).get("servers") or []
# Strip ssh creds / passwords; keep only what's needed to pick a host. # Strip ssh creds / passwords; keep only what's needed to pick a host.
@@ -558,7 +573,7 @@ def setup_codex_routes(
@router.get("/cookbook/output/{session_id}") @router.get("/cookbook/output/{session_id}")
async def codex_cookbook_output(request: Request, session_id: str, tail: int = 400): async def codex_cookbook_output(request: Request, session_id: str, tail: int = 400):
_scope_owner(request, COOKBOOK_READ_SCOPES) _require_cookbook_scope(request, COOKBOOK_READ_SCOPES)
# Defensive: session_id must be the tmux-style id we issue # Defensive: session_id must be the tmux-style id we issue
# (`serve-XXXX` / `cookbook-XXXX` / `queue-XXXX`); anything else # (`serve-XXXX` / `cookbook-XXXX` / `queue-XXXX`); anything else
# would let the agent run arbitrary `tmux capture-pane` targets. # would let the agent run arbitrary `tmux capture-pane` targets.
@@ -600,7 +615,7 @@ def setup_codex_routes(
@router.post("/cookbook/serve") @router.post("/cookbook/serve")
async def codex_cookbook_serve(request: Request, body: dict[str, Any] = Body(default_factory=dict)): async def codex_cookbook_serve(request: Request, body: dict[str, Any] = Body(default_factory=dict)):
_scope_owner(request, COOKBOOK_LAUNCH_SCOPES) _require_cookbook_scope(request, COOKBOOK_LAUNCH_SCOPES)
# Wraps /api/model/serve with the SAME validation the UI uses. # Wraps /api/model/serve with the SAME validation the UI uses.
# _validate_serve_cmd (called inside model_serve) rejects shell # _validate_serve_cmd (called inside model_serve) rejects shell
# metachars and requires the leading binary to be in the # metachars and requires the leading binary to be in the
@@ -639,7 +654,7 @@ def setup_codex_routes(
@router.post("/cookbook/stop/{session_id}") @router.post("/cookbook/stop/{session_id}")
async def codex_cookbook_stop(request: Request, session_id: str): async def codex_cookbook_stop(request: Request, session_id: str):
_scope_owner(request, COOKBOOK_LAUNCH_SCOPES) _require_cookbook_scope(request, COOKBOOK_LAUNCH_SCOPES)
import re as _re import re as _re
if not _re.fullmatch(r"[a-zA-Z0-9_-]+", session_id): if not _re.fullmatch(r"[a-zA-Z0-9_-]+", session_id):
raise HTTPException(400, "Invalid session id") raise HTTPException(400, "Invalid session id")
@@ -659,7 +674,7 @@ def setup_codex_routes(
"""List cached models on a configured server (or local if host is omitted). """List cached models on a configured server (or local if host is omitted).
Mirrors `list_cached_models` from the chat agent so external agents have Mirrors `list_cached_models` from the chat agent so external agents have
the same inventory view before deciding what to serve/download.""" the same inventory view before deciding what to serve/download."""
_scope_owner(request, COOKBOOK_READ_SCOPES) _require_cookbook_scope(request, COOKBOOK_READ_SCOPES)
# Hit /api/model/cached internally, with the same modelDirs the chat # Hit /api/model/cached internally, with the same modelDirs the chat
# agent's list_cached_models would resolve from cookbook state. # agent's list_cached_models would resolve from cookbook state.
state = _read_cookbook_state() state = _read_cookbook_state()
@@ -721,7 +736,7 @@ def setup_codex_routes(
"""List saved serve presets (model + host + port + launch cmd). """List saved serve presets (model + host + port + launch cmd).
Counterpart to `list_serve_presets`. Use BEFORE composing a `serve` Counterpart to `list_serve_presets`. Use BEFORE composing a `serve`
body — the user's saved preset usually has the working cmd already.""" body — the user's saved preset usually has the working cmd already."""
_scope_owner(request, COOKBOOK_READ_SCOPES) _require_cookbook_scope(request, COOKBOOK_READ_SCOPES)
state = _read_cookbook_state() state = _read_cookbook_state()
presets = state.get("presets") or [] presets = state.get("presets") or []
out = [] out = []
@@ -741,7 +756,7 @@ def setup_codex_routes(
async def codex_cookbook_serve_preset(request: Request, name: str): async def codex_cookbook_serve_preset(request: Request, name: str):
"""Launch a saved preset by name. Reuses the working cmd + host the """Launch a saved preset by name. Reuses the working cmd + host the
user already saved, avoiding the cmd-allowlist trial-and-error loop.""" user already saved, avoiding the cmd-allowlist trial-and-error loop."""
_scope_owner(request, COOKBOOK_LAUNCH_SCOPES) _require_cookbook_scope(request, COOKBOOK_LAUNCH_SCOPES)
import re as _re import re as _re
if not _re.fullmatch(r"[A-Za-z0-9 _.:@\-]+", name): if not _re.fullmatch(r"[A-Za-z0-9 _.:@\-]+", name):
raise HTTPException(400, "Invalid preset name") raise HTTPException(400, "Invalid preset name")
@@ -793,7 +808,7 @@ def setup_codex_routes(
cookbook tracking. Needed when serve_model rejects a cmd and the cookbook tracking. Needed when serve_model rejects a cmd and the
agent falls back to direct ssh — without adoption the session is agent falls back to direct ssh — without adoption the session is
invisible to the UI. Body: {tmux_session, model, host?, port?}.""" invisible to the UI. Body: {tmux_session, model, host?, port?}."""
_scope_owner(request, COOKBOOK_LAUNCH_SCOPES) _require_cookbook_scope(request, COOKBOOK_LAUNCH_SCOPES)
norm = dict(body or {}) norm = dict(body or {})
sess = (norm.get("tmux_session") or norm.get("session_id") or "").strip() sess = (norm.get("tmux_session") or norm.get("session_id") or "").strip()
model = (norm.get("model") or norm.get("repo_id") or "").strip() model = (norm.get("model") or norm.get("repo_id") or "").strip()
+118
View File
@@ -0,0 +1,118 @@
"""Codex cookbook routes require admin for cookie-session callers.
Regression test for issue #4542: non-admin users could reach cookbook
routes (tasks, servers, output, stop, adopt, presets, etc.) through
normal cookie sessions because _scope_owner only checked login status,
not admin privileges.
After the fix, cookie-session callers must be admin; API-token callers
are still governed by scope checks only.
"""
import pytest
from types import SimpleNamespace
from fastapi import HTTPException
from routes.codex_routes import _require_cookbook_scope
COOKBOOK_READ_SCOPES = {"cookbook:read", "cookbook:launch"}
COOKBOOK_LAUNCH_SCOPES = {"cookbook:launch"}
def _cookie_request(*, current_user="bob", is_admin=False):
"""Simulate a cookie-session request (no api_token)."""
auth_mgr = SimpleNamespace(
is_configured=True,
is_admin=lambda user: is_admin and user == "bob",
)
return SimpleNamespace(
state=SimpleNamespace(
current_user=current_user,
api_token=False,
),
app=SimpleNamespace(state=SimpleNamespace(auth_manager=auth_mgr)),
headers={},
)
def _api_token_request(*, scopes=None, owner="alice"):
"""Simulate an API-token request."""
return SimpleNamespace(
state=SimpleNamespace(
current_user="api",
api_token=True,
api_token_scopes=scopes or [],
api_token_owner=owner,
),
app=SimpleNamespace(state=SimpleNamespace(auth_manager=None)),
headers={},
)
class TestCookieSessionAdminGate:
"""Non-admin cookie sessions must be rejected; admin sessions allowed."""
def test_non_admin_rejected_read(self, monkeypatch):
monkeypatch.setenv("AUTH_ENABLED", "true")
req = _cookie_request(is_admin=False)
with pytest.raises(HTTPException) as exc:
_require_cookbook_scope(req, COOKBOOK_READ_SCOPES)
assert exc.value.status_code == 403
def test_non_admin_rejected_launch(self, monkeypatch):
monkeypatch.setenv("AUTH_ENABLED", "true")
req = _cookie_request(is_admin=False)
with pytest.raises(HTTPException) as exc:
_require_cookbook_scope(req, COOKBOOK_LAUNCH_SCOPES)
assert exc.value.status_code == 403
def test_admin_allowed_read(self, monkeypatch):
monkeypatch.setenv("AUTH_ENABLED", "true")
req = _cookie_request(is_admin=True)
owner = _require_cookbook_scope(req, COOKBOOK_READ_SCOPES)
assert owner == "bob"
def test_admin_allowed_launch(self, monkeypatch):
monkeypatch.setenv("AUTH_ENABLED", "true")
req = _cookie_request(is_admin=True)
owner = _require_cookbook_scope(req, COOKBOOK_LAUNCH_SCOPES)
assert owner == "bob"
class TestApiTokenScopeGate:
"""API-token callers are governed by scope, not admin status."""
def test_token_with_scope_allowed(self, monkeypatch):
monkeypatch.setenv("AUTH_ENABLED", "true")
req = _api_token_request(scopes=["cookbook:read"])
owner = _require_cookbook_scope(req, COOKBOOK_READ_SCOPES)
assert owner == "alice"
def test_token_missing_scope_rejected(self, monkeypatch):
monkeypatch.setenv("AUTH_ENABLED", "true")
req = _api_token_request(scopes=["unrelated:scope"])
with pytest.raises(HTTPException) as exc:
_require_cookbook_scope(req, COOKBOOK_READ_SCOPES)
assert exc.value.status_code == 403
class TestSourceCodeGate:
"""Static checks: all cookbook routes use _require_cookbook_scope."""
def test_no_raw_scope_owner_in_cookbook_routes(self):
from pathlib import Path
source = Path("routes/codex_routes.py").read_text(encoding="utf-8")
# _scope_owner should NOT appear inside cookbook route handlers.
# Find lines between cookbook route defs that still call _scope_owner.
in_cookbook = False
violations = []
for i, line in enumerate(source.splitlines(), 1):
if "@router." in line and "/cookbook/" in line:
in_cookbook = True
elif "@router." in line and "/cookbook/" not in line:
in_cookbook = False
if in_cookbook and "_scope_owner(request" in line:
violations.append((i, line.strip()))
assert violations == [], (
f"Cookbook routes still use _scope_owner instead of _require_cookbook_scope: {violations}"
)