Cookbook model workflow fixes

This commit is contained in:
pewdiepie-archdaemon
2026-06-21 11:02:35 +00:00
parent 8c46172e87
commit c504214925
38 changed files with 3042 additions and 459 deletions
+174 -8
View File
@@ -751,6 +751,17 @@ def _extract_last_user_message(messages: List[Dict]) -> str:
_LOW_SIGNAL_RE = re.compile(r"^[\W_]*$", re.UNICODE)
_CASUAL_OPENING_RE = re.compile(
r"^\s*(?:h+i+|hey+|hello+|yo+|sup+|what'?s up|wass?up|hiya|howdy|"
r"lol|lmao|haha+|hehe+|thanks?|thank you|ty|idk|dunno|meh|bruh|bro)\b(?P<tail>.*)$",
re.IGNORECASE,
)
_CASUAL_BLOCKLIST_RE = re.compile(
r"\b(?:cookbook|serve|serving|launch|start|vllm|sglang|llama\.?cpp|ollama|"
r"download|model|email|document|doc|note|calendar|task|search|web|research|"
r"file|folder|repo|git|settings?|endpoint|api|token|mcp)\b",
re.IGNORECASE,
)
_EXPLICIT_CONTINUATION_RE = re.compile(
r"^\s*(?:"
r"yes|y|yeah|yep|ok|okay|sure|do it|go ahead|continue|carry on|"
@@ -760,6 +771,17 @@ _EXPLICIT_CONTINUATION_RE = re.compile(
r")\s*[.!?]*\s*$",
re.IGNORECASE,
)
_RETRY_CONTINUATION_RE = re.compile(
r"\b(?:try again|retry|again|rerun|re-run|run it again|launch it again|"
r"start it again|failed|fails?|died|crashed|broke|insta|instantly)\b",
re.IGNORECASE,
)
_COOKBOOK_CONTEXT_RE = re.compile(
r"\b(?:cookbook|serve|serving|served|launch|start|preset|vllm|sglang|"
r"llama\.?cpp|ollama|download|cached models?|model servers?|running models?|"
r"gpu box|ajax|qwen|gemma|llama|mistral|minimax)\b",
re.IGNORECASE,
)
def _is_explicit_continuation(text: str) -> bool:
@@ -767,6 +789,37 @@ def _is_explicit_continuation(text: str) -> bool:
return bool(_EXPLICIT_CONTINUATION_RE.match(str(text or "").strip()))
def _is_casual_low_signal(text: str) -> bool:
"""True for short greetings/slang that should not inherit stale context."""
s = str(text or "").strip()
m = _CASUAL_OPENING_RE.match(s)
if not m:
return False
tail = m.group("tail") or ""
if _CASUAL_BLOCKLIST_RE.search(tail):
return False
# Allow a short vocative/address after the opener without hardcoding the
# address term itself: "hey man", "yo dude", "sup <name>". Longer tails are
# more likely to be an actual request and should get normal context/tooling.
tail_words = re.findall(r"[A-Za-z0-9_'-]+", tail)
return len(tail_words) <= 2
def _is_contextual_retry_continuation(messages: List[Dict], text: str) -> bool:
"""Treat "try again / it failed" as a continuation only for active tool work.
These follow-ups are common after Cookbook launches: the latest user turn
says only "try again it failed", while the actionable model/host/command
details live one or two turns back. Keep this intentionally narrow so
ordinary chat does not inherit stale Cookbook context.
"""
latest = str(text or "").strip()
if not latest or not _RETRY_CONTINUATION_RE.search(latest):
return False
recent = _recent_context_for_retrieval(messages, max_user=5, max_chars=1200)
return bool(_COOKBOOK_CONTEXT_RE.search(recent))
def _assistant_requested_followup(messages: List[Dict]) -> bool:
"""True when the previous assistant turn asked for missing task details.
@@ -808,11 +861,12 @@ def _classify_agent_request(messages: List[Dict], last_user: str) -> Dict[str, o
which domain rule packs get appended to the system prompt.
"""
text = str(last_user or "").strip()
continuation = _is_explicit_continuation(text) or _assistant_requested_followup(messages)
retry_continuation = _is_contextual_retry_continuation(messages, text)
continuation = _is_explicit_continuation(text) or _assistant_requested_followup(messages) or retry_continuation
retrieval_query = _recent_context_for_retrieval(messages) if continuation else text
q = retrieval_query.lower()
if not text or bool(_LOW_SIGNAL_RE.match(text)):
if not text or bool(_LOW_SIGNAL_RE.match(text)) or _is_casual_low_signal(text):
return {
"low_signal": True,
"continuation": False,
@@ -907,6 +961,7 @@ def _build_system_prompt(
compact: bool = False,
owner: Optional[str] = None,
suppress_local_context: bool = False,
suppress_skills: bool = False,
active_email: Optional[Dict[str, str]] = None,
) -> List[Dict]:
"""Build agent system prompt, inject MCP/document context, merge consecutive system msgs."""
@@ -924,7 +979,7 @@ def _build_system_prompt(
_ov_sig = _hl.sha256(_json.dumps(get_builtin_overrides() or {}, sort_keys=True).encode()).hexdigest()
except Exception:
_ov_sig = ""
cache_key = (frozenset(disabled_tools or []), bool(mcp_mgr), needs_admin, _rt_key, compact, _ov_sig, owner, suppress_local_context)
cache_key = (frozenset(disabled_tools or []), bool(mcp_mgr), needs_admin, _rt_key, compact, _ov_sig, owner, suppress_local_context, suppress_skills)
if _cached_base_prompt and _cached_base_prompt_key == cache_key and not active_document:
agent_prompt = _cached_base_prompt
# Skill index is user-editable (name + description), so it must never
@@ -934,6 +989,7 @@ def _build_system_prompt(
disabled_tools, mcp_mgr, needs_admin, relevant_tools,
mcp_disabled_map=mcp_disabled_map, compact=compact, owner=owner,
suppress_local_context=suppress_local_context,
suppress_skills=suppress_skills,
)
else:
agent_prompt, _skill_index_block = _build_base_prompt(
@@ -945,6 +1001,7 @@ def _build_system_prompt(
compact=compact,
owner=owner,
suppress_local_context=suppress_local_context,
suppress_skills=suppress_skills,
)
if not active_document:
_cached_base_prompt = agent_prompt
@@ -1228,7 +1285,7 @@ def _build_system_prompt(
# few. If the teacher wrote a procedure for "open my X chat" last
# time the student failed, this is where the student finds it
# before deciding which tool to call.
if not suppress_local_context:
if not suppress_local_context and not suppress_skills:
try:
last_user = _extract_last_user_message(messages)
# Respect the user's skills-enabled toggle (mirrors memory_enabled).
@@ -1395,6 +1452,7 @@ def _build_base_prompt(
compact: bool = False,
owner: Optional[str] = None,
suppress_local_context: bool = False,
suppress_skills: bool = False,
):
"""Build the agent prompt with only relevant tools included.
@@ -1447,7 +1505,7 @@ def _build_base_prompt(
# The caller wraps it in untrusted_context_message and ships it as a
# user-role message — same treatment as the matched-skills block.
skill_index_block = ""
if not suppress_local_context:
if not suppress_local_context and not suppress_skills:
try:
from services.memory.skills import SkillsManager
from src.constants import DATA_DIR
@@ -1866,6 +1924,7 @@ async def stream_agent_loop(
approved_plan: Optional[str] = None,
tool_policy: Optional[ToolPolicy] = None,
workspace: Optional[str] = None,
forced_tools: Optional[Set[str]] = None,
_is_teacher_run: bool = False,
) -> AsyncGenerator[str, None]:
"""Streaming agent loop generator.
@@ -1905,6 +1964,18 @@ async def stream_agent_loop(
_needs_admin = _detect_admin_intent(messages)
_last_user = _extract_last_user_message(messages)
_intent = _classify_agent_request(messages, _last_user)
_low_signal_turn = bool(_intent.get("low_signal"))
_casual_low_signal_turn = _is_casual_low_signal(_last_user)
_direct_low_signal = (
_low_signal_turn
and not bool(_intent.get("continuation"))
and not plan_mode
and not approved_plan
and (_casual_low_signal_turn or active_document is None)
and (_casual_low_signal_turn or not active_email)
and (_casual_low_signal_turn or not workspace)
and not forced_tools
)
# Tool retrieval uses the latest message by default. It may inherit recent
# user turns only for explicit continuations ("yes", "do it", "1").
_retrieval_query = str(_intent.get("retrieval_query") or _last_user)
@@ -1912,11 +1983,86 @@ async def stream_agent_loop(
"[agent-intent] latest=%r continuation=%s low_signal=%s domains=%s retrieval_query=%r",
_last_user[:120],
bool(_intent.get("continuation")),
bool(_intent.get("low_signal")),
_low_signal_turn,
sorted(_intent.get("domains") or []),
_retrieval_query[:200],
)
_mcp_disabled_map = _load_mcp_disabled_map() if mcp_mgr else {}
if _direct_low_signal:
logger.info("[agent] direct low-signal reply path for latest=%r", _last_user[:80])
direct_messages = [{"role": "user", "content": _last_user}]
direct_response = ""
direct_start = time.time()
direct_actual_model = model
real_input_tokens = 0
real_output_tokens = 0
try:
async for chunk in stream_llm_with_fallback(
[(endpoint_url, model, headers)] + list(fallbacks or []),
direct_messages,
temperature=temperature,
max_tokens=min(max_tokens or 128, 128),
prompt_type=None,
tools=None,
timeout=int(get_setting("agent_stream_timeout_seconds", 300) or 300),
session_id=session_id,
):
if chunk.startswith("data: ") and not chunk.startswith("data: [DONE]"):
try:
data = json.loads(chunk[6:])
except json.JSONDecodeError:
yield chunk
continue
if data.get("type") == "usage":
usage = data.get("data", {}) or {}
direct_actual_model = usage.get("model") or direct_actual_model
real_input_tokens += usage.get("input_tokens", 0) or 0
real_output_tokens += usage.get("output_tokens", 0) or 0
continue
if data.get("type") == "model_actual":
direct_actual_model = data.get("model") or direct_actual_model
data["requested_model"] = model
yield f"data: {json.dumps(data)}\n\n"
continue
if data.get("type") == "fallback":
direct_actual_model = data.get("answered_by") or direct_actual_model
yield chunk
continue
if "delta" in data:
if not data.get("thinking"):
direct_response += data.get("delta", "")
yield chunk
continue
yield chunk
elif chunk.startswith("event: "):
yield chunk
except Exception as _direct_err:
logger.warning("[agent] direct low-signal path failed: %s", _direct_err)
fallback = "Hey."
direct_response += fallback
yield f"data: {json.dumps({'delta': fallback})}\n\n"
if not direct_response.strip():
fallback = "Hey."
direct_response = fallback
yield f"data: {json.dumps({'delta': fallback})}\n\n"
duration = time.time() - direct_start
metrics = {
"model": direct_actual_model,
"requested_model": model,
"input_tokens": real_input_tokens or estimate_tokens(direct_messages),
"output_tokens": real_output_tokens or max(len(direct_response) // 4, 1),
"total_time": round(duration, 2),
"response_time": round(duration, 2),
"agent_rounds": 0,
"tool_calls": 0,
"direct_low_signal": True,
}
yield f"data: {json.dumps({'type': 'metrics', 'data': metrics})}\n\n"
yield "data: [DONE]\n\n"
return
if plan_mode and mcp_mgr:
# Allow read-only MCP tools to investigate, block write/unknown ones:
# hide them from the schemas AND reject them at runtime by qualified name.
@@ -1932,7 +2078,7 @@ async def stream_agent_loop(
_t1 = time.time()
if _relevant_tools:
logger.info(f"[tool-rag] Using caller-provided relevant_tools ({len(_relevant_tools)} tools)")
if not guide_only and not _relevant_tools and bool(_intent.get("low_signal")):
if not guide_only and not _relevant_tools and _low_signal_turn:
from src.tool_index import ALWAYS_AVAILABLE
if workspace:
# An active workspace IS the file-work signal: a vague "look at the
@@ -2023,6 +2169,15 @@ async def stream_agent_loop(
if _relevant_tools is not None and active_document is not None:
_relevant_tools.update({"edit_document", "update_document", "suggest_document"})
# Per-request UI toggles are stronger than retrieval. If the user turns on
# Search, the model must see the search tools even when the latest text is a
# typo or otherwise low-signal for tool RAG.
if not guide_only and forced_tools:
if _relevant_tools is None:
from src.tool_index import ALWAYS_AVAILABLE
_relevant_tools = set(ALWAYS_AVAILABLE)
_relevant_tools.update(t for t in forced_tools if t not in disabled_tools)
# The skill index injected by _build_system_prompt tells the model to
# call `manage_skills action=view`, and Jaccard-matched skills are pasted
# into the prompt as procedures to follow — but neither path goes through
@@ -2030,7 +2185,7 @@ async def stream_agent_loop(
# (grep, read_file, ...) that aren't in its schema list. Keep the schemas
# in lockstep: manage_skills is callable whenever any skill is indexed,
# and a matched skill's declared requires_toolsets ride along with it.
if not guide_only and _relevant_tools is not None:
if not guide_only and _relevant_tools is not None and not _low_signal_turn:
try:
from services.memory.skills import SkillsManager
from src.constants import DATA_DIR
@@ -2147,6 +2302,7 @@ async def stream_agent_loop(
compact=_compact_agent_prompt,
owner=owner,
suppress_local_context=guide_only,
suppress_skills=_low_signal_turn,
active_email=active_email,
)
if plan_mode and not guide_only:
@@ -2753,6 +2909,15 @@ async def stream_agent_loop(
_intent_nudge_count += 1
_matched_phrase = _intent_match.group(0).strip()
logger.info(f"[agent] intent-without-action nudge #{_intent_nudge_count} on round {round_num}: {_matched_phrase!r}")
_lower_phrase = _matched_phrase.lower()
_cookbook_log_hint = ""
if any(_word in _lower_phrase for _word in ("log", "logs", "output", "tail", "status")):
_cookbook_log_hint = (
" If this is about a Cookbook/model serve, the concrete calls are: "
"`list_served_models` first, then `tail_serve_output` with the "
"session_id from the serve/list result. Never answer with "
"\"check logs\" when those tools are available."
)
messages.append({
"role": "system",
"content": (
@@ -2761,6 +2926,7 @@ async def stream_agent_loop(
"see you announced the action but didn't run it, which "
"is the most frustrating thing you can do. "
"DO IT NOW: emit the actual function call this turn. "
f"{_cookbook_log_hint}"
"If you decided not to do it after all, say so plainly in "
"one sentence instead of restating the plan."
),
+33 -11
View File
@@ -7,6 +7,7 @@ from src.constants import MAX_OUTPUT_CHARS
class WebSearchTool:
async def execute(self, content: str, ctx: dict) -> dict:
from src.search import comprehensive_web_search
progress_cb = ctx.get("progress_cb") if isinstance(ctx, dict) else None
raw = content.strip()
query = raw
time_filter = None
@@ -37,18 +38,39 @@ class WebSearchTool:
elif " news" in q_lc or q_lc.startswith("news ") or q_lc.endswith(" news"):
time_filter = "week"
loop = asyncio.get_running_loop()
text, sources = await asyncio.wait_for(
loop.run_in_executor(
None,
lambda: comprehensive_web_search(
query,
max_pages=max_pages,
time_filter=time_filter,
return_sources=True,
if progress_cb:
await progress_cb({
"elapsed_s": 0,
"tail": f"Searching web for: {query[:160]}",
})
try:
text, sources = await asyncio.wait_for(
loop.run_in_executor(
None,
lambda: comprehensive_web_search(
query,
max_pages=max_pages,
time_filter=time_filter,
return_sources=True,
),
),
),
timeout=30,
)
timeout=30,
)
except asyncio.TimeoutError:
return {
"error": f"web_search timed out after 30s: {query[:200]}",
"exit_code": 1,
}
except Exception as e:
return {
"error": f"web_search failed: {type(e).__name__}: {str(e) or 'no details'}",
"exit_code": 1,
}
if progress_cb:
await progress_cb({
"elapsed_s": 30,
"tail": "Search completed; preparing sources.",
})
output = text[:MAX_OUTPUT_CHARS] if len(text) > MAX_OUTPUT_CHARS else text
if sources:
output += "\n\n<!-- SOURCES:" + json.dumps(sources) + " -->"
+50 -48
View File
@@ -76,8 +76,7 @@ async def action_consolidate_memory(owner: str, **kwargs) -> Tuple[str, bool]:
import json
import re
from src.constants import DATA_DIR
from src.endpoint_resolver import resolve_endpoint
from src.llm_core import llm_call_async
from src.llm_core import llm_call_async_with_fallback
from src.memory import MemoryManager
manager = MemoryManager(DATA_DIR)
@@ -116,10 +115,9 @@ async def action_consolidate_memory(owner: str, **kwargs) -> Tuple[str, bool]:
if len(group_memories) < 2:
return False
url, model, headers = resolve_endpoint("utility", owner=group_owner or None)
if not url or not model:
url, model, headers = resolve_endpoint("default", owner=group_owner or None)
if not url or not model:
from src.task_endpoint import resolve_task_candidates
candidates = resolve_task_candidates(owner=group_owner or None)
if not candidates:
return False
try:
@@ -147,13 +145,11 @@ async def action_consolidate_memory(owner: str, **kwargs) -> Tuple[str, bool]:
"\"drop\":[{\"id\":\"existing id\",\"reason\":\"short reason\"}]}\n\n"
f"MEMORIES:\n{json.dumps(items, ensure_ascii=False)}"
)
raw = await llm_call_async(
url=url,
model=model,
raw = await llm_call_async_with_fallback(
candidates,
messages=[{"role": "user", "content": prompt}],
temperature=0.0,
max_tokens=4096,
headers=headers,
timeout=120,
)
from src.text_helpers import strip_think
@@ -604,8 +600,7 @@ async def action_classify_events(owner: str, **kwargs) -> Tuple[str, bool]:
try:
from datetime import timedelta
from core.database import SessionLocal, CalendarEvent
from src.endpoint_resolver import resolve_endpoint
from src.llm_core import llm_call_async
from src.llm_core import llm_call_async_with_fallback
import re as _re, json as _json
db = SessionLocal()
@@ -620,10 +615,9 @@ async def action_classify_events(owner: str, **kwargs) -> Tuple[str, bool]:
if not events:
return "No upcoming events to classify", True
llm_url, llm_model, llm_headers = resolve_endpoint("utility", owner=owner)
if not llm_url:
llm_url, llm_model, llm_headers = resolve_endpoint("default", owner=owner)
llm_available = bool(llm_url and llm_model)
from src.task_endpoint import resolve_task_candidates
llm_candidates = resolve_task_candidates(owner=owner)
llm_available = bool(llm_candidates)
# Pull user memories so the LLM has personal context (relationships,
# job, hobbies). Helps it know e.g. "<name> is your spouse" so their
@@ -699,11 +693,11 @@ async def action_classify_events(owner: str, **kwargs) -> Tuple[str, bool]:
f"EVENTS: {_json.dumps(items)}"
)
try:
raw = await llm_call_async(
url=llm_url, model=llm_model,
raw = await llm_call_async_with_fallback(
llm_candidates,
messages=[{"role": "user", "content": prompt}],
temperature=0.1, max_tokens=16384,
headers=llm_headers, timeout=180,
timeout=180,
)
from src.text_helpers import strip_think as _st
raw = _st(raw or "", prose=False, prompt_echo=False)
@@ -810,8 +804,7 @@ async def action_learn_sender_signatures(owner: str, **kwargs) -> Tuple[str, boo
import asyncio as _aio
from datetime import datetime as _dt, timedelta as _td
from routes.email_helpers import _email_cache_owner_clause, _imap_connect, SCHEDULED_DB
from src.endpoint_resolver import resolve_endpoint
from src.llm_core import llm_call_async
from src.llm_core import llm_call_async_with_fallback
# 1. Pull recent UIDs + From headers cheaply (header-only fetch).
def _pull_headers():
@@ -891,11 +884,11 @@ async def action_learn_sender_signatures(owner: str, **kwargs) -> Tuple[str, boo
if not eligible:
return "All sender sigs already cached (or no eligible senders)", True
url, model, headers = resolve_endpoint("utility", owner=owner)
if not url or not model:
url, model, headers = resolve_endpoint("default", owner=owner)
if not url or not model:
from src.task_endpoint import resolve_task_candidates
candidates = resolve_task_candidates(owner=owner)
if not candidates:
return "No LLM endpoint available", False
model = candidates[0][1]
analyzed = 0
no_sig = 0
@@ -949,11 +942,11 @@ async def action_learn_sender_signatures(owner: str, **kwargs) -> Tuple[str, boo
)
try:
raw = await llm_call_async(
url=url, model=model,
raw = await llm_call_async_with_fallback(
candidates,
messages=[{"role": "user", "content": prompt}],
temperature=0.0, max_tokens=600,
headers=headers, timeout=60,
timeout=60,
)
from src.text_helpers import strip_think as _st
sig = _st(raw or "", prose=False, prompt_echo=False).strip()
@@ -1137,7 +1130,6 @@ async def action_test_skills(owner: str, **kwargs) -> Tuple[str, bool]:
from services.memory.skills import SkillsManager
from src.constants import DATA_DIR
from routes.skills_routes import _run_skill_test_once, _skill_test_task
from src.endpoint_resolver import resolve_endpoint
# #3 SCOPE GUARD: refuse to run on a None/empty owner — otherwise
# `sm.load(owner=None)` returns every user's skills and we'd cross-
@@ -1152,27 +1144,40 @@ async def action_test_skills(owner: str, **kwargs) -> Tuple[str, bool]:
if not names:
raise TaskNoop("no skills to test")
url, model, headers = resolve_endpoint("default", owner=owner)
if not url or not model:
from src.task_endpoint import resolve_task_candidates
candidates = resolve_task_candidates(owner=owner)
if not candidates:
return "No Default/Utility model configured — set one in Settings.", False
# #2 NO SILENT MODEL SWAP: if the configured model isn't served by the
# endpoint, try a basename match — but fail loudly instead of grabbing
# `avail[0]` which could be an embedding-only model and produce 36
# garbage transcripts → 36 'unknown' verdicts with no hint why.
url, model, headers = candidates[0]
try:
from src.llm_core import list_model_ids
avail = list_model_ids(url, headers=headers)
if avail and model not in avail:
import os as _os
base = _os.path.basename((model or "").rstrip("/"))
m = next((a for a in avail if _os.path.basename(a.rstrip("/")) == base), None)
if m:
model = m
else:
return (f"Default model '{model}' not served by endpoint {url}. "
f"Available: {', '.join(avail[:8])}{'' if len(avail) > 8 else ''}. "
"Set a valid Default model in Settings."), False
import os as _os
selected = None
mismatch_notes = []
for cand_url, cand_model, cand_headers in candidates:
avail = list_model_ids(cand_url, headers=cand_headers)
if not avail or cand_model in avail:
selected = (cand_url, cand_model, cand_headers)
break
base = _os.path.basename((cand_model or "").rstrip("/"))
matched = next((a for a in avail if _os.path.basename(a.rstrip("/")) == base), None)
if matched:
selected = (cand_url, matched, cand_headers)
break
mismatch_notes.append(
f"{cand_model} not served by {cand_url}; available: "
f"{', '.join(avail[:8])}{'...' if len(avail) > 8 else ''}"
)
if selected:
url, model, headers = selected
elif mismatch_notes:
return "No configured task fallback model is served. " + " | ".join(mismatch_notes[:3]), False
except Exception as _e:
logger.warning(f"test_skills model resolve check failed (continuing): {_e}")
@@ -1483,7 +1488,6 @@ async def action_check_email_urgency(owner: str, **kwargs) -> Tuple[str, bool]:
from pathlib import Path as _P
from core.database import SessionLocal as _SL, EmailAccount as _EA
from routes.email_helpers import _imap_connect, _decode_header
from src.endpoint_resolver import resolve_endpoint, resolve_utility_fallback_candidates
from src.llm_core import llm_call_async_with_fallback
# Per-owner state file so multi-user runs don't clobber each other's
@@ -1505,12 +1509,10 @@ async def action_check_email_urgency(owner: str, **kwargs) -> Tuple[str, bool]:
# ── 1. Resolve LLM candidates (utility primary + utility fallbacks; fall
# through to default chat as a last resort).
url, model, headers = resolve_endpoint("utility", owner=owner)
if not url or not model:
url, model, headers = resolve_endpoint("default", owner=owner)
if not url or not model:
from src.task_endpoint import resolve_task_candidates
candidates = resolve_task_candidates(owner=owner)
if not candidates:
return "No LLM endpoint available", False
candidates = [(url, model, headers)] + resolve_utility_fallback_candidates(owner=owner)
# ── 2. Enumerate enabled accounts. Match this task's owner AND fall
# back to the legacy "unowned account whose imap_user / from_address
+3
View File
@@ -396,6 +396,9 @@ def resolve_utility_fallback_candidates(owner: Optional[str] = None) -> list:
settings = load_settings()
utility_ep = (get_user_setting("utility_endpoint_id", owner or "", settings.get("utility_endpoint_id", "")) or "").strip()
if not utility_ep:
utility_chain = get_user_setting("utility_model_fallbacks", owner or "", settings.get("utility_model_fallbacks") or []) or []
if utility_chain:
return _resolve_fallback_candidates("utility_model_fallbacks", owner=owner)
return _resolve_fallback_candidates("default_model_fallbacks", owner=owner)
except Exception:
pass
+2
View File
@@ -2130,6 +2130,8 @@ async def stream_llm(url: str, model: str, messages: List[Dict], temperature: fl
yield _stream_delta_event(reasoning, thinking=True)
content = delta.get("content") or ""
if content:
content = re.sub(r"<mm:think(\s+[^>]*)?>", r"<think\1>", content, flags=re.IGNORECASE)
content = re.sub(r"</mm:think>", "</think>", content, flags=re.IGNORECASE)
stripped = content.lstrip()
# gpt-oss harmony format (<|channel|>analysis/final): route via the harmony
# stream router. Sticky once the first marker appears — distinct from the
+64 -2
View File
@@ -1,6 +1,11 @@
"""Shared resolver for background-task AI endpoint (auto-naming, memory, sorting)."""
"""Shared resolver for background-task AI endpoints."""
from src.endpoint_resolver import resolve_endpoint
from src.endpoint_resolver import (
resolve_chat_fallback_candidates,
resolve_endpoint,
resolve_utility_fallback_candidates,
)
from src.llm_core import llm_call_async_with_fallback
def resolve_task_endpoint(fallback_url=None, fallback_model=None, fallback_headers=None, owner=None):
@@ -11,3 +16,60 @@ def resolve_task_endpoint(fallback_url=None, fallback_model=None, fallback_heade
endpoint cannot be resolved.
"""
return resolve_endpoint("task", fallback_url, fallback_model, fallback_headers, owner=owner)
def resolve_task_candidates(
fallback_url=None,
fallback_model=None,
fallback_headers=None,
owner=None,
):
"""Return ordered background-task LLM candidates.
Order:
1. configured Background Tasks endpoint/model, or caller fallback
2. Utility endpoint/model
3. Default endpoint/model
4. Utility fallback chain
5. Default fallback chain
"""
candidates = []
def _append(url, model, headers):
if not url or not model:
return
key = (url, model)
if any((u, m) == key for u, m, _ in candidates):
return
candidates.append((url, model, headers or {}))
_append(*resolve_task_endpoint(fallback_url, fallback_model, fallback_headers, owner=owner))
_append(*resolve_endpoint("utility", owner=owner))
_append(*resolve_endpoint("default", owner=owner))
for url, model, headers in resolve_utility_fallback_candidates(owner=owner):
_append(url, model, headers)
for url, model, headers in resolve_chat_fallback_candidates(owner=owner):
_append(url, model, headers)
return candidates
async def task_llm_call_async(
messages,
*,
fallback_url=None,
fallback_model=None,
fallback_headers=None,
owner=None,
**kwargs,
):
"""Call the shared background-task LLM candidate chain."""
candidates = resolve_task_candidates(
fallback_url=fallback_url,
fallback_model=fallback_model,
fallback_headers=fallback_headers,
owner=owner,
)
if not candidates:
raise RuntimeError("No LLM endpoint available for background task")
return await llm_call_async_with_fallback(candidates, messages=messages, **kwargs)
+32 -13
View File
@@ -833,6 +833,14 @@ class TaskScheduler:
owner=task.owner,
body=run.result if output == "notification" else None,
)
elif run.status == "error":
self.add_notification(
task.name,
"error",
task_id,
owner=task.owner,
body=run.error or run.result,
)
# Log result to the assistant chat so all task activity is visible.
# Skip skipped/error rows — user shouldn't see "skipped: …" noise
@@ -1406,12 +1414,18 @@ class TaskScheduler:
)
except Exception as e:
logger.warning(f"Agent loop failed for task '{task.name}', falling back to simple call: {e}")
from src.llm_core import llm_call_async
from src.task_endpoint import task_llm_call_async
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": task.prompt},
]
result = await llm_call_async(url=endpoint_url, model=model, messages=messages, timeout=120)
result = await task_llm_call_async(
messages,
fallback_url=endpoint_url,
fallback_model=model,
owner=task.owner,
timeout=120,
)
# Strip the model's chain-of-thought before saving/delivering. Task
# output is LLM-only, so prose=True (which also removes untagged
@@ -1636,13 +1650,17 @@ class TaskScheduler:
# Honor per-task max_steps (defense against runaway agent loops).
# Falls back to 20 if not set — the historical default.
_task_max_rounds = task.max_steps if task.max_steps and task.max_steps > 0 else 20
# Tasks are background workloads they share the Utility model's
# fallback chain (Settings → Utility Model → Fallbacks). A downed
# primary endpoint won't silently yield `(no output)` — same recipe
# chat uses but with the utility list (`utility_model_fallbacks`).
# Tasks are background workloads: use the shared task fallback chain
# behind the primary endpoint so a downed primary won't silently yield
# `(no output)`.
try:
from src.endpoint_resolver import resolve_utility_fallback_candidates
_task_fallbacks = resolve_utility_fallback_candidates(owner=task.owner or None)
from src.task_endpoint import resolve_task_candidates
_task_fallbacks = resolve_task_candidates(
fallback_url=endpoint_url,
fallback_model=model,
fallback_headers=headers,
owner=task.owner or None,
)[1:]
except Exception:
_task_fallbacks = []
async for event_str in stream_agent_loop(
@@ -1679,21 +1697,22 @@ class TaskScheduler:
# asking it to summarize what it did. Guarantees output.
if not full_text.strip():
try:
from src.llm_core import llm_call_async_with_fallback
from src.endpoint_resolver import resolve_utility_fallback_candidates
from src.task_endpoint import task_llm_call_async
grace_context = "You ran out of steps. "
if tool_results:
grace_context += "Here's what your tools returned:\n" + "\n".join(tool_results[-5:])
else:
grace_context += "No tool results were captured."
grace_context += "\n\nSummarize what you accomplished and what's still pending. Be concise."
_grace_candidates = [(endpoint_url, model, headers)] + resolve_utility_fallback_candidates(owner=task.owner or None)
full_text = await llm_call_async_with_fallback(
_grace_candidates,
full_text = await task_llm_call_async(
messages=[
{"role": "system", "content": system_content},
{"role": "user", "content": grace_context},
],
fallback_url=endpoint_url,
fallback_model=model,
fallback_headers=headers,
owner=task.owner or None,
timeout=30,
)
full_text = (full_text or "").strip()
+26 -5
View File
@@ -1119,8 +1119,8 @@ async def do_manage_settings(content: str, owner: Optional[str] = None) -> Dict:
_ALIASES = {
"shell": ["bash"],
"terminal": ["bash"],
"search": ["web_search"],
"web": ["web_search"],
"search": ["web_search", "web_fetch"],
"web": ["web_search", "web_fetch"],
"browser": ["builtin_browser"],
"documents": ["create_document", "edit_document", "update_document", "suggest_document"],
"doc": ["create_document", "edit_document", "update_document", "suggest_document"],
@@ -1132,7 +1132,7 @@ async def do_manage_settings(content: str, owner: Optional[str] = None) -> Dict:
"notes": ["manage_notes"],
"calendar": ["manage_calendar"],
"email": ["mcp__email__list_emails", "mcp__email__read_email", "mcp__email__send_email"],
"research": ["web_search"], # research is a per-request flag, not a tool — closest analog
"research": ["web_search", "web_fetch"], # research is a per-request flag, not a tool — closest analog
}
if action == "list_tools":
@@ -2714,13 +2714,25 @@ async def do_serve_model(content: str, owner: Optional[str] = None) -> Dict:
endpoint_added=endpoint_added, endpoint_id=endpoint_id or "",
)
note = "" if registered else " (state-write failed — task may not show in UI)"
where = host or "local"
log_path = f"/tmp/odysseus-tmux/{sid}.log"
return {
"output": f"Serving {repo_id} (session: {sid}){note}",
"output": (
f"Serving {repo_id} on {where} (session: {sid}){note}\n"
f"Next required check: call list_served_models. If this task is not ready, "
f"call tail_serve_output with session_id={sid} and tail=400 before answering. "
f"Do not tell the user to check logs; you have the log tool."
),
"session_id": sid,
"task_type": "serve",
"phase": "running",
"host": host,
"endpoint_id": endpoint_id,
"log_path": log_path,
"next_tools": [
{"name": "list_served_models", "arguments": {}},
{"name": "tail_serve_output", "arguments": {"session_id": sid, "tail": 400}},
],
"exit_code": 0,
}
# FastAPI HTTPException puts the message under `detail`, not `error`.
@@ -3057,8 +3069,17 @@ async def do_tail_serve_output(content: str, owner: Optional[str] = None) -> Dic
MAX_CHARS = 8000
if len(output_text) > MAX_CHARS:
output_text = "…(earlier output truncated)…\n" + output_text[-MAX_CHARS:]
if not output_text:
output_text = (
f"No log output captured yet for {session_id} on {host_label}. "
"This usually means the tmux wrapper has started but the model process "
"has not printed anything yet. Do not stop here: call list_served_models "
"again to check whether it is still loading, ready, or crashed; if it is "
"still not ready, call tail_serve_output again with a larger tail after "
"the next status check."
)
return {
"output": output_text or "(empty pane)",
"output": output_text,
"session_id": session_id,
"host": host_label,
"tail_lines": tail,