diff --git a/routes/api_token_routes.py b/routes/api_token_routes.py index 68d150368..97c576d15 100644 --- a/routes/api_token_routes.py +++ b/routes/api_token_routes.py @@ -155,22 +155,30 @@ def setup_api_token_routes() -> APIRouter: payload = await request.json() except Exception: payload = {} - scope_list = _normalize_scopes(payload.get("scopes")) - scopes_value = ",".join(scope_list) with get_db_session() as db: token = db.query(ApiToken).filter(ApiToken.id == token_id).first() if not token: raise HTTPException(404, "Token not found") if isinstance(payload.get("name"), str) and payload["name"].strip(): token.name = payload["name"].strip()[:MAX_NAME_LEN] - token.scopes = scopes_value + # Only touch scopes when the caller actually sent them. A partial + # update such as a rename ({"name": ...} with no "scopes" key) must + # not silently reset the token to the default scope — that dropped + # every previously granted scope. + if "scopes" in payload: + token.scopes = ",".join(_normalize_scopes(payload.get("scopes"))) db.add(token) + current_scopes = [ + s.strip() + for s in (getattr(token, "scopes", "") or DEFAULT_SCOPES).split(",") + if s.strip() + ] response = { "id": token_id, "name": getattr(token, "name", ""), "owner": getattr(token, "owner", None), "token_prefix": getattr(token, "token_prefix", ""), - "scopes": scope_list, + "scopes": current_scopes, } _invalidate_cache(request) return response diff --git a/tests/test_api_token_routes.py b/tests/test_api_token_routes.py index 611324e69..8c9aaab51 100644 --- a/tests/test_api_token_routes.py +++ b/tests/test_api_token_routes.py @@ -5,6 +5,7 @@ Uses direct endpoint extraction from setup_api_token_routes().routes and fake objects only — no real DB, no network, no external services. """ +import asyncio import contextlib import datetime import secrets as _secrets_mod @@ -292,3 +293,84 @@ def test_delete_missing_token_returns_404_without_invalidating_cache(monkeypatch delete_token(request=req, token_id="missing99") assert exc.value.status_code == 404 invalidator.assert_not_called() + + +# --------------------------------------------------------------------------- +# 6. PATCH /api/tokens/{id} — a partial update must not wipe scopes +# --------------------------------------------------------------------------- + + +def _patch_request(invalidator, body): + """An admin request whose async .json() yields `body`.""" + req = _req("alice", is_admin=True, invalidator=invalidator) + + async def _json(): + return body + + req.json = _json + return req + + +def test_update_token_rename_preserves_scopes(monkeypatch, token_routes_mod): + """Renaming a token (no 'scopes' key in the body) must keep its scopes. + + Previously update_token recomputed scopes from payload.get("scopes"), + which is None on a rename, so _normalize_scopes(None) reset every token to + the default ["chat"] scope — a silent privilege/data loss. + """ + monkeypatch.setenv("AUTH_ENABLED", "true") + mod = token_routes_mod + + token = SimpleNamespace( + id="tok123", name="original", owner="alice", + token_prefix="ody_orig", scopes="email:read,email:draft", is_active=True, + ) + fake_session = MagicMock() + fake_session.query.return_value.filter.return_value.first.return_value = token + monkeypatch.setattr(mod, "get_db_session", lambda: _db_ctx(fake_session)) + + invalidator = MagicMock() + req = _patch_request(invalidator, {"name": "renamed"}) + update_token = _get_handler(mod, "PATCH", "/tokens/{token_id}") + resp = asyncio.run(update_token(request=req, token_id="tok123")) + + assert token.scopes == "email:read,email:draft" # untouched + assert resp["scopes"] == ["email:read", "email:draft"] + assert token.name == "renamed" + invalidator.assert_called_once() + + +def test_update_token_applies_explicit_scopes(monkeypatch, token_routes_mod): + """When the body includes 'scopes', they are normalized and written.""" + monkeypatch.setenv("AUTH_ENABLED", "true") + mod = token_routes_mod + + token = SimpleNamespace( + id="tok123", name="original", owner="alice", + token_prefix="ody_orig", scopes="email:read,email:draft", is_active=True, + ) + fake_session = MagicMock() + fake_session.query.return_value.filter.return_value.first.return_value = token + monkeypatch.setattr(mod, "get_db_session", lambda: _db_ctx(fake_session)) + + req = _patch_request(MagicMock(), {"scopes": ["chat"]}) + update_token = _get_handler(mod, "PATCH", "/tokens/{token_id}") + resp = asyncio.run(update_token(request=req, token_id="tok123")) + + assert token.scopes == "chat" + assert resp["scopes"] == ["chat"] + + +def test_update_missing_token_returns_404(monkeypatch, token_routes_mod): + monkeypatch.setenv("AUTH_ENABLED", "true") + mod = token_routes_mod + + fake_session = MagicMock() + fake_session.query.return_value.filter.return_value.first.return_value = None + monkeypatch.setattr(mod, "get_db_session", lambda: _db_ctx(fake_session)) + + req = _patch_request(MagicMock(), {"name": "x"}) + update_token = _get_handler(mod, "PATCH", "/tokens/{token_id}") + with pytest.raises(HTTPException) as exc: + asyncio.run(update_token(request=req, token_id="missing99")) + assert exc.value.status_code == 404