mirror of
https://github.com/pewdiepie-archdaemon/odysseus.git
synced 2026-06-15 17:25:26 -04:00
8fe98cf471
* feat(auth): add per-user admin promote/demote toggle Admin-only API and Users-tab control to grant/revoke admin rights; refuses to demote the last admin. * fix(auth): restore pre-admin privilege restrictions on demotion Promoting now stashes the user's privilege map (privileges_before_admin) and demoting restores it instead of resetting to defaults, so a promote/demote round trip can no longer broaden a restricted user's access. Users without a stash (created as admin, or promoted before this fix) still demote to DEFAULT_PRIVILEGES so a born-admin's stored all-True map — including can_use_bash — can't survive demotion. --------- Co-authored-by: K M Merajul Arefin <merajul.arefin@therapservices.net>
318 lines
12 KiB
Python
318 lines
12 KiB
Python
"""Promote/demote users to/from admin (issue #2958).
|
|
|
|
Covers AuthManager.set_admin (the core logic + last-admin lockout guard +
|
|
privilege stash/restore on a real role change + no-op preservation) and the
|
|
PUT /api/auth/users/{username}/admin route's status/envelope mapping.
|
|
"""
|
|
|
|
import asyncio
|
|
import importlib
|
|
import sys
|
|
import types
|
|
from pathlib import Path
|
|
from types import SimpleNamespace
|
|
from unittest.mock import MagicMock
|
|
|
|
import pytest
|
|
|
|
from fastapi import HTTPException
|
|
|
|
from tests.helpers.import_state import clear_module
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Manager-level: real AuthManager on a temp auth.json (mirrors
|
|
# tests/test_rename_user_case_insensitive.py).
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _real_core_package():
|
|
root = Path(__file__).resolve().parent.parent
|
|
core_path = str(root / "core")
|
|
core = sys.modules.get("core")
|
|
if core is None:
|
|
core = types.ModuleType("core")
|
|
sys.modules["core"] = core
|
|
core.__path__ = [core_path]
|
|
clear_module("core.auth")
|
|
return core
|
|
|
|
|
|
def _fresh_auth_manager(tmp_path):
|
|
"""Return (auth_module, AuthManager) with hashing stubbed for speed."""
|
|
auth_mod = importlib.import_module("core.auth", package=_real_core_package())
|
|
auth_mod._hash_password = lambda password: f"hash:{password}"
|
|
auth_mod._verify_password = lambda password, hashed: hashed == f"hash:{password}"
|
|
mgr = auth_mod.AuthManager(str(tmp_path / "auth.json"))
|
|
return auth_mod, mgr
|
|
|
|
|
|
def test_promote_sets_admin_flag_and_admin_privileges(tmp_path):
|
|
auth_mod, mgr = _fresh_auth_manager(tmp_path)
|
|
assert mgr.create_user("admin", "pw-123456", is_admin=True) is True
|
|
assert mgr.create_user("bob", "pw-123456") is True
|
|
|
|
result = mgr.set_admin("bob", True, "admin")
|
|
|
|
assert result is auth_mod.SetAdminResult.OK
|
|
assert mgr.is_admin("bob") is True
|
|
assert mgr.users["bob"]["privileges"] == auth_mod.ADMIN_PRIVILEGES
|
|
|
|
|
|
def test_demote_with_two_admins_resets_to_default_privileges(tmp_path):
|
|
auth_mod, mgr = _fresh_auth_manager(tmp_path)
|
|
mgr.create_user("admin", "pw-123456", is_admin=True)
|
|
mgr.create_user("bob", "pw-123456", is_admin=True)
|
|
|
|
result = mgr.set_admin("bob", False, "admin")
|
|
|
|
assert result is auth_mod.SetAdminResult.OK
|
|
assert mgr.is_admin("bob") is False
|
|
assert mgr.users["bob"]["privileges"] == auth_mod.DEFAULT_PRIVILEGES
|
|
|
|
|
|
def test_demote_last_admin_is_blocked(tmp_path):
|
|
auth_mod, mgr = _fresh_auth_manager(tmp_path)
|
|
mgr.create_user("admin", "pw-123456", is_admin=True)
|
|
|
|
result = mgr.set_admin("admin", False, "admin")
|
|
|
|
assert result is auth_mod.SetAdminResult.LAST_ADMIN
|
|
assert mgr.is_admin("admin") is True # unchanged
|
|
|
|
|
|
def test_self_demote_allowed_when_another_admin_exists(tmp_path):
|
|
auth_mod, mgr = _fresh_auth_manager(tmp_path)
|
|
mgr.create_user("admin", "pw-123456", is_admin=True)
|
|
mgr.create_user("bob", "pw-123456", is_admin=True)
|
|
|
|
result = mgr.set_admin("admin", False, "admin") # admin demotes self
|
|
|
|
assert result is auth_mod.SetAdminResult.OK
|
|
assert mgr.is_admin("admin") is False
|
|
assert mgr.is_admin("bob") is True
|
|
|
|
|
|
def test_cannot_demote_past_the_last_admin_sequentially(tmp_path):
|
|
auth_mod, mgr = _fresh_auth_manager(tmp_path)
|
|
mgr.create_user("admin", "pw-123456", is_admin=True)
|
|
mgr.create_user("bob", "pw-123456", is_admin=True)
|
|
|
|
assert mgr.set_admin("bob", False, "admin") is auth_mod.SetAdminResult.OK
|
|
# Now "admin" is the only admin left — demoting them must be refused.
|
|
assert mgr.set_admin("admin", False, "admin") is auth_mod.SetAdminResult.LAST_ADMIN
|
|
assert mgr.is_admin("admin") is True
|
|
|
|
|
|
def test_non_admin_requester_is_rejected(tmp_path):
|
|
auth_mod, mgr = _fresh_auth_manager(tmp_path)
|
|
mgr.create_user("admin", "pw-123456", is_admin=True)
|
|
mgr.create_user("bob", "pw-123456")
|
|
mgr.create_user("carol", "pw-123456")
|
|
|
|
result = mgr.set_admin("carol", True, "bob") # bob is not an admin
|
|
|
|
assert result is auth_mod.SetAdminResult.NOT_AUTHORIZED
|
|
assert mgr.is_admin("carol") is False
|
|
|
|
|
|
def test_unknown_target_user_returns_not_found(tmp_path):
|
|
auth_mod, mgr = _fresh_auth_manager(tmp_path)
|
|
mgr.create_user("admin", "pw-123456", is_admin=True)
|
|
|
|
result = mgr.set_admin("ghost", True, "admin")
|
|
|
|
assert result is auth_mod.SetAdminResult.USER_NOT_FOUND
|
|
|
|
|
|
def test_noop_demote_of_regular_user_preserves_custom_privileges(tmp_path):
|
|
auth_mod, mgr = _fresh_auth_manager(tmp_path)
|
|
mgr.create_user("admin", "pw-123456", is_admin=True)
|
|
mgr.create_user("bob", "pw-123456")
|
|
# Give bob a non-default privilege; DEFAULT_PRIVILEGES has can_use_bash=False.
|
|
assert mgr.set_privileges("bob", {"can_use_bash": True}) is True
|
|
|
|
result = mgr.set_admin("bob", False, "admin") # already a regular user
|
|
|
|
assert result is auth_mod.SetAdminResult.OK
|
|
# Privileges must NOT have been reset to defaults by the no-op.
|
|
assert mgr.users["bob"]["privileges"]["can_use_bash"] is True
|
|
|
|
|
|
def test_demote_restores_pre_admin_privilege_restrictions(tmp_path):
|
|
auth_mod, mgr = _fresh_auth_manager(tmp_path)
|
|
mgr.create_user("admin", "pw-123456", is_admin=True)
|
|
mgr.create_user("bob", "pw-123456")
|
|
# Tighten bob below the defaults before promoting him.
|
|
assert mgr.set_privileges("bob", {
|
|
"can_use_agent": False,
|
|
"can_generate_images": False,
|
|
"max_messages_per_day": 50,
|
|
}) is True
|
|
restricted = mgr.get_privileges("bob")
|
|
|
|
assert mgr.set_admin("bob", True, "admin") is auth_mod.SetAdminResult.OK
|
|
assert mgr.set_admin("bob", False, "admin") is auth_mod.SetAdminResult.OK
|
|
|
|
# Demotion must restore the pre-admin policy, not reset to defaults.
|
|
assert mgr.get_privileges("bob") == restricted
|
|
assert mgr.get_privileges("bob")["can_use_agent"] is False
|
|
assert mgr.get_privileges("bob")["max_messages_per_day"] == 50
|
|
|
|
|
|
def test_promote_demote_round_trip_is_stable_and_cleans_up_stash(tmp_path):
|
|
auth_mod, mgr = _fresh_auth_manager(tmp_path)
|
|
mgr.create_user("admin", "pw-123456", is_admin=True)
|
|
mgr.create_user("bob", "pw-123456")
|
|
assert mgr.set_privileges("bob", {"can_use_browser": False}) is True
|
|
restricted = mgr.get_privileges("bob")
|
|
|
|
for _ in range(2): # two full promote/demote cycles
|
|
assert mgr.set_admin("bob", True, "admin") is auth_mod.SetAdminResult.OK
|
|
assert mgr.set_admin("bob", False, "admin") is auth_mod.SetAdminResult.OK
|
|
|
|
assert mgr.get_privileges("bob") == restricted
|
|
# The stash is promotion-time bookkeeping; it must not linger on the row.
|
|
assert "privileges_before_admin" not in mgr.users["bob"]
|
|
|
|
|
|
def test_redundant_promote_does_not_clobber_stash(tmp_path):
|
|
auth_mod, mgr = _fresh_auth_manager(tmp_path)
|
|
mgr.create_user("admin", "pw-123456", is_admin=True)
|
|
mgr.create_user("bob", "pw-123456")
|
|
assert mgr.set_privileges("bob", {"can_use_agent": False}) is True
|
|
restricted = mgr.get_privileges("bob")
|
|
|
|
assert mgr.set_admin("bob", True, "admin") is auth_mod.SetAdminResult.OK
|
|
# A second promote is a no-op and must not re-stash ADMIN_PRIVILEGES.
|
|
assert mgr.set_admin("bob", True, "admin") is auth_mod.SetAdminResult.OK
|
|
assert mgr.set_admin("bob", False, "admin") is auth_mod.SetAdminResult.OK
|
|
|
|
# Demotion must still restore the original pre-admin restrictions.
|
|
assert mgr.get_privileges("bob") == restricted
|
|
assert mgr.get_privileges("bob")["can_use_agent"] is False
|
|
|
|
|
|
def test_pre_admin_privileges_survive_manager_reload(tmp_path):
|
|
auth_mod, mgr = _fresh_auth_manager(tmp_path)
|
|
mgr.create_user("admin", "pw-123456", is_admin=True)
|
|
mgr.create_user("bob", "pw-123456")
|
|
assert mgr.set_privileges("bob", {"can_use_research": False}) is True
|
|
assert mgr.set_admin("bob", True, "admin") is auth_mod.SetAdminResult.OK
|
|
|
|
# Fresh manager on the same auth.json — the stash must round-trip disk.
|
|
mgr2 = auth_mod.AuthManager(str(tmp_path / "auth.json"))
|
|
assert mgr2.set_admin("bob", False, "admin") is auth_mod.SetAdminResult.OK
|
|
assert mgr2.get_privileges("bob")["can_use_research"] is False
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Route-level: PUT /api/auth/users/{username}/admin (mirrors
|
|
# tests/test_auth_regressions.py). SetAdminResult is read from the route
|
|
# module's own namespace so the route and the test share one enum object.
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_ADMIN_ROUTE = "/api/auth/users/{username}/admin"
|
|
|
|
|
|
def _auth_route_endpoint(path, method):
|
|
from routes.auth_routes import setup_auth_routes
|
|
|
|
auth_manager = MagicMock()
|
|
router = setup_auth_routes(auth_manager)
|
|
for route in router.routes:
|
|
if getattr(route, "path", "") == path and method in getattr(route, "methods", set()):
|
|
return auth_manager, route.endpoint
|
|
raise AssertionError(f"{method} {path} route not registered")
|
|
|
|
|
|
def _fake_auth_request(token="session-token"):
|
|
from routes.auth_routes import SESSION_COOKIE
|
|
|
|
req = SimpleNamespace()
|
|
req.cookies = {SESSION_COOKIE: token}
|
|
req.client = SimpleNamespace(host="127.0.0.1")
|
|
return req
|
|
|
|
|
|
def _result_enum():
|
|
import routes.auth_routes as ar
|
|
|
|
return ar.SetAdminResult
|
|
|
|
|
|
def test_route_requires_admin():
|
|
from routes.auth_routes import SetAdminRequest
|
|
|
|
auth, target = _auth_route_endpoint(_ADMIN_ROUTE, "PUT")
|
|
auth.get_username_for_token.return_value = "bob"
|
|
auth.is_admin.return_value = False
|
|
|
|
with pytest.raises(HTTPException) as exc:
|
|
asyncio.run(target(username="carol", body=SetAdminRequest(is_admin=True),
|
|
request=_fake_auth_request()))
|
|
|
|
assert exc.value.status_code == 403
|
|
auth.set_admin.assert_not_called()
|
|
|
|
|
|
def test_route_last_admin_returns_400():
|
|
from routes.auth_routes import SetAdminRequest
|
|
|
|
R = _result_enum()
|
|
auth, target = _auth_route_endpoint(_ADMIN_ROUTE, "PUT")
|
|
auth.get_username_for_token.return_value = "admin"
|
|
auth.is_admin.return_value = True
|
|
auth.set_admin.return_value = R.LAST_ADMIN
|
|
|
|
with pytest.raises(HTTPException) as exc:
|
|
asyncio.run(target(username="admin", body=SetAdminRequest(is_admin=False),
|
|
request=_fake_auth_request()))
|
|
|
|
assert exc.value.status_code == 400
|
|
|
|
|
|
def test_route_user_not_found_returns_404():
|
|
from routes.auth_routes import SetAdminRequest
|
|
|
|
R = _result_enum()
|
|
auth, target = _auth_route_endpoint(_ADMIN_ROUTE, "PUT")
|
|
auth.get_username_for_token.return_value = "admin"
|
|
auth.is_admin.return_value = True
|
|
auth.set_admin.return_value = R.USER_NOT_FOUND
|
|
|
|
with pytest.raises(HTTPException) as exc:
|
|
asyncio.run(target(username="ghost", body=SetAdminRequest(is_admin=True),
|
|
request=_fake_auth_request()))
|
|
|
|
assert exc.value.status_code == 404
|
|
|
|
|
|
def test_route_success_returns_envelope():
|
|
from routes.auth_routes import SetAdminRequest
|
|
|
|
R = _result_enum()
|
|
auth, target = _auth_route_endpoint(_ADMIN_ROUTE, "PUT")
|
|
auth.get_username_for_token.return_value = "admin"
|
|
auth.is_admin.return_value = True
|
|
auth.set_admin.return_value = R.OK
|
|
|
|
out = asyncio.run(target(username="bob", body=SetAdminRequest(is_admin=True),
|
|
request=_fake_auth_request()))
|
|
|
|
assert out == {"ok": True, "is_admin": True, "self": False}
|
|
|
|
|
|
def test_route_self_flag_true_when_targeting_own_account():
|
|
from routes.auth_routes import SetAdminRequest
|
|
|
|
R = _result_enum()
|
|
auth, target = _auth_route_endpoint(_ADMIN_ROUTE, "PUT")
|
|
auth.get_username_for_token.return_value = "admin"
|
|
auth.is_admin.return_value = True
|
|
auth.set_admin.return_value = R.OK
|
|
|
|
out = asyncio.run(target(username="Admin", body=SetAdminRequest(is_admin=False),
|
|
request=_fake_auth_request()))
|
|
|
|
assert out == {"ok": True, "is_admin": False, "self": True}
|