"""ChatGPT subscription / Codex backend OAuth helpers. This provider is intentionally separate from OpenAI API-key endpoints. It uses OpenAI account OAuth device authorization, stores refresh tokens server-side, and resolves a fresh bearer token at request time. """ from __future__ import annotations import base64 import json import os import threading import time from typing import Any, Dict, Optional import httpx from fastapi import HTTPException from core.database import ProviderAuthSession, SessionLocal, utcnow_naive DEFAULT_CHATGPT_SUBSCRIPTION_BASE_URL = ( os.getenv("CHATGPT_SUBSCRIPTION_BASE_URL", "").strip().rstrip("/") or "https://chatgpt.com/backend-api/codex" ) CHATGPT_SUBSCRIPTION_PROVIDER = "chatgpt-subscription" CHATGPT_OAUTH_CLIENT_ID = "app_EMoamEEZ73f0CkXaXp7hrann" CHATGPT_OAUTH_TOKEN_URL = "https://auth.openai.com/oauth/token" CHATGPT_OAUTH_ISSUER = "https://auth.openai.com" CHATGPT_OAUTH_REDIRECT_URI = f"{CHATGPT_OAUTH_ISSUER}/deviceauth/callback" CHATGPT_ACCESS_TOKEN_REFRESH_SKEW_SECONDS = 120 _AUTH_REFRESH_LOCKS: dict[str, threading.Lock] = {} _AUTH_REFRESH_LOCKS_GUARD = threading.Lock() def _refresh_lock_for(auth_id: str) -> threading.Lock: with _AUTH_REFRESH_LOCKS_GUARD: lock = _AUTH_REFRESH_LOCKS.get(auth_id) if lock is None: lock = threading.Lock() _AUTH_REFRESH_LOCKS[auth_id] = lock return lock class ChatGPTSubscriptionError(RuntimeError): """Base error for ChatGPT subscription provider failures.""" class ChatGPTSubscriptionReauthRequired(ChatGPTSubscriptionError): """Stored OAuth credentials are invalid or expired beyond refresh.""" class ChatGPTSubscriptionRateLimited(ChatGPTSubscriptionError): """Upstream quota/rate limit; reconnecting will not fix it.""" class ChatGPTSubscriptionAuthNotFound(ChatGPTSubscriptionError): """No matching owner-scoped auth session exists.""" def is_chatgpt_subscription_base(url: str) -> bool: try: from urllib.parse import urlparse parsed = urlparse(url or "") host = (parsed.hostname or "").lower().rstrip(".") path = (parsed.path or "").rstrip("/") except Exception: return False return host == "chatgpt.com" and ( path == "/backend-api/codex" or path.startswith("/backend-api/codex/") ) def chatgpt_headers(access_token: Optional[str]) -> Dict[str, str]: headers = { "Accept": "application/json, text/event-stream", "Origin": "https://chatgpt.com", "Referer": "https://chatgpt.com/codex", "User-Agent": "Odysseus ChatGPT Subscription", } if access_token: headers["Authorization"] = f"Bearer {access_token}" return headers def fetch_available_models(access_token: str, timeout: float = 10.0) -> list[str]: if not access_token: return [] try: response = httpx.get( "https://chatgpt.com/backend-api/codex/models?client_version=1.0.0", headers=chatgpt_headers(access_token), timeout=timeout, ) if response.status_code != 200: return [] data = response.json() except Exception: return [] entries = data.get("models", []) if isinstance(data, dict) else [] sortable: list[tuple[int, str]] = [] for item in entries: if not isinstance(item, dict): continue slug = item.get("slug") if not isinstance(slug, str) or not slug.strip(): continue visibility = item.get("visibility", "") if isinstance(visibility, str) and visibility.strip().lower() in {"hide", "hidden"}: continue priority = item.get("priority") rank = int(priority) if isinstance(priority, (int, float)) else 10_000 sortable.append((rank, slug.strip())) sortable.sort(key=lambda item: (item[0], item[1])) ordered: list[str] = [] seen: set[str] = set() for _, slug in sortable: if slug not in seen: ordered.append(slug) seen.add(slug) return ordered def _raise_for_oauth_response(response: httpx.Response, action: str) -> None: if response.status_code < 400: return code = "" message = f"ChatGPT Subscription {action} failed with HTTP {response.status_code}." try: payload = response.json() err = payload.get("error") if isinstance(payload, dict) else None if isinstance(err, dict): code = str(err.get("code") or err.get("type") or "").strip() msg = err.get("message") if msg: message = f"ChatGPT Subscription {action} failed: {msg}" elif isinstance(err, str): code = err.strip() desc = payload.get("error_description") or payload.get("message") if desc: message = f"ChatGPT Subscription {action} failed: {desc}" except Exception: pass if response.status_code == 429: raise ChatGPTSubscriptionRateLimited( "ChatGPT Subscription quota or rate limit was reached. Credentials are still valid." ) if response.status_code in (401, 403) or code in {"invalid_grant", "invalid_token", "invalid_request", "refresh_token_reused"}: raise ChatGPTSubscriptionReauthRequired(message) raise ChatGPTSubscriptionError(message) def _json_or_error(response: httpx.Response, action: str) -> Dict[str, Any]: _raise_for_oauth_response(response, action) try: data = response.json() except Exception as exc: raise ChatGPTSubscriptionError(f"ChatGPT Subscription {action} returned invalid JSON.") from exc if not isinstance(data, dict): raise ChatGPTSubscriptionError(f"ChatGPT Subscription {action} returned an unexpected response.") return data def request_device_code(timeout: float = 15.0) -> Dict[str, Any]: response = httpx.post( f"{CHATGPT_OAUTH_ISSUER}/api/accounts/deviceauth/usercode", json={"client_id": CHATGPT_OAUTH_CLIENT_ID}, headers={"Content-Type": "application/json"}, timeout=timeout, ) data = _json_or_error(response, "device-code request") if not data.get("device_auth_id") or not data.get("user_code"): raise ChatGPTSubscriptionError("ChatGPT device-code response was missing required fields.") data.setdefault("verification_uri", f"{CHATGPT_OAUTH_ISSUER}/codex/device") data.setdefault("interval", 5) data.setdefault("expires_in", 900) return data def poll_device_auth(device_auth_id: str, user_code: str, timeout: float = 15.0) -> Dict[str, Any]: response = httpx.post( f"{CHATGPT_OAUTH_ISSUER}/api/accounts/deviceauth/token", json={"device_auth_id": device_auth_id, "user_code": user_code}, headers={"Content-Type": "application/json"}, timeout=timeout, ) if response.status_code in (403, 404): return {"status": "pending", "error": "authorization_pending"} return _json_or_error(response, "device-code poll") def exchange_authorization_code(authorization_code: str, code_verifier: str, timeout: float = 15.0) -> Dict[str, Any]: response = httpx.post( CHATGPT_OAUTH_TOKEN_URL, headers={"Content-Type": "application/x-www-form-urlencoded"}, data={ "grant_type": "authorization_code", "code": authorization_code, "redirect_uri": CHATGPT_OAUTH_REDIRECT_URI, "client_id": CHATGPT_OAUTH_CLIENT_ID, "code_verifier": code_verifier, }, timeout=timeout, ) data = _json_or_error(response, "token exchange") if not data.get("access_token"): raise ChatGPTSubscriptionReauthRequired("ChatGPT token exchange did not return an access token.") return data def refresh_oauth_tokens(access_token: str, refresh_token: str, timeout: float = 20.0) -> Dict[str, Any]: del access_token if not refresh_token: raise ChatGPTSubscriptionReauthRequired("ChatGPT Subscription is missing a refresh token. Reconnect the provider.") response = httpx.post( CHATGPT_OAUTH_TOKEN_URL, headers={"Content-Type": "application/x-www-form-urlencoded"}, data={ "grant_type": "refresh_token", "refresh_token": refresh_token, "client_id": CHATGPT_OAUTH_CLIENT_ID, }, timeout=timeout, ) data = _json_or_error(response, "token refresh") if not data.get("access_token"): raise ChatGPTSubscriptionReauthRequired("ChatGPT token refresh did not return an access token.") return data def _decode_jwt_payload(token: str) -> Dict[str, Any]: parts = (token or "").split(".") if len(parts) < 2: raise ValueError("not a JWT") segment = parts[1] segment += "=" * (-len(segment) % 4) raw = base64.urlsafe_b64decode(segment.encode("ascii")) payload = json.loads(raw.decode("utf-8")) return payload if isinstance(payload, dict) else {} def access_token_is_expiring(access_token: str, skew_seconds: int = CHATGPT_ACCESS_TOKEN_REFRESH_SKEW_SECONDS) -> bool: try: exp = int(_decode_jwt_payload(access_token).get("exp") or 0) except Exception: return True return exp <= int(time.time()) + int(skew_seconds) def resolve_runtime_credentials(auth_id: str, owner: Optional[str] = None, *, force_refresh: bool = False) -> Dict[str, Any]: db = SessionLocal() try: q = db.query(ProviderAuthSession).filter( ProviderAuthSession.id == auth_id, ProviderAuthSession.provider == CHATGPT_SUBSCRIPTION_PROVIDER, ) if owner: q = q.filter(ProviderAuthSession.owner == owner) row = q.first() if row is None: raise ChatGPTSubscriptionAuthNotFound("ChatGPT Subscription credentials were not found for this user.") access_token = row.access_token or "" if force_refresh or access_token_is_expiring(access_token): with _refresh_lock_for(auth_id): db.refresh(row) access_token = row.access_token or "" refresh_token = row.refresh_token or "" if force_refresh or access_token_is_expiring(access_token): refreshed = refresh_oauth_tokens(access_token, refresh_token) row.access_token = refreshed["access_token"] if refreshed.get("refresh_token"): row.refresh_token = refreshed["refresh_token"] row.last_refresh = utcnow_naive() db.commit() db.refresh(row) access_token = row.access_token or "" return { "provider": CHATGPT_SUBSCRIPTION_PROVIDER, "base_url": (row.base_url or DEFAULT_CHATGPT_SUBSCRIPTION_BASE_URL).rstrip("/"), "api_key": access_token, "auth_mode": row.auth_mode or "chatgpt", } finally: db.close() def to_http_exception(exc: Exception) -> HTTPException: if isinstance(exc, ChatGPTSubscriptionRateLimited): return HTTPException(429, str(exc)) if isinstance(exc, (ChatGPTSubscriptionReauthRequired, ChatGPTSubscriptionAuthNotFound)): return HTTPException(401, f"{exc} Reconnect the provider.") return HTTPException(502, str(exc)) def build_responses_input(messages: list[dict]) -> list[dict]: input_items: list[dict] = [] for msg in messages or []: role = msg.get("role") or "user" if role == "tool": role = "user" content = msg.get("content") if isinstance(content, list): text = "\n".join(str(part.get("text") or part.get("content") or "") for part in content if isinstance(part, dict)) else: text = "" if content is None else str(content) input_type = "output_text" if role == "assistant" else "input_text" input_items.append({"role": role, "content": [{"type": input_type, "text": text}]}) return input_items