Files
odysseus/tests/test_set_admin.py
Merajul Arefin 8fe98cf471 feat(auth): add per-user admin promote/demote toggle (#3078)
* feat(auth): add per-user admin promote/demote toggle

Admin-only API and Users-tab control to grant/revoke admin rights; refuses to demote the last admin.

* fix(auth): restore pre-admin privilege restrictions on demotion

Promoting now stashes the user's privilege map (privileges_before_admin)
and demoting restores it instead of resetting to defaults, so a
promote/demote round trip can no longer broaden a restricted user's
access. Users without a stash (created as admin, or promoted before this
fix) still demote to DEFAULT_PRIVILEGES so a born-admin's stored all-True
map — including can_use_bash — can't survive demotion.

---------

Co-authored-by: K M Merajul Arefin <merajul.arefin@therapservices.net>
2026-06-15 10:44:27 +00:00

318 lines
12 KiB
Python

"""Promote/demote users to/from admin (issue #2958).
Covers AuthManager.set_admin (the core logic + last-admin lockout guard +
privilege stash/restore on a real role change + no-op preservation) and the
PUT /api/auth/users/{username}/admin route's status/envelope mapping.
"""
import asyncio
import importlib
import sys
import types
from pathlib import Path
from types import SimpleNamespace
from unittest.mock import MagicMock
import pytest
from fastapi import HTTPException
from tests.helpers.import_state import clear_module
# ---------------------------------------------------------------------------
# Manager-level: real AuthManager on a temp auth.json (mirrors
# tests/test_rename_user_case_insensitive.py).
# ---------------------------------------------------------------------------
def _real_core_package():
root = Path(__file__).resolve().parent.parent
core_path = str(root / "core")
core = sys.modules.get("core")
if core is None:
core = types.ModuleType("core")
sys.modules["core"] = core
core.__path__ = [core_path]
clear_module("core.auth")
return core
def _fresh_auth_manager(tmp_path):
"""Return (auth_module, AuthManager) with hashing stubbed for speed."""
auth_mod = importlib.import_module("core.auth", package=_real_core_package())
auth_mod._hash_password = lambda password: f"hash:{password}"
auth_mod._verify_password = lambda password, hashed: hashed == f"hash:{password}"
mgr = auth_mod.AuthManager(str(tmp_path / "auth.json"))
return auth_mod, mgr
def test_promote_sets_admin_flag_and_admin_privileges(tmp_path):
auth_mod, mgr = _fresh_auth_manager(tmp_path)
assert mgr.create_user("admin", "pw-123456", is_admin=True) is True
assert mgr.create_user("bob", "pw-123456") is True
result = mgr.set_admin("bob", True, "admin")
assert result is auth_mod.SetAdminResult.OK
assert mgr.is_admin("bob") is True
assert mgr.users["bob"]["privileges"] == auth_mod.ADMIN_PRIVILEGES
def test_demote_with_two_admins_resets_to_default_privileges(tmp_path):
auth_mod, mgr = _fresh_auth_manager(tmp_path)
mgr.create_user("admin", "pw-123456", is_admin=True)
mgr.create_user("bob", "pw-123456", is_admin=True)
result = mgr.set_admin("bob", False, "admin")
assert result is auth_mod.SetAdminResult.OK
assert mgr.is_admin("bob") is False
assert mgr.users["bob"]["privileges"] == auth_mod.DEFAULT_PRIVILEGES
def test_demote_last_admin_is_blocked(tmp_path):
auth_mod, mgr = _fresh_auth_manager(tmp_path)
mgr.create_user("admin", "pw-123456", is_admin=True)
result = mgr.set_admin("admin", False, "admin")
assert result is auth_mod.SetAdminResult.LAST_ADMIN
assert mgr.is_admin("admin") is True # unchanged
def test_self_demote_allowed_when_another_admin_exists(tmp_path):
auth_mod, mgr = _fresh_auth_manager(tmp_path)
mgr.create_user("admin", "pw-123456", is_admin=True)
mgr.create_user("bob", "pw-123456", is_admin=True)
result = mgr.set_admin("admin", False, "admin") # admin demotes self
assert result is auth_mod.SetAdminResult.OK
assert mgr.is_admin("admin") is False
assert mgr.is_admin("bob") is True
def test_cannot_demote_past_the_last_admin_sequentially(tmp_path):
auth_mod, mgr = _fresh_auth_manager(tmp_path)
mgr.create_user("admin", "pw-123456", is_admin=True)
mgr.create_user("bob", "pw-123456", is_admin=True)
assert mgr.set_admin("bob", False, "admin") is auth_mod.SetAdminResult.OK
# Now "admin" is the only admin left — demoting them must be refused.
assert mgr.set_admin("admin", False, "admin") is auth_mod.SetAdminResult.LAST_ADMIN
assert mgr.is_admin("admin") is True
def test_non_admin_requester_is_rejected(tmp_path):
auth_mod, mgr = _fresh_auth_manager(tmp_path)
mgr.create_user("admin", "pw-123456", is_admin=True)
mgr.create_user("bob", "pw-123456")
mgr.create_user("carol", "pw-123456")
result = mgr.set_admin("carol", True, "bob") # bob is not an admin
assert result is auth_mod.SetAdminResult.NOT_AUTHORIZED
assert mgr.is_admin("carol") is False
def test_unknown_target_user_returns_not_found(tmp_path):
auth_mod, mgr = _fresh_auth_manager(tmp_path)
mgr.create_user("admin", "pw-123456", is_admin=True)
result = mgr.set_admin("ghost", True, "admin")
assert result is auth_mod.SetAdminResult.USER_NOT_FOUND
def test_noop_demote_of_regular_user_preserves_custom_privileges(tmp_path):
auth_mod, mgr = _fresh_auth_manager(tmp_path)
mgr.create_user("admin", "pw-123456", is_admin=True)
mgr.create_user("bob", "pw-123456")
# Give bob a non-default privilege; DEFAULT_PRIVILEGES has can_use_bash=False.
assert mgr.set_privileges("bob", {"can_use_bash": True}) is True
result = mgr.set_admin("bob", False, "admin") # already a regular user
assert result is auth_mod.SetAdminResult.OK
# Privileges must NOT have been reset to defaults by the no-op.
assert mgr.users["bob"]["privileges"]["can_use_bash"] is True
def test_demote_restores_pre_admin_privilege_restrictions(tmp_path):
auth_mod, mgr = _fresh_auth_manager(tmp_path)
mgr.create_user("admin", "pw-123456", is_admin=True)
mgr.create_user("bob", "pw-123456")
# Tighten bob below the defaults before promoting him.
assert mgr.set_privileges("bob", {
"can_use_agent": False,
"can_generate_images": False,
"max_messages_per_day": 50,
}) is True
restricted = mgr.get_privileges("bob")
assert mgr.set_admin("bob", True, "admin") is auth_mod.SetAdminResult.OK
assert mgr.set_admin("bob", False, "admin") is auth_mod.SetAdminResult.OK
# Demotion must restore the pre-admin policy, not reset to defaults.
assert mgr.get_privileges("bob") == restricted
assert mgr.get_privileges("bob")["can_use_agent"] is False
assert mgr.get_privileges("bob")["max_messages_per_day"] == 50
def test_promote_demote_round_trip_is_stable_and_cleans_up_stash(tmp_path):
auth_mod, mgr = _fresh_auth_manager(tmp_path)
mgr.create_user("admin", "pw-123456", is_admin=True)
mgr.create_user("bob", "pw-123456")
assert mgr.set_privileges("bob", {"can_use_browser": False}) is True
restricted = mgr.get_privileges("bob")
for _ in range(2): # two full promote/demote cycles
assert mgr.set_admin("bob", True, "admin") is auth_mod.SetAdminResult.OK
assert mgr.set_admin("bob", False, "admin") is auth_mod.SetAdminResult.OK
assert mgr.get_privileges("bob") == restricted
# The stash is promotion-time bookkeeping; it must not linger on the row.
assert "privileges_before_admin" not in mgr.users["bob"]
def test_redundant_promote_does_not_clobber_stash(tmp_path):
auth_mod, mgr = _fresh_auth_manager(tmp_path)
mgr.create_user("admin", "pw-123456", is_admin=True)
mgr.create_user("bob", "pw-123456")
assert mgr.set_privileges("bob", {"can_use_agent": False}) is True
restricted = mgr.get_privileges("bob")
assert mgr.set_admin("bob", True, "admin") is auth_mod.SetAdminResult.OK
# A second promote is a no-op and must not re-stash ADMIN_PRIVILEGES.
assert mgr.set_admin("bob", True, "admin") is auth_mod.SetAdminResult.OK
assert mgr.set_admin("bob", False, "admin") is auth_mod.SetAdminResult.OK
# Demotion must still restore the original pre-admin restrictions.
assert mgr.get_privileges("bob") == restricted
assert mgr.get_privileges("bob")["can_use_agent"] is False
def test_pre_admin_privileges_survive_manager_reload(tmp_path):
auth_mod, mgr = _fresh_auth_manager(tmp_path)
mgr.create_user("admin", "pw-123456", is_admin=True)
mgr.create_user("bob", "pw-123456")
assert mgr.set_privileges("bob", {"can_use_research": False}) is True
assert mgr.set_admin("bob", True, "admin") is auth_mod.SetAdminResult.OK
# Fresh manager on the same auth.json — the stash must round-trip disk.
mgr2 = auth_mod.AuthManager(str(tmp_path / "auth.json"))
assert mgr2.set_admin("bob", False, "admin") is auth_mod.SetAdminResult.OK
assert mgr2.get_privileges("bob")["can_use_research"] is False
# ---------------------------------------------------------------------------
# Route-level: PUT /api/auth/users/{username}/admin (mirrors
# tests/test_auth_regressions.py). SetAdminResult is read from the route
# module's own namespace so the route and the test share one enum object.
# ---------------------------------------------------------------------------
_ADMIN_ROUTE = "/api/auth/users/{username}/admin"
def _auth_route_endpoint(path, method):
from routes.auth_routes import setup_auth_routes
auth_manager = MagicMock()
router = setup_auth_routes(auth_manager)
for route in router.routes:
if getattr(route, "path", "") == path and method in getattr(route, "methods", set()):
return auth_manager, route.endpoint
raise AssertionError(f"{method} {path} route not registered")
def _fake_auth_request(token="session-token"):
from routes.auth_routes import SESSION_COOKIE
req = SimpleNamespace()
req.cookies = {SESSION_COOKIE: token}
req.client = SimpleNamespace(host="127.0.0.1")
return req
def _result_enum():
import routes.auth_routes as ar
return ar.SetAdminResult
def test_route_requires_admin():
from routes.auth_routes import SetAdminRequest
auth, target = _auth_route_endpoint(_ADMIN_ROUTE, "PUT")
auth.get_username_for_token.return_value = "bob"
auth.is_admin.return_value = False
with pytest.raises(HTTPException) as exc:
asyncio.run(target(username="carol", body=SetAdminRequest(is_admin=True),
request=_fake_auth_request()))
assert exc.value.status_code == 403
auth.set_admin.assert_not_called()
def test_route_last_admin_returns_400():
from routes.auth_routes import SetAdminRequest
R = _result_enum()
auth, target = _auth_route_endpoint(_ADMIN_ROUTE, "PUT")
auth.get_username_for_token.return_value = "admin"
auth.is_admin.return_value = True
auth.set_admin.return_value = R.LAST_ADMIN
with pytest.raises(HTTPException) as exc:
asyncio.run(target(username="admin", body=SetAdminRequest(is_admin=False),
request=_fake_auth_request()))
assert exc.value.status_code == 400
def test_route_user_not_found_returns_404():
from routes.auth_routes import SetAdminRequest
R = _result_enum()
auth, target = _auth_route_endpoint(_ADMIN_ROUTE, "PUT")
auth.get_username_for_token.return_value = "admin"
auth.is_admin.return_value = True
auth.set_admin.return_value = R.USER_NOT_FOUND
with pytest.raises(HTTPException) as exc:
asyncio.run(target(username="ghost", body=SetAdminRequest(is_admin=True),
request=_fake_auth_request()))
assert exc.value.status_code == 404
def test_route_success_returns_envelope():
from routes.auth_routes import SetAdminRequest
R = _result_enum()
auth, target = _auth_route_endpoint(_ADMIN_ROUTE, "PUT")
auth.get_username_for_token.return_value = "admin"
auth.is_admin.return_value = True
auth.set_admin.return_value = R.OK
out = asyncio.run(target(username="bob", body=SetAdminRequest(is_admin=True),
request=_fake_auth_request()))
assert out == {"ok": True, "is_admin": True, "self": False}
def test_route_self_flag_true_when_targeting_own_account():
from routes.auth_routes import SetAdminRequest
R = _result_enum()
auth, target = _auth_route_endpoint(_ADMIN_ROUTE, "PUT")
auth.get_username_for_token.return_value = "admin"
auth.is_admin.return_value = True
auth.set_admin.return_value = R.OK
out = asyncio.run(target(username="Admin", body=SetAdminRequest(is_admin=False),
request=_fake_auth_request()))
assert out == {"ok": True, "is_admin": False, "self": True}