feat: add ChatGPT Subscription provider (#2876)

* feat: Add ChatGPT Subscription support and related features

- Introduced a new provider option for ChatGPT Subscription in the endpoint selection UI.
- Implemented OAuth flow for ChatGPT Subscription sign-in, including polling for authorization status.
- Updated admin interface to handle ChatGPT Subscription, including disabling API key input and providing user guidance.
- Enhanced cost tracking logic to differentiate between subscription and non-subscription endpoints.
- Added new slash commands for managing skills, including listing, searching, and invoking skills.
- Implemented caching for skill catalog to optimize performance.
- Updated tests to cover new ChatGPT Subscription functionality and ensure proper endpoint probing.
- Refactored existing code to accommodate new features and improve maintainability.

* refactor: share provider device-flow setup

- reuse one device-flow backend for Copilot and ChatGPT Subscription
- add one frontend device-flow helper for Settings and /setup
- put GitHub Copilot back into Add Models, now as a dropdown option
- make provider selection just select; clicking Add starts sign-in
- stop ChatGPT Subscription setup from opening auth tabs automatically
- make /setup copilot and /setup chatgpt-subscription work from chat
- show ChatGPT Subscription in the /setup suggestions
- show the real error message when setup fails
- add focused tests for the shared flow and setup UI

* feat(chatgpt-subscription): harden credential lifecycle and streamline auth UX

Backend:
- Resolve runtime bearer for provider-auth endpoints at probe time via a
  shared _resolve_probe_key() that delegates to resolve_endpoint_runtime,
  applied across all probe/refresh call sites.
- Skip live completion probes and health pings for discovery-only providers
  (centralized behind _is_discovery_only_provider) — the Codex/Responses API
  has no such endpoints, so status is derived from cached models.
- Never persist the short lived ChatGPT bearer to the plaintext sessions
  table; proactively clear any stale bearer left by an earlier code path.
- Revoke orphaned ProviderAuthSession credentials when the last endpoint
  backing them is deleted (_delete_orphaned_provider_auth), surfaced via
  cleared_provider_auth in the delete response.

Frontend (admin.js):
- Auto-start the device-auth flow on provider selection so the authorization
  panel (code + Authorize) shows immediately instead of behind a "Sign in" click.
- Remove the redundant top button for device auth providers, move retry
  into the panel via an inline "Try again".
- Drop the self-evident hint text and add an execCommand clipboard fallback so
  Copy works in non-secure (HTTP/LAN) contexts.

* fix: harden chatgpt subscription provider

* chore: remove PR media from branch

* Fix chatgpt subscription recovery and token handling

---------

Co-authored-by: 5p00kyy <admin@5p00ky.dev>
This commit is contained in:
stocky789
2026-06-08 18:19:18 +10:00
committed by GitHub
parent ac94885c84
commit 1e0d9b92af
37 changed files with 3425 additions and 485 deletions
+65
View File
@@ -0,0 +1,65 @@
"""Static regressions for Add Models provider device-flow UX."""
from pathlib import Path
_REPO = Path(__file__).resolve().parent.parent
_INDEX = (_REPO / "static" / "index.html").read_text(encoding="utf-8")
_ADMIN = (_REPO / "static" / "js" / "admin.js").read_text(encoding="utf-8")
def _between(src: str, start: str, end: str) -> str:
start_idx = src.index(start)
end_idx = src.index(end, start_idx)
return src[start_idx:end_idx]
def test_copilot_and_chatgpt_subscription_are_dropdown_device_auth_options():
assert 'value="copilot" data-logo="github" data-auth-flow="copilot">GitHub Copilot' in _INDEX
assert 'value="chatgpt-subscription" data-logo="openai" data-auth-flow="chatgpt-subscription">ChatGPT Subscription' in _INDEX
assert 'id="adm-deviceAuthStatus"' in _INDEX
def test_provider_selection_is_inert_and_add_button_starts_device_flow():
change_block = _between(_ADMIN, "provider.addEventListener('change'", "urlInput.addEventListener('input'")
add_block = _between(_ADMIN, "el('adm-epAddBtn').addEventListener('click'", "async function _startProviderDeviceAuth")
assert "_startProviderDeviceAuth" not in change_block
assert "_startProviderDeviceAuth(deviceAuthProvider" in add_block
def test_device_auth_selection_disables_and_dims_api_test_button():
form_block = _between(_ADMIN, "function _setApiFormForProvider()", "function _renderPickerMenu()")
assert "testBtn.disabled = true" in form_block
assert "testBtn.style.opacity = '0.45'" in form_block
assert "testBtn.style.cursor = 'not-allowed'" in form_block
assert "testBtn.disabled = false" in form_block
assert "testBtn.style.opacity = ''" in form_block
assert "testBtn.style.cursor = ''" in form_block
def test_device_auth_keeps_manual_auth_button_without_auto_opening_tab():
auth_block = _between(_ADMIN, "async function _startProviderDeviceAuth", "// Local \"Add\" button")
assert "Authorize with OpenAI" in auth_block
assert "Authorize on GitHub" in auth_block
assert "adm-copilot-panel" in auth_block
assert "adm-device-auth-copy" in auth_block
assert "openWindow: () => {}" in auth_block
assert "A new tab opened" not in auth_block
def test_loud_oauth_copy_and_removed_button_hooks_do_not_return():
forbidden = [
"Click Add to start",
"uses account sign-in",
"Uses ChatGPT/Codex OAuth, not an OpenAI API key.",
"adm-chatgptStatus",
"adm-chatgptConnectBtn",
"adm-copilotConnectBtn",
"adm-copilotStatus",
]
for needle in forbidden:
assert needle not in _INDEX
assert needle not in _ADMIN
+280
View File
@@ -0,0 +1,280 @@
"""DB-backed ChatGPT Subscription endpoint provisioning tests."""
import json
import pytest
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from core.database import Base, ModelEndpoint, ProviderAuthSession
import routes.chatgpt_subscription_routes as csr
def _mem_db(monkeypatch):
engine = create_engine("sqlite:///:memory:")
Base.metadata.create_all(bind=engine)
# Match production (core.database SessionLocal is autoflush=False): a pending
# db.delete(ep) is NOT flushed before the orphan-auth reference-count SELECT,
# which is exactly why _delete_orphaned_provider_auth needs exclude_ep_id.
TestSessionLocal = sessionmaker(bind=engine, autoflush=False)
monkeypatch.setattr(csr, "SessionLocal", TestSessionLocal)
return TestSessionLocal
def test_provision_creates_owner_scoped_auth_session_and_endpoint(monkeypatch):
TestSessionLocal = _mem_db(monkeypatch)
monkeypatch.setattr(csr.chatgpt_subscription, "fetch_available_models", lambda token: ["gpt-5.5", "o4-mini"])
res = csr._provision_endpoint({"access_token": "AT", "refresh_token": "RT"}, "alice")
assert res["name"] == "ChatGPT Subscription"
assert res["base_url"] == csr.chatgpt_subscription.DEFAULT_CHATGPT_SUBSCRIPTION_BASE_URL
assert res["models"] == ["gpt-5.5", "o4-mini"]
db = TestSessionLocal()
try:
auth = db.query(ProviderAuthSession).first()
ep = db.query(ModelEndpoint).filter(ModelEndpoint.id == res["id"]).first()
assert auth is not None
assert auth.owner == "alice"
assert auth.provider == csr.chatgpt_subscription.CHATGPT_SUBSCRIPTION_PROVIDER
assert auth.access_token == "AT"
assert auth.refresh_token == "RT"
assert auth.auth_mode == "chatgpt"
assert ep is not None
assert ep.owner == "alice"
assert ep.api_key is None
assert ep.provider_auth_id == auth.id
assert ep.endpoint_kind == "api"
assert ep.model_refresh_mode == "manual"
assert ep.supports_tools is False
assert json.loads(ep.cached_models) == ["gpt-5.5", "o4-mini"]
finally:
db.close()
def test_provision_refreshes_existing_auth_session_and_endpoint(monkeypatch):
TestSessionLocal = _mem_db(monkeypatch)
monkeypatch.setattr(csr.chatgpt_subscription, "fetch_available_models", lambda token: ["gpt-5.5"])
first = csr._provision_endpoint({"access_token": "OLD", "refresh_token": "OLD-RT"}, "bob")
second = csr._provision_endpoint({"access_token": "NEW", "refresh_token": "NEW-RT"}, "bob")
assert first["id"] == second["id"]
db = TestSessionLocal()
try:
auth_rows = db.query(ProviderAuthSession).filter(ProviderAuthSession.owner == "bob").all()
ep_rows = db.query(ModelEndpoint).filter(ModelEndpoint.owner == "bob").all()
assert len(auth_rows) == 1
assert len(ep_rows) == 1
assert auth_rows[0].access_token == "NEW"
assert auth_rows[0].refresh_token == "NEW-RT"
assert ep_rows[0].provider_auth_id == auth_rows[0].id
finally:
db.close()
def test_provision_rejects_missing_tokens(monkeypatch):
_mem_db(monkeypatch)
with pytest.raises(ValueError, match="missing access_token or refresh_token"):
csr._provision_endpoint({"access_token": "AT"}, "alice")
def test_provision_rejects_accounts_without_usable_models(monkeypatch):
_mem_db(monkeypatch)
monkeypatch.setattr(csr.chatgpt_subscription, "fetch_available_models", lambda token: [])
with pytest.raises(ValueError, match="no usable Codex models"):
csr._provision_endpoint({"access_token": "AT", "refresh_token": "RT"}, "alice")
def _add_auth_and_endpoints(db, *, auth_id="auth1", ep_ids=("ep1",)):
db.add(ProviderAuthSession(
id=auth_id, provider=csr.chatgpt_subscription.CHATGPT_SUBSCRIPTION_PROVIDER,
owner="alice", base_url="https://chatgpt.com/backend-api/codex",
refresh_token="RT", auth_mode="chatgpt",
))
for ep_id in ep_ids:
db.add(ModelEndpoint(
id=ep_id, name="ChatGPT Subscription",
base_url="https://chatgpt.com/backend-api/codex",
provider_auth_id=auth_id, owner="alice",
))
db.commit()
def test_delete_orphaned_provider_auth_revokes_when_last_endpoint_removed(monkeypatch):
from routes.model_routes import _delete_orphaned_provider_auth
TestSessionLocal = _mem_db(monkeypatch)
db = TestSessionLocal()
try:
_add_auth_and_endpoints(db, auth_id="auth1", ep_ids=("ep1",))
# Mirror the production delete route: db.delete(ep) is issued (but not yet
# flushed/committed) BEFORE the orphan check runs.
ep1 = db.query(ModelEndpoint).filter(ModelEndpoint.id == "ep1").first()
db.delete(ep1)
# ep1 (its only referencing endpoint) is being deleted, so the auth clears.
assert _delete_orphaned_provider_auth(db, "auth1", exclude_ep_id="ep1") is True
db.commit()
assert db.query(ProviderAuthSession).filter(ProviderAuthSession.id == "auth1").first() is None
finally:
db.close()
def test_delete_orphaned_provider_auth_requires_exclude_ep_id_for_pending_delete(monkeypatch):
from routes.model_routes import _delete_orphaned_provider_auth
TestSessionLocal = _mem_db(monkeypatch)
db = TestSessionLocal()
try:
_add_auth_and_endpoints(db, auth_id="auth1", ep_ids=("ep1",))
ep1 = db.query(ModelEndpoint).filter(ModelEndpoint.id == "ep1").first()
db.delete(ep1)
# Without exclude_ep_id, the un-flushed pending delete leaves ep1 visible
# to the reference-count SELECT (autoflush=False), so the helper must
# conservatively KEEP the auth row. This is the bug exclude_ep_id fixes.
assert _delete_orphaned_provider_auth(db, "auth1") is False
assert db.query(ProviderAuthSession).filter(ProviderAuthSession.id == "auth1").first() is not None
finally:
db.close()
def test_delete_orphaned_provider_auth_keeps_auth_while_another_endpoint_uses_it(monkeypatch):
from routes.model_routes import _delete_orphaned_provider_auth
TestSessionLocal = _mem_db(monkeypatch)
db = TestSessionLocal()
try:
_add_auth_and_endpoints(db, auth_id="auth1", ep_ids=("ep1", "ep2"))
# ep2 still references auth1, so deleting ep1 must NOT revoke it.
assert _delete_orphaned_provider_auth(db, "auth1", exclude_ep_id="ep1") is False
assert db.query(ProviderAuthSession).filter(ProviderAuthSession.id == "auth1").first() is not None
finally:
db.close()
def test_delete_orphaned_provider_auth_noop_without_auth_id(monkeypatch):
from routes.model_routes import _delete_orphaned_provider_auth
TestSessionLocal = _mem_db(monkeypatch)
db = TestSessionLocal()
try:
assert _delete_orphaned_provider_auth(db, None, exclude_ep_id="ep1") is False
finally:
db.close()
def test_delete_orphaned_provider_auth_noop_when_auth_row_missing(monkeypatch):
from routes.model_routes import _delete_orphaned_provider_auth
TestSessionLocal = _mem_db(monkeypatch)
db = TestSessionLocal()
try:
# Endpoint points at an auth_id whose ProviderAuthSession is already gone.
db.add(ModelEndpoint(
id="ep1", name="ChatGPT Subscription",
base_url="https://chatgpt.com/backend-api/codex",
provider_auth_id="ghost", owner="alice",
))
db.commit()
ep1 = db.query(ModelEndpoint).filter(ModelEndpoint.id == "ep1").first()
db.delete(ep1)
# No other endpoint references "ghost" and no auth row exists → no-op, no error.
assert _delete_orphaned_provider_auth(db, "ghost", exclude_ep_id="ep1") is False
finally:
db.close()
def _delete_route(monkeypatch, TestSessionLocal):
"""Resolve the real DELETE /model-endpoints/{ep_id} route, wired to the test DB.
Neutralizes the route's unrelated cleanup side effects (settings/prefs files,
in-memory session manager) so the test stays hermetic and focuses on the
provider-auth revocation wiring.
"""
import routes.model_routes as mr
import routes.prefs_routes as prefs_routes
import src.ai_interaction as ai_interaction
monkeypatch.setattr(mr, "SessionLocal", TestSessionLocal)
monkeypatch.setattr(mr, "require_admin", lambda request: None)
monkeypatch.setattr(mr, "_load_settings", lambda: {})
monkeypatch.setattr(mr, "_save_settings", lambda settings: None)
monkeypatch.setattr(prefs_routes, "_load", lambda: {})
monkeypatch.setattr(prefs_routes, "_save", lambda prefs: None)
monkeypatch.setattr(ai_interaction, "get_session_manager", lambda: None)
router = mr.setup_model_routes(model_discovery=None)
for route in router.routes:
if getattr(route, "path", "") == "/api/model-endpoints/{ep_id}" and "DELETE" in getattr(route, "methods", set()):
return route.endpoint
raise AssertionError("DELETE /api/model-endpoints/{ep_id} not found")
def test_delete_endpoint_route_revokes_orphaned_provider_auth(monkeypatch):
TestSessionLocal = _mem_db(monkeypatch)
db = TestSessionLocal()
try:
_add_auth_and_endpoints(db, auth_id="auth1", ep_ids=("ep1",))
finally:
db.close()
delete_endpoint = _delete_route(monkeypatch, TestSessionLocal)
result = delete_endpoint("ep1", object())
assert result["deleted"] is True
# The last (only) endpoint backed by auth1 is gone, so the route revokes it.
assert result["cleared_provider_auth"] is True
db = TestSessionLocal()
try:
assert db.query(ProviderAuthSession).filter(ProviderAuthSession.id == "auth1").first() is None
assert db.query(ModelEndpoint).filter(ModelEndpoint.id == "ep1").first() is None
finally:
db.close()
def test_delete_endpoint_route_keeps_auth_when_shared(monkeypatch):
TestSessionLocal = _mem_db(monkeypatch)
db = TestSessionLocal()
try:
_add_auth_and_endpoints(db, auth_id="auth1", ep_ids=("ep1", "ep2"))
finally:
db.close()
delete_endpoint = _delete_route(monkeypatch, TestSessionLocal)
result = delete_endpoint("ep1", object())
assert result["deleted"] is True
# ep2 still references auth1, so deleting ep1 must NOT revoke the credentials.
assert result["cleared_provider_auth"] is False
db = TestSessionLocal()
try:
assert db.query(ProviderAuthSession).filter(ProviderAuthSession.id == "auth1").first() is not None
finally:
db.close()
def test_delete_orphaned_provider_auth_revokes_only_after_last_of_several(monkeypatch):
from routes.model_routes import _delete_orphaned_provider_auth
TestSessionLocal = _mem_db(monkeypatch)
db = TestSessionLocal()
try:
_add_auth_and_endpoints(db, auth_id="auth1", ep_ids=("ep1", "ep2"))
# Delete ep1 first: ep2 still references auth1, so the row survives.
ep1 = db.query(ModelEndpoint).filter(ModelEndpoint.id == "ep1").first()
db.delete(ep1)
assert _delete_orphaned_provider_auth(db, "auth1", exclude_ep_id="ep1") is False
db.commit()
assert db.query(ProviderAuthSession).filter(ProviderAuthSession.id == "auth1").first() is not None
# Now delete the last endpoint ep2: the auth row is finally cleared.
ep2 = db.query(ModelEndpoint).filter(ModelEndpoint.id == "ep2").first()
db.delete(ep2)
assert _delete_orphaned_provider_auth(db, "auth1", exclude_ep_id="ep2") is True
db.commit()
assert db.query(ProviderAuthSession).filter(ProviderAuthSession.id == "auth1").first() is None
finally:
db.close()
+138
View File
@@ -0,0 +1,138 @@
"""Shared device-flow route helper regressions."""
import pytest
from fastapi import FastAPI, HTTPException
from fastapi.testclient import TestClient
from routes import device_flow
def _client(monkeypatch, now_ref, start_flow, poll_flow):
store = device_flow.PendingDeviceFlowStore(time_func=lambda: now_ref[0])
router = device_flow.create_device_flow_router(
prefix="/api/test-device",
tags=["test-device"],
store=store,
start_flow=start_flow,
poll_flow=poll_flow,
)
app = FastAPI()
app.include_router(router)
monkeypatch.setattr(device_flow, "require_admin", lambda request: None)
return TestClient(app)
def _start(_request, _form):
return device_flow.DeviceFlowStart(
pending={"secret": "server-only", "owner": "alice"},
response={"user_code": "ABCD-EFGH", "verification_uri": "https://example.test/device"},
interval=5,
expires_in=20,
)
def test_pending_poll_is_throttled_until_interval(monkeypatch):
now = [100.0]
calls = []
def poll(_request, pending):
calls.append(dict(pending))
return device_flow.DeviceFlowPoll.pending()
client = _client(monkeypatch, now, _start, poll)
start = client.post("/api/test-device/device/start").json()
first = client.post("/api/test-device/device/poll", data={"poll_id": start["poll_id"]})
assert first.json() == {"status": "pending"}
assert calls == [{"secret": "server-only", "owner": "alice"}]
second = client.post("/api/test-device/device/poll", data={"poll_id": start["poll_id"]})
assert second.json() == {"status": "pending"}
assert len(calls) == 1
now[0] += 5
third = client.post("/api/test-device/device/poll", data={"poll_id": start["poll_id"]})
assert third.json() == {"status": "pending"}
assert len(calls) == 2
def test_slow_down_updates_poll_interval(monkeypatch):
now = [100.0]
calls = []
def poll(_request, _pending):
calls.append(now[0])
if len(calls) == 1:
return device_flow.DeviceFlowPoll.slow_down(interval=10)
return device_flow.DeviceFlowPoll.authorized({"id": "ep1", "models": ["gpt-4o"]})
client = _client(monkeypatch, now, _start, poll)
poll_id = client.post("/api/test-device/device/start").json()["poll_id"]
assert client.post("/api/test-device/device/poll", data={"poll_id": poll_id}).json() == {"status": "pending"}
now[0] += 9
assert client.post("/api/test-device/device/poll", data={"poll_id": poll_id}).json() == {"status": "pending"}
assert len(calls) == 1
now[0] += 1
assert client.post("/api/test-device/device/poll", data={"poll_id": poll_id}).json() == {
"status": "authorized",
"endpoint": {"id": "ep1", "models": ["gpt-4o"]},
}
def test_authorized_and_failed_polls_remove_pending_session(monkeypatch):
now = [100.0]
outcomes = [
device_flow.DeviceFlowPoll.authorized({"id": "ep1"}),
device_flow.DeviceFlowPoll.failed("access_denied"),
]
def poll(_request, _pending):
return outcomes.pop(0)
client = _client(monkeypatch, now, _start, poll)
first = client.post("/api/test-device/device/start").json()["poll_id"]
second = client.post("/api/test-device/device/start").json()["poll_id"]
assert client.post("/api/test-device/device/poll", data={"poll_id": first}).json()["status"] == "authorized"
assert client.post("/api/test-device/device/poll", data={"poll_id": first}).status_code == 404
assert client.post("/api/test-device/device/poll", data={"poll_id": second}).json() == {
"status": "failed",
"error": "access_denied",
}
assert client.post("/api/test-device/device/poll", data={"poll_id": second}).status_code == 404
def test_cancel_and_expiry_remove_pending_session(monkeypatch):
now = [100.0]
def poll(_request, _pending):
return device_flow.DeviceFlowPoll.pending()
client = _client(monkeypatch, now, _start, poll)
cancelled = client.post("/api/test-device/device/start").json()["poll_id"]
assert client.post("/api/test-device/device/cancel", data={"poll_id": cancelled}).json() == {"status": "cancelled"}
assert client.post("/api/test-device/device/poll", data={"poll_id": cancelled}).status_code == 404
expired = client.post("/api/test-device/device/start").json()["poll_id"]
now[0] += 21
assert client.post("/api/test-device/device/poll", data={"poll_id": expired}).status_code == 404
def test_routes_are_admin_gated(monkeypatch):
now = [100.0]
def poll(_request, _pending):
return device_flow.DeviceFlowPoll.pending()
client = _client(monkeypatch, now, _start, poll)
def deny(_request):
raise HTTPException(403, "admin required")
monkeypatch.setattr(device_flow, "require_admin", deny)
assert client.post("/api/test-device/device/start").status_code == 403
assert client.post("/api/test-device/device/poll", data={"poll_id": "missing"}).status_code == 403
assert client.post("/api/test-device/device/cancel", data={"poll_id": "missing"}).status_code == 403
+92 -23
View File
@@ -25,32 +25,36 @@ from unittest.mock import MagicMock
import httpx
import pytest
from tests.helpers.import_state import clear_fake_endpoint_resolver_modules
from tests.helpers.import_state import clear_fake_endpoint_resolver_modules, preserve_import_state
# Match test_model_routes.py: if another test stubbed src.endpoint_resolver
# during collection, drop the stub so the real URL helpers load here.
clear_fake_endpoint_resolver_modules()
with preserve_import_state("core.database", "src.database", "core.session_manager", "routes.model_routes"):
# Match test_model_routes.py: if another test stubbed src.endpoint_resolver
# during collection, drop the stub so the real URL helpers load here.
clear_fake_endpoint_resolver_modules()
if "core.database" not in sys.modules:
_core_db = types.ModuleType("core.database")
for _name in [
"SessionLocal", "ModelEndpoint", "Session", "ChatMessage", "Document",
"DocumentVersion", "GalleryImage", "GalleryAlbum", "Note",
"CalendarCal", "CalendarEvent", "ScheduledTask", "TaskRun", "McpServer",
]:
setattr(_core_db, _name, MagicMock())
sys.modules["core.database"] = _core_db
if "core.database" not in sys.modules:
_core_db = types.ModuleType("core.database")
for _name in [
"SessionLocal", "ModelEndpoint", "Session", "ChatMessage", "Document",
"DocumentVersion", "GalleryImage", "GalleryAlbum", "Note",
"CalendarCal", "CalendarEvent", "ScheduledTask", "TaskRun", "McpServer",
"ProviderAuthSession", "Base",
]:
setattr(_core_db, _name, MagicMock())
_core_db.utcnow_naive = MagicMock()
sys.modules["core.database"] = _core_db
import routes.model_routes as model_routes
import src.endpoint_resolver as endpoint_resolver
from routes.model_routes import (
_probe_endpoint,
_ping_endpoint,
_probe_single_model,
_classify_endpoint,
_rewrite_loopback_for_docker,
_PROVIDER_CURATED,
)
import routes.model_routes as model_routes
import src.endpoint_resolver as endpoint_resolver
from routes.model_routes import (
_probe_endpoint,
_ping_endpoint,
_probe_single_model,
_resolve_probe_key,
_classify_endpoint,
_rewrite_loopback_for_docker,
_PROVIDER_CURATED,
)
def _patch_resolve(monkeypatch):
@@ -117,6 +121,26 @@ class TestProbeEndpointParsing:
)
assert _probe_endpoint("https://api.example.com/v1") == []
def test_chatgpt_subscription_probe_uses_discovery_only(self, monkeypatch):
_patch_resolve(monkeypatch)
calls = []
def fake_fetch(access_token, timeout=5):
calls.append((access_token, timeout))
return ["gpt-5.5"]
monkeypatch.setattr("src.chatgpt_subscription.fetch_available_models", fake_fetch)
assert _probe_endpoint("https://chatgpt.com/backend-api/codex", "ACCESS", timeout=7) == ["gpt-5.5"]
assert calls == [("ACCESS", 7)]
def test_chatgpt_subscription_probe_without_discovery_returns_empty(self, monkeypatch):
_patch_resolve(monkeypatch)
monkeypatch.setattr("src.chatgpt_subscription.fetch_available_models", lambda access_token, timeout=5: [])
assert _probe_endpoint("https://chatgpt.com/backend-api/codex", "ACCESS") == []
assert _probe_endpoint("https://chatgpt.com/backend-api/codex") == []
# ── _ping_endpoint: reachability classification ──
@@ -321,6 +345,51 @@ class TestProbeSingleModel:
_probe_single_model("https://api.anthropic.com/v1", "sk-ant", "claude-sonnet-4-5", with_tools=True)
assert "input_schema" in captured["payload"]["tools"][0]
def test_chatgpt_subscription_skips_completion_probe(self, monkeypatch):
# This provider speaks the Responses/Codex API. A chat-completions probe
# would 400 and (via the re-probe flow) hide every model, so it must be
# short-circuited as discovery-only without any HTTP call.
_patch_resolve(monkeypatch)
def boom(*args, **kwargs):
raise AssertionError("must not send a completion probe for chatgpt-subscription")
monkeypatch.setattr(model_routes.httpx, "post", boom)
result = _probe_single_model("https://chatgpt.com/backend-api/codex", None, "gpt-5.1-codex")
assert result["status"] == "ok"
assert result.get("skipped") is True
# Pin the full documented return shape — downstream JSON/UI reads latency_ms.
assert result["latency_ms"] == 0
# ── _resolve_probe_key: static key vs provider-auth runtime token ──
class TestResolveProbeKey:
def test_static_endpoint_uses_api_key(self):
ep = types.SimpleNamespace(id="e1", api_key="sk-static", provider_auth_id=None, owner=None)
assert _resolve_probe_key(ep) == "sk-static"
def test_provider_auth_endpoint_resolves_runtime_token(self, monkeypatch):
ep = types.SimpleNamespace(id="e2", api_key=None, provider_auth_id="auth123", owner="alice")
seen = {}
def fake_runtime(endpoint, owner=None):
seen["owner"] = owner
return ("https://chatgpt.com/backend-api/codex", "live-bearer")
monkeypatch.setattr(endpoint_resolver, "resolve_endpoint_runtime", fake_runtime)
assert _resolve_probe_key(ep) == "live-bearer"
assert seen["owner"] == "alice"
def test_provider_auth_resolution_failure_returns_none(self, monkeypatch):
ep = types.SimpleNamespace(id="e3", api_key=None, provider_auth_id="auth123", owner=None)
def boom(endpoint, owner=None):
raise RuntimeError("reauth required")
monkeypatch.setattr(endpoint_resolver, "resolve_endpoint_runtime", boom)
assert _resolve_probe_key(ep) is None
# ── _classify_endpoint: Tailscale CGNAT range ──
+22
View File
@@ -75,6 +75,28 @@ def test_normal_model_payload_keeps_temperature_above_one(monkeypatch):
assert payload["temperature"] == 1.2
def test_chatgpt_subscription_payload_uses_max_output_tokens():
payload = llm_core._build_chatgpt_responses_payload(
"gpt-5.1-codex",
[{"role": "user", "content": "Say OK"}],
temperature=0.2,
max_tokens=37,
)
assert payload["max_output_tokens"] == 37
def test_chatgpt_subscription_payload_omits_empty_max_output_tokens():
payload = llm_core._build_chatgpt_responses_payload(
"gpt-5.1-codex",
[{"role": "user", "content": "Say OK"}],
temperature=0.2,
max_tokens=0,
)
assert "max_output_tokens" not in payload
def _anthropic_payload(temperature):
return llm_core._build_anthropic_payload(
"claude-3-5-sonnet",
+92 -42
View File
@@ -11,49 +11,51 @@ from types import SimpleNamespace
import httpx
import pytest
from tests.helpers.import_state import clear_fake_endpoint_resolver_modules
from tests.helpers.import_state import clear_fake_endpoint_resolver_modules, preserve_import_state
# Other tests stub this module during collection. These helper tests need
# the real URL normalization helpers so Anthropic /v1 handling is covered.
clear_fake_endpoint_resolver_modules()
with preserve_import_state("core.database", "src.database", "core.session_manager", "routes.model_routes"):
# Other tests stub this module during collection. These helper tests need
# the real URL normalization helpers so Anthropic /v1 handling is covered.
clear_fake_endpoint_resolver_modules()
if "core.database" not in sys.modules:
_core_db = types.ModuleType("core.database")
for _name in [
"SessionLocal", "ModelEndpoint", "Session", "ChatMessage", "Document",
"DocumentVersion", "GalleryImage", "GalleryAlbum", "Note",
"CalendarCal", "CalendarEvent", "ScheduledTask", "TaskRun",
"McpServer",
]:
setattr(_core_db, _name, MagicMock())
sys.modules["core.database"] = _core_db
if "core.database" not in sys.modules:
_core_db = types.ModuleType("core.database")
for _name in [
"SessionLocal", "ModelEndpoint", "Session", "ChatMessage", "Document",
"DocumentVersion", "GalleryImage", "GalleryAlbum", "Note",
"CalendarCal", "CalendarEvent", "ScheduledTask", "TaskRun",
"McpServer", "ProviderAuthSession", "Base",
]:
setattr(_core_db, _name, MagicMock())
_core_db.utcnow_naive = MagicMock()
sys.modules["core.database"] = _core_db
import routes.model_routes as model_routes
import src.database as src_database
import src.endpoint_resolver as endpoint_resolver
import src.llm_core as llm_core
from routes.model_routes import (
_match_provider_curated,
_curate_models,
_visible_models,
_normalize_model_ids,
_api_key_fingerprint,
_is_chat_model,
_classify_endpoint,
_effective_endpoint_kind,
_probe_endpoint,
_ping_endpoint,
_parse_model_list,
_normalize_refresh_mode,
_truthy,
_speech_settings_using_endpoint,
_clear_speech_settings_for_endpoint,
_endpoint_settings_using_endpoint,
_clear_endpoint_settings_for_endpoint,
_clear_user_pref_endpoint_refs,
_PROVIDER_CURATED,
)
from src.llm_core import ANTHROPIC_MODELS
import routes.model_routes as model_routes
import src.database as src_database
import src.endpoint_resolver as endpoint_resolver
import src.llm_core as llm_core
from routes.model_routes import (
_match_provider_curated,
_curate_models,
_visible_models,
_normalize_model_ids,
_api_key_fingerprint,
_is_chat_model,
_classify_endpoint,
_effective_endpoint_kind,
_probe_endpoint,
_ping_endpoint,
_parse_model_list,
_normalize_refresh_mode,
_truthy,
_speech_settings_using_endpoint,
_clear_speech_settings_for_endpoint,
_endpoint_settings_using_endpoint,
_clear_endpoint_settings_for_endpoint,
_clear_user_pref_endpoint_refs,
_PROVIDER_CURATED,
)
from src.llm_core import ANTHROPIC_MODELS
# ── speech endpoint settings ──
@@ -687,8 +689,7 @@ class _PinnedFakeRequest:
def _get_route(path, method):
from routes.model_routes import setup_model_routes
router = setup_model_routes(model_discovery=None)
router = model_routes.setup_model_routes(model_discovery=None)
for route in router.routes:
if getattr(route, "path", "") == path and method in getattr(route, "methods", set()):
return route.endpoint
@@ -787,6 +788,55 @@ def test_reprobe_preserves_pinned_models(monkeypatch):
assert json.loads(ep.cached_models) == ["m1"]
def test_reprobe_chatgpt_subscription_does_not_hide_models(monkeypatch):
# The whole point of the _probe_single_model short-circuit is that re-probing
# a chatgpt-subscription endpoint must NOT mark every (un-probeable) model as
# failed and write them all into hidden_models. Assert that end-to-end at the
# route level, with the REAL _probe_single_model doing the skip.
ep = _make_endpoint(
base_url="https://chatgpt.com/backend-api/codex",
api_key=None,
hidden_models=json.dumps(["stale-hidden"]),
)
db = _PinnedFakeDb([ep])
monkeypatch.setattr(model_routes, "SessionLocal", lambda: db)
monkeypatch.setattr(model_routes, "require_admin", lambda request: None)
monkeypatch.setattr(model_routes, "_normalize_base", lambda url: url.rstrip("/"))
monkeypatch.setattr(model_routes, "_probe_endpoint", lambda *a, **k: ["gpt-5.1-codex", "gpt-5.1"])
monkeypatch.setattr(model_routes, "_is_chat_model", lambda m: True)
# Any completion probe would be a bug for this provider.
monkeypatch.setattr(
model_routes.httpx, "post",
lambda *a, **k: (_ for _ in ()).throw(AssertionError("must not probe chatgpt-subscription")),
)
endpoint = _get_route("/api/model-endpoints/{ep_id}/probe", "GET")
response = endpoint("ep1", _PinnedFakeRequest())
chunks = []
async def _drain():
async for chunk in response.body_iterator:
chunks.append(chunk.decode() if isinstance(chunk, bytes) else chunk)
asyncio.run(_drain())
events = []
for chunk in chunks:
for line in chunk.splitlines():
if line.startswith("data: "):
events.append(json.loads(line[len("data: "):]))
done = next(e for e in events if e.get("type") == "probe_done")
results = [e for e in events if e.get("type") == "probe_result"]
# Every model was skipped as ok; none failed → nothing hidden.
assert done["hidden"] == 0
assert done["ok"] == len(results) == 2
assert all(r["status"] == "ok" and r.get("skipped") is True for r in results)
# The stale hidden_models is cleared, not repopulated with every model.
assert ep.hidden_models is None
def test_visible_models_handles_malformed_strings():
# Non-JSON cached/pinned strings are treated as comma/newline lists and
# never raise; a malformed hidden string is normalized too.
+10
View File
@@ -42,6 +42,10 @@ class TestHostMatch:
class TestDetectProviderRealHosts:
def test_chatgpt_subscription_codex_backend(self):
assert llm_core._detect_provider("https://chatgpt.com/backend-api/codex") == "chatgpt-subscription"
assert llm_core._detect_provider("https://chatgpt.com/backend-api/codex/responses") == "chatgpt-subscription"
def test_anthropic(self):
assert llm_core._detect_provider("https://api.anthropic.com") == "anthropic"
@@ -93,6 +97,12 @@ class TestBuildersRejectLookalikeHosts:
def test_real_anthropic_chat(self):
assert build_chat_url("https://api.anthropic.com") == "https://api.anthropic.com/v1/messages"
def test_chatgpt_subscription_chat_uses_responses(self):
assert build_chat_url("https://chatgpt.com/backend-api/codex") == "https://chatgpt.com/backend-api/codex/responses"
def test_chatgpt_subscription_models_uses_no_live_probe(self):
assert build_models_url("https://chatgpt.com/backend-api/codex") is None
def test_lookalike_anthropic_chat_is_openai(self):
assert build_chat_url("https://notanthropic.com") == "https://notanthropic.com/chat/completions"
+157
View File
@@ -0,0 +1,157 @@
"""Node-driven tests for the shared provider device-flow runner."""
import json
import shutil
import subprocess
from pathlib import Path
import pytest
_REPO = Path(__file__).resolve().parent.parent
_HELPER = _REPO / "static" / "js" / "providerDeviceFlow.js"
pytestmark = pytest.mark.skipif(not shutil.which("node"), reason="node not on PATH")
def _run_node(script: str):
proc = subprocess.run(
["node", "--input-type=module"],
input=script,
capture_output=True,
text=True,
cwd=str(_REPO),
timeout=30,
)
assert proc.returncode == 0, proc.stderr
return json.loads(proc.stdout.strip())
def test_copilot_success_uses_complete_verification_uri():
js = f"""
import {{ runProviderDeviceFlow }} from '{_HELPER.as_posix()}';
const calls = [];
const opened = [];
let polls = 0;
const response = (ok, status, payload) => ({{ ok, status, async json() {{ return payload; }} }});
const fetchImpl = async (url) => {{
calls.push(url);
if (url.endsWith('/device/start')) {{
return response(true, 200, {{
poll_id: 'poll-1',
user_code: 'GH-CODE',
verification_uri: 'https://github.com/login/device',
verification_uri_complete: 'https://github.com/login/device?user_code=GH-CODE',
interval: 2,
expires_in: 30,
}});
}}
polls += 1;
return response(true, 200, polls === 1
? {{ status: 'pending' }}
: {{ status: 'authorized', endpoint: {{ id: 'ep1', models: ['gpt-4o'] }} }}
);
}};
const result = await runProviderDeviceFlow('copilot', {{
fetchImpl,
openWindow: (url) => opened.push(url),
sleep: async () => {{}},
now: () => 0,
}});
console.log(JSON.stringify({{ result, calls, opened }}));
"""
out = _run_node(js)
assert out["result"]["status"] == "authorized"
assert out["result"]["endpoint"]["id"] == "ep1"
assert out["opened"] == ["https://github.com/login/device?user_code=GH-CODE"]
assert out["calls"] == ["/api/copilot/device/start", "/api/copilot/device/poll", "/api/copilot/device/poll"]
def test_chatgpt_success_uses_plain_verification_uri():
js = f"""
import {{ runProviderDeviceFlow }} from '{_HELPER.as_posix()}';
const opened = [];
const response = (ok, status, payload) => ({{ ok, status, async json() {{ return payload; }} }});
const fetchImpl = async (url) => {{
if (url.endsWith('/device/start')) {{
return response(true, 200, {{
poll_id: 'poll-1',
user_code: 'OA-CODE',
verification_uri: 'https://auth.openai.com/codex/device',
interval: 2,
expires_in: 30,
}});
}}
return response(true, 200, {{ status: 'authorized', endpoint: {{ id: 'chatgpt', models: ['gpt-5.5'] }} }});
}};
const result = await runProviderDeviceFlow('chatgpt-subscription', {{
fetchImpl,
openWindow: (url) => opened.push(url),
sleep: async () => {{}},
now: () => 0,
}});
console.log(JSON.stringify({{ result, opened }}));
"""
out = _run_node(js)
assert out["result"]["status"] == "authorized"
assert out["opened"] == ["https://auth.openai.com/codex/device"]
def test_start_errors_surface_backend_detail():
js = f"""
import {{ runProviderDeviceFlow }} from '{_HELPER.as_posix()}';
const response = (ok, status, payload) => ({{ ok, status, async json() {{ return payload; }} }});
try {{
await runProviderDeviceFlow('copilot', {{
fetchImpl: async () => response(false, 502, {{ detail: 'GitHub device-code request failed: upstream down' }}),
openWindow: () => {{}},
sleep: async () => {{}},
now: () => 0,
}});
}} catch (err) {{
console.log(JSON.stringify({{ message: err.message }}));
}}
"""
out = _run_node(js)
assert out["message"] == "GitHub device-code request failed: upstream down"
def test_thrown_fetch_errors_are_preserved():
js = f"""
import {{ runProviderDeviceFlow }} from '{_HELPER.as_posix()}';
try {{
await runProviderDeviceFlow('chatgpt-subscription', {{
fetchImpl: async () => {{ throw new Error('network offline'); }},
openWindow: () => {{}},
sleep: async () => {{}},
now: () => 0,
}});
}} catch (err) {{
console.log(JSON.stringify({{ message: err.message }}));
}}
"""
out = _run_node(js)
assert out["message"] == "network offline"
def test_expired_flow_returns_expired_status():
js = f"""
import {{ runProviderDeviceFlow }} from '{_HELPER.as_posix()}';
let currentTime = 0;
const response = (ok, status, payload) => ({{ ok, status, async json() {{ return payload; }} }});
const result = await runProviderDeviceFlow('copilot', {{
fetchImpl: async (url) => url.endsWith('/device/start')
? response(true, 200, {{
poll_id: 'poll-1',
user_code: 'GH-CODE',
verification_uri: 'https://github.com/login/device',
interval: 2,
expires_in: 1,
}})
: response(true, 200, {{ status: 'pending' }}),
openWindow: () => {{}},
sleep: async () => {{ currentTime += 2000; }},
now: () => currentTime,
}});
console.log(JSON.stringify(result));
"""
out = _run_node(js)
assert out == {"status": "expired"}
+27 -1
View File
@@ -24,7 +24,7 @@ _sd = types.ModuleType("src.database")
_sd.ModelEndpoint = MagicMock()
sys.modules.setdefault("src.database", _sd)
from routes.research_routes import _owned_enabled_endpoint # noqa: E402
from routes.research_routes import _owned_enabled_endpoint, _resolve_endpoint_runtime # noqa: E402
class _Predicate:
@@ -129,3 +129,29 @@ def test_null_owner_is_legacy_single_user_noop():
rows = [_ep("ep-x", "bob"), _ep("ep-y", "alice")]
ep = _resolve(rows, None, "ep-x")
assert ep is not None and ep.id == "ep-x"
def test_runtime_resolution_uses_provider_auth_for_chatgpt_subscription(monkeypatch):
ep = SimpleNamespace(
id="ep-chatgpt",
owner="alice",
base_url="https://chatgpt.com/backend-api/codex",
api_key=None,
provider_auth_id="auth-1",
cached_models='["gpt-5.5"]',
hidden_models=None,
)
monkeypatch.setattr(
"src.chatgpt_subscription.resolve_runtime_credentials",
lambda auth_id, owner=None: {
"base_url": "https://chatgpt.com/backend-api/codex",
"api_key": "fresh-access-token",
},
)
url, model, headers = _resolve_endpoint_runtime(ep, owner="alice", model="")
assert url == "https://chatgpt.com/backend-api/codex/responses"
assert model == "gpt-5.5"
assert headers["Authorization"] == "Bearer fresh-access-token"
+215
View File
@@ -0,0 +1,215 @@
"""resolve_session_auth must not persist the ChatGPT Subscription bearer.
The ChatGPT Subscription access token is a short-lived OAuth bearer re-resolved
(and refreshed) on every request. resolve_session_auth() may set it on the
in-memory session for the current request, but it must never write it back into
the sessions table — otherwise the live token sits at rest as
"Authorization: Bearer ...". Only the encrypted refresh token in
ProviderAuthSession is allowed to persist.
"""
import types
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
import routes.chat_helpers as chat_helpers
import src.endpoint_resolver as endpoint_resolver
from core.database import Base, ModelEndpoint, Session as DbSession
_CODEX_BASE = "https://chatgpt.com/backend-api/codex"
def _mem_db(monkeypatch):
engine = create_engine("sqlite:///:memory:")
Base.metadata.create_all(bind=engine)
# Match production SessionLocal (core.database) which is autoflush=False.
TestSessionLocal = sessionmaker(bind=engine, autoflush=False)
monkeypatch.setattr(chat_helpers, "SessionLocal", TestSessionLocal)
return TestSessionLocal
def test_chatgpt_subscription_auth_is_not_written_to_sessions_table(monkeypatch):
TestSessionLocal = _mem_db(monkeypatch)
db = TestSessionLocal()
try:
db.add(ModelEndpoint(
id="ep1", name="ChatGPT Subscription", base_url=_CODEX_BASE,
provider_auth_id="auth1", owner="alice", is_enabled=True, api_key=None,
))
db.add(DbSession(
id="sess1", name="chat", endpoint_url=_CODEX_BASE,
model="gpt-5.1-codex", owner="alice", headers={},
))
db.commit()
finally:
db.close()
# A live access token is resolved at request time.
monkeypatch.setattr(
endpoint_resolver, "resolve_endpoint_runtime",
lambda ep, owner=None: (_CODEX_BASE, "live-access-token"),
)
sess = types.SimpleNamespace(
id="sess1", endpoint_url=_CODEX_BASE, model="gpt-5.1-codex",
owner="alice", headers={},
)
chat_helpers.resolve_session_auth(sess, "sess1", owner="alice")
# In-memory session got request-local auth for this request...
assert any(k.lower() == "authorization" for k in sess.headers)
assert sess.headers["Authorization"] == "Bearer live-access-token"
# ...but the DB row must NOT have the bearer persisted.
db = TestSessionLocal()
try:
row = db.query(DbSession).filter(DbSession.id == "sess1").first()
stored = row.headers or {}
assert not any(k.lower() == "authorization" for k in stored), (
f"ChatGPT bearer leaked into sessions table: {stored}"
)
finally:
db.close()
def test_non_subscription_auth_is_still_persisted_to_sessions_table(monkeypatch):
"""The early-return must be scoped to ChatGPT Subscription only.
Ordinary endpoints rely on resolve_session_auth() persisting the resolved
headers into the sessions table so they aren't re-resolved on every request.
If the is_chatgpt_subscription guard ever widened, this would silently break;
this test pins the persistence path as still reached for normal endpoints.
"""
base = "https://api.example.com/v1"
TestSessionLocal = _mem_db(monkeypatch)
db = TestSessionLocal()
try:
db.add(ModelEndpoint(
id="ep1", name="Generic", base_url=base,
owner="alice", is_enabled=True, api_key="sk-static",
))
db.add(DbSession(
id="sess1", name="chat", endpoint_url=base,
model="gpt-x", owner="alice", headers={},
))
db.commit()
finally:
db.close()
monkeypatch.setattr(
endpoint_resolver, "resolve_endpoint_runtime",
lambda ep, owner=None: (base, "sk-static"),
)
sess = types.SimpleNamespace(
id="sess1", endpoint_url=base, model="gpt-x", owner="alice", headers={},
)
chat_helpers.resolve_session_auth(sess, "sess1", owner="alice")
# In-memory session got auth...
assert any(k.lower() in ("authorization", "x-api-key") for k in sess.headers)
# ...AND it was persisted to the DB row (the normal, non-subscription path).
db = TestSessionLocal()
try:
row = db.query(DbSession).filter(DbSession.id == "sess1").first()
stored = row.headers or {}
assert any(k.lower() in ("authorization", "x-api-key") for k in stored), (
f"non-subscription auth was not persisted: {stored}"
)
finally:
db.close()
def test_chatgpt_subscription_clears_previously_persisted_bearer(monkeypatch):
"""A bearer left at rest by an older code path is stripped on next resolve."""
TestSessionLocal = _mem_db(monkeypatch)
db = TestSessionLocal()
try:
db.add(ModelEndpoint(
id="ep1", name="ChatGPT Subscription", base_url=_CODEX_BASE,
provider_auth_id="auth1", owner="alice", is_enabled=True, api_key=None,
))
# Simulate the leak: a stale bearer already sitting in the sessions table.
db.add(DbSession(
id="sess1", name="chat", endpoint_url=_CODEX_BASE,
model="gpt-5.1-codex", owner="alice",
headers={"Authorization": "Bearer stale-leaked-token"},
))
db.commit()
finally:
db.close()
monkeypatch.setattr(
endpoint_resolver,
"resolve_endpoint_runtime",
lambda ep, owner=None: (_CODEX_BASE, "live-access-token"),
)
sess = types.SimpleNamespace(
id="sess1", endpoint_url=_CODEX_BASE, model="gpt-5.1-codex",
owner="alice", headers={},
)
chat_helpers.resolve_session_auth(sess, "sess1", owner="alice")
# The stale bearer must have been stripped from the DB row.
db = TestSessionLocal()
try:
row = db.query(DbSession).filter(DbSession.id == "sess1").first()
stored = row.headers or {}
assert not any(k.lower() == "authorization" for k in stored), (
f"stale ChatGPT bearer was not cleared: {stored}"
)
finally:
db.close()
def test_chatgpt_subscription_fallback_auth_is_not_written_to_sessions_table(monkeypatch):
"""Fallback endpoint selection must keep the resolved bearer request-local."""
TestSessionLocal = _mem_db(monkeypatch)
db = TestSessionLocal()
try:
db.add(ModelEndpoint(
id="ep1", name="ChatGPT Subscription", base_url=_CODEX_BASE,
provider_auth_id="auth1", owner="alice", is_enabled=True, api_key=None,
cached_models='["gpt-5.1-codex"]',
))
db.add(DbSession(
id="sess1", name="chat", endpoint_url="https://old.example/v1",
model="old-model", owner="alice", headers={},
))
db.commit()
finally:
db.close()
monkeypatch.setattr(
endpoint_resolver,
"resolve_endpoint_runtime",
lambda ep, owner=None: (_CODEX_BASE, "live-access-token"),
)
sess = types.SimpleNamespace(
id="sess1", endpoint_url="https://old.example/v1", model="old-model",
owner="alice", headers={},
)
result = chat_helpers.try_fallback_endpoint(sess, "sess1")
assert result == {
"model": "gpt-5.1-codex",
"endpoint_url": _CODEX_BASE + "/responses",
"endpoint_name": "ChatGPT Subscription",
}
assert sess.headers["Authorization"] == "Bearer live-access-token"
db = TestSessionLocal()
try:
row = db.query(DbSession).filter(DbSession.id == "sess1").first()
assert row.model == "gpt-5.1-codex"
assert row.endpoint_url == _CODEX_BASE + "/responses"
stored = row.headers or {}
assert not any(k.lower() == "authorization" for k in stored), (
f"ChatGPT fallback bearer leaked into sessions table: {stored}"
)
finally:
db.close()
+1 -1
View File
@@ -386,7 +386,7 @@ async def test_build_chat_context_incognito_does_not_duplicate_current_user_mess
monkeypatch.setattr(chat_helpers, "add_user_message", fake_add_user_message)
monkeypatch.setattr(chat_helpers, "load_prefs_for_user", lambda user: {})
monkeypatch.setattr(chat_helpers, "get_current_user", lambda request: "tester")
monkeypatch.setattr(chat_helpers, "normalize_model_id", lambda endpoint_url, model: None)
monkeypatch.setattr(chat_helpers, "normalize_model_id", lambda endpoint_url, model, **kwargs: None)
monkeypatch.setattr(chat_helpers, "maybe_compact", fake_maybe_compact)
monkeypatch.setattr(chat_helpers, "trim_for_context", lambda messages, context_length: messages)
+9
View File
@@ -137,3 +137,12 @@ def test_unauthenticated_caller_rejected(monkeypatch):
with pytest.raises(HTTPException) as exc:
SR._verify_session_owner(req, "sid")
assert exc.value.status_code == 401
def test_auth_disabled_allows_owner_stamped_session(monkeypatch):
monkeypatch.setenv("AUTH_ENABLED", "false")
monkeypatch.setattr(SR, "SessionLocal", _session_local_returning("admin"))
req = _req(api_token=False, current_user=None)
# Single-user/auth-disabled mode should verify existence but not compare owner.
SR._verify_session_owner(req, "sid-owned-by-admin")
+42
View File
@@ -0,0 +1,42 @@
"""Static regressions for `/setup` account sign-in providers."""
from pathlib import Path
_REPO = Path(__file__).resolve().parent.parent
_SLASH = (_REPO / "static" / "js" / "slashCommands.js").read_text(encoding="utf-8")
def _between(src: str, start: str, end: str) -> str:
start_idx = src.index(start)
end_idx = src.index(end, start_idx)
return src[start_idx:end_idx]
def test_setup_guide_lists_account_sign_in_providers():
guide_block = _between(_SLASH, "function _showSetupEndpointChoices", "async function _hasConfiguredModels")
assert 'data-setup-provider="' in _SLASH
assert "provider.key" in _SLASH
assert "'copilot'" in _SLASH
assert "'chatgpt-subscription'" in _SLASH
assert "/setup copilot" in _SLASH
assert "/setup chatgpt-subscription" in _SLASH
def test_clicking_account_sign_in_provider_prefills_setup_command_not_api_key():
click_block = _between(_SLASH, "const providerEl = e.target.closest('.setup-clickable-provider')", "// 3. Check")
assert "providerEl.dataset.setupProvider" in click_block
assert "providerEl.dataset.setupKind === 'device-auth'" in click_block
assert "'/setup ' + providerKey" in click_block
def test_setup_chatgpt_subscription_prints_auth_url_without_auto_opening_tab():
flow_block = _between(_SLASH, "async function _setupProviderDeviceFlow", "async function _cmdSetup")
assert "providerKey === 'chatgpt-subscription'" in flow_block
assert "Open this URL" in flow_block
assert "authUrl" in flow_block
assert 'href="\' + uiModule.esc(authUrl || \'\') + \'"' in flow_block
assert "if (providerKey === 'chatgpt-subscription') return;" in flow_block
+17
View File
@@ -0,0 +1,17 @@
"""Static regressions for slash autocomplete command-group expansion."""
from pathlib import Path
_REPO = Path(__file__).resolve().parent.parent
_AC = (_REPO / "static" / "js" / "slashAutocomplete.js").read_text(encoding="utf-8")
def test_exact_parent_command_expands_subcommands_before_top_level_row_cap():
assert "function _exactCommandGroupItems" in _AC
assert "entry.token.toLowerCase().startsWith(prefix)" in _AC
assert "items = groupItems.slice(0, MAX_VISIBLE);" in _AC
def test_setup_group_has_room_for_chatgpt_subscription_suggestion():
assert "const MAX_VISIBLE = 14;" in _AC