refactor(auth): centralize the internal-tool pseudo-username into a constant (#4333)

The in-process tool loopback stamps current_user = "internal-tool" and
require_admin grants admin to that sentinel; it is also a reserved username.
That security-sensitive string was hand-typed in ~7 places (stamp, admin gate,
RESERVED_USERNAMES, and standalone admin-equivalent checks in note/research/
shell/task routes), where a typo silently breaks an auth gate.

Add INTERNAL_TOOL_USER in core/middleware.py next to INTERNAL_TOOL_TOKEN/
INTERNAL_TOOL_HEADER and use it at every such site. A typo is now an
ImportError, not a silent mismatch. auth.py importing middleware is acyclic
(middleware imports no app modules). Behaviour is unchanged.

The multi-sentinel sets bundling internal-tool with api/demo/system
(assistant_routes, task_scheduler, research_routes) are a separate reserved-set
dedup, left for a follow-up.

Closes #4332
This commit is contained in:
Kenny Van de Maele
2026-06-16 13:13:00 +02:00
committed by GitHub
parent bf56010aad
commit a2261c38c1
7 changed files with 15 additions and 8 deletions
+2 -2
View File
@@ -318,7 +318,7 @@ if AUTH_ENABLED:
# (no admin cookie available in that context). Restricted to
# loopback clients + matching token to keep it locked down.
try:
from core.middleware import INTERNAL_TOOL_HEADER, INTERNAL_TOOL_TOKEN as _ITT
from core.middleware import INTERNAL_TOOL_HEADER, INTERNAL_TOOL_TOKEN as _ITT, INTERNAL_TOOL_USER
_hdr = request.headers.get(INTERNAL_TOOL_HEADER)
if _hdr and secrets.compare_digest(_hdr, _ITT) and _is_trusted_loopback(request):
# Impersonation: when the agent's loopback call sets
@@ -330,7 +330,7 @@ if AUTH_ENABLED:
if _impersonate and _impersonate in getattr(_auth_mgr, "users", {}):
request.state.current_user = _impersonate
else:
request.state.current_user = "internal-tool"
request.state.current_user = INTERNAL_TOOL_USER
request.state.api_token = False
return await call_next(request)
except Exception as _e:
+2 -1
View File
@@ -20,6 +20,7 @@ logger = logging.getLogger(__name__)
from core.atomic_io import atomic_write_json as _atomic_write_json # noqa: E402
from core.middleware import INTERNAL_TOOL_USER # noqa: E402
DEFAULT_PRIVILEGES = {
"can_use_agent": True,
@@ -65,7 +66,7 @@ TOKEN_TTL = 60 * 60 * 24 * 7 # 7 days
# of those names would be denied an assistant and inconsistently owner-scoped.
# Refuse to create or rename into any of them so the sentinels can't be
# impersonated. (Keep this in sync with that synthetic-owner set.)
RESERVED_USERNAMES = frozenset({"internal-tool", "api", "demo", "system"})
RESERVED_USERNAMES = frozenset({INTERNAL_TOOL_USER, "api", "demo", "system"})
def normalize_known_username(users: Dict[str, Any], username: str | None) -> Optional[str]:
+3 -1
View File
@@ -15,6 +15,8 @@ from starlette.responses import Response
# same value from this module. Never persisted or exposed externally.
INTERNAL_TOOL_TOKEN = os.environ.get("ODYSSEUS_INTERNAL_TOKEN") or secrets.token_hex(32)
INTERNAL_TOOL_HEADER = "X-Odysseus-Internal-Token"
# Pseudo-username on in-process tool-loopback requests; require_admin trusts it and it is reserved.
INTERNAL_TOOL_USER = "internal-tool"
def is_cors_preflight(method: str, headers) -> bool:
@@ -39,7 +41,7 @@ def require_admin(request: Request):
hdr = request.headers.get(INTERNAL_TOOL_HEADER)
if hdr and secrets.compare_digest(hdr, INTERNAL_TOOL_TOKEN):
return
if getattr(request.state, "current_user", None) == "internal-tool":
if getattr(request.state, "current_user", None) == INTERNAL_TOOL_USER:
return
except Exception:
pass
+2 -1
View File
@@ -10,6 +10,7 @@ from fastapi import APIRouter, HTTPException, Request
from pydantic import BaseModel
from core.database import SessionLocal, Note
from core.middleware import INTERNAL_TOOL_USER
from src.auth_helpers import require_user
from src.constants import DATA_DIR
from sqlalchemy.orm.attributes import flag_modified
@@ -582,7 +583,7 @@ def setup_note_routes(task_scheduler=None):
return require_user(request) or None
def _is_admin_or_single_user(request: Request, user: str | None) -> bool:
if user == "internal-tool":
if user == INTERNAL_TOOL_USER:
return True
if not user:
# require_user() already admitted this request, which only happens
+2 -1
View File
@@ -12,6 +12,7 @@ from typing import Optional
from fastapi import APIRouter, HTTPException, Query, Request
from fastapi.responses import HTMLResponse, StreamingResponse
from pydantic import BaseModel, Field
from core.middleware import INTERNAL_TOOL_USER
from src.endpoint_resolver import resolve_endpoint
from src.auth_helpers import _auth_disabled, get_current_user
from core.auth import RESERVED_USERNAMES
@@ -386,7 +387,7 @@ def setup_research_routes(research_handler, session_manager=None) -> APIRouter:
"""Launch a research job from the dedicated panel."""
from src.auth_helpers import require_privilege
user = require_privilege(request, "can_use_research")
if user == "internal-tool":
if user == INTERNAL_TOOL_USER:
tool_owner = (request.headers.get("X-Odysseus-Owner") or "").strip()
if tool_owner and tool_owner not in RESERVED_USERNAMES:
auth_mgr = getattr(request.app.state, "auth_manager", None)
+2 -1
View File
@@ -15,6 +15,7 @@ from collections import namedtuple
from pathlib import Path
from typing import Dict, Any
from core.platform_compat import IS_APPLE_SILICON, which_tool
from core.middleware import INTERNAL_TOOL_USER
from src.optional_deps import prepare_optional_dependency_import
# POSIX-only: `pty`/`fcntl` transitively import `termios`, which does NOT exist
@@ -55,7 +56,7 @@ def _require_admin(request: Request):
# In-process tool loopback. The AuthMiddleware already validated the
# internal token + loopback client before setting this marker, so
# honour it here as admin-equivalent.
if user == "internal-tool":
if user == INTERNAL_TOOL_USER:
return
if not user or user == "api":
raise HTTPException(403, "Admin only")
+2 -1
View File
@@ -11,6 +11,7 @@ from fastapi import APIRouter, HTTPException, Request
from pydantic import BaseModel
from core.database import SessionLocal, ScheduledTask, TaskRun
from core.middleware import INTERNAL_TOOL_USER
from core.constants import internal_api_base
from src.auth_helpers import get_current_user
from src.constants import DATA_DIR, EMAIL_URGENCY_CACHE_DIR
@@ -427,7 +428,7 @@ def setup_task_routes(task_scheduler) -> APIRouter:
# In-process tool-loopback marker — AuthMiddleware validated
# the internal token + loopback client before stamping this,
# so treat as admin-equivalent.
if user == "internal-tool":
if user == INTERNAL_TOOL_USER:
return True
try:
from core.auth import AuthManager