mirror of
https://github.com/pewdiepie-archdaemon/odysseus.git
synced 2026-06-17 18:25:26 -04:00
feat(auth): add per-user admin promote/demote toggle (#3078)
* feat(auth): add per-user admin promote/demote toggle Admin-only API and Users-tab control to grant/revoke admin rights; refuses to demote the last admin. * fix(auth): restore pre-admin privilege restrictions on demotion Promoting now stashes the user's privilege map (privileges_before_admin) and demoting restores it instead of resetting to defaults, so a promote/demote round trip can no longer broaden a restricted user's access. Users without a stash (created as admin, or promoted before this fix) still demote to DEFAULT_PRIVILEGES so a born-admin's stored all-True map — including can_use_bash — can't survive demotion. --------- Co-authored-by: K M Merajul Arefin <merajul.arefin@therapservices.net>
This commit is contained in:
@@ -3,6 +3,7 @@ Authentication module — multi-user password hashing, session tokens, config pe
|
||||
Config stored in data/auth.json. Uses bcrypt directly.
|
||||
"""
|
||||
|
||||
import enum
|
||||
import json
|
||||
import os
|
||||
import secrets
|
||||
@@ -83,6 +84,15 @@ def _verify_password(password: str, hashed: str) -> bool:
|
||||
return bcrypt.checkpw(password.encode("utf-8"), hashed.encode("utf-8"))
|
||||
|
||||
|
||||
class SetAdminResult(enum.Enum):
|
||||
"""Outcome of AuthManager.set_admin, so callers can map each case to a
|
||||
precise response instead of guessing from a bare bool."""
|
||||
OK = "ok"
|
||||
USER_NOT_FOUND = "user_not_found"
|
||||
NOT_AUTHORIZED = "not_authorized" # requester is not an admin
|
||||
LAST_ADMIN = "last_admin" # would remove the last remaining admin
|
||||
|
||||
|
||||
class AuthManager:
|
||||
"""Manages multi-user password + session-token auth system."""
|
||||
|
||||
@@ -387,6 +397,69 @@ class AuthManager:
|
||||
logger.info(f"Updated privileges for '{username}': {current}")
|
||||
return True
|
||||
|
||||
def set_admin(self, username: str, is_admin: bool,
|
||||
requesting_user: str) -> SetAdminResult:
|
||||
"""Promote/demote an existing user to/from admin. Admin only.
|
||||
|
||||
Refuses to remove the last remaining admin so the instance can never
|
||||
be locked out of admin access; self-demotion is allowed as long as
|
||||
another admin remains. Admin status is re-checked live on every
|
||||
request, so unlike delete/rename no session or token revocation is
|
||||
needed — a demoted admin simply fails the next is_admin() gate.
|
||||
|
||||
Promotion stashes the user's current privilege map and demotion
|
||||
restores it, so a temporary admin stint can't silently broaden a
|
||||
user's non-admin access; users without a stash (created as admin,
|
||||
or promoted before stashing existed) demote to DEFAULT_PRIVILEGES.
|
||||
|
||||
Counting admins and flipping the flag happen in one critical section
|
||||
so two concurrent demotions can't race the admin count to zero.
|
||||
"""
|
||||
username = (username or "").strip().lower()
|
||||
requesting_user = (requesting_user or "").strip().lower()
|
||||
is_admin = bool(is_admin)
|
||||
with self._config_lock:
|
||||
target = self._config.get("users", {}).get(username)
|
||||
if target is None:
|
||||
return SetAdminResult.USER_NOT_FOUND
|
||||
if not self.users.get(requesting_user, {}).get("is_admin"):
|
||||
return SetAdminResult.NOT_AUTHORIZED
|
||||
currently_admin = bool(target.get("is_admin"))
|
||||
if currently_admin == is_admin:
|
||||
return SetAdminResult.OK # no-op; leave privileges untouched
|
||||
if currently_admin and not is_admin:
|
||||
admin_count = sum(1 for d in self.users.values() if d.get("is_admin"))
|
||||
if admin_count <= 1:
|
||||
return SetAdminResult.LAST_ADMIN
|
||||
# Write order matters for lock-free readers: get_privileges()
|
||||
# reads without _config_lock and trusts is_admin, so the admin
|
||||
# flag must be flipped while the stored map is safe to expose —
|
||||
# before writing admin privileges on promote, after restoring
|
||||
# the pre-admin map on demote.
|
||||
if is_admin:
|
||||
target["is_admin"] = True
|
||||
# Stash the pre-admin map so a later demotion can restore it.
|
||||
# While is_admin is set the stored map is inert: get_privileges
|
||||
# short-circuits to ADMIN_PRIVILEGES and set_privileges refuses
|
||||
# admins, so only set_admin ever touches the stash.
|
||||
target["privileges_before_admin"] = dict(
|
||||
target.get("privileges") or DEFAULT_PRIVILEGES
|
||||
)
|
||||
target["privileges"] = dict(ADMIN_PRIVILEGES)
|
||||
else:
|
||||
# Restore the stashed pre-admin map. Fall back to defaults for
|
||||
# users created as admins (their stored map is ADMIN_PRIVILEGES,
|
||||
# which must not leak past demotion — e.g. can_use_bash) and
|
||||
# for admins promoted before the stash existed.
|
||||
target["privileges"] = dict(
|
||||
target.pop("privileges_before_admin", None)
|
||||
or DEFAULT_PRIVILEGES
|
||||
)
|
||||
target["is_admin"] = False
|
||||
self._save()
|
||||
logger.info("Set is_admin=%s for '%s' (by '%s')", is_admin, username, requesting_user)
|
||||
return SetAdminResult.OK
|
||||
|
||||
def change_password(self, username: str, current_password: str, new_password: str) -> bool:
|
||||
username = username.strip().lower()
|
||||
if username not in self.users:
|
||||
|
||||
+31
-1
@@ -12,7 +12,7 @@ import re
|
||||
from pathlib import Path
|
||||
|
||||
from core.atomic_io import atomic_write_json, atomic_write_text
|
||||
from core.auth import AuthManager
|
||||
from core.auth import AuthManager, SetAdminResult
|
||||
from src.constants import DEEP_RESEARCH_DIR, MEMORY_FILE, SKILLS_DIR
|
||||
from src.rate_limiter import RateLimiter
|
||||
from src.settings_scrub import scrub_settings
|
||||
@@ -73,6 +73,11 @@ class DeleteUserRequest(BaseModel):
|
||||
class RenameUserRequest(BaseModel):
|
||||
username: str
|
||||
|
||||
|
||||
class SetAdminRequest(BaseModel):
|
||||
is_admin: bool
|
||||
|
||||
|
||||
class SetOpenRegistrationRequest(BaseModel):
|
||||
enabled: bool
|
||||
|
||||
@@ -487,6 +492,31 @@ def setup_auth_routes(auth_manager: AuthManager) -> APIRouter:
|
||||
invalidator()
|
||||
return {"ok": True, "username": new_username, "renamed_self": old_username == user}
|
||||
|
||||
@router.put("/users/{username}/admin")
|
||||
async def set_user_admin(username: str, body: SetAdminRequest, request: Request):
|
||||
"""Promote/demote a user to/from admin. Admin only.
|
||||
|
||||
The last remaining admin can't be demoted (no lockout). Self-demotion
|
||||
is allowed while another admin exists; the `self` flag tells the UI to
|
||||
reload the acting user into the normal-user view.
|
||||
"""
|
||||
user = _get_current_user(request)
|
||||
if not user or not auth_manager.is_admin(user):
|
||||
raise HTTPException(403, "Admin only")
|
||||
result = auth_manager.set_admin(username, body.is_admin, user)
|
||||
if result is SetAdminResult.USER_NOT_FOUND:
|
||||
raise HTTPException(404, "User not found")
|
||||
if result is SetAdminResult.NOT_AUTHORIZED:
|
||||
raise HTTPException(403, "Admin only")
|
||||
if result is SetAdminResult.LAST_ADMIN:
|
||||
raise HTTPException(400, "Cannot demote the last admin")
|
||||
target = (username or "").strip().lower()
|
||||
return {
|
||||
"ok": True,
|
||||
"is_admin": body.is_admin,
|
||||
"self": target == (user or "").strip().lower(),
|
||||
}
|
||||
|
||||
@router.post("/signup-toggle", deprecated=True)
|
||||
async def toggle_signup(request: Request):
|
||||
"""
|
||||
|
||||
+38
-1
@@ -55,6 +55,7 @@ async function loadUsers() {
|
||||
</div>
|
||||
</div>
|
||||
<div style="display:flex;gap:8px;align-items:center;">
|
||||
<button class="admin-btn-sm" data-adm-toggle-admin="${esc(u.username)}" data-make-admin="${u.is_admin ? '0' : '1'}" style="font-size:11px;">${u.is_admin ? 'Revoke admin' : 'Make admin'}</button>
|
||||
<button class="admin-btn-sm" data-adm-rename-user="${esc(u.username)}" style="font-size:11px;">Rename</button>
|
||||
${u.is_admin ? '' : `<button class="admin-btn-delete" data-adm-del-user="${esc(u.username)}" style="font-size:11px;">Remove</button>`}
|
||||
${u.is_admin ? '' : '<svg class="admin-user-chevron" width="12" height="12" viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2.5" stroke-linecap="round" stroke-linejoin="round" style="opacity:0.3;transition:transform 0.2s,opacity 0.2s;"><polyline points="6 9 12 15 18 9"/></svg>'}
|
||||
@@ -113,7 +114,7 @@ async function loadUsers() {
|
||||
// Toggle panel visibility + rotate chevron + load models
|
||||
let _modelsLoaded = false;
|
||||
header.addEventListener('click', (e) => {
|
||||
if (e.target.closest('.admin-btn-delete, [data-adm-rename-user]')) return;
|
||||
if (e.target.closest('.admin-btn-delete, [data-adm-rename-user], [data-adm-toggle-admin]')) return;
|
||||
privPanel.classList.toggle('hidden');
|
||||
const chevron = header.querySelector('.admin-user-chevron');
|
||||
if (chevron) {
|
||||
@@ -199,6 +200,42 @@ async function loadUsers() {
|
||||
});
|
||||
}
|
||||
|
||||
// Promote / demote (admin toggle) — present on every row
|
||||
const adminToggleBtn = row.querySelector('[data-adm-toggle-admin]');
|
||||
if (adminToggleBtn) {
|
||||
adminToggleBtn.addEventListener('click', async (e) => {
|
||||
e.stopPropagation();
|
||||
const username = adminToggleBtn.dataset.admToggleAdmin;
|
||||
const makeAdmin = adminToggleBtn.dataset.makeAdmin === '1';
|
||||
const confirmMsg = makeAdmin
|
||||
? `Grant admin rights to "${username}"? They'll get full access to all settings and users — including the power to demote or remove other admins (you included).`
|
||||
: `Revoke admin rights from "${username}"? They'll lose access to the admin panel.`;
|
||||
if (!await uiModule.styledConfirm(confirmMsg, { confirmText: makeAdmin ? 'Make admin' : 'Revoke admin', danger: !makeAdmin })) return;
|
||||
adminToggleBtn.disabled = true;
|
||||
try {
|
||||
const res = await fetch(`/api/auth/users/${encodeURIComponent(username)}/admin`, {
|
||||
method: 'PUT',
|
||||
credentials: 'same-origin',
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
body: JSON.stringify({ is_admin: makeAdmin }),
|
||||
});
|
||||
const data = await res.json().catch(() => ({}));
|
||||
if (!res.ok) {
|
||||
uiModule.showError(data.detail || 'Failed to change admin status');
|
||||
adminToggleBtn.disabled = false;
|
||||
return;
|
||||
}
|
||||
// Demoting yourself drops your own admin access — reload into the
|
||||
// normal-user view (mirrors the rename-self reload above).
|
||||
if (data.self) { window.location.reload(); return; }
|
||||
loadUsers();
|
||||
} catch (err) {
|
||||
uiModule.showError('Failed to change admin status');
|
||||
adminToggleBtn.disabled = false;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
list.appendChild(row);
|
||||
});
|
||||
} catch (e) { list.innerHTML = '<div class="admin-error">Failed to load users</div>'; }
|
||||
|
||||
@@ -0,0 +1,317 @@
|
||||
"""Promote/demote users to/from admin (issue #2958).
|
||||
|
||||
Covers AuthManager.set_admin (the core logic + last-admin lockout guard +
|
||||
privilege stash/restore on a real role change + no-op preservation) and the
|
||||
PUT /api/auth/users/{username}/admin route's status/envelope mapping.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import importlib
|
||||
import sys
|
||||
import types
|
||||
from pathlib import Path
|
||||
from types import SimpleNamespace
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
import pytest
|
||||
|
||||
from fastapi import HTTPException
|
||||
|
||||
from tests.helpers.import_state import clear_module
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Manager-level: real AuthManager on a temp auth.json (mirrors
|
||||
# tests/test_rename_user_case_insensitive.py).
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _real_core_package():
|
||||
root = Path(__file__).resolve().parent.parent
|
||||
core_path = str(root / "core")
|
||||
core = sys.modules.get("core")
|
||||
if core is None:
|
||||
core = types.ModuleType("core")
|
||||
sys.modules["core"] = core
|
||||
core.__path__ = [core_path]
|
||||
clear_module("core.auth")
|
||||
return core
|
||||
|
||||
|
||||
def _fresh_auth_manager(tmp_path):
|
||||
"""Return (auth_module, AuthManager) with hashing stubbed for speed."""
|
||||
auth_mod = importlib.import_module("core.auth", package=_real_core_package())
|
||||
auth_mod._hash_password = lambda password: f"hash:{password}"
|
||||
auth_mod._verify_password = lambda password, hashed: hashed == f"hash:{password}"
|
||||
mgr = auth_mod.AuthManager(str(tmp_path / "auth.json"))
|
||||
return auth_mod, mgr
|
||||
|
||||
|
||||
def test_promote_sets_admin_flag_and_admin_privileges(tmp_path):
|
||||
auth_mod, mgr = _fresh_auth_manager(tmp_path)
|
||||
assert mgr.create_user("admin", "pw-123456", is_admin=True) is True
|
||||
assert mgr.create_user("bob", "pw-123456") is True
|
||||
|
||||
result = mgr.set_admin("bob", True, "admin")
|
||||
|
||||
assert result is auth_mod.SetAdminResult.OK
|
||||
assert mgr.is_admin("bob") is True
|
||||
assert mgr.users["bob"]["privileges"] == auth_mod.ADMIN_PRIVILEGES
|
||||
|
||||
|
||||
def test_demote_with_two_admins_resets_to_default_privileges(tmp_path):
|
||||
auth_mod, mgr = _fresh_auth_manager(tmp_path)
|
||||
mgr.create_user("admin", "pw-123456", is_admin=True)
|
||||
mgr.create_user("bob", "pw-123456", is_admin=True)
|
||||
|
||||
result = mgr.set_admin("bob", False, "admin")
|
||||
|
||||
assert result is auth_mod.SetAdminResult.OK
|
||||
assert mgr.is_admin("bob") is False
|
||||
assert mgr.users["bob"]["privileges"] == auth_mod.DEFAULT_PRIVILEGES
|
||||
|
||||
|
||||
def test_demote_last_admin_is_blocked(tmp_path):
|
||||
auth_mod, mgr = _fresh_auth_manager(tmp_path)
|
||||
mgr.create_user("admin", "pw-123456", is_admin=True)
|
||||
|
||||
result = mgr.set_admin("admin", False, "admin")
|
||||
|
||||
assert result is auth_mod.SetAdminResult.LAST_ADMIN
|
||||
assert mgr.is_admin("admin") is True # unchanged
|
||||
|
||||
|
||||
def test_self_demote_allowed_when_another_admin_exists(tmp_path):
|
||||
auth_mod, mgr = _fresh_auth_manager(tmp_path)
|
||||
mgr.create_user("admin", "pw-123456", is_admin=True)
|
||||
mgr.create_user("bob", "pw-123456", is_admin=True)
|
||||
|
||||
result = mgr.set_admin("admin", False, "admin") # admin demotes self
|
||||
|
||||
assert result is auth_mod.SetAdminResult.OK
|
||||
assert mgr.is_admin("admin") is False
|
||||
assert mgr.is_admin("bob") is True
|
||||
|
||||
|
||||
def test_cannot_demote_past_the_last_admin_sequentially(tmp_path):
|
||||
auth_mod, mgr = _fresh_auth_manager(tmp_path)
|
||||
mgr.create_user("admin", "pw-123456", is_admin=True)
|
||||
mgr.create_user("bob", "pw-123456", is_admin=True)
|
||||
|
||||
assert mgr.set_admin("bob", False, "admin") is auth_mod.SetAdminResult.OK
|
||||
# Now "admin" is the only admin left — demoting them must be refused.
|
||||
assert mgr.set_admin("admin", False, "admin") is auth_mod.SetAdminResult.LAST_ADMIN
|
||||
assert mgr.is_admin("admin") is True
|
||||
|
||||
|
||||
def test_non_admin_requester_is_rejected(tmp_path):
|
||||
auth_mod, mgr = _fresh_auth_manager(tmp_path)
|
||||
mgr.create_user("admin", "pw-123456", is_admin=True)
|
||||
mgr.create_user("bob", "pw-123456")
|
||||
mgr.create_user("carol", "pw-123456")
|
||||
|
||||
result = mgr.set_admin("carol", True, "bob") # bob is not an admin
|
||||
|
||||
assert result is auth_mod.SetAdminResult.NOT_AUTHORIZED
|
||||
assert mgr.is_admin("carol") is False
|
||||
|
||||
|
||||
def test_unknown_target_user_returns_not_found(tmp_path):
|
||||
auth_mod, mgr = _fresh_auth_manager(tmp_path)
|
||||
mgr.create_user("admin", "pw-123456", is_admin=True)
|
||||
|
||||
result = mgr.set_admin("ghost", True, "admin")
|
||||
|
||||
assert result is auth_mod.SetAdminResult.USER_NOT_FOUND
|
||||
|
||||
|
||||
def test_noop_demote_of_regular_user_preserves_custom_privileges(tmp_path):
|
||||
auth_mod, mgr = _fresh_auth_manager(tmp_path)
|
||||
mgr.create_user("admin", "pw-123456", is_admin=True)
|
||||
mgr.create_user("bob", "pw-123456")
|
||||
# Give bob a non-default privilege; DEFAULT_PRIVILEGES has can_use_bash=False.
|
||||
assert mgr.set_privileges("bob", {"can_use_bash": True}) is True
|
||||
|
||||
result = mgr.set_admin("bob", False, "admin") # already a regular user
|
||||
|
||||
assert result is auth_mod.SetAdminResult.OK
|
||||
# Privileges must NOT have been reset to defaults by the no-op.
|
||||
assert mgr.users["bob"]["privileges"]["can_use_bash"] is True
|
||||
|
||||
|
||||
def test_demote_restores_pre_admin_privilege_restrictions(tmp_path):
|
||||
auth_mod, mgr = _fresh_auth_manager(tmp_path)
|
||||
mgr.create_user("admin", "pw-123456", is_admin=True)
|
||||
mgr.create_user("bob", "pw-123456")
|
||||
# Tighten bob below the defaults before promoting him.
|
||||
assert mgr.set_privileges("bob", {
|
||||
"can_use_agent": False,
|
||||
"can_generate_images": False,
|
||||
"max_messages_per_day": 50,
|
||||
}) is True
|
||||
restricted = mgr.get_privileges("bob")
|
||||
|
||||
assert mgr.set_admin("bob", True, "admin") is auth_mod.SetAdminResult.OK
|
||||
assert mgr.set_admin("bob", False, "admin") is auth_mod.SetAdminResult.OK
|
||||
|
||||
# Demotion must restore the pre-admin policy, not reset to defaults.
|
||||
assert mgr.get_privileges("bob") == restricted
|
||||
assert mgr.get_privileges("bob")["can_use_agent"] is False
|
||||
assert mgr.get_privileges("bob")["max_messages_per_day"] == 50
|
||||
|
||||
|
||||
def test_promote_demote_round_trip_is_stable_and_cleans_up_stash(tmp_path):
|
||||
auth_mod, mgr = _fresh_auth_manager(tmp_path)
|
||||
mgr.create_user("admin", "pw-123456", is_admin=True)
|
||||
mgr.create_user("bob", "pw-123456")
|
||||
assert mgr.set_privileges("bob", {"can_use_browser": False}) is True
|
||||
restricted = mgr.get_privileges("bob")
|
||||
|
||||
for _ in range(2): # two full promote/demote cycles
|
||||
assert mgr.set_admin("bob", True, "admin") is auth_mod.SetAdminResult.OK
|
||||
assert mgr.set_admin("bob", False, "admin") is auth_mod.SetAdminResult.OK
|
||||
|
||||
assert mgr.get_privileges("bob") == restricted
|
||||
# The stash is promotion-time bookkeeping; it must not linger on the row.
|
||||
assert "privileges_before_admin" not in mgr.users["bob"]
|
||||
|
||||
|
||||
def test_redundant_promote_does_not_clobber_stash(tmp_path):
|
||||
auth_mod, mgr = _fresh_auth_manager(tmp_path)
|
||||
mgr.create_user("admin", "pw-123456", is_admin=True)
|
||||
mgr.create_user("bob", "pw-123456")
|
||||
assert mgr.set_privileges("bob", {"can_use_agent": False}) is True
|
||||
restricted = mgr.get_privileges("bob")
|
||||
|
||||
assert mgr.set_admin("bob", True, "admin") is auth_mod.SetAdminResult.OK
|
||||
# A second promote is a no-op and must not re-stash ADMIN_PRIVILEGES.
|
||||
assert mgr.set_admin("bob", True, "admin") is auth_mod.SetAdminResult.OK
|
||||
assert mgr.set_admin("bob", False, "admin") is auth_mod.SetAdminResult.OK
|
||||
|
||||
# Demotion must still restore the original pre-admin restrictions.
|
||||
assert mgr.get_privileges("bob") == restricted
|
||||
assert mgr.get_privileges("bob")["can_use_agent"] is False
|
||||
|
||||
|
||||
def test_pre_admin_privileges_survive_manager_reload(tmp_path):
|
||||
auth_mod, mgr = _fresh_auth_manager(tmp_path)
|
||||
mgr.create_user("admin", "pw-123456", is_admin=True)
|
||||
mgr.create_user("bob", "pw-123456")
|
||||
assert mgr.set_privileges("bob", {"can_use_research": False}) is True
|
||||
assert mgr.set_admin("bob", True, "admin") is auth_mod.SetAdminResult.OK
|
||||
|
||||
# Fresh manager on the same auth.json — the stash must round-trip disk.
|
||||
mgr2 = auth_mod.AuthManager(str(tmp_path / "auth.json"))
|
||||
assert mgr2.set_admin("bob", False, "admin") is auth_mod.SetAdminResult.OK
|
||||
assert mgr2.get_privileges("bob")["can_use_research"] is False
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Route-level: PUT /api/auth/users/{username}/admin (mirrors
|
||||
# tests/test_auth_regressions.py). SetAdminResult is read from the route
|
||||
# module's own namespace so the route and the test share one enum object.
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
_ADMIN_ROUTE = "/api/auth/users/{username}/admin"
|
||||
|
||||
|
||||
def _auth_route_endpoint(path, method):
|
||||
from routes.auth_routes import setup_auth_routes
|
||||
|
||||
auth_manager = MagicMock()
|
||||
router = setup_auth_routes(auth_manager)
|
||||
for route in router.routes:
|
||||
if getattr(route, "path", "") == path and method in getattr(route, "methods", set()):
|
||||
return auth_manager, route.endpoint
|
||||
raise AssertionError(f"{method} {path} route not registered")
|
||||
|
||||
|
||||
def _fake_auth_request(token="session-token"):
|
||||
from routes.auth_routes import SESSION_COOKIE
|
||||
|
||||
req = SimpleNamespace()
|
||||
req.cookies = {SESSION_COOKIE: token}
|
||||
req.client = SimpleNamespace(host="127.0.0.1")
|
||||
return req
|
||||
|
||||
|
||||
def _result_enum():
|
||||
import routes.auth_routes as ar
|
||||
|
||||
return ar.SetAdminResult
|
||||
|
||||
|
||||
def test_route_requires_admin():
|
||||
from routes.auth_routes import SetAdminRequest
|
||||
|
||||
auth, target = _auth_route_endpoint(_ADMIN_ROUTE, "PUT")
|
||||
auth.get_username_for_token.return_value = "bob"
|
||||
auth.is_admin.return_value = False
|
||||
|
||||
with pytest.raises(HTTPException) as exc:
|
||||
asyncio.run(target(username="carol", body=SetAdminRequest(is_admin=True),
|
||||
request=_fake_auth_request()))
|
||||
|
||||
assert exc.value.status_code == 403
|
||||
auth.set_admin.assert_not_called()
|
||||
|
||||
|
||||
def test_route_last_admin_returns_400():
|
||||
from routes.auth_routes import SetAdminRequest
|
||||
|
||||
R = _result_enum()
|
||||
auth, target = _auth_route_endpoint(_ADMIN_ROUTE, "PUT")
|
||||
auth.get_username_for_token.return_value = "admin"
|
||||
auth.is_admin.return_value = True
|
||||
auth.set_admin.return_value = R.LAST_ADMIN
|
||||
|
||||
with pytest.raises(HTTPException) as exc:
|
||||
asyncio.run(target(username="admin", body=SetAdminRequest(is_admin=False),
|
||||
request=_fake_auth_request()))
|
||||
|
||||
assert exc.value.status_code == 400
|
||||
|
||||
|
||||
def test_route_user_not_found_returns_404():
|
||||
from routes.auth_routes import SetAdminRequest
|
||||
|
||||
R = _result_enum()
|
||||
auth, target = _auth_route_endpoint(_ADMIN_ROUTE, "PUT")
|
||||
auth.get_username_for_token.return_value = "admin"
|
||||
auth.is_admin.return_value = True
|
||||
auth.set_admin.return_value = R.USER_NOT_FOUND
|
||||
|
||||
with pytest.raises(HTTPException) as exc:
|
||||
asyncio.run(target(username="ghost", body=SetAdminRequest(is_admin=True),
|
||||
request=_fake_auth_request()))
|
||||
|
||||
assert exc.value.status_code == 404
|
||||
|
||||
|
||||
def test_route_success_returns_envelope():
|
||||
from routes.auth_routes import SetAdminRequest
|
||||
|
||||
R = _result_enum()
|
||||
auth, target = _auth_route_endpoint(_ADMIN_ROUTE, "PUT")
|
||||
auth.get_username_for_token.return_value = "admin"
|
||||
auth.is_admin.return_value = True
|
||||
auth.set_admin.return_value = R.OK
|
||||
|
||||
out = asyncio.run(target(username="bob", body=SetAdminRequest(is_admin=True),
|
||||
request=_fake_auth_request()))
|
||||
|
||||
assert out == {"ok": True, "is_admin": True, "self": False}
|
||||
|
||||
|
||||
def test_route_self_flag_true_when_targeting_own_account():
|
||||
from routes.auth_routes import SetAdminRequest
|
||||
|
||||
R = _result_enum()
|
||||
auth, target = _auth_route_endpoint(_ADMIN_ROUTE, "PUT")
|
||||
auth.get_username_for_token.return_value = "admin"
|
||||
auth.is_admin.return_value = True
|
||||
auth.set_admin.return_value = R.OK
|
||||
|
||||
out = asyncio.run(target(username="Admin", body=SetAdminRequest(is_admin=False),
|
||||
request=_fake_auth_request()))
|
||||
|
||||
assert out == {"ok": True, "is_admin": False, "self": True}
|
||||
Reference in New Issue
Block a user