fix(auth): centralize password and username validation constants (#4120)

Added PASSWORD_MIN_LENGTH and RESERVED_USERNAMES to src/constants.py as the
single source of truth. Previously PASSWORD_MIN_LENGTH was hardcoded as 8 in
four route handlers and all three JS validation paths; RESERVED_USERNAMES was
an inline frozenset duplicated in core/auth.py, routes/assistant_routes.py,
routes/research_routes.py, and src/task_scheduler.py.

Added GET /api/auth/policy (unauthenticated) so the frontend reads the real
values from the server instead of hardcoding them in JS.

Added missing empty-username guard to /setup and admin POST /users. Both
returned a misleading 500/409 on whitespace-only input. /signup already had the
check; this makes all three consistent.
This commit is contained in:
Karl Jussila
2026-06-16 02:52:15 -05:00
committed by GitHub
parent 2b519bf355
commit ee72d71872
12 changed files with 327 additions and 30 deletions
+3 -2
View File
@@ -16,6 +16,7 @@ from pydantic import BaseModel
from core.database import SessionLocal, CrewMember, ScheduledTask
from src.auth_helpers import get_current_user
from core.auth import RESERVED_USERNAMES
from src.task_scheduler import compute_next_run
@@ -89,11 +90,11 @@ def setup_assistant_routes(task_scheduler) -> APIRouter:
# check-in tasks seeded. Hitting any /assistant route under one of these
# used to seed a full CrewMember + Morning/Midday/Evening tasks under that
# owner, which then double-fired alongside the real user's check-ins.
_SYNTHETIC_OWNERS = frozenset({"internal-tool", "api", "demo", "system", ""})
# RESERVED_USERNAMES covers the same set; the `not owner` guard handles "".
async def _get_or_create(owner: str) -> CrewMember:
"""Return the per-owner assistant CrewMember, creating it on demand."""
if not owner or owner in _SYNTHETIC_OWNERS:
if not owner or owner in RESERVED_USERNAMES:
raise HTTPException(status_code=400, detail=f"Cannot seed assistant for {owner!r}")
db = SessionLocal()
try:
+25 -10
View File
@@ -12,8 +12,8 @@ import re
from pathlib import Path
from core.atomic_io import atomic_write_json, atomic_write_text
from core.auth import AuthManager, SetAdminResult
from src.constants import DEEP_RESEARCH_DIR, MEMORY_FILE, SKILLS_DIR
from core.auth import AuthManager, RESERVED_USERNAMES, SetAdminResult
from src.constants import DEEP_RESEARCH_DIR, MEMORY_FILE, PASSWORD_MIN_LENGTH, SKILLS_DIR
from src.rate_limiter import RateLimiter
from src.settings_scrub import scrub_settings
from src.settings import (
@@ -102,8 +102,12 @@ def setup_auth_routes(auth_manager: AuthManager) -> APIRouter:
raise HTTPException(429, "Too many requests — try again later")
if auth_manager.is_configured:
raise HTTPException(400, "Already configured")
if len(body.password) < 8:
raise HTTPException(400, "Password must be at least 8 characters")
if len(body.password) < PASSWORD_MIN_LENGTH:
raise HTTPException(400, f"Password must be at least {PASSWORD_MIN_LENGTH} characters")
if len(body.username.strip()) < 1:
raise HTTPException(400, "Username is required")
if body.username.lower() in RESERVED_USERNAMES:
raise HTTPException(403, "Username is reserved")
ok = await asyncio.to_thread(auth_manager.setup, body.username, body.password)
if not ok:
raise HTTPException(500, "Setup failed")
@@ -118,10 +122,12 @@ def setup_auth_routes(auth_manager: AuthManager) -> APIRouter:
raise HTTPException(400, "Run setup first")
if not auth_manager.signup_enabled:
raise HTTPException(403, "Registration is disabled. Ask an admin for an account.")
if len(body.password) < 8:
raise HTTPException(400, "Password must be at least 8 characters")
if len(body.password) < PASSWORD_MIN_LENGTH:
raise HTTPException(400, f"Password must be at least {PASSWORD_MIN_LENGTH} characters")
if len(body.username.strip()) < 1:
raise HTTPException(400, "Username is required")
if body.username.lower() in RESERVED_USERNAMES:
raise HTTPException(403, "Username is reserved")
ok = await asyncio.to_thread(auth_manager.create_user, body.username, body.password, is_admin=False)
if not ok:
raise HTTPException(409, "Username already taken")
@@ -184,13 +190,18 @@ def setup_auth_routes(auth_manager: AuthManager) -> APIRouter:
pass
return result
@router.get("/policy")
async def auth_policy():
"""Return public auth policy constants for the frontend."""
return auth_manager.policy()
@router.post("/change-password")
async def change_password(body: ChangePasswordRequest, request: Request):
user = _get_current_user(request)
if not user:
raise HTTPException(401, "Not authenticated")
if len(body.new_password) < 8:
raise HTTPException(400, "Password must be at least 8 characters")
if len(body.new_password) < PASSWORD_MIN_LENGTH:
raise HTTPException(400, f"Password must be at least {PASSWORD_MIN_LENGTH} characters")
current_token = request.cookies.get(SESSION_COOKIE)
ok = await asyncio.to_thread(auth_manager.change_password, user, body.current_password, body.new_password)
if not ok:
@@ -270,8 +281,12 @@ def setup_auth_routes(auth_manager: AuthManager) -> APIRouter:
user = _get_current_user(request)
if not user or not auth_manager.is_admin(user):
raise HTTPException(403, "Admin only")
if len(body.password) < 8:
raise HTTPException(400, "Password must be at least 8 characters")
if len(body.password) < PASSWORD_MIN_LENGTH:
raise HTTPException(400, f"Password must be at least {PASSWORD_MIN_LENGTH} characters")
if len(body.username.strip()) < 1:
raise HTTPException(400, "Username is required")
if body.username.lower() in RESERVED_USERNAMES:
raise HTTPException(403, "Username is reserved")
ok = auth_manager.create_user(body.username, body.password, body.is_admin)
if not ok:
raise HTTPException(409, "Username already taken")
+2 -1
View File
@@ -14,6 +14,7 @@ from fastapi.responses import HTMLResponse, StreamingResponse
from pydantic import BaseModel, Field
from src.endpoint_resolver import resolve_endpoint
from src.auth_helpers import _auth_disabled, get_current_user
from core.auth import RESERVED_USERNAMES
from src.constants import DEEP_RESEARCH_DIR
_SESSION_ID_RE = re.compile(r"^[a-zA-Z0-9-]{1,128}$")
@@ -387,7 +388,7 @@ def setup_research_routes(research_handler, session_manager=None) -> APIRouter:
user = require_privilege(request, "can_use_research")
if user == "internal-tool":
tool_owner = (request.headers.get("X-Odysseus-Owner") or "").strip()
if tool_owner and tool_owner not in {"internal-tool", "api", "demo", "system"}:
if tool_owner and tool_owner not in RESERVED_USERNAMES:
auth_mgr = getattr(request.app.state, "auth_manager", None)
if auth_mgr is not None and getattr(auth_mgr, "is_configured", False):
try: