fix(tokens): owner check on update and delete routes (#3899)

PATCH and DELETE /api/tokens/{id} both called require_admin but never
checked that the token belonged to the requesting admin. Any admin could
rename, re-scope, or delete another admin's token by ID.

create_token already stamps owner on every token — update and delete
just never read it. Fixed by comparing token.owner against
get_current_user(request) after the 404 guard, same pattern the rest of
the auth routes use. Check is skipped when current_user is falsy
(AUTH_ENABLED=false / single-user mode).

Fixes #3898
This commit is contained in:
Ashvin
2026-06-11 19:04:44 +05:30
committed by GitHub
parent 4f48cfa9ae
commit a7b03398b6
2 changed files with 109 additions and 4 deletions
+9 -2
View File
@@ -154,6 +154,7 @@ def setup_api_token_routes() -> APIRouter:
@router.patch("/tokens/{token_id}")
async def update_token(request: Request, token_id: str):
require_admin(request)
current_user = get_current_user(request)
try:
payload = await request.json()
except Exception:
@@ -162,6 +163,8 @@ def setup_api_token_routes() -> APIRouter:
token = db.query(ApiToken).filter(ApiToken.id == token_id).first()
if not token:
raise HTTPException(404, "Token not found")
if current_user and token.owner != current_user:
raise HTTPException(403, "Not your token")
if isinstance(payload.get("name"), str) and payload["name"].strip():
token.name = payload["name"].strip()[:MAX_NAME_LEN]
# Only touch scopes when the caller actually sent them. A partial
@@ -189,10 +192,14 @@ def setup_api_token_routes() -> APIRouter:
@router.delete("/tokens/{token_id}")
def delete_token(request: Request, token_id: str):
require_admin(request)
current_user = get_current_user(request)
with get_db_session() as db:
deleted = db.query(ApiToken).filter(ApiToken.id == token_id).delete()
if not deleted:
token = db.query(ApiToken).filter(ApiToken.id == token_id).first()
if not token:
raise HTTPException(404, "Token not found")
if current_user and token.owner != current_user:
raise HTTPException(403, "Not your token")
db.delete(token)
_invalidate_cache(request)
return {"status": "deleted"}
+100 -2
View File
@@ -287,8 +287,9 @@ def test_delete_token_deletes_and_invalidates_cache(monkeypatch, token_routes_mo
monkeypatch.setattr(mod, "get_current_user", lambda req: req.state.current_user)
monkeypatch.setattr(mod, "ApiToken", MagicMock())
fake_token = SimpleNamespace(id="abcd1234", owner="alice", name="test")
fake_session = MagicMock()
fake_session.query.return_value.filter.return_value.delete.return_value = 1
fake_session.query.return_value.filter.return_value.first.return_value = fake_token
monkeypatch.setattr(mod, "get_db_session", lambda: _db_ctx(fake_session))
invalidator = MagicMock()
@@ -297,6 +298,7 @@ def test_delete_token_deletes_and_invalidates_cache(monkeypatch, token_routes_mo
resp = delete_token(request=req, token_id="abcd1234")
assert resp == {"status": "deleted"}
fake_session.delete.assert_called_once_with(fake_token)
invalidator.assert_called_once()
@@ -312,7 +314,7 @@ def test_delete_missing_token_returns_404_without_invalidating_cache(monkeypatch
monkeypatch.setattr(mod, "ApiToken", MagicMock())
fake_session = MagicMock()
fake_session.query.return_value.filter.return_value.delete.return_value = 0
fake_session.query.return_value.filter.return_value.first.return_value = None
monkeypatch.setattr(mod, "get_db_session", lambda: _db_ctx(fake_session))
invalidator = MagicMock()
@@ -404,3 +406,99 @@ def test_update_missing_token_returns_404(monkeypatch, token_routes_mod):
with pytest.raises(HTTPException) as exc:
asyncio.run(update_token(request=req, token_id="missing99"))
assert exc.value.status_code == 404
# ---------------------------------------------------------------------------
# 7. Owner check — update/delete reject a different admin's token with 403
# ---------------------------------------------------------------------------
def _bob_patch_request(invalidator, body):
"""An admin request from bob whose async .json() yields `body`."""
req = _req("bob", is_admin=True, invalidator=invalidator)
async def _json():
return body
req.json = _json
return req
def test_update_token_rejects_non_owner(monkeypatch, token_routes_mod):
monkeypatch.setenv("AUTH_ENABLED", "true")
mod = token_routes_mod
monkeypatch.setattr(mod, "get_current_user", lambda req: req.state.current_user)
token = SimpleNamespace(
id="tok123", name="alice-token", owner="alice",
token_prefix="ody_alic", scopes="chat", is_active=True,
)
fake_session = MagicMock()
fake_session.query.return_value.filter.return_value.first.return_value = token
monkeypatch.setattr(mod, "get_db_session", lambda: _db_ctx(fake_session))
req = _bob_patch_request(MagicMock(), {"name": "hijacked"})
update_token = _get_handler(mod, "PATCH", "/tokens/{token_id}")
with pytest.raises(HTTPException) as exc:
asyncio.run(update_token(request=req, token_id="tok123"))
assert exc.value.status_code == 403
assert token.name == "alice-token"
def test_delete_token_rejects_non_owner(monkeypatch, token_routes_mod):
monkeypatch.setenv("AUTH_ENABLED", "true")
mod = token_routes_mod
monkeypatch.setattr(mod, "get_current_user", lambda req: req.state.current_user)
monkeypatch.setattr(mod, "ApiToken", MagicMock())
fake_token = SimpleNamespace(id="tok123", owner="alice", name="alice-token")
fake_session = MagicMock()
fake_session.query.return_value.filter.return_value.first.return_value = fake_token
monkeypatch.setattr(mod, "get_db_session", lambda: _db_ctx(fake_session))
invalidator = MagicMock()
req = _req("bob", is_admin=True, invalidator=invalidator)
delete_token = _get_handler(mod, "DELETE", "/tokens/{token_id}")
with pytest.raises(HTTPException) as exc:
delete_token(request=req, token_id="tok123")
assert exc.value.status_code == 403
fake_session.delete.assert_not_called()
invalidator.assert_not_called()
def test_update_token_owner_check_skipped_when_auth_disabled(monkeypatch, token_routes_mod):
monkeypatch.setenv("AUTH_ENABLED", "false")
mod = token_routes_mod
monkeypatch.setattr(mod, "get_current_user", lambda req: None)
token = SimpleNamespace(
id="tok123", name="original", owner="alice",
token_prefix="ody_alic", scopes="chat", is_active=True,
)
fake_session = MagicMock()
fake_session.query.return_value.filter.return_value.first.return_value = token
monkeypatch.setattr(mod, "get_db_session", lambda: _db_ctx(fake_session))
req = _bob_patch_request(MagicMock(), {"name": "renamed-in-single-user"})
update_token = _get_handler(mod, "PATCH", "/tokens/{token_id}")
resp = asyncio.run(update_token(request=req, token_id="tok123"))
assert resp["name"] == "renamed-in-single-user"
def test_delete_token_owner_check_skipped_when_auth_disabled(monkeypatch, token_routes_mod):
monkeypatch.setenv("AUTH_ENABLED", "false")
mod = token_routes_mod
monkeypatch.setattr(mod, "get_current_user", lambda req: None)
monkeypatch.setattr(mod, "ApiToken", MagicMock())
fake_token = SimpleNamespace(id="tok123", owner="alice", name="alice-token")
fake_session = MagicMock()
fake_session.query.return_value.filter.return_value.first.return_value = fake_token
monkeypatch.setattr(mod, "get_db_session", lambda: _db_ctx(fake_session))
invalidator = MagicMock()
req = _req("", is_admin=True, invalidator=invalidator)
delete_token = _get_handler(mod, "DELETE", "/tokens/{token_id}")
resp = delete_token(request=req, token_id="tok123")
assert resp == {"status": "deleted"}
fake_session.delete.assert_called_once_with(fake_token)