From a7b03398b6e3dd1989010c9bc3582cb00d5480ab Mon Sep 17 00:00:00 2001 From: Ashvin <76151462+ashvinctrl@users.noreply.github.com> Date: Thu, 11 Jun 2026 19:04:44 +0530 Subject: [PATCH] fix(tokens): owner check on update and delete routes (#3899) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PATCH and DELETE /api/tokens/{id} both called require_admin but never checked that the token belonged to the requesting admin. Any admin could rename, re-scope, or delete another admin's token by ID. create_token already stamps owner on every token — update and delete just never read it. Fixed by comparing token.owner against get_current_user(request) after the 404 guard, same pattern the rest of the auth routes use. Check is skipped when current_user is falsy (AUTH_ENABLED=false / single-user mode). Fixes #3898 --- routes/api_token_routes.py | 11 +++- tests/test_api_token_routes.py | 102 ++++++++++++++++++++++++++++++++- 2 files changed, 109 insertions(+), 4 deletions(-) diff --git a/routes/api_token_routes.py b/routes/api_token_routes.py index 6f8ac2fc9..475c6502d 100644 --- a/routes/api_token_routes.py +++ b/routes/api_token_routes.py @@ -154,6 +154,7 @@ def setup_api_token_routes() -> APIRouter: @router.patch("/tokens/{token_id}") async def update_token(request: Request, token_id: str): require_admin(request) + current_user = get_current_user(request) try: payload = await request.json() except Exception: @@ -162,6 +163,8 @@ def setup_api_token_routes() -> APIRouter: token = db.query(ApiToken).filter(ApiToken.id == token_id).first() if not token: raise HTTPException(404, "Token not found") + if current_user and token.owner != current_user: + raise HTTPException(403, "Not your token") if isinstance(payload.get("name"), str) and payload["name"].strip(): token.name = payload["name"].strip()[:MAX_NAME_LEN] # Only touch scopes when the caller actually sent them. A partial @@ -189,10 +192,14 @@ def setup_api_token_routes() -> APIRouter: @router.delete("/tokens/{token_id}") def delete_token(request: Request, token_id: str): require_admin(request) + current_user = get_current_user(request) with get_db_session() as db: - deleted = db.query(ApiToken).filter(ApiToken.id == token_id).delete() - if not deleted: + token = db.query(ApiToken).filter(ApiToken.id == token_id).first() + if not token: raise HTTPException(404, "Token not found") + if current_user and token.owner != current_user: + raise HTTPException(403, "Not your token") + db.delete(token) _invalidate_cache(request) return {"status": "deleted"} diff --git a/tests/test_api_token_routes.py b/tests/test_api_token_routes.py index 8443fdafe..cd7eb5709 100644 --- a/tests/test_api_token_routes.py +++ b/tests/test_api_token_routes.py @@ -287,8 +287,9 @@ def test_delete_token_deletes_and_invalidates_cache(monkeypatch, token_routes_mo monkeypatch.setattr(mod, "get_current_user", lambda req: req.state.current_user) monkeypatch.setattr(mod, "ApiToken", MagicMock()) + fake_token = SimpleNamespace(id="abcd1234", owner="alice", name="test") fake_session = MagicMock() - fake_session.query.return_value.filter.return_value.delete.return_value = 1 + fake_session.query.return_value.filter.return_value.first.return_value = fake_token monkeypatch.setattr(mod, "get_db_session", lambda: _db_ctx(fake_session)) invalidator = MagicMock() @@ -297,6 +298,7 @@ def test_delete_token_deletes_and_invalidates_cache(monkeypatch, token_routes_mo resp = delete_token(request=req, token_id="abcd1234") assert resp == {"status": "deleted"} + fake_session.delete.assert_called_once_with(fake_token) invalidator.assert_called_once() @@ -312,7 +314,7 @@ def test_delete_missing_token_returns_404_without_invalidating_cache(monkeypatch monkeypatch.setattr(mod, "ApiToken", MagicMock()) fake_session = MagicMock() - fake_session.query.return_value.filter.return_value.delete.return_value = 0 + fake_session.query.return_value.filter.return_value.first.return_value = None monkeypatch.setattr(mod, "get_db_session", lambda: _db_ctx(fake_session)) invalidator = MagicMock() @@ -404,3 +406,99 @@ def test_update_missing_token_returns_404(monkeypatch, token_routes_mod): with pytest.raises(HTTPException) as exc: asyncio.run(update_token(request=req, token_id="missing99")) assert exc.value.status_code == 404 + + +# --------------------------------------------------------------------------- +# 7. Owner check — update/delete reject a different admin's token with 403 +# --------------------------------------------------------------------------- + + +def _bob_patch_request(invalidator, body): + """An admin request from bob whose async .json() yields `body`.""" + req = _req("bob", is_admin=True, invalidator=invalidator) + + async def _json(): + return body + + req.json = _json + return req + + +def test_update_token_rejects_non_owner(monkeypatch, token_routes_mod): + monkeypatch.setenv("AUTH_ENABLED", "true") + mod = token_routes_mod + monkeypatch.setattr(mod, "get_current_user", lambda req: req.state.current_user) + + token = SimpleNamespace( + id="tok123", name="alice-token", owner="alice", + token_prefix="ody_alic", scopes="chat", is_active=True, + ) + fake_session = MagicMock() + fake_session.query.return_value.filter.return_value.first.return_value = token + monkeypatch.setattr(mod, "get_db_session", lambda: _db_ctx(fake_session)) + + req = _bob_patch_request(MagicMock(), {"name": "hijacked"}) + update_token = _get_handler(mod, "PATCH", "/tokens/{token_id}") + with pytest.raises(HTTPException) as exc: + asyncio.run(update_token(request=req, token_id="tok123")) + assert exc.value.status_code == 403 + assert token.name == "alice-token" + + +def test_delete_token_rejects_non_owner(monkeypatch, token_routes_mod): + monkeypatch.setenv("AUTH_ENABLED", "true") + mod = token_routes_mod + monkeypatch.setattr(mod, "get_current_user", lambda req: req.state.current_user) + monkeypatch.setattr(mod, "ApiToken", MagicMock()) + + fake_token = SimpleNamespace(id="tok123", owner="alice", name="alice-token") + fake_session = MagicMock() + fake_session.query.return_value.filter.return_value.first.return_value = fake_token + monkeypatch.setattr(mod, "get_db_session", lambda: _db_ctx(fake_session)) + + invalidator = MagicMock() + req = _req("bob", is_admin=True, invalidator=invalidator) + delete_token = _get_handler(mod, "DELETE", "/tokens/{token_id}") + with pytest.raises(HTTPException) as exc: + delete_token(request=req, token_id="tok123") + assert exc.value.status_code == 403 + fake_session.delete.assert_not_called() + invalidator.assert_not_called() + + +def test_update_token_owner_check_skipped_when_auth_disabled(monkeypatch, token_routes_mod): + monkeypatch.setenv("AUTH_ENABLED", "false") + mod = token_routes_mod + monkeypatch.setattr(mod, "get_current_user", lambda req: None) + + token = SimpleNamespace( + id="tok123", name="original", owner="alice", + token_prefix="ody_alic", scopes="chat", is_active=True, + ) + fake_session = MagicMock() + fake_session.query.return_value.filter.return_value.first.return_value = token + monkeypatch.setattr(mod, "get_db_session", lambda: _db_ctx(fake_session)) + + req = _bob_patch_request(MagicMock(), {"name": "renamed-in-single-user"}) + update_token = _get_handler(mod, "PATCH", "/tokens/{token_id}") + resp = asyncio.run(update_token(request=req, token_id="tok123")) + assert resp["name"] == "renamed-in-single-user" + + +def test_delete_token_owner_check_skipped_when_auth_disabled(monkeypatch, token_routes_mod): + monkeypatch.setenv("AUTH_ENABLED", "false") + mod = token_routes_mod + monkeypatch.setattr(mod, "get_current_user", lambda req: None) + monkeypatch.setattr(mod, "ApiToken", MagicMock()) + + fake_token = SimpleNamespace(id="tok123", owner="alice", name="alice-token") + fake_session = MagicMock() + fake_session.query.return_value.filter.return_value.first.return_value = fake_token + monkeypatch.setattr(mod, "get_db_session", lambda: _db_ctx(fake_session)) + + invalidator = MagicMock() + req = _req("", is_admin=True, invalidator=invalidator) + delete_token = _get_handler(mod, "DELETE", "/tokens/{token_id}") + resp = delete_token(request=req, token_id="tok123") + assert resp == {"status": "deleted"} + fake_session.delete.assert_called_once_with(fake_token)