From c01c09559a65e81193ecb4ae41603eb45c68eaf1 Mon Sep 17 00:00:00 2001 From: tanmayraut45 Date: Sun, 28 Jun 2026 05:18:35 +0530 Subject: [PATCH] fix(ai): offload model resolution from async paths Wrap blocking _resolve_model calls in asyncio.to_thread across async model interaction paths so endpoint/model resolution does not stall the event loop. Preserve owner-scoped resolution and add focused regression coverage. --- mcp_servers/image_gen_server.py | 4 +- routes/preset_routes.py | 3 +- src/agent_tools/model_interaction_tools.py | 5 +- src/agent_tools/session_tools.py | 3 +- src/ai_interaction.py | 9 ++-- src/teacher_escalation.py | 4 +- tests/test_ai_interaction_owner_scope.py | 5 +- tests/test_resolve_model_offloaded.py | 61 ++++++++++++++++++++++ 8 files changed, 80 insertions(+), 14 deletions(-) create mode 100644 tests/test_resolve_model_offloaded.py diff --git a/mcp_servers/image_gen_server.py b/mcp_servers/image_gen_server.py index 0c8d3884a..6cb77f780 100644 --- a/mcp_servers/image_gen_server.py +++ b/mcp_servers/image_gen_server.py @@ -73,7 +73,7 @@ async def call_tool(name: str, arguments: dict) -> list[TextContent]: if not model_spec: for candidate in ("gpt-image-1.5", "gpt-image-1", "dall-e-3"): try: - _resolve_model(candidate) + await asyncio.to_thread(_resolve_model, candidate) model_spec = candidate break except ValueError: @@ -81,7 +81,7 @@ async def call_tool(name: str, arguments: dict) -> list[TextContent]: if not model_spec: return [TextContent(type="text", text="Error: No image model found. Configure one in Admin.")] - url, model_id, headers = _resolve_model(model_spec) + url, model_id, headers = await asyncio.to_thread(_resolve_model, model_spec) is_gpt_image = "gpt-image" in model_id.lower() base_url = url.replace("/chat/completions", "").replace("/v1/messages", "").rstrip("/") diff --git a/routes/preset_routes.py b/routes/preset_routes.py index 20c6c830a..097b9ceca 100644 --- a/routes/preset_routes.py +++ b/routes/preset_routes.py @@ -1,5 +1,6 @@ """Preset routes — /api/presets GET, /api/presets/custom POST, user templates CRUD.""" +import asyncio import logging import uuid from typing import Dict, Any, List @@ -102,7 +103,7 @@ def setup_preset_routes(preset_manager) -> APIRouter: try: model_spec = data.get("model") or "" user = effective_user(request) - url, model, headers = _resolve_model(model_spec, owner=user) + url, model, headers = await asyncio.to_thread(_resolve_model, model_spec, owner=user) result = await llm_call_async(url, model, messages, temperature=0.8, max_tokens=500, headers=headers) return {"success": True, "prompt": result.strip()} except Exception as e: diff --git a/src/agent_tools/model_interaction_tools.py b/src/agent_tools/model_interaction_tools.py index 6cbabe919..c07b39e78 100644 --- a/src/agent_tools/model_interaction_tools.py +++ b/src/agent_tools/model_interaction_tools.py @@ -10,6 +10,7 @@ Shared helpers that still live in ``src.ai_interaction`` and are used by tools not yet migrated (``_resolve_model``, ``AI_CHAT_TIMEOUT``) are imported lazily inside the functions to avoid an import cycle at module load. """ +import asyncio import logging from typing import Dict, Optional @@ -46,7 +47,7 @@ async def chat_with_model(content: str, session_id: Optional[str] = None, owner: return {"error": "No message provided (line 2+ is the message)"} try: - url, model, headers = _resolve_model(model_spec, owner=owner) + url, model, headers = await asyncio.to_thread(_resolve_model, model_spec, owner=owner) except ValueError as e: return {"error": str(e)} @@ -90,7 +91,7 @@ async def ask_teacher(content: str, session_id: Optional[str] = None, owner: Opt return {"error": "No teacher model configured. Specify a model name or set teacher_model in settings."} try: - url, model, headers = _resolve_model(model_spec, owner=owner) + url, model, headers = await asyncio.to_thread(_resolve_model, model_spec, owner=owner) except ValueError as e: return {"error": str(e)} diff --git a/src/agent_tools/session_tools.py b/src/agent_tools/session_tools.py index 797235c5d..c5226afcf 100644 --- a/src/agent_tools/session_tools.py +++ b/src/agent_tools/session_tools.py @@ -8,6 +8,7 @@ The session manager is a runtime-set singleton in src.ai_interaction, so each function fetches it via get_session_manager() (imported here); _resolve_model and AI_CHAT_TIMEOUT are reused from there too. """ +import asyncio import json import logging import uuid @@ -40,7 +41,7 @@ async def create_session(content: str, session_id: Optional[str] = None, owner: return {"error": "Session name cannot be empty"} try: - url, model, headers = _resolve_model(model_spec, owner=owner) + url, model, headers = await asyncio.to_thread(_resolve_model, model_spec, owner=owner) except ValueError as e: return {"error": str(e)} diff --git a/src/ai_interaction.py b/src/ai_interaction.py index ca6e24436..43690e623 100644 --- a/src/ai_interaction.py +++ b/src/ai_interaction.py @@ -14,6 +14,7 @@ These are agent tools — the LLM writes fenced code blocks and they execute through the standard agent_tools.py pipeline. """ +import asyncio import json import logging import uuid @@ -229,7 +230,7 @@ async def do_pipeline(content: str, session_id: Optional[str] = None, owner: Opt if not model_spec or not instruction: return {"error": f"Step {i + 1}: both 'model' and 'instruction' are required"} try: - url, model, headers = _resolve_model(model_spec, owner=owner) + url, model, headers = await asyncio.to_thread(_resolve_model, model_spec, owner=owner) resolved.append((url, model, headers, instruction)) except ValueError as e: return {"error": f"Step {i + 1}: {e}"} @@ -624,7 +625,7 @@ async def do_ui_control(content: str, session_id: Optional[str] = None, owner: O # Resolve the model to validate it exists try: - url, model_id, headers = _resolve_model(model_spec, owner=owner) + url, model_id, headers = await asyncio.to_thread(_resolve_model, model_spec, owner=owner) except ValueError as e: return {"error": str(e)} @@ -914,7 +915,7 @@ async def do_generate_image(content: str, session_id: Optional[str] = None, owne if not model_spec: for candidate in ("gpt-image-1.5", "gpt-image-1", "dall-e-3"): try: - _resolve_model(candidate, owner=owner) + await asyncio.to_thread(_resolve_model, candidate, owner=owner) model_spec = candidate break except ValueError: @@ -958,7 +959,7 @@ async def do_generate_image(content: str, session_id: Optional[str] = None, owne # Resolve the model to find the right endpoint try: - url, model_id, headers = _resolve_model(model_spec, owner=owner) + url, model_id, headers = await asyncio.to_thread(_resolve_model, model_spec, owner=owner) except ValueError: return {"error": f"No endpoint found with image model '{model_spec}'. " "Configure an OpenAI-compatible endpoint with image generation support."} diff --git a/src/teacher_escalation.py b/src/teacher_escalation.py index bd1930325..49134991c 100644 --- a/src/teacher_escalation.py +++ b/src/teacher_escalation.py @@ -235,7 +235,7 @@ async def _call_teacher(teacher_model_spec: str, prompt: str, from src.llm_core import llm_call_async from src.ai_interaction import _resolve_model, _TEACHER_SYSTEM_PROMPT try: - url, model, headers = _resolve_model(teacher_model_spec, owner=owner) + url, model, headers = await asyncio.to_thread(_resolve_model, teacher_model_spec, owner=owner) except Exception as e: logger.warning(f"teacher endpoint not resolvable ({teacher_model_spec!r}): {e}") return None @@ -619,7 +619,7 @@ async def run_teacher_inline( # Resolve teacher endpoint try: from src.ai_interaction import _resolve_model - teacher_url, teacher_model, teacher_headers = _resolve_model(teacher_spec, owner=owner) + teacher_url, teacher_model, teacher_headers = await asyncio.to_thread(_resolve_model, teacher_spec, owner=owner) except Exception as e: logger.warning(f"teacher endpoint not resolvable ({teacher_spec!r}): {e}") yield ( diff --git a/tests/test_ai_interaction_owner_scope.py b/tests/test_ai_interaction_owner_scope.py index 1cfe31c23..ade9da166 100644 --- a/tests/test_ai_interaction_owner_scope.py +++ b/tests/test_ai_interaction_owner_scope.py @@ -25,9 +25,10 @@ def test_model_listing_and_image_fallback_are_owner_scoped(): assert "owner: Optional[str] = None" in list_body assert "owner_filter(query, ModelEndpoint, owner)" in list_body - assert "_resolve_model(candidate, owner=owner)" in image_body + # _resolve_model is offloaded to a worker thread (#4589) but stays owner-scoped. + assert "asyncio.to_thread(_resolve_model, candidate, owner=owner)" in image_body assert "owner_filter(_img_q, ModelEndpoint, owner)" in image_body - assert "_resolve_model(model_spec, owner=owner)" in image_body + assert "asyncio.to_thread(_resolve_model, model_spec, owner=owner)" in image_body # chat_with_model, list_models and ask_teacher moved to the registry (#3629) diff --git a/tests/test_resolve_model_offloaded.py b/tests/test_resolve_model_offloaded.py new file mode 100644 index 000000000..94124aa30 --- /dev/null +++ b/tests/test_resolve_model_offloaded.py @@ -0,0 +1,61 @@ +"""Issue #4589 — _resolve_model does a blocking httpx.get, so calling it +directly from an async handler stalls the whole event loop for the duration of +the probe. The async call sites now wrap it in asyncio.to_thread. + +do_pipeline is used as the representative handler: _resolve_model is the first +real work it does, and a ValueError returns early before any LLM call, so these +tests drive the offload path without a live model endpoint. +""" + +import asyncio +import threading +import time + +import src.ai_interaction as ai + + +async def test_do_pipeline_resolves_model_off_the_event_loop(monkeypatch): + # A deliberately blocking _resolve_model that records how many copies run + # at once. If it ran on the event loop, the first call would block the loop + # and the second could not start — peak concurrency would be 1. + state = {"active": 0, "peak": 0} + lock = threading.Lock() + + def slow_resolve(spec, owner=None): + with lock: + state["active"] += 1 + state["peak"] = max(state["peak"], state["active"]) + time.sleep(0.2) + with lock: + state["active"] -= 1 + raise ValueError("no such model") # early-return path, no LLM call + + monkeypatch.setattr(ai, "_resolve_model", slow_resolve) + + content = '[{"model": "m", "instruction": "go"}]' + results = await asyncio.gather( + ai.do_pipeline(content, owner="u"), + ai.do_pipeline(content, owner="u"), + ) + + assert all("error" in r for r in results) + assert state["peak"] == 2, "resolutions did not overlap — call still blocks the loop" + + +async def test_do_pipeline_uses_offloaded_resolution_result(monkeypatch): + # The offload must also return the resolved tuple, not just propagate errors. + monkeypatch.setattr( + ai, "_resolve_model", + lambda spec, owner=None: ("http://x/v1/chat/completions", "resolved-model", {}), + ) + + async def fake_llm(url, model, messages, **kwargs): + return f"output from {model}" + + monkeypatch.setattr("src.llm_core.llm_call_async", fake_llm) + + result = await ai.do_pipeline('[{"model": "m", "instruction": "go"}]', owner="u") + + assert "error" not in result, result + # The model the offloaded _resolve_model returned made it through to the call. + assert "resolved-model" in str(result)