mirror of
https://github.com/pewdiepie-archdaemon/odysseus.git
synced 2026-06-30 00:22:10 -04:00
fix(ai): offload model resolution from async paths
Wrap blocking _resolve_model calls in asyncio.to_thread across async model interaction paths so endpoint/model resolution does not stall the event loop. Preserve owner-scoped resolution and add focused regression coverage.
This commit is contained in:
@@ -73,7 +73,7 @@ async def call_tool(name: str, arguments: dict) -> list[TextContent]:
|
|||||||
if not model_spec:
|
if not model_spec:
|
||||||
for candidate in ("gpt-image-1.5", "gpt-image-1", "dall-e-3"):
|
for candidate in ("gpt-image-1.5", "gpt-image-1", "dall-e-3"):
|
||||||
try:
|
try:
|
||||||
_resolve_model(candidate)
|
await asyncio.to_thread(_resolve_model, candidate)
|
||||||
model_spec = candidate
|
model_spec = candidate
|
||||||
break
|
break
|
||||||
except ValueError:
|
except ValueError:
|
||||||
@@ -81,7 +81,7 @@ async def call_tool(name: str, arguments: dict) -> list[TextContent]:
|
|||||||
if not model_spec:
|
if not model_spec:
|
||||||
return [TextContent(type="text", text="Error: No image model found. Configure one in Admin.")]
|
return [TextContent(type="text", text="Error: No image model found. Configure one in Admin.")]
|
||||||
|
|
||||||
url, model_id, headers = _resolve_model(model_spec)
|
url, model_id, headers = await asyncio.to_thread(_resolve_model, model_spec)
|
||||||
|
|
||||||
is_gpt_image = "gpt-image" in model_id.lower()
|
is_gpt_image = "gpt-image" in model_id.lower()
|
||||||
base_url = url.replace("/chat/completions", "").replace("/v1/messages", "").rstrip("/")
|
base_url = url.replace("/chat/completions", "").replace("/v1/messages", "").rstrip("/")
|
||||||
|
|||||||
@@ -1,5 +1,6 @@
|
|||||||
"""Preset routes — /api/presets GET, /api/presets/custom POST, user templates CRUD."""
|
"""Preset routes — /api/presets GET, /api/presets/custom POST, user templates CRUD."""
|
||||||
|
|
||||||
|
import asyncio
|
||||||
import logging
|
import logging
|
||||||
import uuid
|
import uuid
|
||||||
from typing import Dict, Any, List
|
from typing import Dict, Any, List
|
||||||
@@ -102,7 +103,7 @@ def setup_preset_routes(preset_manager) -> APIRouter:
|
|||||||
try:
|
try:
|
||||||
model_spec = data.get("model") or ""
|
model_spec = data.get("model") or ""
|
||||||
user = effective_user(request)
|
user = effective_user(request)
|
||||||
url, model, headers = _resolve_model(model_spec, owner=user)
|
url, model, headers = await asyncio.to_thread(_resolve_model, model_spec, owner=user)
|
||||||
result = await llm_call_async(url, model, messages, temperature=0.8, max_tokens=500, headers=headers)
|
result = await llm_call_async(url, model, messages, temperature=0.8, max_tokens=500, headers=headers)
|
||||||
return {"success": True, "prompt": result.strip()}
|
return {"success": True, "prompt": result.strip()}
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
|||||||
@@ -10,6 +10,7 @@ Shared helpers that still live in ``src.ai_interaction`` and are used by tools
|
|||||||
not yet migrated (``_resolve_model``, ``AI_CHAT_TIMEOUT``) are imported lazily
|
not yet migrated (``_resolve_model``, ``AI_CHAT_TIMEOUT``) are imported lazily
|
||||||
inside the functions to avoid an import cycle at module load.
|
inside the functions to avoid an import cycle at module load.
|
||||||
"""
|
"""
|
||||||
|
import asyncio
|
||||||
import logging
|
import logging
|
||||||
from typing import Dict, Optional
|
from typing import Dict, Optional
|
||||||
|
|
||||||
@@ -46,7 +47,7 @@ async def chat_with_model(content: str, session_id: Optional[str] = None, owner:
|
|||||||
return {"error": "No message provided (line 2+ is the message)"}
|
return {"error": "No message provided (line 2+ is the message)"}
|
||||||
|
|
||||||
try:
|
try:
|
||||||
url, model, headers = _resolve_model(model_spec, owner=owner)
|
url, model, headers = await asyncio.to_thread(_resolve_model, model_spec, owner=owner)
|
||||||
except ValueError as e:
|
except ValueError as e:
|
||||||
return {"error": str(e)}
|
return {"error": str(e)}
|
||||||
|
|
||||||
@@ -90,7 +91,7 @@ async def ask_teacher(content: str, session_id: Optional[str] = None, owner: Opt
|
|||||||
return {"error": "No teacher model configured. Specify a model name or set teacher_model in settings."}
|
return {"error": "No teacher model configured. Specify a model name or set teacher_model in settings."}
|
||||||
|
|
||||||
try:
|
try:
|
||||||
url, model, headers = _resolve_model(model_spec, owner=owner)
|
url, model, headers = await asyncio.to_thread(_resolve_model, model_spec, owner=owner)
|
||||||
except ValueError as e:
|
except ValueError as e:
|
||||||
return {"error": str(e)}
|
return {"error": str(e)}
|
||||||
|
|
||||||
|
|||||||
@@ -8,6 +8,7 @@ The session manager is a runtime-set singleton in src.ai_interaction, so each
|
|||||||
function fetches it via get_session_manager() (imported here); _resolve_model and
|
function fetches it via get_session_manager() (imported here); _resolve_model and
|
||||||
AI_CHAT_TIMEOUT are reused from there too.
|
AI_CHAT_TIMEOUT are reused from there too.
|
||||||
"""
|
"""
|
||||||
|
import asyncio
|
||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
import uuid
|
import uuid
|
||||||
@@ -40,7 +41,7 @@ async def create_session(content: str, session_id: Optional[str] = None, owner:
|
|||||||
return {"error": "Session name cannot be empty"}
|
return {"error": "Session name cannot be empty"}
|
||||||
|
|
||||||
try:
|
try:
|
||||||
url, model, headers = _resolve_model(model_spec, owner=owner)
|
url, model, headers = await asyncio.to_thread(_resolve_model, model_spec, owner=owner)
|
||||||
except ValueError as e:
|
except ValueError as e:
|
||||||
return {"error": str(e)}
|
return {"error": str(e)}
|
||||||
|
|
||||||
|
|||||||
@@ -14,6 +14,7 @@ These are agent tools — the LLM writes fenced code blocks and they execute
|
|||||||
through the standard agent_tools.py pipeline.
|
through the standard agent_tools.py pipeline.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
import asyncio
|
||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
import uuid
|
import uuid
|
||||||
@@ -229,7 +230,7 @@ async def do_pipeline(content: str, session_id: Optional[str] = None, owner: Opt
|
|||||||
if not model_spec or not instruction:
|
if not model_spec or not instruction:
|
||||||
return {"error": f"Step {i + 1}: both 'model' and 'instruction' are required"}
|
return {"error": f"Step {i + 1}: both 'model' and 'instruction' are required"}
|
||||||
try:
|
try:
|
||||||
url, model, headers = _resolve_model(model_spec, owner=owner)
|
url, model, headers = await asyncio.to_thread(_resolve_model, model_spec, owner=owner)
|
||||||
resolved.append((url, model, headers, instruction))
|
resolved.append((url, model, headers, instruction))
|
||||||
except ValueError as e:
|
except ValueError as e:
|
||||||
return {"error": f"Step {i + 1}: {e}"}
|
return {"error": f"Step {i + 1}: {e}"}
|
||||||
@@ -624,7 +625,7 @@ async def do_ui_control(content: str, session_id: Optional[str] = None, owner: O
|
|||||||
|
|
||||||
# Resolve the model to validate it exists
|
# Resolve the model to validate it exists
|
||||||
try:
|
try:
|
||||||
url, model_id, headers = _resolve_model(model_spec, owner=owner)
|
url, model_id, headers = await asyncio.to_thread(_resolve_model, model_spec, owner=owner)
|
||||||
except ValueError as e:
|
except ValueError as e:
|
||||||
return {"error": str(e)}
|
return {"error": str(e)}
|
||||||
|
|
||||||
@@ -914,7 +915,7 @@ async def do_generate_image(content: str, session_id: Optional[str] = None, owne
|
|||||||
if not model_spec:
|
if not model_spec:
|
||||||
for candidate in ("gpt-image-1.5", "gpt-image-1", "dall-e-3"):
|
for candidate in ("gpt-image-1.5", "gpt-image-1", "dall-e-3"):
|
||||||
try:
|
try:
|
||||||
_resolve_model(candidate, owner=owner)
|
await asyncio.to_thread(_resolve_model, candidate, owner=owner)
|
||||||
model_spec = candidate
|
model_spec = candidate
|
||||||
break
|
break
|
||||||
except ValueError:
|
except ValueError:
|
||||||
@@ -958,7 +959,7 @@ async def do_generate_image(content: str, session_id: Optional[str] = None, owne
|
|||||||
|
|
||||||
# Resolve the model to find the right endpoint
|
# Resolve the model to find the right endpoint
|
||||||
try:
|
try:
|
||||||
url, model_id, headers = _resolve_model(model_spec, owner=owner)
|
url, model_id, headers = await asyncio.to_thread(_resolve_model, model_spec, owner=owner)
|
||||||
except ValueError:
|
except ValueError:
|
||||||
return {"error": f"No endpoint found with image model '{model_spec}'. "
|
return {"error": f"No endpoint found with image model '{model_spec}'. "
|
||||||
"Configure an OpenAI-compatible endpoint with image generation support."}
|
"Configure an OpenAI-compatible endpoint with image generation support."}
|
||||||
|
|||||||
@@ -235,7 +235,7 @@ async def _call_teacher(teacher_model_spec: str, prompt: str,
|
|||||||
from src.llm_core import llm_call_async
|
from src.llm_core import llm_call_async
|
||||||
from src.ai_interaction import _resolve_model, _TEACHER_SYSTEM_PROMPT
|
from src.ai_interaction import _resolve_model, _TEACHER_SYSTEM_PROMPT
|
||||||
try:
|
try:
|
||||||
url, model, headers = _resolve_model(teacher_model_spec, owner=owner)
|
url, model, headers = await asyncio.to_thread(_resolve_model, teacher_model_spec, owner=owner)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning(f"teacher endpoint not resolvable ({teacher_model_spec!r}): {e}")
|
logger.warning(f"teacher endpoint not resolvable ({teacher_model_spec!r}): {e}")
|
||||||
return None
|
return None
|
||||||
@@ -619,7 +619,7 @@ async def run_teacher_inline(
|
|||||||
# Resolve teacher endpoint
|
# Resolve teacher endpoint
|
||||||
try:
|
try:
|
||||||
from src.ai_interaction import _resolve_model
|
from src.ai_interaction import _resolve_model
|
||||||
teacher_url, teacher_model, teacher_headers = _resolve_model(teacher_spec, owner=owner)
|
teacher_url, teacher_model, teacher_headers = await asyncio.to_thread(_resolve_model, teacher_spec, owner=owner)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning(f"teacher endpoint not resolvable ({teacher_spec!r}): {e}")
|
logger.warning(f"teacher endpoint not resolvable ({teacher_spec!r}): {e}")
|
||||||
yield (
|
yield (
|
||||||
|
|||||||
@@ -25,9 +25,10 @@ def test_model_listing_and_image_fallback_are_owner_scoped():
|
|||||||
|
|
||||||
assert "owner: Optional[str] = None" in list_body
|
assert "owner: Optional[str] = None" in list_body
|
||||||
assert "owner_filter(query, ModelEndpoint, owner)" in list_body
|
assert "owner_filter(query, ModelEndpoint, owner)" in list_body
|
||||||
assert "_resolve_model(candidate, owner=owner)" in image_body
|
# _resolve_model is offloaded to a worker thread (#4589) but stays owner-scoped.
|
||||||
|
assert "asyncio.to_thread(_resolve_model, candidate, owner=owner)" in image_body
|
||||||
assert "owner_filter(_img_q, ModelEndpoint, owner)" in image_body
|
assert "owner_filter(_img_q, ModelEndpoint, owner)" in image_body
|
||||||
assert "_resolve_model(model_spec, owner=owner)" in image_body
|
assert "asyncio.to_thread(_resolve_model, model_spec, owner=owner)" in image_body
|
||||||
|
|
||||||
|
|
||||||
# chat_with_model, list_models and ask_teacher moved to the registry (#3629)
|
# chat_with_model, list_models and ask_teacher moved to the registry (#3629)
|
||||||
|
|||||||
@@ -0,0 +1,61 @@
|
|||||||
|
"""Issue #4589 — _resolve_model does a blocking httpx.get, so calling it
|
||||||
|
directly from an async handler stalls the whole event loop for the duration of
|
||||||
|
the probe. The async call sites now wrap it in asyncio.to_thread.
|
||||||
|
|
||||||
|
do_pipeline is used as the representative handler: _resolve_model is the first
|
||||||
|
real work it does, and a ValueError returns early before any LLM call, so these
|
||||||
|
tests drive the offload path without a live model endpoint.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import threading
|
||||||
|
import time
|
||||||
|
|
||||||
|
import src.ai_interaction as ai
|
||||||
|
|
||||||
|
|
||||||
|
async def test_do_pipeline_resolves_model_off_the_event_loop(monkeypatch):
|
||||||
|
# A deliberately blocking _resolve_model that records how many copies run
|
||||||
|
# at once. If it ran on the event loop, the first call would block the loop
|
||||||
|
# and the second could not start — peak concurrency would be 1.
|
||||||
|
state = {"active": 0, "peak": 0}
|
||||||
|
lock = threading.Lock()
|
||||||
|
|
||||||
|
def slow_resolve(spec, owner=None):
|
||||||
|
with lock:
|
||||||
|
state["active"] += 1
|
||||||
|
state["peak"] = max(state["peak"], state["active"])
|
||||||
|
time.sleep(0.2)
|
||||||
|
with lock:
|
||||||
|
state["active"] -= 1
|
||||||
|
raise ValueError("no such model") # early-return path, no LLM call
|
||||||
|
|
||||||
|
monkeypatch.setattr(ai, "_resolve_model", slow_resolve)
|
||||||
|
|
||||||
|
content = '[{"model": "m", "instruction": "go"}]'
|
||||||
|
results = await asyncio.gather(
|
||||||
|
ai.do_pipeline(content, owner="u"),
|
||||||
|
ai.do_pipeline(content, owner="u"),
|
||||||
|
)
|
||||||
|
|
||||||
|
assert all("error" in r for r in results)
|
||||||
|
assert state["peak"] == 2, "resolutions did not overlap — call still blocks the loop"
|
||||||
|
|
||||||
|
|
||||||
|
async def test_do_pipeline_uses_offloaded_resolution_result(monkeypatch):
|
||||||
|
# The offload must also return the resolved tuple, not just propagate errors.
|
||||||
|
monkeypatch.setattr(
|
||||||
|
ai, "_resolve_model",
|
||||||
|
lambda spec, owner=None: ("http://x/v1/chat/completions", "resolved-model", {}),
|
||||||
|
)
|
||||||
|
|
||||||
|
async def fake_llm(url, model, messages, **kwargs):
|
||||||
|
return f"output from {model}"
|
||||||
|
|
||||||
|
monkeypatch.setattr("src.llm_core.llm_call_async", fake_llm)
|
||||||
|
|
||||||
|
result = await ai.do_pipeline('[{"model": "m", "instruction": "go"}]', owner="u")
|
||||||
|
|
||||||
|
assert "error" not in result, result
|
||||||
|
# The model the offloaded _resolve_model returned made it through to the call.
|
||||||
|
assert "resolved-model" in str(result)
|
||||||
Reference in New Issue
Block a user