diff --git a/src/tool_implementations.py b/src/tool_implementations.py index 3ef3eac3f..092747452 100644 --- a/src/tool_implementations.py +++ b/src/tool_implementations.py @@ -12,11 +12,9 @@ import os import re from typing import Any, Dict, List, Optional -from fastapi import HTTPException from src.constants import MAX_READ_CHARS, DEEP_RESEARCH_DIR, VAULT_FILE from src.tool_utils import get_mcp_manager from core.constants import internal_api_base -from routes._validators import validate_remote_host, validate_ssh_port # System-domain tools were extracted to src/tools/system.py (slice 1, # #4082/#4071); the admin manage_* tools live in src/agent_tools/admin_tools @@ -32,19 +30,25 @@ from src.agent_tools.admin_tools import ( # noqa: F401 do_manage_tokens, do_manage_settings, _MCP_DENIED_COMMANDS, _validate_mcp_command, ) +# Cookbook (model serving) domain extracted to src/tools/cookbook.py +# (slice 1, #4082/#4071). Re-imported here so this module stays a working +# facade. `_internal_headers` / `_INTERNAL_BASE` stay in this file and are +# pulled back function-locally inside cookbook.py. +from src.tools.cookbook import ( # noqa: F401 + do_download_model, do_serve_model, do_list_served_models, + do_stop_served_model, do_tail_serve_output, do_list_downloads, + do_cancel_download, do_search_hf_models, do_adopt_served_model, + do_list_cookbook_servers, do_list_serve_presets, do_serve_preset, + do_list_cached_models, + _cookbook_servers, _resolve_cookbook_host, _cookbook_env_for_host, + _infer_serve_port, _infer_serve_host, _ensure_served_endpoint, + _cookbook_register_task, _cookbook_apply_retry_suggestion, + _scan_running_model_processes, _cookbook_kill_session, + _MODEL_PROCESS_PATTERNS, +) logger = logging.getLogger(__name__) - -def _string_arg(value: Any) -> str: - return "" if value is None else str(value).strip() - - -def _validate_cookbook_ssh_target(remote_host: Any, ssh_port: Any = "") -> tuple[str, str]: - remote = validate_remote_host(_string_arg(remote_host) or None) or "" - sport = validate_ssh_port(_string_arg(ssh_port) or None) or "" - return remote, sport - # --------------------------------------------------------------------------- # Active email state # --------------------------------------------------------------------------- @@ -934,1456 +938,6 @@ def _internal_headers(owner: Optional[str] = None) -> Dict[str, str]: return headers -async def _cookbook_servers() -> Dict[str, Any]: - """Return the cookbook's configured servers + the currently-selected - default host. Shape: {default_host, hosts: [{host, platform, env, envPath}]}. - The agent uses this to route downloads/serves to the right machine - instead of silently defaulting to localhost.""" - import httpx - try: - async with httpx.AsyncClient(timeout=10) as client: - r = await client.get(f"{_INTERNAL_BASE}/api/cookbook/state", headers=_internal_headers()) - state = r.json() if r.headers.get("content-type", "").startswith("application/json") else {} - except Exception: - return {"default_host": "", "hosts": []} - env = (state or {}).get("env") or {} - if not isinstance(env, dict): - return {"default_host": "", "hosts": []} - hosts = [] - for s in (env.get("servers") or []): - if isinstance(s, dict): - hosts.append({ - "name": s.get("name") or "", - "host": s.get("host") or "", # "" = Local - "platform": s.get("platform") or "", - "env": s.get("env") or "", - "envPath": s.get("envPath") or "", - "port": s.get("port") or "", - }) - return {"default_host": env.get("remoteHost") or "", "hosts": hosts} - - -async def _resolve_cookbook_host(name_or_host: str) -> str: - """Map a friendly server NAME ('gpu-box', 'workstation') to its ssh host - string ('user@192.0.2.10'). If the input already looks like an - ssh host (contains '@' or matches a known host), or matches nothing, - it's returned unchanged. 'local'/'localhost' → '' (this machine).""" - if not name_or_host: - return "" - val = name_or_host.strip() - low = val.lower() - if low in ("local", "localhost", "this machine", "here"): - return "" - servers = await _cookbook_servers() - # Exact host match → already an ssh host - for h in servers.get("hosts") or []: - if h.get("host") and h["host"] == val: - return val - # Name match (case-insensitive) - for h in servers.get("hosts") or []: - if (h.get("name") or "").lower() == low: - return h.get("host") or "" # "" for the Local entry - # Substring name match as a fallback - for h in servers.get("hosts") or []: - if low and low in (h.get("name") or "").lower(): - return h.get("host") or "" - # No match — assume the caller passed a raw host/alias; return as-is - # (ssh can resolve aliases from ~/.ssh/config). - return val - - -async def _cookbook_env_for_host(host: str) -> Dict[str, Any]: - """Resolve env_prefix / gpus / platform / hf_token / ssh_port for a - given host by looking it up in cookbook_state.env. The user - configures these per-host in the Cookbook UI; without them, raw - `vllm serve …` fails with 'command not found' because vLLM lives - inside a venv that has to be sourced first. - - Returns a dict with keys ready to drop into the /api/model/serve - payload: env_prefix, gpus, platform, hf_token, ssh_port. - Falls back to the top-level env settings if no per-host entry exists. - """ - import httpx - headers = _internal_headers() - state: Dict[str, Any] = {} - try: - async with httpx.AsyncClient(timeout=10) as client: - r = await client.get(f"{_INTERNAL_BASE}/api/cookbook/state", headers=headers) - state = r.json() if r.headers.get("content-type", "").startswith("application/json") else {} - except Exception as e: - logger.debug(f"cookbook env lookup failed for host={host!r}: {e}") - return {} - if not isinstance(state, dict): - return {} - env_root = state.get("env") or {} - if not isinstance(env_root, dict): - return {} - - # Per-host entry takes precedence over top-level. - per_host: Dict[str, Any] = {} - for s in (env_root.get("servers") or []): - if isinstance(s, dict) and (s.get("host") or "") == (host or ""): - per_host = s - break - - env_kind = per_host.get("env") or env_root.get("env") or "none" - env_path = per_host.get("envPath") or env_root.get("envPath") or "" - platform = per_host.get("platform") or env_root.get("platform") or "linux" - ssh_port = per_host.get("sshPort") or env_root.get("sshPort") or "" - - env_prefix = "" - if env_kind == "venv" and env_path: - if platform == "windows": - activate = env_path if env_path.endswith("\\Scripts\\Activate.ps1") else env_path.rstrip("\\") + "\\Scripts\\Activate.ps1" - env_prefix = f"& {activate}" - else: - activate = env_path if env_path.endswith("/bin/activate") else env_path.rstrip("/") + "/bin/activate" - env_prefix = f"source {activate}" - elif env_kind == "conda" and env_path: - if platform == "windows": - env_prefix = f"conda activate {env_path}" - else: - env_prefix = f'eval "$(conda shell.bash hook)" && conda activate {env_path}' - - from routes.cookbook_helpers import load_stored_hf_token - return { - "env_prefix": env_prefix, - "env_type": env_kind, - "env_path": env_path, - "gpus": env_root.get("gpus") or "", - "platform": platform, - "hf_token": load_stored_hf_token(), - "ssh_port": ssh_port, - } - - -def _infer_serve_port(cmd: str) -> int: - """Infer likely listen port from a serve command.""" - if not cmd: - return 8080 - m = re.search(r"--port\\s+(\\d+)", cmd) - if m: - try: - return int(m.group(1)) - except Exception: - pass - m = re.search(r"OLLAMA_HOST=[^\\s]*?:(\\d+)", cmd) - if m: - try: - return int(m.group(1)) - except Exception: - pass - if "ollama" in cmd: - return 11434 - return 8080 - - -def _infer_serve_host(host: str | None) -> tuple[str, bool]: - """Return (host, container_local) for registering a served endpoint.""" - if not (host or "").strip(): - return "localhost", True - base_host = host.split("@", 1)[-1] if "@" in host else host - return base_host, False - - -async def _ensure_served_endpoint( - *, - model: str, - cmd: str, - host: str | None, -) -> Dict[str, Any]: - """Register/fetch a model endpoint for a running serve session.""" - import httpx - endpoint_host, container_local = _infer_serve_host(host) - port = _infer_serve_port(cmd) - base_url = f"http://{endpoint_host}:{port}/v1" - short_name = model.split("/")[-1] if "/" in model else model - is_image = "diffusion_server.py" in (cmd or "") - payload = { - "name": short_name if not is_image else f"{short_name} (image)", - "base_url": base_url, - "skip_probe": "true", - "model_type": "image" if is_image else "llm", - "container_local": "true" if container_local else "false", - } - try: - async with httpx.AsyncClient(timeout=30) as client: - resp = await client.post( - f"{_INTERNAL_BASE}/api/model-endpoints", - data=payload, - headers=_internal_headers(), - ) - data = resp.json() if resp.headers.get("content-type", "").startswith("application/json") else {} - if resp.status_code >= 400: - logger.debug( - f"ensure endpoint failed for {model!r}: status={resp.status_code} data={data}" - ) - return {"added": False, "endpoint_id": "", "base_url": base_url, "error": data} - ep_id = data.get("id") if isinstance(data, dict) else None - return { - "added": bool(ep_id), - "endpoint_id": ep_id or "", - "base_url": base_url, - "data": data, - } - except Exception as e: - logger.debug(f"ensure endpoint exception for {model!r}: {e}") - return {"added": False, "endpoint_id": "", "base_url": base_url, "error": str(e)} - - -async def _cookbook_register_task( - session_id: str, - model: str, - host: str, - cmd: str, - task_type: str = "serve", - *, - endpoint_added: bool = False, - endpoint_id: str = "", -) -> bool: - """Append a task entry to cookbook_state.json after the agent - launches via /api/model/serve or /api/model/download. The route - spawns tmux but leaves state-writing to the UI; the agent needs to - do that here so the task shows up in the Cookbook tab. - Returns True on success, False if the write failed (best-effort).""" - import httpx - import time as _time - headers = _internal_headers() - try: - async with httpx.AsyncClient(timeout=10) as client: - r = await client.get(f"{_INTERNAL_BASE}/api/cookbook/state", headers=headers) - state = r.json() if r.headers.get("content-type", "").startswith("application/json") else {} - except Exception as e: - logger.debug(f"cookbook state read failed: {e}") - return False - if not isinstance(state, dict): - state = {} - tasks = state.get("tasks") if isinstance(state.get("tasks"), list) else [] - # Skip duplicate (same session_id) entries - if any(isinstance(t, dict) and t.get("sessionId") == session_id for t in tasks): - return True - display_name = model.split("/")[-1] if "/" in model else model - # Placeholder output — the cookbook UI's CSS hides empty
- # via `.cookbook-output-pre:empty { display: none }`, so an
- # empty-string output makes the expansion appear broken until the
- # frontend's reconnect-polling loop captures tmux output. A short
- # placeholder gives the user something to see immediately; it gets
- # replaced by real tmux output within a few seconds.
- target = f"{host}:" if host else "local:"
- placeholder = (
- f"Launched via agent — waiting for tmux output…\n"
- f" session: {session_id}\n"
- f" target: {target}{(cmd.split() or [''])[0] if cmd else ''}\n"
- f" cmd: {cmd[:200]}{'…' if len(cmd) > 200 else ''}"
- )
- tasks.append({
- "id": session_id,
- "sessionId": session_id,
- "name": display_name,
- "modelId": model,
- "type": task_type,
- "status": "running",
- "output": placeholder,
- "ts": int(_time.time() * 1000),
- "payload": {"repo_id": model, "remote_host": host or "", "_cmd": cmd},
- "remoteHost": host or "",
- "sshPort": "",
- "platform": "linux",
- "_serveReady": False,
- "_endpointAdded": bool(endpoint_added),
- "_endpointId": endpoint_id or "",
- })
- state["tasks"] = tasks
- try:
- async with httpx.AsyncClient(timeout=10) as client:
- r = await client.post(f"{_INTERNAL_BASE}/api/cookbook/state",
- json=state, headers=headers)
- return r.status_code < 400
- except Exception as e:
- logger.debug(f"cookbook state write failed: {e}")
- return False
-
-
-
-
-# Patterns for detecting running LLM/diffusion model servers outside
-# the cookbook's task tracker. Each entry: (label, substring-list).
-# Match is case-insensitive against the FULL cmdline. First-match wins.
-_MODEL_PROCESS_PATTERNS = [
- ("vLLM", ["vllm.entrypoints", "vllm serve", "/vllm/", "vllm-openai"]),
- ("SGLang", ["sglang.launch_server", "sglang/launch_server"]),
- ("llama.cpp", ["llama-server", "llama_cpp_server", "llamacppserver"]),
- ("Ollama", ["ollama serve", "ollama runner", "/ollama "]),
- ("ComfyUI", ["comfyui/main.py", "/ComfyUI/main.py", "ComfyUI"]),
- ("A1111 WebUI", ["stable-diffusion-webui/webui", "stable-diffusion-webui/launch", "webui.sh"]),
- ("Fooocus", ["Fooocus/entry_with_update", "Fooocus/launch"]),
- ("InvokeAI", ["invokeai-web", "invokeai.app", "invokeai/api_app"]),
- ("Forge WebUI", ["stable-diffusion-webui-forge", "forge/webui"]),
- ("SD.Next", ["automatic/webui", "sd.next"]),
- ("TGI", ["text-generation-launcher", "text_generation_launcher"]),
- ("Aphrodite", ["aphrodite.endpoints", "aphrodite-engine"]),
- ("Triton", ["tritonserver", "triton/main"]),
- ("Diffusers", ["diffusers.pipelines", "StableDiffusionInpaintPipeline", "DiffusionPipeline"]),
-]
-
-
-def _cookbook_apply_retry_suggestion(cmd: str, suggestion: Dict[str, Any]) -> str:
- """Apply a structured Cookbook diagnosis suggestion to a serve command."""
- if not cmd or not suggestion:
- return cmd
- op = suggestion.get("op")
- if op == "append":
- arg = (suggestion.get("arg") or "").strip()
- if not arg or arg in cmd:
- return cmd
- return f"{cmd.rstrip()} {arg}"
- if op == "remove":
- flag = (suggestion.get("flag") or "").strip()
- if not flag:
- return cmd
- return re.sub(rf"\s*{re.escape(flag)}(?:\s+\S+)?", "", cmd).strip()
- if op == "replace":
- flag = (suggestion.get("flag") or "").strip()
- value = str(suggestion.get("value") or "").strip()
- if not flag or not value:
- return cmd
- repl = f"{flag} {value}"
- if re.search(rf"(^|\s){re.escape(flag)}(\s+\S+)?", cmd):
- return re.sub(rf"(^|\s){re.escape(flag)}(?:\s+\S+)?", lambda m: (m.group(1) or " ") + repl, cmd).strip()
- return f"{cmd.rstrip()} {repl}"
- return cmd
-
-
-def _scan_running_model_processes() -> List[Dict[str, Any]]:
- """Scan /proc for running model server processes. Linux-only; returns
- [] on other platforms or if /proc isn't accessible. Each match returns
- a dict shaped like a cookbook task so the caller can merge cleanly.
- """
- import os
- if not os.path.isdir("/proc"):
- return []
- out: List[Dict[str, Any]] = []
- seen_keys = set()
- try:
- for pid_dir in os.listdir("/proc"):
- if not pid_dir.isdigit():
- continue
- try:
- with open(f"/proc/{pid_dir}/cmdline", "rb") as f:
- raw = f.read()
- except (OSError, PermissionError):
- continue
- if not raw:
- continue
- # cmdline is NUL-separated; join with spaces for matching/display
- cmdline = raw.replace(b"\x00", b" ").decode("utf-8", errors="replace").strip()
- if not cmdline:
- continue
- lower = cmdline.lower()
- for label, needles in _MODEL_PROCESS_PATTERNS:
- if any(n.lower() in lower for n in needles):
- # Dedupe by (label, first-arg) — multi-worker servers
- # spawn N processes; only show one row per server.
- key = (label, cmdline.split(" ")[0])
- if key in seen_keys:
- break
- seen_keys.add(key)
- # Try to pluck a model name out of the cmdline.
- model = ""
- for tok in cmdline.split():
- if "/" in tok and any(s in tok.lower() for s in (
- "model", "checkpoint", ".safetensors", ".gguf", ".bin", "huggingface"
- )):
- model = tok
- break
- out.append({
- "session_id": f"pid-{pid_dir}",
- "model": model or label,
- "phase": "running (external)",
- "type": "serve",
- "remote": "local",
- "pid": int(pid_dir),
- "label": label,
- "cmdline_preview": cmdline[:140] + ("…" if len(cmdline) > 140 else ""),
- "external": True,
- })
- break
- except Exception as e:
- logger.debug(f"_scan_running_model_processes failed: {e}")
- return out
-
-
-async def do_download_model(content: str, owner: Optional[str] = None) -> Dict:
- """Download a HuggingFace model via the cookbook API."""
- import httpx
- try:
- args = _parse_tool_args(content)
- except ValueError:
- return {"error": "Invalid JSON arguments", "exit_code": 1}
- repo_id = args.get("repo_id", "")
- if not repo_id:
- return {"error": "repo_id is required", "exit_code": 1}
- host = (args.get("host") or "").strip()
- # Resolve a friendly server NAME ("gpu-box") to its ssh host string.
- if host:
- host = await _resolve_cookbook_host(host)
- # No host specified → default to the cookbook's currently-selected
- # server rather than silently downloading to localhost (which is
- # usually NOT where the GPUs / model cache live).
- _host_defaulted = False
- if not host and not args.get("local"):
- _servers = await _cookbook_servers()
- if _servers.get("default_host"):
- host = _servers["default_host"]
- _host_defaulted = True
- backend = (args.get("backend") or "").strip().lower()
- if not backend and "/" not in repo_id and ":" in repo_id:
- backend = "ollama"
- payload = {"repo_id": repo_id}
- if backend:
- payload["backend"] = backend
- if host:
- payload["remote_host"] = host
- if args.get("include"):
- payload["include"] = args["include"]
- # Per-host env_prefix + hf_token from cookbook_state (same as serve).
- env_cfg = await _cookbook_env_for_host(host)
- if env_cfg.get("env_prefix"): payload["env_prefix"] = env_cfg["env_prefix"]
- if env_cfg.get("hf_token"): payload["hf_token"] = env_cfg["hf_token"]
- if env_cfg.get("platform"): payload["platform"] = env_cfg["platform"]
- if env_cfg.get("ssh_port"): payload["ssh_port"] = env_cfg["ssh_port"]
- try:
- async with httpx.AsyncClient(timeout=30) as client:
- resp = await client.post(f"{_INTERNAL_BASE}/api/model/download",
- json=payload, headers=_internal_headers())
- data = resp.json()
- if data.get("ok"):
- sid = data.get("session_id", "?")
- registered = await _cookbook_register_task(
- session_id=sid, model=repo_id, host=host,
- cmd=(f"ollama pull {repo_id}" if backend == "ollama" else f"hf download {repo_id}"),
- task_type="download",
- )
- note = "" if registered else " (state-write failed — download may not show in UI)"
- where = host or "local"
- default_note = " (defaulted to the cookbook's selected server — pass host= or local=true to override)" if _host_defaulted else ""
- return {
- "output": f"Download started: {repo_id} on {where} (session: {sid}){note}{default_note}",
- "session_id": sid,
- "host": host,
- "task_type": "download",
- "phase": "running",
- "exit_code": 0,
- }
- return {"error": data.get("error", "Download failed"), "exit_code": 1}
- except Exception as e:
- return {"error": str(e), "exit_code": 1}
-
-
-async def do_serve_model(content: str, owner: Optional[str] = None) -> Dict:
- """Start serving a model via the cookbook API."""
- import httpx
- try:
- args = _parse_tool_args(content)
- except ValueError:
- return {"error": "Invalid JSON arguments", "exit_code": 1}
- repo_id = args.get("repo_id", "")
- cmd = args.get("cmd", "")
- if not repo_id or not cmd:
- return {"error": "repo_id and cmd are required", "exit_code": 1}
- host = (args.get("host") or "").strip()
- if host:
- host = await _resolve_cookbook_host(host)
- if not host and not args.get("local"):
- _servers = await _cookbook_servers()
- if _servers.get("default_host"):
- host = _servers["default_host"]
- payload = {"repo_id": repo_id, "cmd": cmd}
- if host:
- payload["remote_host"] = host
- # Resolve per-host env settings (venv/conda activate, gpus,
- # hf_token, platform, ssh_port) from cookbook_state — same path
- # the UI uses. Without env_prefix, `vllm serve …` lands in a shell
- # without the user's venv and fails 'command not found'.
- env_cfg = await _cookbook_env_for_host(host)
- # Rewrite bare `vllm` / `python3` leading tokens to the venv's absolute
- # binary path when the target host has a venv configured. SSH non-
- # interactive shells often leave ~/.local/bin ahead of the venv bin on
- # PATH even with the venv activated, so `vllm serve` finds the wrong
- # binary and crashes early (e.g. compute_89 torch ABI errors on an old
- # user-site torch). This mirrors what static/js/cookbook.js does in
- # _buildServeCmd for the UI launch path.
- env_path = (env_cfg.get("env_path") or "").rstrip("/")
- env_type = (env_cfg.get("env_type") or env_cfg.get("env") or "").lower()
- if env_type == "venv" and env_path:
- venv_bin = f"{env_path}/bin"
- # Match the FIRST shell-token: skip leading KEY=VAL env-var prefixes
- # (CUDA_VISIBLE_DEVICES=… VLLM_USE_FLASHINFER_SAMPLER=…) before the binary.
- import re as _re3
- tokens = cmd.split()
- idx = 0
- env_re = _re3.compile(r"^[A-Za-z_][A-Za-z0-9_]*=")
- while idx < len(tokens) and env_re.match(tokens[idx]):
- idx += 1
- if idx < len(tokens):
- head = tokens[idx]
- if head in ("vllm", "python3", "python"):
- tokens[idx] = f"{venv_bin}/{head}"
- cmd = " ".join(tokens)
- payload["cmd"] = cmd
- if env_cfg.get("env_prefix"): payload["env_prefix"] = env_cfg["env_prefix"]
- if env_cfg.get("gpus"): payload["gpus"] = env_cfg["gpus"]
- if env_cfg.get("hf_token"): payload["hf_token"] = env_cfg["hf_token"]
- if env_cfg.get("platform"): payload["platform"] = env_cfg["platform"]
- if env_cfg.get("ssh_port"): payload["ssh_port"] = env_cfg["ssh_port"]
- try:
- async with httpx.AsyncClient(timeout=30) as client:
- resp = await client.post(f"{_INTERNAL_BASE}/api/model/serve",
- json=payload, headers=_internal_headers())
- data = resp.json()
- if data.get("ok"):
- sid = data.get("session_id", "?")
- endpoint_id = data.get("endpoint_id") or ""
- if endpoint_id:
- endpoint_added = True
- else:
- endpoint_meta = await _ensure_served_endpoint(model=repo_id, cmd=cmd, host=host)
- endpoint_added = bool(endpoint_meta.get("added"))
- endpoint_id = endpoint_meta.get("endpoint_id", "") or endpoint_id
- registered = await _cookbook_register_task(
- session_id=sid, model=repo_id,
- host=host, cmd=cmd, task_type="serve",
- endpoint_added=endpoint_added, endpoint_id=endpoint_id or "",
- )
- note = "" if registered else " (state-write failed — task may not show in UI)"
- return {
- "output": f"Serving {repo_id} (session: {sid}){note}",
- "session_id": sid,
- "task_type": "serve",
- "phase": "running",
- "host": host,
- "endpoint_id": endpoint_id,
- "exit_code": 0,
- }
- # FastAPI HTTPException puts the message under `detail`, not `error`.
- # Surface BOTH so the agent sees "Invalid characters in cmd" (from
- # _validate_serve_cmd rejecting `&&`/`source`/`cd`) instead of
- # the generic "Serve failed", which leaves it with nothing to act on.
- err_msg = data.get("error") or data.get("detail") or "Serve failed"
- hint = ""
- if isinstance(err_msg, str) and "cmd" in err_msg.lower():
- hint = (" — the cmd must START with an allowlisted binary "
- "(vllm, python3, llama-server, ollama, sglang, lmdeploy, node, npx). "
- "Do NOT prefix with `cd …`, `source …`, or chain with `&&`. "
- "env_prefix (e.g. `source ~/qwen35-env/bin/activate`) is added "
- "automatically from the host's saved venv settings.")
- return {"error": f"{err_msg}{hint}", "exit_code": 1}
- except Exception as e:
- return {"error": str(e), "exit_code": 1}
-
-
-async def do_list_served_models(content: str, owner: Optional[str] = None) -> Dict:
- """List running model servers — merges cookbook-tracked tasks with
- a /proc scan for externally-launched LLM/diffusion processes
- (vLLM, sglang, llama.cpp, Ollama, ComfyUI, A1111, Fooocus, etc.)."""
- import asyncio
- import httpx
-
- # Cookbook-tracked tasks (best-effort; don't fail the whole call if
- # this is unreachable).
- cookbook_tasks: List[Dict[str, Any]] = []
- try:
- async with httpx.AsyncClient(timeout=15) as client:
- resp = await client.get(f"{_INTERNAL_BASE}/api/cookbook/tasks/status",
- headers=_internal_headers())
- cookbook_tasks = (resp.json() or {}).get("tasks") or []
- except Exception as e:
- logger.debug(f"cookbook tasks/status fetch failed: {e}")
-
- # Local process scan — runs in a worker thread so it doesn't block.
- external = await asyncio.to_thread(_scan_running_model_processes)
-
- merged: List[Dict[str, Any]] = []
- merged.extend(cookbook_tasks)
- # Dedupe: if a process's PID is already mentioned by a cookbook task
- # (cookbook may track the PID via session_id), skip it.
- cookbook_pids = set()
- for t in cookbook_tasks:
- if isinstance(t, dict) and t.get("pid"):
- cookbook_pids.add(t["pid"])
- for p in external:
- if p.get("pid") not in cookbook_pids:
- merged.append(p)
-
- if not merged:
- return {
- "output": "No model servers currently running (cookbook task tracker empty; /proc scan found no vLLM / sglang / llama.cpp / Ollama / ComfyUI / A1111 / Fooocus / InvokeAI / TGI / Aphrodite / Triton / Diffusers processes).",
- "exit_code": 0,
- }
-
- # Sort so the agent sees what's actually LIVE first. Stopped/error/
- # completed tasks are mostly historical noise — they shouldn't lead
- # the list when something is genuinely serving.
- _ORDER = {
- "ready": 0, "running": 1, "loading": 1, "warming": 1,
- "queued": 2, "starting": 2,
- "error": 5, "crashed": 5, "failed": 5,
- "stopped": 6, "killed": 6, "cancelled": 6, "canceled": 6,
- "done": 7, "completed": 7, "finished": 7,
- }
- def _rank(t: Dict[str, Any]) -> int:
- phase = (t.get("phase") or t.get("status") or "unknown").lower()
- return _ORDER.get(phase, 3)
- merged.sort(key=_rank)
-
- cb_n = len(cookbook_tasks)
- ext_n = len(external)
- live_n = sum(1 for t in merged if _rank(t) <= 2)
- header = []
- if cb_n:
- header.append(f"{cb_n} cookbook-tracked")
- if ext_n:
- header.append(f"{ext_n} external")
- if live_n:
- header.insert(0, f"{live_n} LIVE")
- lines = [f"Running: {', '.join(header)}."]
- for t in merged:
- phase = t.get("phase") or t.get("status", "unknown")
- model = t.get("model", "?")
- remote = t.get("remote", "local")
- sid = t.get("session_id", "?")
- tag = " [external]" if t.get("external") else ""
- lines.append(f"- {model}: {phase} ({remote}, session: {sid}){tag}")
- diag = t.get("diagnosis") if isinstance(t.get("diagnosis"), dict) else None
- if diag:
- lines.append(f" diagnosis: {diag.get('message')}")
- cmd = t.get("cmd") or ""
- suggestions = diag.get("suggestions") or []
- actionable = []
- for s in suggestions[:3]:
- label = s.get("label") or "retry"
- retry_cmd = _cookbook_apply_retry_suggestion(cmd, s)
- if retry_cmd and retry_cmd != cmd and s.get("op") in {"append", "replace", "remove"}:
- actionable.append(f"{label}: `{retry_cmd}`")
- else:
- actionable.append(label)
- if actionable:
- lines.append(" suggestions: " + " | ".join(actionable))
- if t.get("status") == "error" and t.get("output_tail"):
- tail = str(t.get("output_tail") or "").strip()
- if tail:
- # Prefer a window around a Python traceback if one exists,
- # falling back to the last 30 lines. The previous 6-line
- # tail showed only the post-crash bash prompt / neofetch
- # banner ("Locale: C / Ubuntu_Odysseus ❯") — useless for
- # diagnosis. The traceback we want is usually 50-200 lines
- # earlier in the buffer.
- _tail_lines = tail.splitlines()
- _shown = _tail_lines[-30:]
- for _i, _ln in enumerate(_tail_lines):
- if "Traceback (most recent call last)" in _ln or "ERROR" in _ln or "Error:" in _ln:
- _shown = _tail_lines[_i:_i + 40]
- break
- lines.append(" recent log:")
- for line in _shown:
- lines.append(f" {line[:220]}")
- if t.get("external") and t.get("cmdline_preview"):
- lines.append(f" cmd: {t['cmdline_preview']}")
- return {"output": "\n".join(lines), "tasks": merged, "exit_code": 0}
-
-
-async def _cookbook_kill_session(session_id: str, *, remote_host: str = "",
- ssh_port: str = "", verb: str = "Stopped") -> Dict:
- """Kill a cookbook tmux session — remote-aware — AND mark the task
- stopped in cookbook_state.json. Shared by stop_served_model and
- cancel_download so both behave identically.
-
- Resolves the task's remote host from state when not passed in. A
- local-only `tmux kill-session` silently no-ops for remote tasks —
- that's the bug where "stop the download" appeared to work but the
- download kept running on the remote host.
- """
- import httpx
- import shlex
- headers = _internal_headers()
- remote = remote_host or ""
- sport = ssh_port or ""
-
- # Look up the task's host + confirm it exists in state.
- state: Dict[str, Any] = {}
- try:
- async with httpx.AsyncClient(timeout=10) as client:
- resp = await client.get(f"{_INTERNAL_BASE}/api/cookbook/state", headers=headers)
- state = resp.json() or {}
- except Exception as e:
- logger.debug(f"cookbook state lookup failed for {session_id}: {e}")
- if not isinstance(state, dict):
- state = {}
- matched = None
- for t in (state.get("tasks") or []):
- if isinstance(t, dict) and (t.get("sessionId") == session_id or t.get("id") == session_id):
- matched = t
- if not remote:
- remote = t.get("remoteHost") or ""
- if not sport:
- sport = t.get("sshPort") or ""
- break
-
- if remote:
- try:
- remote, sport = _validate_cookbook_ssh_target(remote, sport)
- except HTTPException as e:
- return {"error": str(getattr(e, "detail", e)), "exit_code": 1}
- _pf = f"-p {shlex.quote(str(sport))} " if sport and str(sport) != "22" else ""
- cmd = (
- f"ssh -o ConnectTimeout=5 -o StrictHostKeyChecking=no "
- f"{_pf}{shlex.quote(remote)} 'tmux kill-session -t {shlex.quote(session_id)}'"
- )
- target_label = f"{session_id} on {remote}"
- else:
- cmd = f"tmux kill-session -t {shlex.quote(session_id)}"
- target_label = session_id
-
- try:
- async with httpx.AsyncClient(timeout=15) as client:
- resp = await client.post(f"{_INTERNAL_BASE}/api/shell/exec",
- json={"command": cmd}, headers=headers)
- if resp.status_code >= 400:
- return {"error": f"shell/exec returned HTTP {resp.status_code}: {resp.text[:200]}", "exit_code": 1}
- try:
- data = resp.json()
- except Exception:
- data = {}
- kill_failed = isinstance(data, dict) and data.get("exit_code") not in (None, 0)
- kill_err = ((data.get("stderr") or data.get("error") or "").strip() if isinstance(data, dict) else "")
- # "no server running" / "can't find session" means it was already
- # gone — treat as success (the goal is "not running").
- already_gone = any(s in kill_err.lower() for s in ("no server running", "can't find session", "session not found"))
- if kill_failed and not already_gone:
- return {"error": f"Failed to {verb.lower()} {target_label}: {kill_err or 'kill-session returned non-zero'}", "exit_code": 1}
-
- # Update state: mark stopped (so the UI + list reflect reality).
- if matched is not None:
- try:
- matched["status"] = "stopped"
- async with httpx.AsyncClient(timeout=10) as client:
- await client.post(f"{_INTERNAL_BASE}/api/cookbook/state",
- json=state, headers=headers)
- except Exception as e:
- logger.debug(f"failed to mark {session_id} stopped in state: {e}")
-
- suffix = " (was already gone)" if already_gone else ""
- return {"output": f"{verb} {target_label}{suffix}", "exit_code": 0}
- except Exception as e:
- return {"error": str(e), "exit_code": 1}
-
-
-async def do_stop_served_model(content: str, owner: Optional[str] = None) -> Dict:
- """Stop a running model server by killing its tmux session (remote-aware)."""
- try:
- args = _parse_tool_args(content)
- except ValueError:
- return {"error": "Invalid JSON arguments", "exit_code": 1}
- session_id = args.get("session_id", "")
- if not session_id:
- return {"error": "session_id is required", "exit_code": 1}
- return await _cookbook_kill_session(
- session_id,
- remote_host=args.get("remote_host") or args.get("host") or "",
- ssh_port=args.get("ssh_port") or "",
- verb="Stopped server",
- )
-
-
-async def do_tail_serve_output(content: str, owner: Optional[str] = None) -> Dict:
- """Capture the last N lines of a cookbook task's tmux pane — remote-aware.
-
- Used by the agent to debug a failed/stuck serve: list_served_models tells
- you the task is `crashed`, this tool returns the actual stderr/traceback
- so the agent can match it against a known fix (compute_89 nvcc mismatch,
- flashinfer version mismatch, OOM, missing kernels, etc.) and decide
- whether to relaunch via serve_model with new flags.
- """
- import httpx
- import shlex
- try:
- args = _parse_tool_args(content)
- except ValueError:
- return {"error": "Invalid JSON arguments", "exit_code": 1}
- session_id = (args.get("session_id") or "").strip()
- if not session_id:
- return {"error": "session_id is required (from list_served_models)", "exit_code": 1}
- import re as _re
- if not _re.fullmatch(r"[a-zA-Z0-9_-]+", session_id):
- return {"error": "Invalid session_id format", "exit_code": 1}
- try:
- tail = int(args.get("tail") or 400)
- except (TypeError, ValueError):
- tail = 400
- tail = max(20, min(tail, 4000))
- headers = _internal_headers()
- remote = _string_arg(args.get("remote_host") or args.get("host"))
- sport = _string_arg(args.get("ssh_port"))
- # Resolve host from cookbook state if caller didn't pass one — same
- # lookup _cookbook_kill_session uses.
- if not remote:
- state: Dict[str, Any] = {}
- try:
- async with httpx.AsyncClient(timeout=10) as client:
- resp = await client.get(f"{_INTERNAL_BASE}/api/cookbook/state", headers=headers)
- state = resp.json() or {}
- except Exception as e:
- logger.debug(f"cookbook state lookup failed for {session_id}: {e}")
- if isinstance(state, dict):
- for t in (state.get("tasks") or []):
- if isinstance(t, dict) and (t.get("sessionId") == session_id or t.get("id") == session_id):
- remote = t.get("remoteHost") or ""
- if not sport:
- sport = t.get("sshPort") or ""
- break
- if remote:
- try:
- remote, sport = _validate_cookbook_ssh_target(remote, sport)
- except HTTPException as e:
- return {"error": str(getattr(e, "detail", e)), "exit_code": 1}
-
- # Prefer the persisted /tmp/odysseus-tmux/SESSION.log file over the
- # live tmux pane. The pane is what the user would see scrolling on
- # their screen — including the post-crash neofetch banner and the
- # idle bash prompt that overwrites the actual traceback the moment
- # vllm exits. The log file is the raw stdout/stderr of the wrapped
- # process and survives the crash unchanged. We only fall back to
- # the pane when the log file doesn't exist (older sessions launched
- # before the tmux+tee wrapper was added).
- log_path = f"/tmp/odysseus-tmux/{session_id}.log"
- pane_inner = f"tmux capture-pane -t {shlex.quote(session_id)} -p -S -{tail} 2>/dev/null"
- file_inner = f"tail -n {tail} {shlex.quote(log_path)} 2>/dev/null"
- inner = (
- f"if [ -s {shlex.quote(log_path)} ]; then {file_inner}; "
- f"else {pane_inner}; fi"
- )
- if remote:
- _pf = f"-p {shlex.quote(str(sport))} " if sport and str(sport) != "22" else ""
- cmd = (
- f"ssh -o ConnectTimeout=5 -o StrictHostKeyChecking=no "
- f"{_pf}{shlex.quote(remote)} {shlex.quote(inner)}"
- )
- host_label = remote
- else:
- cmd = inner
- host_label = "local"
- try:
- async with httpx.AsyncClient(timeout=20) as client:
- resp = await client.post(f"{_INTERNAL_BASE}/api/shell/exec",
- json={"command": cmd}, headers=headers)
- if resp.status_code >= 400:
- return {"error": f"shell/exec returned HTTP {resp.status_code}: {resp.text[:200]}", "exit_code": 1}
- data = resp.json() if resp.content else {}
- output_text = (data.get("stdout") or "").strip()
- stderr_text = (data.get("stderr") or "").strip()
- rc = data.get("exit_code")
- if rc not in (None, 0) and not output_text:
- already_gone = any(s in (stderr_text or "").lower() for s in ("no server running", "can't find session", "session not found"))
- if already_gone:
- return {"output": f"Tmux session {session_id} on {host_label} is gone (task already exited).", "exit_code": 0, "session_id": session_id, "host": host_label}
- return {"error": f"capture-pane failed on {host_label}: {stderr_text or f'exit {rc}'}", "exit_code": 1}
- # Dedupe download-progress noise. A 100-shard HF download produces
- # tens of thousands of `model-NN-of-MM.safetensors: 91%|...` lines
- # that all look the same to the agent and drown the actual error.
- # Keep only one sample per (file, decile-percent) bucket.
- import re as _re2
- lines = output_text.splitlines()
- dedup_lines = []
- seen_progress = set()
- progress_re = _re2.compile(r"^([\w./\-]+):\s+(\d+)%")
- for ln in lines:
- m = progress_re.match(ln.strip())
- if m:
- key = (m.group(1), int(m.group(2)) // 10) # bucket by 10%
- if key in seen_progress:
- continue
- seen_progress.add(key)
- dedup_lines.append(ln)
- output_text = "\n".join(dedup_lines)
- # Hard cap so the agent doesn't blow its token budget.
- MAX_CHARS = 8000
- if len(output_text) > MAX_CHARS:
- output_text = "…(earlier output truncated)…\n" + output_text[-MAX_CHARS:]
- return {
- "output": output_text or "(empty pane)",
- "session_id": session_id,
- "host": host_label,
- "tail_lines": tail,
- "exit_code": 0,
- }
- except Exception as e:
- return {"error": str(e), "exit_code": 1}
-
-
-async def do_list_downloads(content: str, owner: Optional[str] = None) -> Dict:
- """List in-flight model downloads (filters /api/cookbook/tasks/status to type=download)."""
- import httpx
- try:
- async with httpx.AsyncClient(timeout=15) as client:
- resp = await client.get(f"{_INTERNAL_BASE}/api/cookbook/tasks/status",
- headers=_internal_headers())
- data = resp.json()
- tasks = [t for t in data.get("tasks", []) if (t.get("type") or "").lower() == "download"]
- if not tasks:
- return {"output": "No downloads in progress.", "exit_code": 0}
- lines = [f"{len(tasks)} download(s) in progress:"]
- for t in tasks:
- phase = t.get("phase") or t.get("status", "unknown")
- model = t.get("model", "?")
- pct = t.get("progress_percent") or t.get("percent")
- pct_str = f" {pct}%" if pct is not None else ""
- lines.append(f"- {model}: {phase}{pct_str} ({t.get('remote', 'local')}, session: {t.get('session_id', '?')})")
- return {"output": "\n".join(lines), "downloads": tasks, "exit_code": 0}
- except Exception as e:
- return {"error": str(e), "exit_code": 1}
-
-
-async def do_cancel_download(content: str, owner: Optional[str] = None) -> Dict:
- """Cancel a model download by killing its tmux session (remote-aware)."""
- try:
- args = _parse_tool_args(content)
- except ValueError:
- return {"error": "Invalid JSON arguments", "exit_code": 1}
- session_id = args.get("session_id", "")
- if not session_id:
- return {"error": "session_id is required (from list_downloads)", "exit_code": 1}
- return await _cookbook_kill_session(
- session_id,
- remote_host=args.get("remote_host") or args.get("host") or "",
- ssh_port=args.get("ssh_port") or "",
- verb="Cancelled download",
- )
-
-
-async def do_search_hf_models(content: str, owner: Optional[str] = None) -> Dict:
- """Search HuggingFace via the cookbook /api/cookbook/hf-latest endpoint."""
- import httpx
- try:
- args = _parse_tool_args(content)
- except ValueError:
- return {"error": "Invalid JSON arguments", "exit_code": 1}
- query = args.get("query", "") or args.get("search", "")
- limit = args.get("limit", 10)
- params: Dict[str, str] = {}
- if query:
- params["search"] = query
- if limit:
- params["limit"] = str(limit)
- try:
- async with httpx.AsyncClient(timeout=30) as client:
- resp = await client.get(f"{_INTERNAL_BASE}/api/cookbook/hf-latest",
- params=params, headers=_internal_headers())
- data = resp.json()
- models = data.get("models") if isinstance(data, dict) else data
- if not models:
- return {"output": f"No models found for query: {query!r}", "exit_code": 0}
- lines = [f"Found {len(models)} model(s) for {query!r}:" if query else f"{len(models)} model(s):"]
- for m in models[:limit if isinstance(limit, int) else 10]:
- if isinstance(m, dict):
- name = m.get("repo_id") or m.get("modelId") or m.get("id") or "?"
- dl = m.get("downloads")
- size = m.get("size_gb") or m.get("needed_vram_gb")
- bits = []
- if size:
- bits.append(f"~{size}GB")
- if dl:
- bits.append(f"{dl} downloads")
- tail = f" ({', '.join(bits)})" if bits else ""
- lines.append(f"- {name}{tail}")
- else:
- lines.append(f"- {m}")
- return {"output": "\n".join(lines), "models": models, "exit_code": 0}
- except Exception as e:
- return {"error": str(e), "exit_code": 1}
-
-
-async def do_adopt_served_model(content: str, owner: Optional[str] = None) -> Dict:
- """Register an externally-launched model server (bash + tmux + ssh, or
- anything else) into the Cookbook so it appears in list_served_models,
- can be stopped via stop_served_model, and is added to the user's
- endpoint list for chat. Use this when a model was started outside
- the cookbook's serve flow but you want first-class tracking.
-
- Args (JSON):
- host: "user@192.0.2.10" (or omit for localhost)
- tmux_session: "minimax-m27" (existing tmux session name)
- model: "cyankiwi/MiniMax-M2.7-AWQ-4bit" (HF repo or display name)
- port: 8000
- name: optional display name (defaults to model basename)
- add_endpoint: bool (default true) — also register as a chat endpoint
- """
- import httpx
- import shlex
- try:
- args = _parse_tool_args(content)
- except ValueError:
- return {"error": "Invalid JSON arguments", "exit_code": 1}
-
- host = _string_arg(args.get("host") or args.get("remote_host"))
- sess = (args.get("tmux_session") or args.get("session_id") or "").strip()
- model = (args.get("model") or args.get("repo_id") or "").strip()
- port = args.get("port") or 8000
- display_name = (args.get("name") or "").strip() or (model.split("/")[-1] if "/" in model else model)
- add_endpoint = args.get("add_endpoint", True)
-
- if not sess or not model:
- return {"error": "tmux_session and model are required", "exit_code": 1}
-
- # Verify tmux session exists on the target host
- if host:
- try:
- host, _ = _validate_cookbook_ssh_target(host)
- except HTTPException as e:
- return {"error": str(getattr(e, "detail", e)), "exit_code": 1}
-
- headers = _internal_headers()
- if host:
- check = f"ssh -o ConnectTimeout=5 -o StrictHostKeyChecking=no {shlex.quote(host)} 'tmux has-session -t {shlex.quote(sess)} 2>&1'"
- else:
- check = f"tmux has-session -t {shlex.quote(sess)} 2>&1"
- try:
- async with httpx.AsyncClient(timeout=10) as client:
- r = await client.post(f"{_INTERNAL_BASE}/api/shell/exec",
- json={"command": check}, headers=headers)
- data = r.json() if r.headers.get("content-type", "").startswith("application/json") else {}
- if r.status_code >= 400 or (data.get("exit_code") not in (None, 0)):
- err = (data.get("stderr") or data.get("error") or r.text[:200]).strip()
- return {"error": f"tmux session {sess!r} not found on {host or 'local'}: {err}", "exit_code": 1}
- except Exception as e:
- return {"error": f"verify failed: {e}", "exit_code": 1}
-
- # Best-effort health check — does port respond to /v1/models?
- if host:
- health_cmd = f"ssh -o ConnectTimeout=5 {shlex.quote(host)} 'curl -s -m 3 http://localhost:{int(port)}/v1/models'"
- else:
- health_cmd = f"curl -s -m 3 http://localhost:{int(port)}/v1/models"
- server_up = False
- try:
- async with httpx.AsyncClient(timeout=10) as client:
- r = await client.post(f"{_INTERNAL_BASE}/api/shell/exec",
- json={"command": health_cmd}, headers=headers)
- body = (r.json() or {}).get("stdout", "") if r.headers.get("content-type", "").startswith("application/json") else ""
- server_up = '"data"' in body or '"object"' in body
- except Exception:
- pass
-
- # Read+modify+write cookbook state. APPEND a task entry; do NOT
- # overwrite the whole file (that'd nuke presets).
- try:
- async with httpx.AsyncClient(timeout=10) as client:
- r = await client.get(f"{_INTERNAL_BASE}/api/cookbook/state", headers=headers)
- state = r.json() if r.headers.get("content-type", "").startswith("application/json") else {}
- except Exception as e:
- return {"error": f"could not read cookbook state: {e}", "exit_code": 1}
- if not isinstance(state, dict):
- state = {}
- tasks = state.get("tasks") if isinstance(state.get("tasks"), list) else []
- # Skip duplicate adopt of the same session
- if any(isinstance(t, dict) and t.get("sessionId") == sess for t in tasks):
- adopted_already = True
- else:
- adopted_already = False
- import time as _time
- new_task = {
- "id": sess,
- "sessionId": sess,
- "name": display_name,
- "type": "serve",
- "status": "running",
- "output": (
- f"Adopted externally-launched session {sess!r} on {host or 'local'}.\n"
- "Reconnect polling will start streaming tmux output shortly."
- ),
- "ts": int(_time.time() * 1000),
- "payload": {"repo_id": model, "remote_host": host or "", "_cmd": "(adopted — launched outside cookbook)"},
- "remoteHost": host or "",
- "sshPort": "",
- "platform": "linux",
- "_serveReady": bool(server_up),
- "_endpointAdded": False,
- "_adoptedExternally": True,
- }
- tasks.append(new_task)
- state["tasks"] = tasks
- try:
- async with httpx.AsyncClient(timeout=10) as client:
- await client.post(f"{_INTERNAL_BASE}/api/cookbook/state",
- json=state, headers=headers)
- except Exception as e:
- return {"error": f"could not save cookbook state: {e}", "exit_code": 1}
-
- # Optionally register as a chat endpoint
- endpoint_msg = ""
- if add_endpoint:
- # Resolve host to a URL. SSH form `user@host` → just take host.
- host_only = host.split("@", 1)[-1] if host else "localhost"
- endpoint_url = f"http://{host_only}:{int(port)}/v1"
- try:
- from src.tool_implementations import do_manage_endpoints # avoid forward ref issues
- except Exception:
- do_manage_endpoints = None
- if do_manage_endpoints is not None:
- try:
- ep_result = await do_manage_endpoints(json.dumps({
- "action": "add",
- "name": display_name,
- "endpoint_url": endpoint_url,
- "is_local": False,
- }), owner=owner)
- if isinstance(ep_result, dict) and not ep_result.get("error"):
- endpoint_msg = f" Endpoint {endpoint_url} added as {display_name!r}."
- else:
- endpoint_msg = f" Endpoint registration skipped: {(ep_result or {}).get('error', 'unknown')}"
- except Exception as e:
- endpoint_msg = f" Endpoint registration failed: {e}"
-
- return {
- "output": (
- f"Adopted session {sess!r} ({model}) on {host or 'local'}:{port}. "
- + ("Already tracked — skipped state write. " if adopted_already else "Added to cookbook state. ")
- + ("Server responding. " if server_up else "Server not responding yet (still loading?). ")
- + endpoint_msg
- ).strip(),
- "session_id": sess,
- "host": host,
- "port": int(port),
- "server_up": server_up,
- "exit_code": 0,
- }
-
-
-async def do_list_cookbook_servers(content: str, owner: Optional[str] = None) -> Dict:
- """List the cookbook's configured servers and which one is the
- current default. Use this to decide where to download/serve a
- model, or to show the user options when the target host is
- ambiguous."""
- servers = await _cookbook_servers()
- hosts = servers.get("hosts") or []
- default = servers.get("default_host") or ""
- if not hosts:
- return {"output": "No cookbook servers configured. Downloads/serves default to localhost.", "servers": [], "default_host": "", "exit_code": 0}
- # Resolve which server is the default by its friendly name too.
- default_name = next((h.get("name") for h in hosts if h.get("host") == default and h.get("name")), default or "local")
- lines = [f"{len(hosts)} configured server(s) (default: {default_name}):"]
- for h in hosts:
- name = h.get("name") or "(unnamed)"
- host = h.get("host") or "local"
- mark = " ← default" if h.get("host") == default else ""
- env_bit = f" [{h.get('env')}: {h.get('envPath')}]" if h.get("env") and h.get("env") != "none" else ""
- plat = f" ({h.get('platform')})" if h.get("platform") else ""
- lines.append(f"- {name} → {host}{plat}{env_bit}{mark}")
- lines.append("\nRefer to servers by their name (e.g. download_model with host=\"gpu-box\").")
- return {"output": "\n".join(lines), "servers": hosts, "default_host": default, "exit_code": 0}
-
-
-async def do_list_serve_presets(content: str, owner: Optional[str] = None) -> Dict:
- """List saved serve presets from cookbook_state.json. Each preset
- is a launch template: name, model, host, port, cmd. Use this to
- discover what the user has previously configured so you can
- launch by preset instead of fabricating tmux commands."""
- import httpx
- try:
- async with httpx.AsyncClient(timeout=10) as client:
- resp = await client.get(f"{_INTERNAL_BASE}/api/cookbook/state",
- headers=_internal_headers())
- state = resp.json() or {}
- except Exception as e:
- return {"error": f"Failed to fetch cookbook state: {e}", "exit_code": 1}
-
- presets = state.get("presets") or []
- if not presets:
- return {
- "output": "No serve presets saved. Tell the user to save one from the Cookbook UI first, or use serve_model with explicit repo_id + cmd + host.",
- "presets": [],
- "exit_code": 0,
- }
- lines = [f"{len(presets)} saved serve preset(s):"]
- for p in presets:
- if not isinstance(p, dict):
- continue
- name = p.get("name", "?")
- model = p.get("model") or p.get("modelId") or "?"
- host = p.get("host") or p.get("remoteHost") or "local"
- port = p.get("port", "")
- cmd = (p.get("cmd") or "").strip()
- bits = [f"- {name}: {model}", f"host={host}"]
- if port:
- bits.append(f"port={port}")
- lines.append(" ".join(bits))
- if cmd:
- cmd_preview = cmd if len(cmd) < 140 else cmd[:140] + "…"
- lines.append(f" cmd: {cmd_preview}")
- return {"output": "\n".join(lines), "presets": presets, "exit_code": 0}
-
-
-async def do_serve_preset(content: str, owner: Optional[str] = None) -> Dict:
- """Launch a saved serve preset by name. Resolves the preset's
- cmd + host + model from cookbook_state.json, then calls the
- standard model/serve endpoint. Saves the agent from having to
- reinvent tmux launch commands the user already saved."""
- import httpx
- try:
- args = _parse_tool_args(content)
- except ValueError:
- return {"error": "Invalid JSON arguments", "exit_code": 1}
- name = (args.get("name") or args.get("preset") or "").strip()
- if not name:
- return {"error": "name (preset name) is required. Call list_serve_presets to see what's available.", "exit_code": 1}
-
- try:
- async with httpx.AsyncClient(timeout=10) as client:
- resp = await client.get(f"{_INTERNAL_BASE}/api/cookbook/state",
- headers=_internal_headers())
- state = resp.json() or {}
- except Exception as e:
- return {"error": f"Failed to fetch cookbook state: {e}", "exit_code": 1}
-
- presets = state.get("presets") or []
- # Match by exact name first, then case-insensitive substring.
- chosen = None
- lname = name.lower()
- for p in presets:
- if isinstance(p, dict) and (p.get("name") or "").lower() == lname:
- chosen = p
- break
- if chosen is None:
- for p in presets:
- if isinstance(p, dict) and lname in (p.get("name") or "").lower():
- chosen = p
- break
- if chosen is None:
- sample = ", ".join((p.get("name") or "?") for p in presets[:8] if isinstance(p, dict))
- return {"error": f"No preset matching {name!r}. Available: {sample or '(none)'}", "exit_code": 1}
-
- repo_id = chosen.get("model") or chosen.get("modelId") or ""
- cmd = (chosen.get("cmd") or "").strip()
- host = chosen.get("host") or chosen.get("remoteHost") or ""
- if not repo_id or not cmd:
- return {"error": f"Preset {chosen.get('name')!r} is missing model or cmd — can't launch.", "exit_code": 1}
-
- payload: Dict[str, Any] = {"repo_id": repo_id, "cmd": cmd}
- if host:
- payload["remote_host"] = host
- # Resolve per-host env settings the same way the UI does — pulls
- # env_prefix (source ~/vllm-env/bin/activate), gpus, hf_token,
- # etc. from cookbook_state.env so launches actually find vllm.
- env_cfg = await _cookbook_env_for_host(host)
- if env_cfg.get("env_prefix"): payload["env_prefix"] = env_cfg["env_prefix"]
- if env_cfg.get("gpus"): payload["gpus"] = env_cfg["gpus"]
- if env_cfg.get("hf_token"): payload["hf_token"] = env_cfg["hf_token"]
- if env_cfg.get("platform"): payload["platform"] = env_cfg["platform"]
- if env_cfg.get("ssh_port"):
- payload["ssh_port"] = env_cfg["ssh_port"]
-
- try:
- async with httpx.AsyncClient(timeout=30) as client:
- resp = await client.post(f"{_INTERNAL_BASE}/api/model/serve",
- json=payload, headers=_internal_headers())
- data = resp.json()
- if data.get("ok"):
- sid = data.get("session_id", "?")
- endpoint_id = data.get("endpoint_id") or ""
- if endpoint_id:
- endpoint_added = True
- else:
- endpoint_meta = await _ensure_served_endpoint(model=repo_id, cmd=cmd, host=host)
- endpoint_added = bool(endpoint_meta.get("added"))
- endpoint_id = endpoint_meta.get("endpoint_id", "") or endpoint_id
- registered = await _cookbook_register_task(
- session_id=sid, model=repo_id, host=host,
- cmd=cmd, task_type="serve",
- endpoint_added=endpoint_added, endpoint_id=endpoint_id or "",
- )
- note = "" if registered else " (state-write failed — task may not show in UI)"
- return {"output": f"Launched preset {chosen.get('name')!r}: {repo_id} on {host or 'local'} (session: {sid}){note}", "session_id": sid, "host": host, "endpoint_id": endpoint_id, "exit_code": 0}
- return {"error": data.get("error", "Serve failed"), "exit_code": 1}
- except Exception as e:
- return {"error": str(e), "exit_code": 1}
-
-
-async def do_list_cached_models(content: str, owner: Optional[str] = None) -> Dict:
- """List models already cached locally and/or on remote hosts.
-
- With no `host` arg, scans EVERY configured Cookbook server (and local)
- and aggregates — so the agent sees the full inventory in one call
- instead of having to query each server individually.
- """
- import httpx
- try:
- args = _parse_tool_args(content) if content.strip() else {}
- except ValueError:
- return {"error": "Invalid JSON arguments", "exit_code": 1}
- raw_host = (args.get("host") or "").strip()
- headers = _internal_headers()
-
- async def _scan_one(host_label: str, host_val: str, ssh_port: str = "",
- platform: str = "", model_dir: str = "") -> list:
- """Hit /api/model/cached for one host; tag each returned model with its source."""
- p: Dict[str, str] = {}
- if host_val:
- p["host"] = host_val
- # Caller-provided override beats per-server config beats nothing.
- if args.get("model_dir"):
- p["model_dir"] = args["model_dir"]
- elif model_dir:
- p["model_dir"] = model_dir
- if ssh_port:
- p["ssh_port"] = ssh_port
- elif args.get("ssh_port"):
- p["ssh_port"] = str(args["ssh_port"])
- if platform:
- p["platform"] = platform
- elif args.get("platform"):
- p["platform"] = args["platform"]
- try:
- async with httpx.AsyncClient(timeout=60) as client:
- resp = await client.get(f"{_INTERNAL_BASE}/api/model/cached",
- params=p, headers=headers)
- data = resp.json()
- ms = data.get("models", []) if isinstance(data, dict) else (data or [])
- for m in ms:
- m["host"] = host_label or "local"
- return ms or []
- except Exception as e:
- logger.debug(f"list_cached_models scan({host_label}) failed: {e}")
- return []
-
- # When the caller specifies a host explicitly, scan only that one (old behaviour).
- # Otherwise iterate every configured server + local so the agent doesn't
- # have to repeat the call per server.
- try:
- # Pull configured servers from cookbook state (used for resolving
- # modelDirs both when caller specifies a host and when we scan all).
- servers: list = []
- try:
- async with httpx.AsyncClient(timeout=10) as client:
- st = await client.get(f"{_INTERNAL_BASE}/api/cookbook/state", headers=headers)
- st_data = st.json() if st.headers.get("content-type", "").startswith("application/json") else {}
- servers = (st_data.get("env", {}) or {}).get("servers") or []
- except Exception as e:
- logger.debug(f"server list fetch failed: {e}")
- st_data = {}
-
- def _dirs_for(server_record: Dict[str, Any]) -> str:
- """Comma-joined modelDirs from a saved server record (Settings).
-
- Filters out the HF cache (~/.cache/huggingface/hub) — the backend
- scan script always scans it by default, so re-passing it as an
- extra model_dir is redundant AND confuses some path-handling
- edge cases where the extra dir suppresses the deeper scan.
- We only need to forward the NON-default dirs (e.g. /mnt/HADES/models).
- """
- mds = server_record.get("modelDirs") if isinstance(server_record, dict) else None
- HF_DEFAULTS = {"~/.cache/huggingface/hub", "~/.cache/huggingface"}
- if isinstance(mds, list):
- extras = [d for d in mds if isinstance(d, str) and d.strip() and d.strip() not in HF_DEFAULTS]
- return ",".join(extras)
- if isinstance(mds, str) and mds.strip() not in HF_DEFAULTS:
- return mds
- return ""
-
- if raw_host:
- host = await _resolve_cookbook_host(raw_host)
- # Find this host's saved record so its modelDirs apply too.
- srv = next(
- (s for s in servers if isinstance(s, dict)
- and (s.get("name") == raw_host or s.get("host") == host or s.get("host") == raw_host)),
- {},
- )
- models = await _scan_one(raw_host, host, model_dir=_dirs_for(srv))
- else:
- # Always include local. Local's saved record is the one with no host.
- local_srv = next((s for s in servers if isinstance(s, dict) and not (s.get("host") or "").strip()), {})
- scans: list = [_scan_one("local", "", model_dir=_dirs_for(local_srv))]
- for s in servers:
- if not isinstance(s, dict):
- continue
- name = s.get("name") or s.get("host")
- host_val = s.get("host") or ""
- if not host_val:
- continue
- scans.append(_scan_one(
- name,
- host_val,
- ssh_port=str(s.get("port") or ""),
- platform=s.get("platform") or "",
- model_dir=_dirs_for(s),
- ))
- results = await asyncio.gather(*scans, return_exceptions=False)
- # Dedupe by (host, repo_id) — same model could appear in both HF cache + Ollama list.
- seen = set()
- models: list = []
- for batch in results:
- for m in batch:
- key = (m.get("host", ""), m.get("repo_id", ""))
- if key in seen:
- continue
- seen.add(key)
- models.append(m)
- if not models:
- # Cache scans can miss models downloaded into the HF default cache
- # when the server has no explicit model_dir configured. Surface
- # completed Cookbook download tasks so the agent doesn't conclude
- # a model is absent and re-download it.
- downloaded = []
- try:
- async with httpx.AsyncClient(timeout=10) as client:
- st = await client.get(f"{_INTERNAL_BASE}/api/cookbook/state", headers=headers)
- state = st.json() if st.headers.get("content-type", "").startswith("application/json") else {}
- for t in (state.get("tasks") or []):
- if not isinstance(t, dict) or t.get("type") != "download":
- continue
- if (t.get("status") or "").lower() not in {"done", "completed"}:
- continue
- task_host = t.get("remoteHost") or (t.get("payload") or {}).get("remote_host") or ""
- if raw_host and task_host != raw_host:
- continue
- repo = t.get("modelId") or t.get("repoId") or (t.get("payload") or {}).get("repo_id") or t.get("name")
- if repo and repo not in downloaded:
- downloaded.append(repo)
- except Exception:
- downloaded = []
- host_str = f" on {raw_host}" if raw_host else ""
- if downloaded:
- lines = [f"No cache paths were detected{host_str}, but Cookbook has completed download task(s):"]
- lines.extend(f"- {repo} — downloaded via Cookbook task" for repo in downloaded)
- return {"output": "\n".join(lines), "models": [{"repo_id": repo, "source": "cookbook_task"} for repo in downloaded], "exit_code": 0}
- return {"output": f"No cached models found{host_str}.", "exit_code": 0}
- # Multi-host scan: group by host so the agent sees inventory per server.
- # Single-host scan: flat list (matches old output shape).
- if raw_host:
- lines = [f"{len(models)} cached model(s) on {raw_host}:"]
- for m in models:
- name = m.get("repo_id", "?")
- sz = m.get("size") or (f"{m.get('size_bytes', 0) / (1024**3):.1f}GB" if m.get("size_bytes") else "")
- inc = " (incomplete)" if m.get("has_incomplete") else ""
- kind = " [diffusion]" if m.get("is_diffusion") else ""
- lines.append(f"- {name}{kind} — {sz}{inc}")
- else:
- from collections import defaultdict as _dd
- by_host = _dd(list)
- for m in models:
- by_host[m.get("host", "local")].append(m)
- lines = [f"{len(models)} cached model(s) across {len(by_host)} server(s):"]
- for host_name in sorted(by_host.keys()):
- lines.append(f"\n[{host_name}]")
- for m in by_host[host_name]:
- name = m.get("repo_id", "?")
- sz = m.get("size") or (f"{m.get('size_bytes', 0) / (1024**3):.1f}GB" if m.get("size_bytes") else "")
- inc = " (incomplete)" if m.get("has_incomplete") else ""
- kind = " [diffusion]" if m.get("is_diffusion") else ""
- backend = f" ({m.get('backend')})" if m.get("backend") else ""
- lines.append(f"- {name}{kind}{backend} — {sz}{inc}")
- return {"output": "\n".join(lines), "models": models, "exit_code": 0}
- except Exception as e:
- return {"error": str(e), "exit_code": 1}
-
-
# ── Gallery tools ──
async def do_edit_image(content: str, owner: Optional[str] = None) -> Dict:
diff --git a/src/tools/__init__.py b/src/tools/__init__.py
index d5d25831e..50c427baf 100644
--- a/src/tools/__init__.py
+++ b/src/tools/__init__.py
@@ -9,3 +9,15 @@ from src.tools.system import ( # noqa: F401
do_manage_mcp, do_manage_webhooks, do_manage_tokens, do_manage_settings,
do_api_call, do_app_api,
)
+from src.tools.cookbook import ( # noqa: F401
+ do_download_model, do_serve_model, do_list_served_models,
+ do_stop_served_model, do_tail_serve_output, do_list_downloads,
+ do_cancel_download, do_search_hf_models, do_adopt_served_model,
+ do_list_cookbook_servers, do_list_serve_presets, do_serve_preset,
+ do_list_cached_models,
+ _cookbook_servers, _resolve_cookbook_host, _cookbook_env_for_host,
+ _infer_serve_port, _infer_serve_host, _ensure_served_endpoint,
+ _cookbook_register_task, _cookbook_apply_retry_suggestion,
+ _scan_running_model_processes, _cookbook_kill_session,
+ _MODEL_PROCESS_PATTERNS,
+)
diff --git a/src/tools/cookbook.py b/src/tools/cookbook.py
new file mode 100644
index 000000000..22213d162
--- /dev/null
+++ b/src/tools/cookbook.py
@@ -0,0 +1,1498 @@
+"""Cookbook (model serving) tool domain — slice 1 (#4082/#4071).
+
+Download, serve, list, stop, tail, search, adopt and cache HuggingFace / model
+serving operations, plus their private helpers. Extracted verbatim from
+``src.tool_implementations.py``; this module is re-exported by the facade.
+
+Shared constants ``_internal_headers`` and ``_INTERNAL_BASE`` still live in
+``src.tool_implementations`` (used by many domains); each function that needs
+them does a function-local import to avoid a top-level circular dependency,
+matching the system-domain split.
+"""
+import asyncio
+import json
+import logging
+import re
+from typing import Any, Dict, List, Optional
+
+from fastapi import HTTPException
+from routes._validators import validate_remote_host, validate_ssh_port
+
+from src.tools._common import _parse_tool_args
+
+logger = logging.getLogger(__name__)
+
+
+def _string_arg(value: Any) -> str:
+ return "" if value is None else str(value).strip()
+
+
+def _validate_cookbook_ssh_target(remote_host: Any, ssh_port: Any = "") -> tuple[str, str]:
+ remote = validate_remote_host(_string_arg(remote_host) or None) or ""
+ sport = validate_ssh_port(_string_arg(ssh_port) or None) or ""
+ return remote, sport
+
+
+async def _cookbook_servers() -> Dict[str, Any]:
+ """Return the cookbook's configured servers + the currently-selected
+ default host. Shape: {default_host, hosts: [{host, platform, env, envPath}]}.
+ The agent uses this to route downloads/serves to the right machine
+ instead of silently defaulting to localhost."""
+ from src.tool_implementations import _internal_headers, _INTERNAL_BASE # shared, lives in facade
+ import httpx
+ try:
+ async with httpx.AsyncClient(timeout=10) as client:
+ r = await client.get(f"{_INTERNAL_BASE}/api/cookbook/state", headers=_internal_headers())
+ state = r.json() if r.headers.get("content-type", "").startswith("application/json") else {}
+ except Exception:
+ return {"default_host": "", "hosts": []}
+ env = (state or {}).get("env") or {}
+ if not isinstance(env, dict):
+ return {"default_host": "", "hosts": []}
+ hosts = []
+ for s in (env.get("servers") or []):
+ if isinstance(s, dict):
+ hosts.append({
+ "name": s.get("name") or "",
+ "host": s.get("host") or "", # "" = Local
+ "platform": s.get("platform") or "",
+ "env": s.get("env") or "",
+ "envPath": s.get("envPath") or "",
+ "port": s.get("port") or "",
+ })
+ return {"default_host": env.get("remoteHost") or "", "hosts": hosts}
+
+
+async def _resolve_cookbook_host(name_or_host: str) -> str:
+ """Map a friendly server NAME ('gpu-box', 'workstation') to its ssh host
+ string ('user@192.0.2.10'). If the input already looks like an
+ ssh host (contains '@' or matches a known host), or matches nothing,
+ it's returned unchanged. 'local'/'localhost' → '' (this machine)."""
+ if not name_or_host:
+ return ""
+ val = name_or_host.strip()
+ low = val.lower()
+ if low in ("local", "localhost", "this machine", "here"):
+ return ""
+ servers = await _cookbook_servers()
+ # Exact host match → already an ssh host
+ for h in servers.get("hosts") or []:
+ if h.get("host") and h["host"] == val:
+ return val
+ # Name match (case-insensitive)
+ for h in servers.get("hosts") or []:
+ if (h.get("name") or "").lower() == low:
+ return h.get("host") or "" # "" for the Local entry
+ # Substring name match as a fallback
+ for h in servers.get("hosts") or []:
+ if low and low in (h.get("name") or "").lower():
+ return h.get("host") or ""
+ # No match — assume the caller passed a raw host/alias; return as-is
+ # (ssh can resolve aliases from ~/.ssh/config).
+ return val
+
+
+async def _cookbook_env_for_host(host: str) -> Dict[str, Any]:
+ """Resolve env_prefix / gpus / platform / hf_token / ssh_port for a
+ given host by looking it up in cookbook_state.env. The user
+ configures these per-host in the Cookbook UI; without them, raw
+ `vllm serve …` fails with 'command not found' because vLLM lives
+ inside a venv that has to be sourced first.
+
+ Returns a dict with keys ready to drop into the /api/model/serve
+ payload: env_prefix, gpus, platform, hf_token, ssh_port.
+ Falls back to the top-level env settings if no per-host entry exists.
+ """
+ from src.tool_implementations import _internal_headers, _INTERNAL_BASE # shared, lives in facade
+ import httpx
+ headers = _internal_headers()
+ state: Dict[str, Any] = {}
+ try:
+ async with httpx.AsyncClient(timeout=10) as client:
+ r = await client.get(f"{_INTERNAL_BASE}/api/cookbook/state", headers=headers)
+ state = r.json() if r.headers.get("content-type", "").startswith("application/json") else {}
+ except Exception as e:
+ logger.debug(f"cookbook env lookup failed for host={host!r}: {e}")
+ return {}
+ if not isinstance(state, dict):
+ return {}
+ env_root = state.get("env") or {}
+ if not isinstance(env_root, dict):
+ return {}
+
+ # Per-host entry takes precedence over top-level.
+ per_host: Dict[str, Any] = {}
+ for s in (env_root.get("servers") or []):
+ if isinstance(s, dict) and (s.get("host") or "") == (host or ""):
+ per_host = s
+ break
+
+ env_kind = per_host.get("env") or env_root.get("env") or "none"
+ env_path = per_host.get("envPath") or env_root.get("envPath") or ""
+ platform = per_host.get("platform") or env_root.get("platform") or "linux"
+ ssh_port = per_host.get("sshPort") or env_root.get("sshPort") or ""
+
+ env_prefix = ""
+ if env_kind == "venv" and env_path:
+ if platform == "windows":
+ activate = env_path if env_path.endswith("\\Scripts\\Activate.ps1") else env_path.rstrip("\\") + "\\Scripts\\Activate.ps1"
+ env_prefix = f"& {activate}"
+ else:
+ activate = env_path if env_path.endswith("/bin/activate") else env_path.rstrip("/") + "/bin/activate"
+ env_prefix = f"source {activate}"
+ elif env_kind == "conda" and env_path:
+ if platform == "windows":
+ env_prefix = f"conda activate {env_path}"
+ else:
+ env_prefix = f'eval "$(conda shell.bash hook)" && conda activate {env_path}'
+
+ from routes.cookbook_helpers import load_stored_hf_token
+ return {
+ "env_prefix": env_prefix,
+ "env_type": env_kind,
+ "env_path": env_path,
+ "gpus": env_root.get("gpus") or "",
+ "platform": platform,
+ "hf_token": load_stored_hf_token(),
+ "ssh_port": ssh_port,
+ }
+
+
+def _infer_serve_port(cmd: str) -> int:
+ """Infer likely listen port from a serve command."""
+ if not cmd:
+ return 8080
+ m = re.search(r"--port\\s+(\\d+)", cmd)
+ if m:
+ try:
+ return int(m.group(1))
+ except Exception:
+ pass
+ m = re.search(r"OLLAMA_HOST=[^\\s]*?:(\\d+)", cmd)
+ if m:
+ try:
+ return int(m.group(1))
+ except Exception:
+ pass
+ if "ollama" in cmd:
+ return 11434
+ return 8080
+
+
+def _infer_serve_host(host: str | None) -> tuple[str, bool]:
+ """Return (host, container_local) for registering a served endpoint."""
+ if not (host or "").strip():
+ return "localhost", True
+ base_host = host.split("@", 1)[-1] if "@" in host else host
+ return base_host, False
+
+
+async def _ensure_served_endpoint(
+ *,
+ model: str,
+ cmd: str,
+ host: str | None,
+) -> Dict[str, Any]:
+ """Register/fetch a model endpoint for a running serve session."""
+ from src.tool_implementations import _internal_headers, _INTERNAL_BASE # shared, lives in facade
+ import httpx
+ endpoint_host, container_local = _infer_serve_host(host)
+ port = _infer_serve_port(cmd)
+ base_url = f"http://{endpoint_host}:{port}/v1"
+ short_name = model.split("/")[-1] if "/" in model else model
+ is_image = "diffusion_server.py" in (cmd or "")
+ payload = {
+ "name": short_name if not is_image else f"{short_name} (image)",
+ "base_url": base_url,
+ "skip_probe": "true",
+ "model_type": "image" if is_image else "llm",
+ "container_local": "true" if container_local else "false",
+ }
+ try:
+ async with httpx.AsyncClient(timeout=30) as client:
+ resp = await client.post(
+ f"{_INTERNAL_BASE}/api/model-endpoints",
+ data=payload,
+ headers=_internal_headers(),
+ )
+ data = resp.json() if resp.headers.get("content-type", "").startswith("application/json") else {}
+ if resp.status_code >= 400:
+ logger.debug(
+ f"ensure endpoint failed for {model!r}: status={resp.status_code} data={data}"
+ )
+ return {"added": False, "endpoint_id": "", "base_url": base_url, "error": data}
+ ep_id = data.get("id") if isinstance(data, dict) else None
+ return {
+ "added": bool(ep_id),
+ "endpoint_id": ep_id or "",
+ "base_url": base_url,
+ "data": data,
+ }
+ except Exception as e:
+ logger.debug(f"ensure endpoint exception for {model!r}: {e}")
+ return {"added": False, "endpoint_id": "", "base_url": base_url, "error": str(e)}
+
+
+async def _cookbook_register_task(
+ session_id: str,
+ model: str,
+ host: str,
+ cmd: str,
+ task_type: str = "serve",
+ *,
+ endpoint_added: bool = False,
+ endpoint_id: str = "",
+) -> bool:
+ """Append a task entry to cookbook_state.json after the agent
+ launches via /api/model/serve or /api/model/download. The route
+ spawns tmux but leaves state-writing to the UI; the agent needs to
+ do that here so the task shows up in the Cookbook tab.
+ Returns True on success, False if the write failed (best-effort)."""
+ from src.tool_implementations import _internal_headers, _INTERNAL_BASE # shared, lives in facade
+ import httpx
+ import time as _time
+ headers = _internal_headers()
+ try:
+ async with httpx.AsyncClient(timeout=10) as client:
+ r = await client.get(f"{_INTERNAL_BASE}/api/cookbook/state", headers=headers)
+ state = r.json() if r.headers.get("content-type", "").startswith("application/json") else {}
+ except Exception as e:
+ logger.debug(f"cookbook state read failed: {e}")
+ return False
+ if not isinstance(state, dict):
+ state = {}
+ tasks = state.get("tasks") if isinstance(state.get("tasks"), list) else []
+ # Skip duplicate (same session_id) entries
+ if any(isinstance(t, dict) and t.get("sessionId") == session_id for t in tasks):
+ return True
+ display_name = model.split("/")[-1] if "/" in model else model
+ # Placeholder output — the cookbook UI's CSS hides empty
+ # via `.cookbook-output-pre:empty { display: none }`, so an
+ # empty-string output makes the expansion appear broken until the
+ # frontend's reconnect-polling loop captures tmux output. A short
+ # placeholder gives the user something to see immediately; it gets
+ # replaced by real tmux output within a few seconds.
+ target = f"{host}:" if host else "local:"
+ placeholder = (
+ f"Launched via agent — waiting for tmux output…\n"
+ f" session: {session_id}\n"
+ f" target: {target}{(cmd.split() or [''])[0] if cmd else ''}\n"
+ f" cmd: {cmd[:200]}{'…' if len(cmd) > 200 else ''}"
+ )
+ tasks.append({
+ "id": session_id,
+ "sessionId": session_id,
+ "name": display_name,
+ "modelId": model,
+ "type": task_type,
+ "status": "running",
+ "output": placeholder,
+ "ts": int(_time.time() * 1000),
+ "payload": {"repo_id": model, "remote_host": host or "", "_cmd": cmd},
+ "remoteHost": host or "",
+ "sshPort": "",
+ "platform": "linux",
+ "_serveReady": False,
+ "_endpointAdded": bool(endpoint_added),
+ "_endpointId": endpoint_id or "",
+ })
+ state["tasks"] = tasks
+ try:
+ async with httpx.AsyncClient(timeout=10) as client:
+ r = await client.post(f"{_INTERNAL_BASE}/api/cookbook/state",
+ json=state, headers=headers)
+ return r.status_code < 400
+ except Exception as e:
+ logger.debug(f"cookbook state write failed: {e}")
+ return False
+
+
+
+
+# Patterns for detecting running LLM/diffusion model servers outside
+# the cookbook's task tracker. Each entry: (label, substring-list).
+# Match is case-insensitive against the FULL cmdline. First-match wins.
+_MODEL_PROCESS_PATTERNS = [
+ ("vLLM", ["vllm.entrypoints", "vllm serve", "/vllm/", "vllm-openai"]),
+ ("SGLang", ["sglang.launch_server", "sglang/launch_server"]),
+ ("llama.cpp", ["llama-server", "llama_cpp_server", "llamacppserver"]),
+ ("Ollama", ["ollama serve", "ollama runner", "/ollama "]),
+ ("ComfyUI", ["comfyui/main.py", "/ComfyUI/main.py", "ComfyUI"]),
+ ("A1111 WebUI", ["stable-diffusion-webui/webui", "stable-diffusion-webui/launch", "webui.sh"]),
+ ("Fooocus", ["Fooocus/entry_with_update", "Fooocus/launch"]),
+ ("InvokeAI", ["invokeai-web", "invokeai.app", "invokeai/api_app"]),
+ ("Forge WebUI", ["stable-diffusion-webui-forge", "forge/webui"]),
+ ("SD.Next", ["automatic/webui", "sd.next"]),
+ ("TGI", ["text-generation-launcher", "text_generation_launcher"]),
+ ("Aphrodite", ["aphrodite.endpoints", "aphrodite-engine"]),
+ ("Triton", ["tritonserver", "triton/main"]),
+ ("Diffusers", ["diffusers.pipelines", "StableDiffusionInpaintPipeline", "DiffusionPipeline"]),
+]
+
+
+def _cookbook_apply_retry_suggestion(cmd: str, suggestion: Dict[str, Any]) -> str:
+ """Apply a structured Cookbook diagnosis suggestion to a serve command."""
+ if not cmd or not suggestion:
+ return cmd
+ op = suggestion.get("op")
+ if op == "append":
+ arg = (suggestion.get("arg") or "").strip()
+ if not arg or arg in cmd:
+ return cmd
+ return f"{cmd.rstrip()} {arg}"
+ if op == "remove":
+ flag = (suggestion.get("flag") or "").strip()
+ if not flag:
+ return cmd
+ return re.sub(rf"\s*{re.escape(flag)}(?:\s+\S+)?", "", cmd).strip()
+ if op == "replace":
+ flag = (suggestion.get("flag") or "").strip()
+ value = str(suggestion.get("value") or "").strip()
+ if not flag or not value:
+ return cmd
+ repl = f"{flag} {value}"
+ if re.search(rf"(^|\s){re.escape(flag)}(\s+\S+)?", cmd):
+ return re.sub(rf"(^|\s){re.escape(flag)}(?:\s+\S+)?", lambda m: (m.group(1) or " ") + repl, cmd).strip()
+ return f"{cmd.rstrip()} {repl}"
+ return cmd
+
+
+def _scan_running_model_processes() -> List[Dict[str, Any]]:
+ """Scan /proc for running model server processes. Linux-only; returns
+ [] on other platforms or if /proc isn't accessible. Each match returns
+ a dict shaped like a cookbook task so the caller can merge cleanly.
+ """
+ import os
+ if not os.path.isdir("/proc"):
+ return []
+ out: List[Dict[str, Any]] = []
+ seen_keys = set()
+ try:
+ for pid_dir in os.listdir("/proc"):
+ if not pid_dir.isdigit():
+ continue
+ try:
+ with open(f"/proc/{pid_dir}/cmdline", "rb") as f:
+ raw = f.read()
+ except (OSError, PermissionError):
+ continue
+ if not raw:
+ continue
+ # cmdline is NUL-separated; join with spaces for matching/display
+ cmdline = raw.replace(b"\x00", b" ").decode("utf-8", errors="replace").strip()
+ if not cmdline:
+ continue
+ lower = cmdline.lower()
+ for label, needles in _MODEL_PROCESS_PATTERNS:
+ if any(n.lower() in lower for n in needles):
+ # Dedupe by (label, first-arg) — multi-worker servers
+ # spawn N processes; only show one row per server.
+ key = (label, cmdline.split(" ")[0])
+ if key in seen_keys:
+ break
+ seen_keys.add(key)
+ # Try to pluck a model name out of the cmdline.
+ model = ""
+ for tok in cmdline.split():
+ if "/" in tok and any(s in tok.lower() for s in (
+ "model", "checkpoint", ".safetensors", ".gguf", ".bin", "huggingface"
+ )):
+ model = tok
+ break
+ out.append({
+ "session_id": f"pid-{pid_dir}",
+ "model": model or label,
+ "phase": "running (external)",
+ "type": "serve",
+ "remote": "local",
+ "pid": int(pid_dir),
+ "label": label,
+ "cmdline_preview": cmdline[:140] + ("…" if len(cmdline) > 140 else ""),
+ "external": True,
+ })
+ break
+ except Exception as e:
+ logger.debug(f"_scan_running_model_processes failed: {e}")
+ return out
+
+
+async def do_download_model(content: str, owner: Optional[str] = None) -> Dict:
+ """Download a HuggingFace model via the cookbook API."""
+ from src.tool_implementations import _internal_headers, _INTERNAL_BASE # shared, lives in facade
+ import httpx
+ try:
+ args = _parse_tool_args(content)
+ except ValueError:
+ return {"error": "Invalid JSON arguments", "exit_code": 1}
+ repo_id = args.get("repo_id", "")
+ if not repo_id:
+ return {"error": "repo_id is required", "exit_code": 1}
+ host = (args.get("host") or "").strip()
+ # Resolve a friendly server NAME ("gpu-box") to its ssh host string.
+ if host:
+ host = await _resolve_cookbook_host(host)
+ # No host specified → default to the cookbook's currently-selected
+ # server rather than silently downloading to localhost (which is
+ # usually NOT where the GPUs / model cache live).
+ _host_defaulted = False
+ if not host and not args.get("local"):
+ _servers = await _cookbook_servers()
+ if _servers.get("default_host"):
+ host = _servers["default_host"]
+ _host_defaulted = True
+ backend = (args.get("backend") or "").strip().lower()
+ if not backend and "/" not in repo_id and ":" in repo_id:
+ backend = "ollama"
+ payload = {"repo_id": repo_id}
+ if backend:
+ payload["backend"] = backend
+ if host:
+ payload["remote_host"] = host
+ if args.get("include"):
+ payload["include"] = args["include"]
+ # Per-host env_prefix + hf_token from cookbook_state (same as serve).
+ env_cfg = await _cookbook_env_for_host(host)
+ if env_cfg.get("env_prefix"): payload["env_prefix"] = env_cfg["env_prefix"]
+ if env_cfg.get("hf_token"): payload["hf_token"] = env_cfg["hf_token"]
+ if env_cfg.get("platform"): payload["platform"] = env_cfg["platform"]
+ if env_cfg.get("ssh_port"): payload["ssh_port"] = env_cfg["ssh_port"]
+ try:
+ async with httpx.AsyncClient(timeout=30) as client:
+ resp = await client.post(f"{_INTERNAL_BASE}/api/model/download",
+ json=payload, headers=_internal_headers())
+ data = resp.json()
+ if data.get("ok"):
+ sid = data.get("session_id", "?")
+ registered = await _cookbook_register_task(
+ session_id=sid, model=repo_id, host=host,
+ cmd=(f"ollama pull {repo_id}" if backend == "ollama" else f"hf download {repo_id}"),
+ task_type="download",
+ )
+ note = "" if registered else " (state-write failed — download may not show in UI)"
+ where = host or "local"
+ default_note = " (defaulted to the cookbook's selected server — pass host= or local=true to override)" if _host_defaulted else ""
+ return {
+ "output": f"Download started: {repo_id} on {where} (session: {sid}){note}{default_note}",
+ "session_id": sid,
+ "host": host,
+ "task_type": "download",
+ "phase": "running",
+ "exit_code": 0,
+ }
+ return {"error": data.get("error", "Download failed"), "exit_code": 1}
+ except Exception as e:
+ return {"error": str(e), "exit_code": 1}
+
+
+async def do_serve_model(content: str, owner: Optional[str] = None) -> Dict:
+ """Start serving a model via the cookbook API."""
+ from src.tool_implementations import _internal_headers, _INTERNAL_BASE # shared, lives in facade
+ import httpx
+ try:
+ args = _parse_tool_args(content)
+ except ValueError:
+ return {"error": "Invalid JSON arguments", "exit_code": 1}
+ repo_id = args.get("repo_id", "")
+ cmd = args.get("cmd", "")
+ if not repo_id or not cmd:
+ return {"error": "repo_id and cmd are required", "exit_code": 1}
+ host = (args.get("host") or "").strip()
+ if host:
+ host = await _resolve_cookbook_host(host)
+ if not host and not args.get("local"):
+ _servers = await _cookbook_servers()
+ if _servers.get("default_host"):
+ host = _servers["default_host"]
+ payload = {"repo_id": repo_id, "cmd": cmd}
+ if host:
+ payload["remote_host"] = host
+ # Resolve per-host env settings (venv/conda activate, gpus,
+ # hf_token, platform, ssh_port) from cookbook_state — same path
+ # the UI uses. Without env_prefix, `vllm serve …` lands in a shell
+ # without the user's venv and fails 'command not found'.
+ env_cfg = await _cookbook_env_for_host(host)
+ # Rewrite bare `vllm` / `python3` leading tokens to the venv's absolute
+ # binary path when the target host has a venv configured. SSH non-
+ # interactive shells often leave ~/.local/bin ahead of the venv bin on
+ # PATH even with the venv activated, so `vllm serve` finds the wrong
+ # binary and crashes early (e.g. compute_89 torch ABI errors on an old
+ # user-site torch). This mirrors what static/js/cookbook.js does in
+ # _buildServeCmd for the UI launch path.
+ env_path = (env_cfg.get("env_path") or "").rstrip("/")
+ env_type = (env_cfg.get("env_type") or env_cfg.get("env") or "").lower()
+ if env_type == "venv" and env_path:
+ venv_bin = f"{env_path}/bin"
+ # Match the FIRST shell-token: skip leading KEY=VAL env-var prefixes
+ # (CUDA_VISIBLE_DEVICES=… VLLM_USE_FLASHINFER_SAMPLER=…) before the binary.
+ import re as _re3
+ tokens = cmd.split()
+ idx = 0
+ env_re = _re3.compile(r"^[A-Za-z_][A-Za-z0-9_]*=")
+ while idx < len(tokens) and env_re.match(tokens[idx]):
+ idx += 1
+ if idx < len(tokens):
+ head = tokens[idx]
+ if head in ("vllm", "python3", "python"):
+ tokens[idx] = f"{venv_bin}/{head}"
+ cmd = " ".join(tokens)
+ payload["cmd"] = cmd
+ if env_cfg.get("env_prefix"): payload["env_prefix"] = env_cfg["env_prefix"]
+ if env_cfg.get("gpus"): payload["gpus"] = env_cfg["gpus"]
+ if env_cfg.get("hf_token"): payload["hf_token"] = env_cfg["hf_token"]
+ if env_cfg.get("platform"): payload["platform"] = env_cfg["platform"]
+ if env_cfg.get("ssh_port"): payload["ssh_port"] = env_cfg["ssh_port"]
+ try:
+ async with httpx.AsyncClient(timeout=30) as client:
+ resp = await client.post(f"{_INTERNAL_BASE}/api/model/serve",
+ json=payload, headers=_internal_headers())
+ data = resp.json()
+ if data.get("ok"):
+ sid = data.get("session_id", "?")
+ endpoint_id = data.get("endpoint_id") or ""
+ if endpoint_id:
+ endpoint_added = True
+ else:
+ endpoint_meta = await _ensure_served_endpoint(model=repo_id, cmd=cmd, host=host)
+ endpoint_added = bool(endpoint_meta.get("added"))
+ endpoint_id = endpoint_meta.get("endpoint_id", "") or endpoint_id
+ registered = await _cookbook_register_task(
+ session_id=sid, model=repo_id,
+ host=host, cmd=cmd, task_type="serve",
+ endpoint_added=endpoint_added, endpoint_id=endpoint_id or "",
+ )
+ note = "" if registered else " (state-write failed — task may not show in UI)"
+ return {
+ "output": f"Serving {repo_id} (session: {sid}){note}",
+ "session_id": sid,
+ "task_type": "serve",
+ "phase": "running",
+ "host": host,
+ "endpoint_id": endpoint_id,
+ "exit_code": 0,
+ }
+ # FastAPI HTTPException puts the message under `detail`, not `error`.
+ # Surface BOTH so the agent sees "Invalid characters in cmd" (from
+ # _validate_serve_cmd rejecting `&&`/`source`/`cd`) instead of
+ # the generic "Serve failed", which leaves it with nothing to act on.
+ err_msg = data.get("error") or data.get("detail") or "Serve failed"
+ hint = ""
+ if isinstance(err_msg, str) and "cmd" in err_msg.lower():
+ hint = (" — the cmd must START with an allowlisted binary "
+ "(vllm, python3, llama-server, ollama, sglang, lmdeploy, node, npx). "
+ "Do NOT prefix with `cd …`, `source …`, or chain with `&&`. "
+ "env_prefix (e.g. `source ~/qwen35-env/bin/activate`) is added "
+ "automatically from the host's saved venv settings.")
+ return {"error": f"{err_msg}{hint}", "exit_code": 1}
+ except Exception as e:
+ return {"error": str(e), "exit_code": 1}
+
+
+async def do_list_served_models(content: str, owner: Optional[str] = None) -> Dict:
+ """List running model servers — merges cookbook-tracked tasks with
+ a /proc scan for externally-launched LLM/diffusion processes
+ (vLLM, sglang, llama.cpp, Ollama, ComfyUI, A1111, Fooocus, etc.)."""
+ from src.tool_implementations import _internal_headers, _INTERNAL_BASE # shared, lives in facade
+ import asyncio
+ import httpx
+
+ # Cookbook-tracked tasks (best-effort; don't fail the whole call if
+ # this is unreachable).
+ cookbook_tasks: List[Dict[str, Any]] = []
+ try:
+ async with httpx.AsyncClient(timeout=15) as client:
+ resp = await client.get(f"{_INTERNAL_BASE}/api/cookbook/tasks/status",
+ headers=_internal_headers())
+ cookbook_tasks = (resp.json() or {}).get("tasks") or []
+ except Exception as e:
+ logger.debug(f"cookbook tasks/status fetch failed: {e}")
+
+ # Local process scan — runs in a worker thread so it doesn't block.
+ external = await asyncio.to_thread(_scan_running_model_processes)
+
+ merged: List[Dict[str, Any]] = []
+ merged.extend(cookbook_tasks)
+ # Dedupe: if a process's PID is already mentioned by a cookbook task
+ # (cookbook may track the PID via session_id), skip it.
+ cookbook_pids = set()
+ for t in cookbook_tasks:
+ if isinstance(t, dict) and t.get("pid"):
+ cookbook_pids.add(t["pid"])
+ for p in external:
+ if p.get("pid") not in cookbook_pids:
+ merged.append(p)
+
+ if not merged:
+ return {
+ "output": "No model servers currently running (cookbook task tracker empty; /proc scan found no vLLM / sglang / llama.cpp / Ollama / ComfyUI / A1111 / Fooocus / InvokeAI / TGI / Aphrodite / Triton / Diffusers processes).",
+ "exit_code": 0,
+ }
+
+ # Sort so the agent sees what's actually LIVE first. Stopped/error/
+ # completed tasks are mostly historical noise — they shouldn't lead
+ # the list when something is genuinely serving.
+ _ORDER = {
+ "ready": 0, "running": 1, "loading": 1, "warming": 1,
+ "queued": 2, "starting": 2,
+ "error": 5, "crashed": 5, "failed": 5,
+ "stopped": 6, "killed": 6, "cancelled": 6, "canceled": 6,
+ "done": 7, "completed": 7, "finished": 7,
+ }
+ def _rank(t: Dict[str, Any]) -> int:
+ phase = (t.get("phase") or t.get("status") or "unknown").lower()
+ return _ORDER.get(phase, 3)
+ merged.sort(key=_rank)
+
+ cb_n = len(cookbook_tasks)
+ ext_n = len(external)
+ live_n = sum(1 for t in merged if _rank(t) <= 2)
+ header = []
+ if cb_n:
+ header.append(f"{cb_n} cookbook-tracked")
+ if ext_n:
+ header.append(f"{ext_n} external")
+ if live_n:
+ header.insert(0, f"{live_n} LIVE")
+ lines = [f"Running: {', '.join(header)}."]
+ for t in merged:
+ phase = t.get("phase") or t.get("status", "unknown")
+ model = t.get("model", "?")
+ remote = t.get("remote", "local")
+ sid = t.get("session_id", "?")
+ tag = " [external]" if t.get("external") else ""
+ lines.append(f"- {model}: {phase} ({remote}, session: {sid}){tag}")
+ diag = t.get("diagnosis") if isinstance(t.get("diagnosis"), dict) else None
+ if diag:
+ lines.append(f" diagnosis: {diag.get('message')}")
+ cmd = t.get("cmd") or ""
+ suggestions = diag.get("suggestions") or []
+ actionable = []
+ for s in suggestions[:3]:
+ label = s.get("label") or "retry"
+ retry_cmd = _cookbook_apply_retry_suggestion(cmd, s)
+ if retry_cmd and retry_cmd != cmd and s.get("op") in {"append", "replace", "remove"}:
+ actionable.append(f"{label}: `{retry_cmd}`")
+ else:
+ actionable.append(label)
+ if actionable:
+ lines.append(" suggestions: " + " | ".join(actionable))
+ if t.get("status") == "error" and t.get("output_tail"):
+ tail = str(t.get("output_tail") or "").strip()
+ if tail:
+ # Prefer a window around a Python traceback if one exists,
+ # falling back to the last 30 lines. The previous 6-line
+ # tail showed only the post-crash bash prompt / neofetch
+ # banner ("Locale: C / Ubuntu_Odysseus ❯") — useless for
+ # diagnosis. The traceback we want is usually 50-200 lines
+ # earlier in the buffer.
+ _tail_lines = tail.splitlines()
+ _shown = _tail_lines[-30:]
+ for _i, _ln in enumerate(_tail_lines):
+ if "Traceback (most recent call last)" in _ln or "ERROR" in _ln or "Error:" in _ln:
+ _shown = _tail_lines[_i:_i + 40]
+ break
+ lines.append(" recent log:")
+ for line in _shown:
+ lines.append(f" {line[:220]}")
+ if t.get("external") and t.get("cmdline_preview"):
+ lines.append(f" cmd: {t['cmdline_preview']}")
+ return {"output": "\n".join(lines), "tasks": merged, "exit_code": 0}
+
+
+async def _cookbook_kill_session(session_id: str, *, remote_host: str = "",
+ ssh_port: str = "", verb: str = "Stopped") -> Dict:
+ """Kill a cookbook tmux session — remote-aware — AND mark the task
+ stopped in cookbook_state.json. Shared by stop_served_model and
+ cancel_download so both behave identically.
+
+ Resolves the task's remote host from state when not passed in. A
+ local-only `tmux kill-session` silently no-ops for remote tasks —
+ that's the bug where "stop the download" appeared to work but the
+ download kept running on the remote host.
+ """
+ from src.tool_implementations import _internal_headers, _INTERNAL_BASE # shared, lives in facade
+ import httpx
+ import shlex
+ headers = _internal_headers()
+ remote = remote_host or ""
+ sport = ssh_port or ""
+
+ # Look up the task's host + confirm it exists in state.
+ state: Dict[str, Any] = {}
+ try:
+ async with httpx.AsyncClient(timeout=10) as client:
+ resp = await client.get(f"{_INTERNAL_BASE}/api/cookbook/state", headers=headers)
+ state = resp.json() or {}
+ except Exception as e:
+ logger.debug(f"cookbook state lookup failed for {session_id}: {e}")
+ if not isinstance(state, dict):
+ state = {}
+ matched = None
+ for t in (state.get("tasks") or []):
+ if isinstance(t, dict) and (t.get("sessionId") == session_id or t.get("id") == session_id):
+ matched = t
+ if not remote:
+ remote = t.get("remoteHost") or ""
+ if not sport:
+ sport = t.get("sshPort") or ""
+ break
+
+ if remote:
+ try:
+ remote, sport = _validate_cookbook_ssh_target(remote, sport)
+ except HTTPException as e:
+ return {"error": str(getattr(e, "detail", e)), "exit_code": 1}
+ _pf = f"-p {shlex.quote(str(sport))} " if sport and str(sport) != "22" else ""
+ cmd = (
+ f"ssh -o ConnectTimeout=5 -o StrictHostKeyChecking=no "
+ f"{_pf}{shlex.quote(remote)} 'tmux kill-session -t {shlex.quote(session_id)}'"
+ )
+ target_label = f"{session_id} on {remote}"
+ else:
+ cmd = f"tmux kill-session -t {shlex.quote(session_id)}"
+ target_label = session_id
+
+ try:
+ async with httpx.AsyncClient(timeout=15) as client:
+ resp = await client.post(f"{_INTERNAL_BASE}/api/shell/exec",
+ json={"command": cmd}, headers=headers)
+ if resp.status_code >= 400:
+ return {"error": f"shell/exec returned HTTP {resp.status_code}: {resp.text[:200]}", "exit_code": 1}
+ try:
+ data = resp.json()
+ except Exception:
+ data = {}
+ kill_failed = isinstance(data, dict) and data.get("exit_code") not in (None, 0)
+ kill_err = ((data.get("stderr") or data.get("error") or "").strip() if isinstance(data, dict) else "")
+ # "no server running" / "can't find session" means it was already
+ # gone — treat as success (the goal is "not running").
+ already_gone = any(s in kill_err.lower() for s in ("no server running", "can't find session", "session not found"))
+ if kill_failed and not already_gone:
+ return {"error": f"Failed to {verb.lower()} {target_label}: {kill_err or 'kill-session returned non-zero'}", "exit_code": 1}
+
+ # Update state: mark stopped (so the UI + list reflect reality).
+ if matched is not None:
+ try:
+ matched["status"] = "stopped"
+ async with httpx.AsyncClient(timeout=10) as client:
+ await client.post(f"{_INTERNAL_BASE}/api/cookbook/state",
+ json=state, headers=headers)
+ except Exception as e:
+ logger.debug(f"failed to mark {session_id} stopped in state: {e}")
+
+ suffix = " (was already gone)" if already_gone else ""
+ return {"output": f"{verb} {target_label}{suffix}", "exit_code": 0}
+ except Exception as e:
+ return {"error": str(e), "exit_code": 1}
+
+
+async def do_stop_served_model(content: str, owner: Optional[str] = None) -> Dict:
+ """Stop a running model server by killing its tmux session (remote-aware)."""
+ try:
+ args = _parse_tool_args(content)
+ except ValueError:
+ return {"error": "Invalid JSON arguments", "exit_code": 1}
+ session_id = args.get("session_id", "")
+ if not session_id:
+ return {"error": "session_id is required", "exit_code": 1}
+ return await _cookbook_kill_session(
+ session_id,
+ remote_host=args.get("remote_host") or args.get("host") or "",
+ ssh_port=args.get("ssh_port") or "",
+ verb="Stopped server",
+ )
+
+
+async def do_tail_serve_output(content: str, owner: Optional[str] = None) -> Dict:
+ """Capture the last N lines of a cookbook task's tmux pane — remote-aware.
+
+ Used by the agent to debug a failed/stuck serve: list_served_models tells
+ you the task is `crashed`, this tool returns the actual stderr/traceback
+ so the agent can match it against a known fix (compute_89 nvcc mismatch,
+ flashinfer version mismatch, OOM, missing kernels, etc.) and decide
+ whether to relaunch via serve_model with new flags.
+ """
+ from src.tool_implementations import _internal_headers, _INTERNAL_BASE # shared, lives in facade
+ import httpx
+ import shlex
+ try:
+ args = _parse_tool_args(content)
+ except ValueError:
+ return {"error": "Invalid JSON arguments", "exit_code": 1}
+ session_id = (args.get("session_id") or "").strip()
+ if not session_id:
+ return {"error": "session_id is required (from list_served_models)", "exit_code": 1}
+ import re as _re
+ if not _re.fullmatch(r"[a-zA-Z0-9_-]+", session_id):
+ return {"error": "Invalid session_id format", "exit_code": 1}
+ try:
+ tail = int(args.get("tail") or 400)
+ except (TypeError, ValueError):
+ tail = 400
+ tail = max(20, min(tail, 4000))
+ headers = _internal_headers()
+ remote = _string_arg(args.get("remote_host") or args.get("host"))
+ sport = _string_arg(args.get("ssh_port"))
+ # Resolve host from cookbook state if caller didn't pass one — same
+ # lookup _cookbook_kill_session uses.
+ if not remote:
+ state: Dict[str, Any] = {}
+ try:
+ async with httpx.AsyncClient(timeout=10) as client:
+ resp = await client.get(f"{_INTERNAL_BASE}/api/cookbook/state", headers=headers)
+ state = resp.json() or {}
+ except Exception as e:
+ logger.debug(f"cookbook state lookup failed for {session_id}: {e}")
+ if isinstance(state, dict):
+ for t in (state.get("tasks") or []):
+ if isinstance(t, dict) and (t.get("sessionId") == session_id or t.get("id") == session_id):
+ remote = t.get("remoteHost") or ""
+ if not sport:
+ sport = t.get("sshPort") or ""
+ break
+ if remote:
+ try:
+ remote, sport = _validate_cookbook_ssh_target(remote, sport)
+ except HTTPException as e:
+ return {"error": str(getattr(e, "detail", e)), "exit_code": 1}
+
+ # Prefer the persisted /tmp/odysseus-tmux/SESSION.log file over the
+ # live tmux pane. The pane is what the user would see scrolling on
+ # their screen — including the post-crash neofetch banner and the
+ # idle bash prompt that overwrites the actual traceback the moment
+ # vllm exits. The log file is the raw stdout/stderr of the wrapped
+ # process and survives the crash unchanged. We only fall back to
+ # the pane when the log file doesn't exist (older sessions launched
+ # before the tmux+tee wrapper was added).
+ log_path = f"/tmp/odysseus-tmux/{session_id}.log"
+ pane_inner = f"tmux capture-pane -t {shlex.quote(session_id)} -p -S -{tail} 2>/dev/null"
+ file_inner = f"tail -n {tail} {shlex.quote(log_path)} 2>/dev/null"
+ inner = (
+ f"if [ -s {shlex.quote(log_path)} ]; then {file_inner}; "
+ f"else {pane_inner}; fi"
+ )
+ if remote:
+ _pf = f"-p {shlex.quote(str(sport))} " if sport and str(sport) != "22" else ""
+ cmd = (
+ f"ssh -o ConnectTimeout=5 -o StrictHostKeyChecking=no "
+ f"{_pf}{shlex.quote(remote)} {shlex.quote(inner)}"
+ )
+ host_label = remote
+ else:
+ cmd = inner
+ host_label = "local"
+ try:
+ async with httpx.AsyncClient(timeout=20) as client:
+ resp = await client.post(f"{_INTERNAL_BASE}/api/shell/exec",
+ json={"command": cmd}, headers=headers)
+ if resp.status_code >= 400:
+ return {"error": f"shell/exec returned HTTP {resp.status_code}: {resp.text[:200]}", "exit_code": 1}
+ data = resp.json() if resp.content else {}
+ output_text = (data.get("stdout") or "").strip()
+ stderr_text = (data.get("stderr") or "").strip()
+ rc = data.get("exit_code")
+ if rc not in (None, 0) and not output_text:
+ already_gone = any(s in (stderr_text or "").lower() for s in ("no server running", "can't find session", "session not found"))
+ if already_gone:
+ return {"output": f"Tmux session {session_id} on {host_label} is gone (task already exited).", "exit_code": 0, "session_id": session_id, "host": host_label}
+ return {"error": f"capture-pane failed on {host_label}: {stderr_text or f'exit {rc}'}", "exit_code": 1}
+ # Dedupe download-progress noise. A 100-shard HF download produces
+ # tens of thousands of `model-NN-of-MM.safetensors: 91%|...` lines
+ # that all look the same to the agent and drown the actual error.
+ # Keep only one sample per (file, decile-percent) bucket.
+ import re as _re2
+ lines = output_text.splitlines()
+ dedup_lines = []
+ seen_progress = set()
+ progress_re = _re2.compile(r"^([\w./\-]+):\s+(\d+)%")
+ for ln in lines:
+ m = progress_re.match(ln.strip())
+ if m:
+ key = (m.group(1), int(m.group(2)) // 10) # bucket by 10%
+ if key in seen_progress:
+ continue
+ seen_progress.add(key)
+ dedup_lines.append(ln)
+ output_text = "\n".join(dedup_lines)
+ # Hard cap so the agent doesn't blow its token budget.
+ MAX_CHARS = 8000
+ if len(output_text) > MAX_CHARS:
+ output_text = "…(earlier output truncated)…\n" + output_text[-MAX_CHARS:]
+ return {
+ "output": output_text or "(empty pane)",
+ "session_id": session_id,
+ "host": host_label,
+ "tail_lines": tail,
+ "exit_code": 0,
+ }
+ except Exception as e:
+ return {"error": str(e), "exit_code": 1}
+
+
+async def do_list_downloads(content: str, owner: Optional[str] = None) -> Dict:
+ """List in-flight model downloads (filters /api/cookbook/tasks/status to type=download)."""
+ from src.tool_implementations import _internal_headers, _INTERNAL_BASE # shared, lives in facade
+ import httpx
+ try:
+ async with httpx.AsyncClient(timeout=15) as client:
+ resp = await client.get(f"{_INTERNAL_BASE}/api/cookbook/tasks/status",
+ headers=_internal_headers())
+ data = resp.json()
+ tasks = [t for t in data.get("tasks", []) if (t.get("type") or "").lower() == "download"]
+ if not tasks:
+ return {"output": "No downloads in progress.", "exit_code": 0}
+ lines = [f"{len(tasks)} download(s) in progress:"]
+ for t in tasks:
+ phase = t.get("phase") or t.get("status", "unknown")
+ model = t.get("model", "?")
+ pct = t.get("progress_percent") or t.get("percent")
+ pct_str = f" {pct}%" if pct is not None else ""
+ lines.append(f"- {model}: {phase}{pct_str} ({t.get('remote', 'local')}, session: {t.get('session_id', '?')})")
+ return {"output": "\n".join(lines), "downloads": tasks, "exit_code": 0}
+ except Exception as e:
+ return {"error": str(e), "exit_code": 1}
+
+
+async def do_cancel_download(content: str, owner: Optional[str] = None) -> Dict:
+ """Cancel a model download by killing its tmux session (remote-aware)."""
+ try:
+ args = _parse_tool_args(content)
+ except ValueError:
+ return {"error": "Invalid JSON arguments", "exit_code": 1}
+ session_id = args.get("session_id", "")
+ if not session_id:
+ return {"error": "session_id is required (from list_downloads)", "exit_code": 1}
+ return await _cookbook_kill_session(
+ session_id,
+ remote_host=args.get("remote_host") or args.get("host") or "",
+ ssh_port=args.get("ssh_port") or "",
+ verb="Cancelled download",
+ )
+
+
+async def do_search_hf_models(content: str, owner: Optional[str] = None) -> Dict:
+ """Search HuggingFace via the cookbook /api/cookbook/hf-latest endpoint."""
+ from src.tool_implementations import _internal_headers, _INTERNAL_BASE # shared, lives in facade
+ import httpx
+ try:
+ args = _parse_tool_args(content)
+ except ValueError:
+ return {"error": "Invalid JSON arguments", "exit_code": 1}
+ query = args.get("query", "") or args.get("search", "")
+ limit = args.get("limit", 10)
+ params: Dict[str, str] = {}
+ if query:
+ params["search"] = query
+ if limit:
+ params["limit"] = str(limit)
+ try:
+ async with httpx.AsyncClient(timeout=30) as client:
+ resp = await client.get(f"{_INTERNAL_BASE}/api/cookbook/hf-latest",
+ params=params, headers=_internal_headers())
+ data = resp.json()
+ models = data.get("models") if isinstance(data, dict) else data
+ if not models:
+ return {"output": f"No models found for query: {query!r}", "exit_code": 0}
+ lines = [f"Found {len(models)} model(s) for {query!r}:" if query else f"{len(models)} model(s):"]
+ for m in models[:limit if isinstance(limit, int) else 10]:
+ if isinstance(m, dict):
+ name = m.get("repo_id") or m.get("modelId") or m.get("id") or "?"
+ dl = m.get("downloads")
+ size = m.get("size_gb") or m.get("needed_vram_gb")
+ bits = []
+ if size:
+ bits.append(f"~{size}GB")
+ if dl:
+ bits.append(f"{dl} downloads")
+ tail = f" ({', '.join(bits)})" if bits else ""
+ lines.append(f"- {name}{tail}")
+ else:
+ lines.append(f"- {m}")
+ return {"output": "\n".join(lines), "models": models, "exit_code": 0}
+ except Exception as e:
+ return {"error": str(e), "exit_code": 1}
+
+
+async def do_adopt_served_model(content: str, owner: Optional[str] = None) -> Dict:
+ """Register an externally-launched model server (bash + tmux + ssh, or
+ anything else) into the Cookbook so it appears in list_served_models,
+ can be stopped via stop_served_model, and is added to the user's
+ endpoint list for chat. Use this when a model was started outside
+ the cookbook's serve flow but you want first-class tracking.
+
+ Args (JSON):
+ host: "user@192.0.2.10" (or omit for localhost)
+ tmux_session: "minimax-m27" (existing tmux session name)
+ model: "cyankiwi/MiniMax-M2.7-AWQ-4bit" (HF repo or display name)
+ port: 8000
+ name: optional display name (defaults to model basename)
+ add_endpoint: bool (default true) — also register as a chat endpoint
+ """
+ from src.tool_implementations import _internal_headers, _INTERNAL_BASE # shared, lives in facade
+ import httpx
+ import shlex
+ try:
+ args = _parse_tool_args(content)
+ except ValueError:
+ return {"error": "Invalid JSON arguments", "exit_code": 1}
+
+ host = _string_arg(args.get("host") or args.get("remote_host"))
+ sess = (args.get("tmux_session") or args.get("session_id") or "").strip()
+ model = (args.get("model") or args.get("repo_id") or "").strip()
+ port = args.get("port") or 8000
+ display_name = (args.get("name") or "").strip() or (model.split("/")[-1] if "/" in model else model)
+ add_endpoint = args.get("add_endpoint", True)
+
+ if not sess or not model:
+ return {"error": "tmux_session and model are required", "exit_code": 1}
+
+ # Verify tmux session exists on the target host
+ if host:
+ try:
+ host, _ = _validate_cookbook_ssh_target(host)
+ except HTTPException as e:
+ return {"error": str(getattr(e, "detail", e)), "exit_code": 1}
+
+ headers = _internal_headers()
+ if host:
+ check = f"ssh -o ConnectTimeout=5 -o StrictHostKeyChecking=no {shlex.quote(host)} 'tmux has-session -t {shlex.quote(sess)} 2>&1'"
+ else:
+ check = f"tmux has-session -t {shlex.quote(sess)} 2>&1"
+ try:
+ async with httpx.AsyncClient(timeout=10) as client:
+ r = await client.post(f"{_INTERNAL_BASE}/api/shell/exec",
+ json={"command": check}, headers=headers)
+ data = r.json() if r.headers.get("content-type", "").startswith("application/json") else {}
+ if r.status_code >= 400 or (data.get("exit_code") not in (None, 0)):
+ err = (data.get("stderr") or data.get("error") or r.text[:200]).strip()
+ return {"error": f"tmux session {sess!r} not found on {host or 'local'}: {err}", "exit_code": 1}
+ except Exception as e:
+ return {"error": f"verify failed: {e}", "exit_code": 1}
+
+ # Best-effort health check — does port respond to /v1/models?
+ if host:
+ health_cmd = f"ssh -o ConnectTimeout=5 {shlex.quote(host)} 'curl -s -m 3 http://localhost:{int(port)}/v1/models'"
+ else:
+ health_cmd = f"curl -s -m 3 http://localhost:{int(port)}/v1/models"
+ server_up = False
+ try:
+ async with httpx.AsyncClient(timeout=10) as client:
+ r = await client.post(f"{_INTERNAL_BASE}/api/shell/exec",
+ json={"command": health_cmd}, headers=headers)
+ body = (r.json() or {}).get("stdout", "") if r.headers.get("content-type", "").startswith("application/json") else ""
+ server_up = '"data"' in body or '"object"' in body
+ except Exception:
+ pass
+
+ # Read+modify+write cookbook state. APPEND a task entry; do NOT
+ # overwrite the whole file (that'd nuke presets).
+ try:
+ async with httpx.AsyncClient(timeout=10) as client:
+ r = await client.get(f"{_INTERNAL_BASE}/api/cookbook/state", headers=headers)
+ state = r.json() if r.headers.get("content-type", "").startswith("application/json") else {}
+ except Exception as e:
+ return {"error": f"could not read cookbook state: {e}", "exit_code": 1}
+ if not isinstance(state, dict):
+ state = {}
+ tasks = state.get("tasks") if isinstance(state.get("tasks"), list) else []
+ # Skip duplicate adopt of the same session
+ if any(isinstance(t, dict) and t.get("sessionId") == sess for t in tasks):
+ adopted_already = True
+ else:
+ adopted_already = False
+ import time as _time
+ new_task = {
+ "id": sess,
+ "sessionId": sess,
+ "name": display_name,
+ "type": "serve",
+ "status": "running",
+ "output": (
+ f"Adopted externally-launched session {sess!r} on {host or 'local'}.\n"
+ "Reconnect polling will start streaming tmux output shortly."
+ ),
+ "ts": int(_time.time() * 1000),
+ "payload": {"repo_id": model, "remote_host": host or "", "_cmd": "(adopted — launched outside cookbook)"},
+ "remoteHost": host or "",
+ "sshPort": "",
+ "platform": "linux",
+ "_serveReady": bool(server_up),
+ "_endpointAdded": False,
+ "_adoptedExternally": True,
+ }
+ tasks.append(new_task)
+ state["tasks"] = tasks
+ try:
+ async with httpx.AsyncClient(timeout=10) as client:
+ await client.post(f"{_INTERNAL_BASE}/api/cookbook/state",
+ json=state, headers=headers)
+ except Exception as e:
+ return {"error": f"could not save cookbook state: {e}", "exit_code": 1}
+
+ # Optionally register as a chat endpoint
+ endpoint_msg = ""
+ if add_endpoint:
+ # Resolve host to a URL. SSH form `user@host` → just take host.
+ host_only = host.split("@", 1)[-1] if host else "localhost"
+ endpoint_url = f"http://{host_only}:{int(port)}/v1"
+ try:
+ from src.tool_implementations import do_manage_endpoints # avoid forward ref issues
+ except Exception:
+ do_manage_endpoints = None
+ if do_manage_endpoints is not None:
+ try:
+ ep_result = await do_manage_endpoints(json.dumps({
+ "action": "add",
+ "name": display_name,
+ "endpoint_url": endpoint_url,
+ "is_local": False,
+ }), owner=owner)
+ if isinstance(ep_result, dict) and not ep_result.get("error"):
+ endpoint_msg = f" Endpoint {endpoint_url} added as {display_name!r}."
+ else:
+ endpoint_msg = f" Endpoint registration skipped: {(ep_result or {}).get('error', 'unknown')}"
+ except Exception as e:
+ endpoint_msg = f" Endpoint registration failed: {e}"
+
+ return {
+ "output": (
+ f"Adopted session {sess!r} ({model}) on {host or 'local'}:{port}. "
+ + ("Already tracked — skipped state write. " if adopted_already else "Added to cookbook state. ")
+ + ("Server responding. " if server_up else "Server not responding yet (still loading?). ")
+ + endpoint_msg
+ ).strip(),
+ "session_id": sess,
+ "host": host,
+ "port": int(port),
+ "server_up": server_up,
+ "exit_code": 0,
+ }
+
+
+async def do_list_cookbook_servers(content: str, owner: Optional[str] = None) -> Dict:
+ """List the cookbook's configured servers and which one is the
+ current default. Use this to decide where to download/serve a
+ model, or to show the user options when the target host is
+ ambiguous."""
+ servers = await _cookbook_servers()
+ hosts = servers.get("hosts") or []
+ default = servers.get("default_host") or ""
+ if not hosts:
+ return {"output": "No cookbook servers configured. Downloads/serves default to localhost.", "servers": [], "default_host": "", "exit_code": 0}
+ # Resolve which server is the default by its friendly name too.
+ default_name = next((h.get("name") for h in hosts if h.get("host") == default and h.get("name")), default or "local")
+ lines = [f"{len(hosts)} configured server(s) (default: {default_name}):"]
+ for h in hosts:
+ name = h.get("name") or "(unnamed)"
+ host = h.get("host") or "local"
+ mark = " ← default" if h.get("host") == default else ""
+ env_bit = f" [{h.get('env')}: {h.get('envPath')}]" if h.get("env") and h.get("env") != "none" else ""
+ plat = f" ({h.get('platform')})" if h.get("platform") else ""
+ lines.append(f"- {name} → {host}{plat}{env_bit}{mark}")
+ lines.append("\nRefer to servers by their name (e.g. download_model with host=\"gpu-box\").")
+ return {"output": "\n".join(lines), "servers": hosts, "default_host": default, "exit_code": 0}
+
+
+async def do_list_serve_presets(content: str, owner: Optional[str] = None) -> Dict:
+ """List saved serve presets from cookbook_state.json. Each preset
+ is a launch template: name, model, host, port, cmd. Use this to
+ discover what the user has previously configured so you can
+ launch by preset instead of fabricating tmux commands."""
+ from src.tool_implementations import _internal_headers, _INTERNAL_BASE # shared, lives in facade
+ import httpx
+ try:
+ async with httpx.AsyncClient(timeout=10) as client:
+ resp = await client.get(f"{_INTERNAL_BASE}/api/cookbook/state",
+ headers=_internal_headers())
+ state = resp.json() or {}
+ except Exception as e:
+ return {"error": f"Failed to fetch cookbook state: {e}", "exit_code": 1}
+
+ presets = state.get("presets") or []
+ if not presets:
+ return {
+ "output": "No serve presets saved. Tell the user to save one from the Cookbook UI first, or use serve_model with explicit repo_id + cmd + host.",
+ "presets": [],
+ "exit_code": 0,
+ }
+ lines = [f"{len(presets)} saved serve preset(s):"]
+ for p in presets:
+ if not isinstance(p, dict):
+ continue
+ name = p.get("name", "?")
+ model = p.get("model") or p.get("modelId") or "?"
+ host = p.get("host") or p.get("remoteHost") or "local"
+ port = p.get("port", "")
+ cmd = (p.get("cmd") or "").strip()
+ bits = [f"- {name}: {model}", f"host={host}"]
+ if port:
+ bits.append(f"port={port}")
+ lines.append(" ".join(bits))
+ if cmd:
+ cmd_preview = cmd if len(cmd) < 140 else cmd[:140] + "…"
+ lines.append(f" cmd: {cmd_preview}")
+ return {"output": "\n".join(lines), "presets": presets, "exit_code": 0}
+
+
+async def do_serve_preset(content: str, owner: Optional[str] = None) -> Dict:
+ """Launch a saved serve preset by name. Resolves the preset's
+ cmd + host + model from cookbook_state.json, then calls the
+ standard model/serve endpoint. Saves the agent from having to
+ reinvent tmux launch commands the user already saved."""
+ from src.tool_implementations import _internal_headers, _INTERNAL_BASE # shared, lives in facade
+ import httpx
+ try:
+ args = _parse_tool_args(content)
+ except ValueError:
+ return {"error": "Invalid JSON arguments", "exit_code": 1}
+ name = (args.get("name") or args.get("preset") or "").strip()
+ if not name:
+ return {"error": "name (preset name) is required. Call list_serve_presets to see what's available.", "exit_code": 1}
+
+ try:
+ async with httpx.AsyncClient(timeout=10) as client:
+ resp = await client.get(f"{_INTERNAL_BASE}/api/cookbook/state",
+ headers=_internal_headers())
+ state = resp.json() or {}
+ except Exception as e:
+ return {"error": f"Failed to fetch cookbook state: {e}", "exit_code": 1}
+
+ presets = state.get("presets") or []
+ # Match by exact name first, then case-insensitive substring.
+ chosen = None
+ lname = name.lower()
+ for p in presets:
+ if isinstance(p, dict) and (p.get("name") or "").lower() == lname:
+ chosen = p
+ break
+ if chosen is None:
+ for p in presets:
+ if isinstance(p, dict) and lname in (p.get("name") or "").lower():
+ chosen = p
+ break
+ if chosen is None:
+ sample = ", ".join((p.get("name") or "?") for p in presets[:8] if isinstance(p, dict))
+ return {"error": f"No preset matching {name!r}. Available: {sample or '(none)'}", "exit_code": 1}
+
+ repo_id = chosen.get("model") or chosen.get("modelId") or ""
+ cmd = (chosen.get("cmd") or "").strip()
+ host = chosen.get("host") or chosen.get("remoteHost") or ""
+ if not repo_id or not cmd:
+ return {"error": f"Preset {chosen.get('name')!r} is missing model or cmd — can't launch.", "exit_code": 1}
+
+ payload: Dict[str, Any] = {"repo_id": repo_id, "cmd": cmd}
+ if host:
+ payload["remote_host"] = host
+ # Resolve per-host env settings the same way the UI does — pulls
+ # env_prefix (source ~/vllm-env/bin/activate), gpus, hf_token,
+ # etc. from cookbook_state.env so launches actually find vllm.
+ env_cfg = await _cookbook_env_for_host(host)
+ if env_cfg.get("env_prefix"): payload["env_prefix"] = env_cfg["env_prefix"]
+ if env_cfg.get("gpus"): payload["gpus"] = env_cfg["gpus"]
+ if env_cfg.get("hf_token"): payload["hf_token"] = env_cfg["hf_token"]
+ if env_cfg.get("platform"): payload["platform"] = env_cfg["platform"]
+ if env_cfg.get("ssh_port"):
+ payload["ssh_port"] = env_cfg["ssh_port"]
+
+ try:
+ async with httpx.AsyncClient(timeout=30) as client:
+ resp = await client.post(f"{_INTERNAL_BASE}/api/model/serve",
+ json=payload, headers=_internal_headers())
+ data = resp.json()
+ if data.get("ok"):
+ sid = data.get("session_id", "?")
+ endpoint_id = data.get("endpoint_id") or ""
+ if endpoint_id:
+ endpoint_added = True
+ else:
+ endpoint_meta = await _ensure_served_endpoint(model=repo_id, cmd=cmd, host=host)
+ endpoint_added = bool(endpoint_meta.get("added"))
+ endpoint_id = endpoint_meta.get("endpoint_id", "") or endpoint_id
+ registered = await _cookbook_register_task(
+ session_id=sid, model=repo_id, host=host,
+ cmd=cmd, task_type="serve",
+ endpoint_added=endpoint_added, endpoint_id=endpoint_id or "",
+ )
+ note = "" if registered else " (state-write failed — task may not show in UI)"
+ return {"output": f"Launched preset {chosen.get('name')!r}: {repo_id} on {host or 'local'} (session: {sid}){note}", "session_id": sid, "host": host, "endpoint_id": endpoint_id, "exit_code": 0}
+ return {"error": data.get("error", "Serve failed"), "exit_code": 1}
+ except Exception as e:
+ return {"error": str(e), "exit_code": 1}
+
+
+async def do_list_cached_models(content: str, owner: Optional[str] = None) -> Dict:
+ """List models already cached locally and/or on remote hosts.
+
+ With no `host` arg, scans EVERY configured Cookbook server (and local)
+ and aggregates — so the agent sees the full inventory in one call
+ instead of having to query each server individually.
+ """
+ from src.tool_implementations import _internal_headers, _INTERNAL_BASE # shared, lives in facade
+ import httpx
+ try:
+ args = _parse_tool_args(content) if content.strip() else {}
+ except ValueError:
+ return {"error": "Invalid JSON arguments", "exit_code": 1}
+ raw_host = (args.get("host") or "").strip()
+ headers = _internal_headers()
+
+ async def _scan_one(host_label: str, host_val: str, ssh_port: str = "",
+ platform: str = "", model_dir: str = "") -> list:
+ """Hit /api/model/cached for one host; tag each returned model with its source."""
+ p: Dict[str, str] = {}
+ if host_val:
+ p["host"] = host_val
+ # Caller-provided override beats per-server config beats nothing.
+ if args.get("model_dir"):
+ p["model_dir"] = args["model_dir"]
+ elif model_dir:
+ p["model_dir"] = model_dir
+ if ssh_port:
+ p["ssh_port"] = ssh_port
+ elif args.get("ssh_port"):
+ p["ssh_port"] = str(args["ssh_port"])
+ if platform:
+ p["platform"] = platform
+ elif args.get("platform"):
+ p["platform"] = args["platform"]
+ try:
+ async with httpx.AsyncClient(timeout=60) as client:
+ resp = await client.get(f"{_INTERNAL_BASE}/api/model/cached",
+ params=p, headers=headers)
+ data = resp.json()
+ ms = data.get("models", []) if isinstance(data, dict) else (data or [])
+ for m in ms:
+ m["host"] = host_label or "local"
+ return ms or []
+ except Exception as e:
+ logger.debug(f"list_cached_models scan({host_label}) failed: {e}")
+ return []
+
+ # When the caller specifies a host explicitly, scan only that one (old behaviour).
+ # Otherwise iterate every configured server + local so the agent doesn't
+ # have to repeat the call per server.
+ try:
+ # Pull configured servers from cookbook state (used for resolving
+ # modelDirs both when caller specifies a host and when we scan all).
+ servers: list = []
+ try:
+ async with httpx.AsyncClient(timeout=10) as client:
+ st = await client.get(f"{_INTERNAL_BASE}/api/cookbook/state", headers=headers)
+ st_data = st.json() if st.headers.get("content-type", "").startswith("application/json") else {}
+ servers = (st_data.get("env", {}) or {}).get("servers") or []
+ except Exception as e:
+ logger.debug(f"server list fetch failed: {e}")
+ st_data = {}
+
+ def _dirs_for(server_record: Dict[str, Any]) -> str:
+ """Comma-joined modelDirs from a saved server record (Settings).
+
+ Filters out the HF cache (~/.cache/huggingface/hub) — the backend
+ scan script always scans it by default, so re-passing it as an
+ extra model_dir is redundant AND confuses some path-handling
+ edge cases where the extra dir suppresses the deeper scan.
+ We only need to forward the NON-default dirs (e.g. /mnt/HADES/models).
+ """
+ mds = server_record.get("modelDirs") if isinstance(server_record, dict) else None
+ HF_DEFAULTS = {"~/.cache/huggingface/hub", "~/.cache/huggingface"}
+ if isinstance(mds, list):
+ extras = [d for d in mds if isinstance(d, str) and d.strip() and d.strip() not in HF_DEFAULTS]
+ return ",".join(extras)
+ if isinstance(mds, str) and mds.strip() not in HF_DEFAULTS:
+ return mds
+ return ""
+
+ if raw_host:
+ host = await _resolve_cookbook_host(raw_host)
+ # Find this host's saved record so its modelDirs apply too.
+ srv = next(
+ (s for s in servers if isinstance(s, dict)
+ and (s.get("name") == raw_host or s.get("host") == host or s.get("host") == raw_host)),
+ {},
+ )
+ models = await _scan_one(raw_host, host, model_dir=_dirs_for(srv))
+ else:
+ # Always include local. Local's saved record is the one with no host.
+ local_srv = next((s for s in servers if isinstance(s, dict) and not (s.get("host") or "").strip()), {})
+ scans: list = [_scan_one("local", "", model_dir=_dirs_for(local_srv))]
+ for s in servers:
+ if not isinstance(s, dict):
+ continue
+ name = s.get("name") or s.get("host")
+ host_val = s.get("host") or ""
+ if not host_val:
+ continue
+ scans.append(_scan_one(
+ name,
+ host_val,
+ ssh_port=str(s.get("port") or ""),
+ platform=s.get("platform") or "",
+ model_dir=_dirs_for(s),
+ ))
+ results = await asyncio.gather(*scans, return_exceptions=False)
+ # Dedupe by (host, repo_id) — same model could appear in both HF cache + Ollama list.
+ seen = set()
+ models: list = []
+ for batch in results:
+ for m in batch:
+ key = (m.get("host", ""), m.get("repo_id", ""))
+ if key in seen:
+ continue
+ seen.add(key)
+ models.append(m)
+ if not models:
+ # Cache scans can miss models downloaded into the HF default cache
+ # when the server has no explicit model_dir configured. Surface
+ # completed Cookbook download tasks so the agent doesn't conclude
+ # a model is absent and re-download it.
+ downloaded = []
+ try:
+ async with httpx.AsyncClient(timeout=10) as client:
+ st = await client.get(f"{_INTERNAL_BASE}/api/cookbook/state", headers=headers)
+ state = st.json() if st.headers.get("content-type", "").startswith("application/json") else {}
+ for t in (state.get("tasks") or []):
+ if not isinstance(t, dict) or t.get("type") != "download":
+ continue
+ if (t.get("status") or "").lower() not in {"done", "completed"}:
+ continue
+ task_host = t.get("remoteHost") or (t.get("payload") or {}).get("remote_host") or ""
+ if raw_host and task_host != raw_host:
+ continue
+ repo = t.get("modelId") or t.get("repoId") or (t.get("payload") or {}).get("repo_id") or t.get("name")
+ if repo and repo not in downloaded:
+ downloaded.append(repo)
+ except Exception:
+ downloaded = []
+ host_str = f" on {raw_host}" if raw_host else ""
+ if downloaded:
+ lines = [f"No cache paths were detected{host_str}, but Cookbook has completed download task(s):"]
+ lines.extend(f"- {repo} — downloaded via Cookbook task" for repo in downloaded)
+ return {"output": "\n".join(lines), "models": [{"repo_id": repo, "source": "cookbook_task"} for repo in downloaded], "exit_code": 0}
+ return {"output": f"No cached models found{host_str}.", "exit_code": 0}
+ # Multi-host scan: group by host so the agent sees inventory per server.
+ # Single-host scan: flat list (matches old output shape).
+ if raw_host:
+ lines = [f"{len(models)} cached model(s) on {raw_host}:"]
+ for m in models:
+ name = m.get("repo_id", "?")
+ sz = m.get("size") or (f"{m.get('size_bytes', 0) / (1024**3):.1f}GB" if m.get("size_bytes") else "")
+ inc = " (incomplete)" if m.get("has_incomplete") else ""
+ kind = " [diffusion]" if m.get("is_diffusion") else ""
+ lines.append(f"- {name}{kind} — {sz}{inc}")
+ else:
+ from collections import defaultdict as _dd
+ by_host = _dd(list)
+ for m in models:
+ by_host[m.get("host", "local")].append(m)
+ lines = [f"{len(models)} cached model(s) across {len(by_host)} server(s):"]
+ for host_name in sorted(by_host.keys()):
+ lines.append(f"\n[{host_name}]")
+ for m in by_host[host_name]:
+ name = m.get("repo_id", "?")
+ sz = m.get("size") or (f"{m.get('size_bytes', 0) / (1024**3):.1f}GB" if m.get("size_bytes") else "")
+ inc = " (incomplete)" if m.get("has_incomplete") else ""
+ kind = " [diffusion]" if m.get("is_diffusion") else ""
+ backend = f" ({m.get('backend')})" if m.get("backend") else ""
+ lines.append(f"- {name}{kind}{backend} — {sz}{inc}")
+ return {"output": "\n".join(lines), "models": models, "exit_code": 0}
+ except Exception as e:
+ return {"error": str(e), "exit_code": 1}