From e3ecdd32072e3a4ecd1bb86dfbfc7d069d29a28c Mon Sep 17 00:00:00 2001 From: Michael <52305679+michaelxer@users.noreply.github.com> Date: Sat, 27 Jun 2026 20:09:32 +0700 Subject: [PATCH] fix(security): gate codex cookbook routes behind admin check for cookie sessions (#4554) The Codex cookbook bridge authorized cookie sessions with require_user() only, allowing non-admin accounts to read cookbook task state, server topology, task logs, tmux sessions, and model presets. The stop/adopt routes also execute local or SSH-backed tmux commands. Add _require_cookbook_scope() that enforces require_admin() for cookie-session callers while preserving the existing API-token scope checks. Apply it to all nine /api/codex/cookbook/* routes. Fixes #4542 Co-authored-by: michaelxer --- routes/codex_routes.py | 33 +++++-- tests/test_codex_cookbook_admin_gate.py | 118 ++++++++++++++++++++++++ 2 files changed, 142 insertions(+), 9 deletions(-) create mode 100644 tests/test_codex_cookbook_admin_gate.py diff --git a/routes/codex_routes.py b/routes/codex_routes.py index 22fc7feeb..e3ddaba46 100644 --- a/routes/codex_routes.py +++ b/routes/codex_routes.py @@ -15,6 +15,7 @@ from typing import Any from fastapi import APIRouter, BackgroundTasks, Body, HTTPException, Request from fastapi.responses import StreamingResponse +from core.middleware import require_admin from src.auth_helpers import require_authenticated_request, require_user from src.tool_implementations import do_manage_notes from src.constants import COOKBOOK_STATE_FILE @@ -109,6 +110,20 @@ def _scope_owner_all(request: Request, required: set[str]) -> str: return require_user(request) +def _require_cookbook_scope(request: Request, allowed: set[str]) -> str: + """Authorize a Codex cookbook route. + + For API-token callers, enforce the given scope set. + For cookie-session callers, additionally require admin privileges + because cookbook surfaces expose host topology, task logs, tmux + commands, and model-serving controls. + """ + owner = _scope_owner(request, allowed) + if not getattr(request.state, "api_token", False): + require_admin(request) + return owner + + def _find_endpoint(router: APIRouter | None, method: str, path: str): if router is None: return None @@ -532,14 +547,14 @@ def setup_codex_routes( @router.get("/cookbook/tasks") async def codex_cookbook_tasks(request: Request): - _scope_owner(request, COOKBOOK_READ_SCOPES) + _require_cookbook_scope(request, COOKBOOK_READ_SCOPES) state = _read_cookbook_state() tasks = state.get("tasks") or [] return {"tasks": [_redact_task(t) for t in tasks]} @router.get("/cookbook/servers") async def codex_cookbook_servers(request: Request): - _scope_owner(request, COOKBOOK_READ_SCOPES) + _require_cookbook_scope(request, COOKBOOK_READ_SCOPES) state = _read_cookbook_state() servers = state.get("env", {}).get("servers") or [] # Strip ssh creds / passwords; keep only what's needed to pick a host. @@ -558,7 +573,7 @@ def setup_codex_routes( @router.get("/cookbook/output/{session_id}") async def codex_cookbook_output(request: Request, session_id: str, tail: int = 400): - _scope_owner(request, COOKBOOK_READ_SCOPES) + _require_cookbook_scope(request, COOKBOOK_READ_SCOPES) # Defensive: session_id must be the tmux-style id we issue # (`serve-XXXX` / `cookbook-XXXX` / `queue-XXXX`); anything else # would let the agent run arbitrary `tmux capture-pane` targets. @@ -600,7 +615,7 @@ def setup_codex_routes( @router.post("/cookbook/serve") async def codex_cookbook_serve(request: Request, body: dict[str, Any] = Body(default_factory=dict)): - _scope_owner(request, COOKBOOK_LAUNCH_SCOPES) + _require_cookbook_scope(request, COOKBOOK_LAUNCH_SCOPES) # Wraps /api/model/serve with the SAME validation the UI uses. # _validate_serve_cmd (called inside model_serve) rejects shell # metachars and requires the leading binary to be in the @@ -639,7 +654,7 @@ def setup_codex_routes( @router.post("/cookbook/stop/{session_id}") async def codex_cookbook_stop(request: Request, session_id: str): - _scope_owner(request, COOKBOOK_LAUNCH_SCOPES) + _require_cookbook_scope(request, COOKBOOK_LAUNCH_SCOPES) import re as _re if not _re.fullmatch(r"[a-zA-Z0-9_-]+", session_id): raise HTTPException(400, "Invalid session id") @@ -659,7 +674,7 @@ def setup_codex_routes( """List cached models on a configured server (or local if host is omitted). Mirrors `list_cached_models` from the chat agent so external agents have the same inventory view before deciding what to serve/download.""" - _scope_owner(request, COOKBOOK_READ_SCOPES) + _require_cookbook_scope(request, COOKBOOK_READ_SCOPES) # Hit /api/model/cached internally, with the same modelDirs the chat # agent's list_cached_models would resolve from cookbook state. state = _read_cookbook_state() @@ -721,7 +736,7 @@ def setup_codex_routes( """List saved serve presets (model + host + port + launch cmd). Counterpart to `list_serve_presets`. Use BEFORE composing a `serve` body — the user's saved preset usually has the working cmd already.""" - _scope_owner(request, COOKBOOK_READ_SCOPES) + _require_cookbook_scope(request, COOKBOOK_READ_SCOPES) state = _read_cookbook_state() presets = state.get("presets") or [] out = [] @@ -741,7 +756,7 @@ def setup_codex_routes( async def codex_cookbook_serve_preset(request: Request, name: str): """Launch a saved preset by name. Reuses the working cmd + host the user already saved, avoiding the cmd-allowlist trial-and-error loop.""" - _scope_owner(request, COOKBOOK_LAUNCH_SCOPES) + _require_cookbook_scope(request, COOKBOOK_LAUNCH_SCOPES) import re as _re if not _re.fullmatch(r"[A-Za-z0-9 _.:@\-]+", name): raise HTTPException(400, "Invalid preset name") @@ -793,7 +808,7 @@ def setup_codex_routes( cookbook tracking. Needed when serve_model rejects a cmd and the agent falls back to direct ssh — without adoption the session is invisible to the UI. Body: {tmux_session, model, host?, port?}.""" - _scope_owner(request, COOKBOOK_LAUNCH_SCOPES) + _require_cookbook_scope(request, COOKBOOK_LAUNCH_SCOPES) norm = dict(body or {}) sess = (norm.get("tmux_session") or norm.get("session_id") or "").strip() model = (norm.get("model") or norm.get("repo_id") or "").strip() diff --git a/tests/test_codex_cookbook_admin_gate.py b/tests/test_codex_cookbook_admin_gate.py new file mode 100644 index 000000000..c267283f1 --- /dev/null +++ b/tests/test_codex_cookbook_admin_gate.py @@ -0,0 +1,118 @@ +"""Codex cookbook routes require admin for cookie-session callers. + +Regression test for issue #4542: non-admin users could reach cookbook +routes (tasks, servers, output, stop, adopt, presets, etc.) through +normal cookie sessions because _scope_owner only checked login status, +not admin privileges. + +After the fix, cookie-session callers must be admin; API-token callers +are still governed by scope checks only. +""" +import pytest +from types import SimpleNamespace +from fastapi import HTTPException + +from routes.codex_routes import _require_cookbook_scope + + +COOKBOOK_READ_SCOPES = {"cookbook:read", "cookbook:launch"} +COOKBOOK_LAUNCH_SCOPES = {"cookbook:launch"} + + +def _cookie_request(*, current_user="bob", is_admin=False): + """Simulate a cookie-session request (no api_token).""" + auth_mgr = SimpleNamespace( + is_configured=True, + is_admin=lambda user: is_admin and user == "bob", + ) + return SimpleNamespace( + state=SimpleNamespace( + current_user=current_user, + api_token=False, + ), + app=SimpleNamespace(state=SimpleNamespace(auth_manager=auth_mgr)), + headers={}, + ) + + +def _api_token_request(*, scopes=None, owner="alice"): + """Simulate an API-token request.""" + return SimpleNamespace( + state=SimpleNamespace( + current_user="api", + api_token=True, + api_token_scopes=scopes or [], + api_token_owner=owner, + ), + app=SimpleNamespace(state=SimpleNamespace(auth_manager=None)), + headers={}, + ) + + +class TestCookieSessionAdminGate: + """Non-admin cookie sessions must be rejected; admin sessions allowed.""" + + def test_non_admin_rejected_read(self, monkeypatch): + monkeypatch.setenv("AUTH_ENABLED", "true") + req = _cookie_request(is_admin=False) + with pytest.raises(HTTPException) as exc: + _require_cookbook_scope(req, COOKBOOK_READ_SCOPES) + assert exc.value.status_code == 403 + + def test_non_admin_rejected_launch(self, monkeypatch): + monkeypatch.setenv("AUTH_ENABLED", "true") + req = _cookie_request(is_admin=False) + with pytest.raises(HTTPException) as exc: + _require_cookbook_scope(req, COOKBOOK_LAUNCH_SCOPES) + assert exc.value.status_code == 403 + + def test_admin_allowed_read(self, monkeypatch): + monkeypatch.setenv("AUTH_ENABLED", "true") + req = _cookie_request(is_admin=True) + owner = _require_cookbook_scope(req, COOKBOOK_READ_SCOPES) + assert owner == "bob" + + def test_admin_allowed_launch(self, monkeypatch): + monkeypatch.setenv("AUTH_ENABLED", "true") + req = _cookie_request(is_admin=True) + owner = _require_cookbook_scope(req, COOKBOOK_LAUNCH_SCOPES) + assert owner == "bob" + + +class TestApiTokenScopeGate: + """API-token callers are governed by scope, not admin status.""" + + def test_token_with_scope_allowed(self, monkeypatch): + monkeypatch.setenv("AUTH_ENABLED", "true") + req = _api_token_request(scopes=["cookbook:read"]) + owner = _require_cookbook_scope(req, COOKBOOK_READ_SCOPES) + assert owner == "alice" + + def test_token_missing_scope_rejected(self, monkeypatch): + monkeypatch.setenv("AUTH_ENABLED", "true") + req = _api_token_request(scopes=["unrelated:scope"]) + with pytest.raises(HTTPException) as exc: + _require_cookbook_scope(req, COOKBOOK_READ_SCOPES) + assert exc.value.status_code == 403 + + +class TestSourceCodeGate: + """Static checks: all cookbook routes use _require_cookbook_scope.""" + + def test_no_raw_scope_owner_in_cookbook_routes(self): + from pathlib import Path + source = Path("routes/codex_routes.py").read_text(encoding="utf-8") + # _scope_owner should NOT appear inside cookbook route handlers. + # Find lines between cookbook route defs that still call _scope_owner. + in_cookbook = False + violations = [] + for i, line in enumerate(source.splitlines(), 1): + if "@router." in line and "/cookbook/" in line: + in_cookbook = True + elif "@router." in line and "/cookbook/" not in line: + in_cookbook = False + if in_cookbook and "_scope_owner(request" in line: + violations.append((i, line.strip())) + assert violations == [], ( + f"Cookbook routes still use _scope_owner instead of _require_cookbook_scope: {violations}" + )