diff --git a/core/auth.py b/core/auth.py
index 2f9fd4e51..7f085c065 100644
--- a/core/auth.py
+++ b/core/auth.py
@@ -3,6 +3,7 @@ Authentication module — multi-user password hashing, session tokens, config pe
Config stored in data/auth.json. Uses bcrypt directly.
"""
+import enum
import json
import os
import secrets
@@ -83,6 +84,15 @@ def _verify_password(password: str, hashed: str) -> bool:
return bcrypt.checkpw(password.encode("utf-8"), hashed.encode("utf-8"))
+class SetAdminResult(enum.Enum):
+ """Outcome of AuthManager.set_admin, so callers can map each case to a
+ precise response instead of guessing from a bare bool."""
+ OK = "ok"
+ USER_NOT_FOUND = "user_not_found"
+ NOT_AUTHORIZED = "not_authorized" # requester is not an admin
+ LAST_ADMIN = "last_admin" # would remove the last remaining admin
+
+
class AuthManager:
"""Manages multi-user password + session-token auth system."""
@@ -387,6 +397,69 @@ class AuthManager:
logger.info(f"Updated privileges for '{username}': {current}")
return True
+ def set_admin(self, username: str, is_admin: bool,
+ requesting_user: str) -> SetAdminResult:
+ """Promote/demote an existing user to/from admin. Admin only.
+
+ Refuses to remove the last remaining admin so the instance can never
+ be locked out of admin access; self-demotion is allowed as long as
+ another admin remains. Admin status is re-checked live on every
+ request, so unlike delete/rename no session or token revocation is
+ needed — a demoted admin simply fails the next is_admin() gate.
+
+ Promotion stashes the user's current privilege map and demotion
+ restores it, so a temporary admin stint can't silently broaden a
+ user's non-admin access; users without a stash (created as admin,
+ or promoted before stashing existed) demote to DEFAULT_PRIVILEGES.
+
+ Counting admins and flipping the flag happen in one critical section
+ so two concurrent demotions can't race the admin count to zero.
+ """
+ username = (username or "").strip().lower()
+ requesting_user = (requesting_user or "").strip().lower()
+ is_admin = bool(is_admin)
+ with self._config_lock:
+ target = self._config.get("users", {}).get(username)
+ if target is None:
+ return SetAdminResult.USER_NOT_FOUND
+ if not self.users.get(requesting_user, {}).get("is_admin"):
+ return SetAdminResult.NOT_AUTHORIZED
+ currently_admin = bool(target.get("is_admin"))
+ if currently_admin == is_admin:
+ return SetAdminResult.OK # no-op; leave privileges untouched
+ if currently_admin and not is_admin:
+ admin_count = sum(1 for d in self.users.values() if d.get("is_admin"))
+ if admin_count <= 1:
+ return SetAdminResult.LAST_ADMIN
+ # Write order matters for lock-free readers: get_privileges()
+ # reads without _config_lock and trusts is_admin, so the admin
+ # flag must be flipped while the stored map is safe to expose —
+ # before writing admin privileges on promote, after restoring
+ # the pre-admin map on demote.
+ if is_admin:
+ target["is_admin"] = True
+ # Stash the pre-admin map so a later demotion can restore it.
+ # While is_admin is set the stored map is inert: get_privileges
+ # short-circuits to ADMIN_PRIVILEGES and set_privileges refuses
+ # admins, so only set_admin ever touches the stash.
+ target["privileges_before_admin"] = dict(
+ target.get("privileges") or DEFAULT_PRIVILEGES
+ )
+ target["privileges"] = dict(ADMIN_PRIVILEGES)
+ else:
+ # Restore the stashed pre-admin map. Fall back to defaults for
+ # users created as admins (their stored map is ADMIN_PRIVILEGES,
+ # which must not leak past demotion — e.g. can_use_bash) and
+ # for admins promoted before the stash existed.
+ target["privileges"] = dict(
+ target.pop("privileges_before_admin", None)
+ or DEFAULT_PRIVILEGES
+ )
+ target["is_admin"] = False
+ self._save()
+ logger.info("Set is_admin=%s for '%s' (by '%s')", is_admin, username, requesting_user)
+ return SetAdminResult.OK
+
def change_password(self, username: str, current_password: str, new_password: str) -> bool:
username = username.strip().lower()
if username not in self.users:
diff --git a/routes/auth_routes.py b/routes/auth_routes.py
index a9cc8ecb1..6173b0c14 100644
--- a/routes/auth_routes.py
+++ b/routes/auth_routes.py
@@ -12,7 +12,7 @@ import re
from pathlib import Path
from core.atomic_io import atomic_write_json, atomic_write_text
-from core.auth import AuthManager
+from core.auth import AuthManager, SetAdminResult
from src.constants import DEEP_RESEARCH_DIR, MEMORY_FILE, SKILLS_DIR
from src.rate_limiter import RateLimiter
from src.settings_scrub import scrub_settings
@@ -73,6 +73,11 @@ class DeleteUserRequest(BaseModel):
class RenameUserRequest(BaseModel):
username: str
+
+class SetAdminRequest(BaseModel):
+ is_admin: bool
+
+
class SetOpenRegistrationRequest(BaseModel):
enabled: bool
@@ -487,6 +492,31 @@ def setup_auth_routes(auth_manager: AuthManager) -> APIRouter:
invalidator()
return {"ok": True, "username": new_username, "renamed_self": old_username == user}
+ @router.put("/users/{username}/admin")
+ async def set_user_admin(username: str, body: SetAdminRequest, request: Request):
+ """Promote/demote a user to/from admin. Admin only.
+
+ The last remaining admin can't be demoted (no lockout). Self-demotion
+ is allowed while another admin exists; the `self` flag tells the UI to
+ reload the acting user into the normal-user view.
+ """
+ user = _get_current_user(request)
+ if not user or not auth_manager.is_admin(user):
+ raise HTTPException(403, "Admin only")
+ result = auth_manager.set_admin(username, body.is_admin, user)
+ if result is SetAdminResult.USER_NOT_FOUND:
+ raise HTTPException(404, "User not found")
+ if result is SetAdminResult.NOT_AUTHORIZED:
+ raise HTTPException(403, "Admin only")
+ if result is SetAdminResult.LAST_ADMIN:
+ raise HTTPException(400, "Cannot demote the last admin")
+ target = (username or "").strip().lower()
+ return {
+ "ok": True,
+ "is_admin": body.is_admin,
+ "self": target == (user or "").strip().lower(),
+ }
+
@router.post("/signup-toggle", deprecated=True)
async def toggle_signup(request: Request):
"""
diff --git a/static/js/admin.js b/static/js/admin.js
index 2c4288b40..e912c9471 100644
--- a/static/js/admin.js
+++ b/static/js/admin.js
@@ -55,6 +55,7 @@ async function loadUsers() {
+
${u.is_admin ? '' : ``}
${u.is_admin ? '' : ''}
@@ -113,7 +114,7 @@ async function loadUsers() {
// Toggle panel visibility + rotate chevron + load models
let _modelsLoaded = false;
header.addEventListener('click', (e) => {
- if (e.target.closest('.admin-btn-delete, [data-adm-rename-user]')) return;
+ if (e.target.closest('.admin-btn-delete, [data-adm-rename-user], [data-adm-toggle-admin]')) return;
privPanel.classList.toggle('hidden');
const chevron = header.querySelector('.admin-user-chevron');
if (chevron) {
@@ -199,6 +200,42 @@ async function loadUsers() {
});
}
+ // Promote / demote (admin toggle) — present on every row
+ const adminToggleBtn = row.querySelector('[data-adm-toggle-admin]');
+ if (adminToggleBtn) {
+ adminToggleBtn.addEventListener('click', async (e) => {
+ e.stopPropagation();
+ const username = adminToggleBtn.dataset.admToggleAdmin;
+ const makeAdmin = adminToggleBtn.dataset.makeAdmin === '1';
+ const confirmMsg = makeAdmin
+ ? `Grant admin rights to "${username}"? They'll get full access to all settings and users — including the power to demote or remove other admins (you included).`
+ : `Revoke admin rights from "${username}"? They'll lose access to the admin panel.`;
+ if (!await uiModule.styledConfirm(confirmMsg, { confirmText: makeAdmin ? 'Make admin' : 'Revoke admin', danger: !makeAdmin })) return;
+ adminToggleBtn.disabled = true;
+ try {
+ const res = await fetch(`/api/auth/users/${encodeURIComponent(username)}/admin`, {
+ method: 'PUT',
+ credentials: 'same-origin',
+ headers: { 'Content-Type': 'application/json' },
+ body: JSON.stringify({ is_admin: makeAdmin }),
+ });
+ const data = await res.json().catch(() => ({}));
+ if (!res.ok) {
+ uiModule.showError(data.detail || 'Failed to change admin status');
+ adminToggleBtn.disabled = false;
+ return;
+ }
+ // Demoting yourself drops your own admin access — reload into the
+ // normal-user view (mirrors the rename-self reload above).
+ if (data.self) { window.location.reload(); return; }
+ loadUsers();
+ } catch (err) {
+ uiModule.showError('Failed to change admin status');
+ adminToggleBtn.disabled = false;
+ }
+ });
+ }
+
list.appendChild(row);
});
} catch (e) { list.innerHTML = '
Failed to load users
'; }
diff --git a/tests/test_set_admin.py b/tests/test_set_admin.py
new file mode 100644
index 000000000..0d3b97172
--- /dev/null
+++ b/tests/test_set_admin.py
@@ -0,0 +1,317 @@
+"""Promote/demote users to/from admin (issue #2958).
+
+Covers AuthManager.set_admin (the core logic + last-admin lockout guard +
+privilege stash/restore on a real role change + no-op preservation) and the
+PUT /api/auth/users/{username}/admin route's status/envelope mapping.
+"""
+
+import asyncio
+import importlib
+import sys
+import types
+from pathlib import Path
+from types import SimpleNamespace
+from unittest.mock import MagicMock
+
+import pytest
+
+from fastapi import HTTPException
+
+from tests.helpers.import_state import clear_module
+
+
+# ---------------------------------------------------------------------------
+# Manager-level: real AuthManager on a temp auth.json (mirrors
+# tests/test_rename_user_case_insensitive.py).
+# ---------------------------------------------------------------------------
+
+def _real_core_package():
+ root = Path(__file__).resolve().parent.parent
+ core_path = str(root / "core")
+ core = sys.modules.get("core")
+ if core is None:
+ core = types.ModuleType("core")
+ sys.modules["core"] = core
+ core.__path__ = [core_path]
+ clear_module("core.auth")
+ return core
+
+
+def _fresh_auth_manager(tmp_path):
+ """Return (auth_module, AuthManager) with hashing stubbed for speed."""
+ auth_mod = importlib.import_module("core.auth", package=_real_core_package())
+ auth_mod._hash_password = lambda password: f"hash:{password}"
+ auth_mod._verify_password = lambda password, hashed: hashed == f"hash:{password}"
+ mgr = auth_mod.AuthManager(str(tmp_path / "auth.json"))
+ return auth_mod, mgr
+
+
+def test_promote_sets_admin_flag_and_admin_privileges(tmp_path):
+ auth_mod, mgr = _fresh_auth_manager(tmp_path)
+ assert mgr.create_user("admin", "pw-123456", is_admin=True) is True
+ assert mgr.create_user("bob", "pw-123456") is True
+
+ result = mgr.set_admin("bob", True, "admin")
+
+ assert result is auth_mod.SetAdminResult.OK
+ assert mgr.is_admin("bob") is True
+ assert mgr.users["bob"]["privileges"] == auth_mod.ADMIN_PRIVILEGES
+
+
+def test_demote_with_two_admins_resets_to_default_privileges(tmp_path):
+ auth_mod, mgr = _fresh_auth_manager(tmp_path)
+ mgr.create_user("admin", "pw-123456", is_admin=True)
+ mgr.create_user("bob", "pw-123456", is_admin=True)
+
+ result = mgr.set_admin("bob", False, "admin")
+
+ assert result is auth_mod.SetAdminResult.OK
+ assert mgr.is_admin("bob") is False
+ assert mgr.users["bob"]["privileges"] == auth_mod.DEFAULT_PRIVILEGES
+
+
+def test_demote_last_admin_is_blocked(tmp_path):
+ auth_mod, mgr = _fresh_auth_manager(tmp_path)
+ mgr.create_user("admin", "pw-123456", is_admin=True)
+
+ result = mgr.set_admin("admin", False, "admin")
+
+ assert result is auth_mod.SetAdminResult.LAST_ADMIN
+ assert mgr.is_admin("admin") is True # unchanged
+
+
+def test_self_demote_allowed_when_another_admin_exists(tmp_path):
+ auth_mod, mgr = _fresh_auth_manager(tmp_path)
+ mgr.create_user("admin", "pw-123456", is_admin=True)
+ mgr.create_user("bob", "pw-123456", is_admin=True)
+
+ result = mgr.set_admin("admin", False, "admin") # admin demotes self
+
+ assert result is auth_mod.SetAdminResult.OK
+ assert mgr.is_admin("admin") is False
+ assert mgr.is_admin("bob") is True
+
+
+def test_cannot_demote_past_the_last_admin_sequentially(tmp_path):
+ auth_mod, mgr = _fresh_auth_manager(tmp_path)
+ mgr.create_user("admin", "pw-123456", is_admin=True)
+ mgr.create_user("bob", "pw-123456", is_admin=True)
+
+ assert mgr.set_admin("bob", False, "admin") is auth_mod.SetAdminResult.OK
+ # Now "admin" is the only admin left — demoting them must be refused.
+ assert mgr.set_admin("admin", False, "admin") is auth_mod.SetAdminResult.LAST_ADMIN
+ assert mgr.is_admin("admin") is True
+
+
+def test_non_admin_requester_is_rejected(tmp_path):
+ auth_mod, mgr = _fresh_auth_manager(tmp_path)
+ mgr.create_user("admin", "pw-123456", is_admin=True)
+ mgr.create_user("bob", "pw-123456")
+ mgr.create_user("carol", "pw-123456")
+
+ result = mgr.set_admin("carol", True, "bob") # bob is not an admin
+
+ assert result is auth_mod.SetAdminResult.NOT_AUTHORIZED
+ assert mgr.is_admin("carol") is False
+
+
+def test_unknown_target_user_returns_not_found(tmp_path):
+ auth_mod, mgr = _fresh_auth_manager(tmp_path)
+ mgr.create_user("admin", "pw-123456", is_admin=True)
+
+ result = mgr.set_admin("ghost", True, "admin")
+
+ assert result is auth_mod.SetAdminResult.USER_NOT_FOUND
+
+
+def test_noop_demote_of_regular_user_preserves_custom_privileges(tmp_path):
+ auth_mod, mgr = _fresh_auth_manager(tmp_path)
+ mgr.create_user("admin", "pw-123456", is_admin=True)
+ mgr.create_user("bob", "pw-123456")
+ # Give bob a non-default privilege; DEFAULT_PRIVILEGES has can_use_bash=False.
+ assert mgr.set_privileges("bob", {"can_use_bash": True}) is True
+
+ result = mgr.set_admin("bob", False, "admin") # already a regular user
+
+ assert result is auth_mod.SetAdminResult.OK
+ # Privileges must NOT have been reset to defaults by the no-op.
+ assert mgr.users["bob"]["privileges"]["can_use_bash"] is True
+
+
+def test_demote_restores_pre_admin_privilege_restrictions(tmp_path):
+ auth_mod, mgr = _fresh_auth_manager(tmp_path)
+ mgr.create_user("admin", "pw-123456", is_admin=True)
+ mgr.create_user("bob", "pw-123456")
+ # Tighten bob below the defaults before promoting him.
+ assert mgr.set_privileges("bob", {
+ "can_use_agent": False,
+ "can_generate_images": False,
+ "max_messages_per_day": 50,
+ }) is True
+ restricted = mgr.get_privileges("bob")
+
+ assert mgr.set_admin("bob", True, "admin") is auth_mod.SetAdminResult.OK
+ assert mgr.set_admin("bob", False, "admin") is auth_mod.SetAdminResult.OK
+
+ # Demotion must restore the pre-admin policy, not reset to defaults.
+ assert mgr.get_privileges("bob") == restricted
+ assert mgr.get_privileges("bob")["can_use_agent"] is False
+ assert mgr.get_privileges("bob")["max_messages_per_day"] == 50
+
+
+def test_promote_demote_round_trip_is_stable_and_cleans_up_stash(tmp_path):
+ auth_mod, mgr = _fresh_auth_manager(tmp_path)
+ mgr.create_user("admin", "pw-123456", is_admin=True)
+ mgr.create_user("bob", "pw-123456")
+ assert mgr.set_privileges("bob", {"can_use_browser": False}) is True
+ restricted = mgr.get_privileges("bob")
+
+ for _ in range(2): # two full promote/demote cycles
+ assert mgr.set_admin("bob", True, "admin") is auth_mod.SetAdminResult.OK
+ assert mgr.set_admin("bob", False, "admin") is auth_mod.SetAdminResult.OK
+
+ assert mgr.get_privileges("bob") == restricted
+ # The stash is promotion-time bookkeeping; it must not linger on the row.
+ assert "privileges_before_admin" not in mgr.users["bob"]
+
+
+def test_redundant_promote_does_not_clobber_stash(tmp_path):
+ auth_mod, mgr = _fresh_auth_manager(tmp_path)
+ mgr.create_user("admin", "pw-123456", is_admin=True)
+ mgr.create_user("bob", "pw-123456")
+ assert mgr.set_privileges("bob", {"can_use_agent": False}) is True
+ restricted = mgr.get_privileges("bob")
+
+ assert mgr.set_admin("bob", True, "admin") is auth_mod.SetAdminResult.OK
+ # A second promote is a no-op and must not re-stash ADMIN_PRIVILEGES.
+ assert mgr.set_admin("bob", True, "admin") is auth_mod.SetAdminResult.OK
+ assert mgr.set_admin("bob", False, "admin") is auth_mod.SetAdminResult.OK
+
+ # Demotion must still restore the original pre-admin restrictions.
+ assert mgr.get_privileges("bob") == restricted
+ assert mgr.get_privileges("bob")["can_use_agent"] is False
+
+
+def test_pre_admin_privileges_survive_manager_reload(tmp_path):
+ auth_mod, mgr = _fresh_auth_manager(tmp_path)
+ mgr.create_user("admin", "pw-123456", is_admin=True)
+ mgr.create_user("bob", "pw-123456")
+ assert mgr.set_privileges("bob", {"can_use_research": False}) is True
+ assert mgr.set_admin("bob", True, "admin") is auth_mod.SetAdminResult.OK
+
+ # Fresh manager on the same auth.json — the stash must round-trip disk.
+ mgr2 = auth_mod.AuthManager(str(tmp_path / "auth.json"))
+ assert mgr2.set_admin("bob", False, "admin") is auth_mod.SetAdminResult.OK
+ assert mgr2.get_privileges("bob")["can_use_research"] is False
+
+
+# ---------------------------------------------------------------------------
+# Route-level: PUT /api/auth/users/{username}/admin (mirrors
+# tests/test_auth_regressions.py). SetAdminResult is read from the route
+# module's own namespace so the route and the test share one enum object.
+# ---------------------------------------------------------------------------
+
+_ADMIN_ROUTE = "/api/auth/users/{username}/admin"
+
+
+def _auth_route_endpoint(path, method):
+ from routes.auth_routes import setup_auth_routes
+
+ auth_manager = MagicMock()
+ router = setup_auth_routes(auth_manager)
+ for route in router.routes:
+ if getattr(route, "path", "") == path and method in getattr(route, "methods", set()):
+ return auth_manager, route.endpoint
+ raise AssertionError(f"{method} {path} route not registered")
+
+
+def _fake_auth_request(token="session-token"):
+ from routes.auth_routes import SESSION_COOKIE
+
+ req = SimpleNamespace()
+ req.cookies = {SESSION_COOKIE: token}
+ req.client = SimpleNamespace(host="127.0.0.1")
+ return req
+
+
+def _result_enum():
+ import routes.auth_routes as ar
+
+ return ar.SetAdminResult
+
+
+def test_route_requires_admin():
+ from routes.auth_routes import SetAdminRequest
+
+ auth, target = _auth_route_endpoint(_ADMIN_ROUTE, "PUT")
+ auth.get_username_for_token.return_value = "bob"
+ auth.is_admin.return_value = False
+
+ with pytest.raises(HTTPException) as exc:
+ asyncio.run(target(username="carol", body=SetAdminRequest(is_admin=True),
+ request=_fake_auth_request()))
+
+ assert exc.value.status_code == 403
+ auth.set_admin.assert_not_called()
+
+
+def test_route_last_admin_returns_400():
+ from routes.auth_routes import SetAdminRequest
+
+ R = _result_enum()
+ auth, target = _auth_route_endpoint(_ADMIN_ROUTE, "PUT")
+ auth.get_username_for_token.return_value = "admin"
+ auth.is_admin.return_value = True
+ auth.set_admin.return_value = R.LAST_ADMIN
+
+ with pytest.raises(HTTPException) as exc:
+ asyncio.run(target(username="admin", body=SetAdminRequest(is_admin=False),
+ request=_fake_auth_request()))
+
+ assert exc.value.status_code == 400
+
+
+def test_route_user_not_found_returns_404():
+ from routes.auth_routes import SetAdminRequest
+
+ R = _result_enum()
+ auth, target = _auth_route_endpoint(_ADMIN_ROUTE, "PUT")
+ auth.get_username_for_token.return_value = "admin"
+ auth.is_admin.return_value = True
+ auth.set_admin.return_value = R.USER_NOT_FOUND
+
+ with pytest.raises(HTTPException) as exc:
+ asyncio.run(target(username="ghost", body=SetAdminRequest(is_admin=True),
+ request=_fake_auth_request()))
+
+ assert exc.value.status_code == 404
+
+
+def test_route_success_returns_envelope():
+ from routes.auth_routes import SetAdminRequest
+
+ R = _result_enum()
+ auth, target = _auth_route_endpoint(_ADMIN_ROUTE, "PUT")
+ auth.get_username_for_token.return_value = "admin"
+ auth.is_admin.return_value = True
+ auth.set_admin.return_value = R.OK
+
+ out = asyncio.run(target(username="bob", body=SetAdminRequest(is_admin=True),
+ request=_fake_auth_request()))
+
+ assert out == {"ok": True, "is_admin": True, "self": False}
+
+
+def test_route_self_flag_true_when_targeting_own_account():
+ from routes.auth_routes import SetAdminRequest
+
+ R = _result_enum()
+ auth, target = _auth_route_endpoint(_ADMIN_ROUTE, "PUT")
+ auth.get_username_for_token.return_value = "admin"
+ auth.is_admin.return_value = True
+ auth.set_admin.return_value = R.OK
+
+ out = asyncio.run(target(username="Admin", body=SetAdminRequest(is_admin=False),
+ request=_fake_auth_request()))
+
+ assert out == {"ok": True, "is_admin": False, "self": True}