mirror of
https://github.com/pewdiepie-archdaemon/odysseus.git
synced 2026-06-15 17:25:26 -04:00
fix(auth): drop reserved usernames loaded from auth config (#3727)
This commit is contained in:
@@ -56,7 +56,7 @@ from core.constants import (
|
||||
)
|
||||
from core.database import SessionLocal, ApiToken
|
||||
from core.middleware import SecurityHeadersMiddleware, is_cors_preflight
|
||||
from core.auth import AuthManager
|
||||
from core.auth import AuthManager, normalize_known_username
|
||||
from core.exceptions import (
|
||||
SessionNotFoundError, InvalidFileUploadError,
|
||||
LLMServiceError, WebSearchError,
|
||||
@@ -228,8 +228,16 @@ if AUTH_ENABLED:
|
||||
try:
|
||||
rows = db.query(ApiToken).filter(ApiToken.is_active == True).all()
|
||||
for r in rows:
|
||||
owner_key = normalize_known_username(auth_manager.users, getattr(r, "owner", None))
|
||||
if not owner_key:
|
||||
logger.warning(
|
||||
"Ignoring active API token '%s' for unknown auth user '%s'",
|
||||
getattr(r, "id", ""),
|
||||
getattr(r, "owner", None),
|
||||
)
|
||||
continue
|
||||
scopes = [s.strip() for s in (getattr(r, "scopes", "") or "chat").split(",") if s.strip()]
|
||||
new_map[r.token_prefix].append((r.id, r.token_hash, getattr(r, "owner", None), scopes))
|
||||
new_map[r.token_prefix].append((r.id, r.token_hash, owner_key, scopes))
|
||||
finally:
|
||||
db.close()
|
||||
_token_cache.clear()
|
||||
|
||||
+40
-1
@@ -67,6 +67,14 @@ TOKEN_TTL = 60 * 60 * 24 * 7 # 7 days
|
||||
RESERVED_USERNAMES = frozenset({"internal-tool", "api", "demo", "system"})
|
||||
|
||||
|
||||
def normalize_known_username(users: Dict[str, Any], username: str | None) -> Optional[str]:
|
||||
"""Return a normalized username only when it exists in the auth user map."""
|
||||
key = str(username or "").strip().lower()
|
||||
if not key or key not in users:
|
||||
return None
|
||||
return key
|
||||
|
||||
|
||||
def _hash_password(password: str) -> str:
|
||||
return bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt()).decode("utf-8")
|
||||
|
||||
@@ -96,6 +104,7 @@ class AuthManager:
|
||||
self._load()
|
||||
self._load_sessions()
|
||||
self._migrate_single_user()
|
||||
self._drop_reserved_loaded_users()
|
||||
self._migrate_legacy_admin_role()
|
||||
|
||||
def _load(self):
|
||||
@@ -148,7 +157,13 @@ class AuthManager:
|
||||
def _migrate_single_user(self):
|
||||
"""Migrate old single-user format to multi-user format."""
|
||||
if "password_hash" in self._config and "users" not in self._config:
|
||||
old_user = self._config.get("username", "admin")
|
||||
old_user = str(self._config.get("username", "admin") or "admin").strip().lower()
|
||||
if old_user in RESERVED_USERNAMES:
|
||||
logger.warning(
|
||||
"Migrating legacy single-user reserved username '%s' to 'admin'",
|
||||
old_user,
|
||||
)
|
||||
old_user = "admin"
|
||||
old_hash = self._config["password_hash"]
|
||||
self._config = {
|
||||
"users": {
|
||||
@@ -162,6 +177,30 @@ class AuthManager:
|
||||
self._save()
|
||||
logger.info(f"Migrated single-user auth to multi-user (admin: {old_user})")
|
||||
|
||||
def _drop_reserved_loaded_users(self):
|
||||
"""Fail closed for legacy/manual auth rows that collide with sentinels."""
|
||||
users = self._config.get("users")
|
||||
if not isinstance(users, dict):
|
||||
return
|
||||
normalized = {}
|
||||
removed = []
|
||||
for username, data in users.items():
|
||||
key = str(username or "").strip().lower()
|
||||
if not key:
|
||||
continue
|
||||
if key in RESERVED_USERNAMES:
|
||||
removed.append(key)
|
||||
continue
|
||||
normalized[key] = data
|
||||
if removed or normalized != users:
|
||||
self._config["users"] = normalized
|
||||
self._save()
|
||||
if removed:
|
||||
logger.warning(
|
||||
"Removed reserved username(s) from auth config: %s",
|
||||
", ".join(sorted(set(removed))),
|
||||
)
|
||||
|
||||
def _migrate_legacy_admin_role(self):
|
||||
"""Normalize setup.py's old role='admin' marker to is_admin=True."""
|
||||
changed = False
|
||||
|
||||
@@ -58,6 +58,62 @@ def test_rename_into_reserved_username_is_blocked(tmp_path):
|
||||
assert "bob" in mgr.users
|
||||
|
||||
|
||||
def test_legacy_reserved_username_is_removed_on_load(tmp_path):
|
||||
auth_path = tmp_path / "auth.json"
|
||||
auth_path.write_text(
|
||||
'{"users": {"internal-tool": {"password_hash": "unused", "is_admin": false}, '
|
||||
'"admin": {"password_hash": "unused", "is_admin": true}}}',
|
||||
encoding="utf-8",
|
||||
)
|
||||
mgr = _fresh_auth_manager(tmp_path)
|
||||
|
||||
assert "internal-tool" not in mgr.users
|
||||
assert "admin" in mgr.users
|
||||
assert "internal-tool" not in auth_path.read_text(encoding="utf-8")
|
||||
|
||||
|
||||
def test_legacy_reserved_username_session_cannot_authenticate(tmp_path):
|
||||
auth_path = tmp_path / "auth.json"
|
||||
sessions_path = tmp_path / "sessions.json"
|
||||
auth_path.write_text(
|
||||
'{"users": {"internal-tool": {"password_hash": "unused", "is_admin": false}}}',
|
||||
encoding="utf-8",
|
||||
)
|
||||
sessions_path.write_text(
|
||||
'{"tok": {"username": "internal-tool", "expiry": 9999999999}}',
|
||||
encoding="utf-8",
|
||||
)
|
||||
mgr = _fresh_auth_manager(tmp_path)
|
||||
|
||||
assert mgr.validate_token("tok") is False
|
||||
assert mgr.get_username_for_token("tok") is None
|
||||
|
||||
|
||||
def test_legacy_reserved_single_user_migrates_to_admin(tmp_path):
|
||||
auth_path = tmp_path / "auth.json"
|
||||
auth_path.write_text(
|
||||
'{"username": "internal-tool", "password_hash": "unused"}',
|
||||
encoding="utf-8",
|
||||
)
|
||||
mgr = _fresh_auth_manager(tmp_path)
|
||||
|
||||
assert "internal-tool" not in mgr.users
|
||||
assert "admin" in mgr.users
|
||||
assert mgr.is_admin("admin") is True
|
||||
|
||||
|
||||
def test_token_cache_owner_normalization_requires_current_user():
|
||||
clear_module("core.auth")
|
||||
from core.auth import normalize_known_username
|
||||
|
||||
users = {"alice": {}, "admin": {}}
|
||||
|
||||
assert normalize_known_username(users, " Alice ") == "alice"
|
||||
assert normalize_known_username(users, "internal-tool") is None
|
||||
assert normalize_known_username(users, "api") is None
|
||||
assert normalize_known_username(users, "") is None
|
||||
|
||||
|
||||
def test_normal_usernames_still_allowed(tmp_path):
|
||||
mgr = _fresh_auth_manager(tmp_path)
|
||||
assert mgr.create_user("alice", "pw-123456") is True
|
||||
|
||||
Reference in New Issue
Block a user