From ee6cfbd25a597d4ece1aac09554464e13970ce6e Mon Sep 17 00:00:00 2001 From: RaresKeY <158580472+RaresKeY@users.noreply.github.com> Date: Wed, 10 Jun 2026 17:31:26 +0300 Subject: [PATCH] fix(auth): drop reserved usernames loaded from auth config (#3727) --- app.py | 12 +++- core/auth.py | 41 +++++++++++++- ...test_reserved_username_admin_escalation.py | 56 +++++++++++++++++++ 3 files changed, 106 insertions(+), 3 deletions(-) diff --git a/app.py b/app.py index cfd73e83f..7cec8b0f1 100644 --- a/app.py +++ b/app.py @@ -56,7 +56,7 @@ from core.constants import ( ) from core.database import SessionLocal, ApiToken from core.middleware import SecurityHeadersMiddleware, is_cors_preflight -from core.auth import AuthManager +from core.auth import AuthManager, normalize_known_username from core.exceptions import ( SessionNotFoundError, InvalidFileUploadError, LLMServiceError, WebSearchError, @@ -228,8 +228,16 @@ if AUTH_ENABLED: try: rows = db.query(ApiToken).filter(ApiToken.is_active == True).all() for r in rows: + owner_key = normalize_known_username(auth_manager.users, getattr(r, "owner", None)) + if not owner_key: + logger.warning( + "Ignoring active API token '%s' for unknown auth user '%s'", + getattr(r, "id", ""), + getattr(r, "owner", None), + ) + continue scopes = [s.strip() for s in (getattr(r, "scopes", "") or "chat").split(",") if s.strip()] - new_map[r.token_prefix].append((r.id, r.token_hash, getattr(r, "owner", None), scopes)) + new_map[r.token_prefix].append((r.id, r.token_hash, owner_key, scopes)) finally: db.close() _token_cache.clear() diff --git a/core/auth.py b/core/auth.py index 11f38cd5f..2f9fd4e51 100644 --- a/core/auth.py +++ b/core/auth.py @@ -67,6 +67,14 @@ TOKEN_TTL = 60 * 60 * 24 * 7 # 7 days RESERVED_USERNAMES = frozenset({"internal-tool", "api", "demo", "system"}) +def normalize_known_username(users: Dict[str, Any], username: str | None) -> Optional[str]: + """Return a normalized username only when it exists in the auth user map.""" + key = str(username or "").strip().lower() + if not key or key not in users: + return None + return key + + def _hash_password(password: str) -> str: return bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt()).decode("utf-8") @@ -96,6 +104,7 @@ class AuthManager: self._load() self._load_sessions() self._migrate_single_user() + self._drop_reserved_loaded_users() self._migrate_legacy_admin_role() def _load(self): @@ -148,7 +157,13 @@ class AuthManager: def _migrate_single_user(self): """Migrate old single-user format to multi-user format.""" if "password_hash" in self._config and "users" not in self._config: - old_user = self._config.get("username", "admin") + old_user = str(self._config.get("username", "admin") or "admin").strip().lower() + if old_user in RESERVED_USERNAMES: + logger.warning( + "Migrating legacy single-user reserved username '%s' to 'admin'", + old_user, + ) + old_user = "admin" old_hash = self._config["password_hash"] self._config = { "users": { @@ -162,6 +177,30 @@ class AuthManager: self._save() logger.info(f"Migrated single-user auth to multi-user (admin: {old_user})") + def _drop_reserved_loaded_users(self): + """Fail closed for legacy/manual auth rows that collide with sentinels.""" + users = self._config.get("users") + if not isinstance(users, dict): + return + normalized = {} + removed = [] + for username, data in users.items(): + key = str(username or "").strip().lower() + if not key: + continue + if key in RESERVED_USERNAMES: + removed.append(key) + continue + normalized[key] = data + if removed or normalized != users: + self._config["users"] = normalized + self._save() + if removed: + logger.warning( + "Removed reserved username(s) from auth config: %s", + ", ".join(sorted(set(removed))), + ) + def _migrate_legacy_admin_role(self): """Normalize setup.py's old role='admin' marker to is_admin=True.""" changed = False diff --git a/tests/test_reserved_username_admin_escalation.py b/tests/test_reserved_username_admin_escalation.py index 29c423774..fff1aea78 100644 --- a/tests/test_reserved_username_admin_escalation.py +++ b/tests/test_reserved_username_admin_escalation.py @@ -58,6 +58,62 @@ def test_rename_into_reserved_username_is_blocked(tmp_path): assert "bob" in mgr.users +def test_legacy_reserved_username_is_removed_on_load(tmp_path): + auth_path = tmp_path / "auth.json" + auth_path.write_text( + '{"users": {"internal-tool": {"password_hash": "unused", "is_admin": false}, ' + '"admin": {"password_hash": "unused", "is_admin": true}}}', + encoding="utf-8", + ) + mgr = _fresh_auth_manager(tmp_path) + + assert "internal-tool" not in mgr.users + assert "admin" in mgr.users + assert "internal-tool" not in auth_path.read_text(encoding="utf-8") + + +def test_legacy_reserved_username_session_cannot_authenticate(tmp_path): + auth_path = tmp_path / "auth.json" + sessions_path = tmp_path / "sessions.json" + auth_path.write_text( + '{"users": {"internal-tool": {"password_hash": "unused", "is_admin": false}}}', + encoding="utf-8", + ) + sessions_path.write_text( + '{"tok": {"username": "internal-tool", "expiry": 9999999999}}', + encoding="utf-8", + ) + mgr = _fresh_auth_manager(tmp_path) + + assert mgr.validate_token("tok") is False + assert mgr.get_username_for_token("tok") is None + + +def test_legacy_reserved_single_user_migrates_to_admin(tmp_path): + auth_path = tmp_path / "auth.json" + auth_path.write_text( + '{"username": "internal-tool", "password_hash": "unused"}', + encoding="utf-8", + ) + mgr = _fresh_auth_manager(tmp_path) + + assert "internal-tool" not in mgr.users + assert "admin" in mgr.users + assert mgr.is_admin("admin") is True + + +def test_token_cache_owner_normalization_requires_current_user(): + clear_module("core.auth") + from core.auth import normalize_known_username + + users = {"alice": {}, "admin": {}} + + assert normalize_known_username(users, " Alice ") == "alice" + assert normalize_known_username(users, "internal-tool") is None + assert normalize_known_username(users, "api") is None + assert normalize_known_username(users, "") is None + + def test_normal_usernames_still_allowed(tmp_path): mgr = _fresh_auth_manager(tmp_path) assert mgr.create_user("alice", "pw-123456") is True