From 8fe98cf471d30c9a2f2bde340121c197e6645f67 Mon Sep 17 00:00:00 2001 From: Merajul Arefin Date: Mon, 15 Jun 2026 16:44:27 +0600 Subject: [PATCH] feat(auth): add per-user admin promote/demote toggle (#3078) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat(auth): add per-user admin promote/demote toggle Admin-only API and Users-tab control to grant/revoke admin rights; refuses to demote the last admin. * fix(auth): restore pre-admin privilege restrictions on demotion Promoting now stashes the user's privilege map (privileges_before_admin) and demoting restores it instead of resetting to defaults, so a promote/demote round trip can no longer broaden a restricted user's access. Users without a stash (created as admin, or promoted before this fix) still demote to DEFAULT_PRIVILEGES so a born-admin's stored all-True map — including can_use_bash — can't survive demotion. --------- Co-authored-by: K M Merajul Arefin --- core/auth.py | 73 +++++++++ routes/auth_routes.py | 32 +++- static/js/admin.js | 39 ++++- tests/test_set_admin.py | 317 ++++++++++++++++++++++++++++++++++++++++ 4 files changed, 459 insertions(+), 2 deletions(-) create mode 100644 tests/test_set_admin.py diff --git a/core/auth.py b/core/auth.py index 2f9fd4e51..7f085c065 100644 --- a/core/auth.py +++ b/core/auth.py @@ -3,6 +3,7 @@ Authentication module — multi-user password hashing, session tokens, config pe Config stored in data/auth.json. Uses bcrypt directly. """ +import enum import json import os import secrets @@ -83,6 +84,15 @@ def _verify_password(password: str, hashed: str) -> bool: return bcrypt.checkpw(password.encode("utf-8"), hashed.encode("utf-8")) +class SetAdminResult(enum.Enum): + """Outcome of AuthManager.set_admin, so callers can map each case to a + precise response instead of guessing from a bare bool.""" + OK = "ok" + USER_NOT_FOUND = "user_not_found" + NOT_AUTHORIZED = "not_authorized" # requester is not an admin + LAST_ADMIN = "last_admin" # would remove the last remaining admin + + class AuthManager: """Manages multi-user password + session-token auth system.""" @@ -387,6 +397,69 @@ class AuthManager: logger.info(f"Updated privileges for '{username}': {current}") return True + def set_admin(self, username: str, is_admin: bool, + requesting_user: str) -> SetAdminResult: + """Promote/demote an existing user to/from admin. Admin only. + + Refuses to remove the last remaining admin so the instance can never + be locked out of admin access; self-demotion is allowed as long as + another admin remains. Admin status is re-checked live on every + request, so unlike delete/rename no session or token revocation is + needed — a demoted admin simply fails the next is_admin() gate. + + Promotion stashes the user's current privilege map and demotion + restores it, so a temporary admin stint can't silently broaden a + user's non-admin access; users without a stash (created as admin, + or promoted before stashing existed) demote to DEFAULT_PRIVILEGES. + + Counting admins and flipping the flag happen in one critical section + so two concurrent demotions can't race the admin count to zero. + """ + username = (username or "").strip().lower() + requesting_user = (requesting_user or "").strip().lower() + is_admin = bool(is_admin) + with self._config_lock: + target = self._config.get("users", {}).get(username) + if target is None: + return SetAdminResult.USER_NOT_FOUND + if not self.users.get(requesting_user, {}).get("is_admin"): + return SetAdminResult.NOT_AUTHORIZED + currently_admin = bool(target.get("is_admin")) + if currently_admin == is_admin: + return SetAdminResult.OK # no-op; leave privileges untouched + if currently_admin and not is_admin: + admin_count = sum(1 for d in self.users.values() if d.get("is_admin")) + if admin_count <= 1: + return SetAdminResult.LAST_ADMIN + # Write order matters for lock-free readers: get_privileges() + # reads without _config_lock and trusts is_admin, so the admin + # flag must be flipped while the stored map is safe to expose — + # before writing admin privileges on promote, after restoring + # the pre-admin map on demote. + if is_admin: + target["is_admin"] = True + # Stash the pre-admin map so a later demotion can restore it. + # While is_admin is set the stored map is inert: get_privileges + # short-circuits to ADMIN_PRIVILEGES and set_privileges refuses + # admins, so only set_admin ever touches the stash. + target["privileges_before_admin"] = dict( + target.get("privileges") or DEFAULT_PRIVILEGES + ) + target["privileges"] = dict(ADMIN_PRIVILEGES) + else: + # Restore the stashed pre-admin map. Fall back to defaults for + # users created as admins (their stored map is ADMIN_PRIVILEGES, + # which must not leak past demotion — e.g. can_use_bash) and + # for admins promoted before the stash existed. + target["privileges"] = dict( + target.pop("privileges_before_admin", None) + or DEFAULT_PRIVILEGES + ) + target["is_admin"] = False + self._save() + logger.info("Set is_admin=%s for '%s' (by '%s')", is_admin, username, requesting_user) + return SetAdminResult.OK + def change_password(self, username: str, current_password: str, new_password: str) -> bool: username = username.strip().lower() if username not in self.users: diff --git a/routes/auth_routes.py b/routes/auth_routes.py index a9cc8ecb1..6173b0c14 100644 --- a/routes/auth_routes.py +++ b/routes/auth_routes.py @@ -12,7 +12,7 @@ import re from pathlib import Path from core.atomic_io import atomic_write_json, atomic_write_text -from core.auth import AuthManager +from core.auth import AuthManager, SetAdminResult from src.constants import DEEP_RESEARCH_DIR, MEMORY_FILE, SKILLS_DIR from src.rate_limiter import RateLimiter from src.settings_scrub import scrub_settings @@ -73,6 +73,11 @@ class DeleteUserRequest(BaseModel): class RenameUserRequest(BaseModel): username: str + +class SetAdminRequest(BaseModel): + is_admin: bool + + class SetOpenRegistrationRequest(BaseModel): enabled: bool @@ -487,6 +492,31 @@ def setup_auth_routes(auth_manager: AuthManager) -> APIRouter: invalidator() return {"ok": True, "username": new_username, "renamed_self": old_username == user} + @router.put("/users/{username}/admin") + async def set_user_admin(username: str, body: SetAdminRequest, request: Request): + """Promote/demote a user to/from admin. Admin only. + + The last remaining admin can't be demoted (no lockout). Self-demotion + is allowed while another admin exists; the `self` flag tells the UI to + reload the acting user into the normal-user view. + """ + user = _get_current_user(request) + if not user or not auth_manager.is_admin(user): + raise HTTPException(403, "Admin only") + result = auth_manager.set_admin(username, body.is_admin, user) + if result is SetAdminResult.USER_NOT_FOUND: + raise HTTPException(404, "User not found") + if result is SetAdminResult.NOT_AUTHORIZED: + raise HTTPException(403, "Admin only") + if result is SetAdminResult.LAST_ADMIN: + raise HTTPException(400, "Cannot demote the last admin") + target = (username or "").strip().lower() + return { + "ok": True, + "is_admin": body.is_admin, + "self": target == (user or "").strip().lower(), + } + @router.post("/signup-toggle", deprecated=True) async def toggle_signup(request: Request): """ diff --git a/static/js/admin.js b/static/js/admin.js index 2c4288b40..e912c9471 100644 --- a/static/js/admin.js +++ b/static/js/admin.js @@ -55,6 +55,7 @@ async function loadUsers() {
+ ${u.is_admin ? '' : ``} ${u.is_admin ? '' : ''} @@ -113,7 +114,7 @@ async function loadUsers() { // Toggle panel visibility + rotate chevron + load models let _modelsLoaded = false; header.addEventListener('click', (e) => { - if (e.target.closest('.admin-btn-delete, [data-adm-rename-user]')) return; + if (e.target.closest('.admin-btn-delete, [data-adm-rename-user], [data-adm-toggle-admin]')) return; privPanel.classList.toggle('hidden'); const chevron = header.querySelector('.admin-user-chevron'); if (chevron) { @@ -199,6 +200,42 @@ async function loadUsers() { }); } + // Promote / demote (admin toggle) — present on every row + const adminToggleBtn = row.querySelector('[data-adm-toggle-admin]'); + if (adminToggleBtn) { + adminToggleBtn.addEventListener('click', async (e) => { + e.stopPropagation(); + const username = adminToggleBtn.dataset.admToggleAdmin; + const makeAdmin = adminToggleBtn.dataset.makeAdmin === '1'; + const confirmMsg = makeAdmin + ? `Grant admin rights to "${username}"? They'll get full access to all settings and users — including the power to demote or remove other admins (you included).` + : `Revoke admin rights from "${username}"? They'll lose access to the admin panel.`; + if (!await uiModule.styledConfirm(confirmMsg, { confirmText: makeAdmin ? 'Make admin' : 'Revoke admin', danger: !makeAdmin })) return; + adminToggleBtn.disabled = true; + try { + const res = await fetch(`/api/auth/users/${encodeURIComponent(username)}/admin`, { + method: 'PUT', + credentials: 'same-origin', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ is_admin: makeAdmin }), + }); + const data = await res.json().catch(() => ({})); + if (!res.ok) { + uiModule.showError(data.detail || 'Failed to change admin status'); + adminToggleBtn.disabled = false; + return; + } + // Demoting yourself drops your own admin access — reload into the + // normal-user view (mirrors the rename-self reload above). + if (data.self) { window.location.reload(); return; } + loadUsers(); + } catch (err) { + uiModule.showError('Failed to change admin status'); + adminToggleBtn.disabled = false; + } + }); + } + list.appendChild(row); }); } catch (e) { list.innerHTML = '
Failed to load users
'; } diff --git a/tests/test_set_admin.py b/tests/test_set_admin.py new file mode 100644 index 000000000..0d3b97172 --- /dev/null +++ b/tests/test_set_admin.py @@ -0,0 +1,317 @@ +"""Promote/demote users to/from admin (issue #2958). + +Covers AuthManager.set_admin (the core logic + last-admin lockout guard + +privilege stash/restore on a real role change + no-op preservation) and the +PUT /api/auth/users/{username}/admin route's status/envelope mapping. +""" + +import asyncio +import importlib +import sys +import types +from pathlib import Path +from types import SimpleNamespace +from unittest.mock import MagicMock + +import pytest + +from fastapi import HTTPException + +from tests.helpers.import_state import clear_module + + +# --------------------------------------------------------------------------- +# Manager-level: real AuthManager on a temp auth.json (mirrors +# tests/test_rename_user_case_insensitive.py). +# --------------------------------------------------------------------------- + +def _real_core_package(): + root = Path(__file__).resolve().parent.parent + core_path = str(root / "core") + core = sys.modules.get("core") + if core is None: + core = types.ModuleType("core") + sys.modules["core"] = core + core.__path__ = [core_path] + clear_module("core.auth") + return core + + +def _fresh_auth_manager(tmp_path): + """Return (auth_module, AuthManager) with hashing stubbed for speed.""" + auth_mod = importlib.import_module("core.auth", package=_real_core_package()) + auth_mod._hash_password = lambda password: f"hash:{password}" + auth_mod._verify_password = lambda password, hashed: hashed == f"hash:{password}" + mgr = auth_mod.AuthManager(str(tmp_path / "auth.json")) + return auth_mod, mgr + + +def test_promote_sets_admin_flag_and_admin_privileges(tmp_path): + auth_mod, mgr = _fresh_auth_manager(tmp_path) + assert mgr.create_user("admin", "pw-123456", is_admin=True) is True + assert mgr.create_user("bob", "pw-123456") is True + + result = mgr.set_admin("bob", True, "admin") + + assert result is auth_mod.SetAdminResult.OK + assert mgr.is_admin("bob") is True + assert mgr.users["bob"]["privileges"] == auth_mod.ADMIN_PRIVILEGES + + +def test_demote_with_two_admins_resets_to_default_privileges(tmp_path): + auth_mod, mgr = _fresh_auth_manager(tmp_path) + mgr.create_user("admin", "pw-123456", is_admin=True) + mgr.create_user("bob", "pw-123456", is_admin=True) + + result = mgr.set_admin("bob", False, "admin") + + assert result is auth_mod.SetAdminResult.OK + assert mgr.is_admin("bob") is False + assert mgr.users["bob"]["privileges"] == auth_mod.DEFAULT_PRIVILEGES + + +def test_demote_last_admin_is_blocked(tmp_path): + auth_mod, mgr = _fresh_auth_manager(tmp_path) + mgr.create_user("admin", "pw-123456", is_admin=True) + + result = mgr.set_admin("admin", False, "admin") + + assert result is auth_mod.SetAdminResult.LAST_ADMIN + assert mgr.is_admin("admin") is True # unchanged + + +def test_self_demote_allowed_when_another_admin_exists(tmp_path): + auth_mod, mgr = _fresh_auth_manager(tmp_path) + mgr.create_user("admin", "pw-123456", is_admin=True) + mgr.create_user("bob", "pw-123456", is_admin=True) + + result = mgr.set_admin("admin", False, "admin") # admin demotes self + + assert result is auth_mod.SetAdminResult.OK + assert mgr.is_admin("admin") is False + assert mgr.is_admin("bob") is True + + +def test_cannot_demote_past_the_last_admin_sequentially(tmp_path): + auth_mod, mgr = _fresh_auth_manager(tmp_path) + mgr.create_user("admin", "pw-123456", is_admin=True) + mgr.create_user("bob", "pw-123456", is_admin=True) + + assert mgr.set_admin("bob", False, "admin") is auth_mod.SetAdminResult.OK + # Now "admin" is the only admin left — demoting them must be refused. + assert mgr.set_admin("admin", False, "admin") is auth_mod.SetAdminResult.LAST_ADMIN + assert mgr.is_admin("admin") is True + + +def test_non_admin_requester_is_rejected(tmp_path): + auth_mod, mgr = _fresh_auth_manager(tmp_path) + mgr.create_user("admin", "pw-123456", is_admin=True) + mgr.create_user("bob", "pw-123456") + mgr.create_user("carol", "pw-123456") + + result = mgr.set_admin("carol", True, "bob") # bob is not an admin + + assert result is auth_mod.SetAdminResult.NOT_AUTHORIZED + assert mgr.is_admin("carol") is False + + +def test_unknown_target_user_returns_not_found(tmp_path): + auth_mod, mgr = _fresh_auth_manager(tmp_path) + mgr.create_user("admin", "pw-123456", is_admin=True) + + result = mgr.set_admin("ghost", True, "admin") + + assert result is auth_mod.SetAdminResult.USER_NOT_FOUND + + +def test_noop_demote_of_regular_user_preserves_custom_privileges(tmp_path): + auth_mod, mgr = _fresh_auth_manager(tmp_path) + mgr.create_user("admin", "pw-123456", is_admin=True) + mgr.create_user("bob", "pw-123456") + # Give bob a non-default privilege; DEFAULT_PRIVILEGES has can_use_bash=False. + assert mgr.set_privileges("bob", {"can_use_bash": True}) is True + + result = mgr.set_admin("bob", False, "admin") # already a regular user + + assert result is auth_mod.SetAdminResult.OK + # Privileges must NOT have been reset to defaults by the no-op. + assert mgr.users["bob"]["privileges"]["can_use_bash"] is True + + +def test_demote_restores_pre_admin_privilege_restrictions(tmp_path): + auth_mod, mgr = _fresh_auth_manager(tmp_path) + mgr.create_user("admin", "pw-123456", is_admin=True) + mgr.create_user("bob", "pw-123456") + # Tighten bob below the defaults before promoting him. + assert mgr.set_privileges("bob", { + "can_use_agent": False, + "can_generate_images": False, + "max_messages_per_day": 50, + }) is True + restricted = mgr.get_privileges("bob") + + assert mgr.set_admin("bob", True, "admin") is auth_mod.SetAdminResult.OK + assert mgr.set_admin("bob", False, "admin") is auth_mod.SetAdminResult.OK + + # Demotion must restore the pre-admin policy, not reset to defaults. + assert mgr.get_privileges("bob") == restricted + assert mgr.get_privileges("bob")["can_use_agent"] is False + assert mgr.get_privileges("bob")["max_messages_per_day"] == 50 + + +def test_promote_demote_round_trip_is_stable_and_cleans_up_stash(tmp_path): + auth_mod, mgr = _fresh_auth_manager(tmp_path) + mgr.create_user("admin", "pw-123456", is_admin=True) + mgr.create_user("bob", "pw-123456") + assert mgr.set_privileges("bob", {"can_use_browser": False}) is True + restricted = mgr.get_privileges("bob") + + for _ in range(2): # two full promote/demote cycles + assert mgr.set_admin("bob", True, "admin") is auth_mod.SetAdminResult.OK + assert mgr.set_admin("bob", False, "admin") is auth_mod.SetAdminResult.OK + + assert mgr.get_privileges("bob") == restricted + # The stash is promotion-time bookkeeping; it must not linger on the row. + assert "privileges_before_admin" not in mgr.users["bob"] + + +def test_redundant_promote_does_not_clobber_stash(tmp_path): + auth_mod, mgr = _fresh_auth_manager(tmp_path) + mgr.create_user("admin", "pw-123456", is_admin=True) + mgr.create_user("bob", "pw-123456") + assert mgr.set_privileges("bob", {"can_use_agent": False}) is True + restricted = mgr.get_privileges("bob") + + assert mgr.set_admin("bob", True, "admin") is auth_mod.SetAdminResult.OK + # A second promote is a no-op and must not re-stash ADMIN_PRIVILEGES. + assert mgr.set_admin("bob", True, "admin") is auth_mod.SetAdminResult.OK + assert mgr.set_admin("bob", False, "admin") is auth_mod.SetAdminResult.OK + + # Demotion must still restore the original pre-admin restrictions. + assert mgr.get_privileges("bob") == restricted + assert mgr.get_privileges("bob")["can_use_agent"] is False + + +def test_pre_admin_privileges_survive_manager_reload(tmp_path): + auth_mod, mgr = _fresh_auth_manager(tmp_path) + mgr.create_user("admin", "pw-123456", is_admin=True) + mgr.create_user("bob", "pw-123456") + assert mgr.set_privileges("bob", {"can_use_research": False}) is True + assert mgr.set_admin("bob", True, "admin") is auth_mod.SetAdminResult.OK + + # Fresh manager on the same auth.json — the stash must round-trip disk. + mgr2 = auth_mod.AuthManager(str(tmp_path / "auth.json")) + assert mgr2.set_admin("bob", False, "admin") is auth_mod.SetAdminResult.OK + assert mgr2.get_privileges("bob")["can_use_research"] is False + + +# --------------------------------------------------------------------------- +# Route-level: PUT /api/auth/users/{username}/admin (mirrors +# tests/test_auth_regressions.py). SetAdminResult is read from the route +# module's own namespace so the route and the test share one enum object. +# --------------------------------------------------------------------------- + +_ADMIN_ROUTE = "/api/auth/users/{username}/admin" + + +def _auth_route_endpoint(path, method): + from routes.auth_routes import setup_auth_routes + + auth_manager = MagicMock() + router = setup_auth_routes(auth_manager) + for route in router.routes: + if getattr(route, "path", "") == path and method in getattr(route, "methods", set()): + return auth_manager, route.endpoint + raise AssertionError(f"{method} {path} route not registered") + + +def _fake_auth_request(token="session-token"): + from routes.auth_routes import SESSION_COOKIE + + req = SimpleNamespace() + req.cookies = {SESSION_COOKIE: token} + req.client = SimpleNamespace(host="127.0.0.1") + return req + + +def _result_enum(): + import routes.auth_routes as ar + + return ar.SetAdminResult + + +def test_route_requires_admin(): + from routes.auth_routes import SetAdminRequest + + auth, target = _auth_route_endpoint(_ADMIN_ROUTE, "PUT") + auth.get_username_for_token.return_value = "bob" + auth.is_admin.return_value = False + + with pytest.raises(HTTPException) as exc: + asyncio.run(target(username="carol", body=SetAdminRequest(is_admin=True), + request=_fake_auth_request())) + + assert exc.value.status_code == 403 + auth.set_admin.assert_not_called() + + +def test_route_last_admin_returns_400(): + from routes.auth_routes import SetAdminRequest + + R = _result_enum() + auth, target = _auth_route_endpoint(_ADMIN_ROUTE, "PUT") + auth.get_username_for_token.return_value = "admin" + auth.is_admin.return_value = True + auth.set_admin.return_value = R.LAST_ADMIN + + with pytest.raises(HTTPException) as exc: + asyncio.run(target(username="admin", body=SetAdminRequest(is_admin=False), + request=_fake_auth_request())) + + assert exc.value.status_code == 400 + + +def test_route_user_not_found_returns_404(): + from routes.auth_routes import SetAdminRequest + + R = _result_enum() + auth, target = _auth_route_endpoint(_ADMIN_ROUTE, "PUT") + auth.get_username_for_token.return_value = "admin" + auth.is_admin.return_value = True + auth.set_admin.return_value = R.USER_NOT_FOUND + + with pytest.raises(HTTPException) as exc: + asyncio.run(target(username="ghost", body=SetAdminRequest(is_admin=True), + request=_fake_auth_request())) + + assert exc.value.status_code == 404 + + +def test_route_success_returns_envelope(): + from routes.auth_routes import SetAdminRequest + + R = _result_enum() + auth, target = _auth_route_endpoint(_ADMIN_ROUTE, "PUT") + auth.get_username_for_token.return_value = "admin" + auth.is_admin.return_value = True + auth.set_admin.return_value = R.OK + + out = asyncio.run(target(username="bob", body=SetAdminRequest(is_admin=True), + request=_fake_auth_request())) + + assert out == {"ok": True, "is_admin": True, "self": False} + + +def test_route_self_flag_true_when_targeting_own_account(): + from routes.auth_routes import SetAdminRequest + + R = _result_enum() + auth, target = _auth_route_endpoint(_ADMIN_ROUTE, "PUT") + auth.get_username_for_token.return_value = "admin" + auth.is_admin.return_value = True + auth.set_admin.return_value = R.OK + + out = asyncio.run(target(username="Admin", body=SetAdminRequest(is_admin=False), + request=_fake_auth_request())) + + assert out == {"ok": True, "is_admin": False, "self": True}