mirror of
https://github.com/pewdiepie-archdaemon/odysseus.git
synced 2026-06-17 18:25:26 -04:00
fix(cookbook): report dead finished downloads as completed instead of stopped (#4025)
When a download's tmux pane is gone, the status endpoint trusted only the HF-cache probe to tell completed from stopped. The probe derives its cache root from its own environment, but the download runner exports HF_HOME=<local_dir> (the #2722 fix), so custom-dir downloads land in <local_dir>/hub where the probe never looks - and ollama pulls don't touch the HF cache at all. Finished downloads were reported as stopped forever, and tasks already persisted as completed were demoted back to stopped on the next poll. This is the backend half of #3897, deliberately left out of the frontend fix in #4000. - honor the conclusive runner markers first: DOWNLOAD_OK -> completed (keeping the "Fetching 0 files" error guard), DOWNLOAD_FAILED -> error - pass the task's local_dir through to the cache probes so they check the cache the download actually wrote to, keeping the env-var fallback for default-cache downloads - move the probe scripts and marker classification into routes/cookbook_output.py (dependency-free) with behavioral tests Fixes #4017
This commit is contained in:
@@ -4,6 +4,62 @@ Kept dependency-free (no FastAPI / SQLAlchemy imports) so the behavior can be
|
||||
unit-tested without standing up the whole app.
|
||||
"""
|
||||
|
||||
import re
|
||||
|
||||
_FETCHING_ZERO_FILES_RE = re.compile(r"Fetching\s+0\s+files", re.IGNORECASE)
|
||||
|
||||
# Probe scripts for the dead-session download check, run as
|
||||
# `python3 -c <PROBE> <repo_id> <cache_root>` (locally or over SSH).
|
||||
# cache_root is the task's custom download dir, '' for the default HF cache.
|
||||
# It has to be passed explicitly: the download runner exports
|
||||
# HF_HOME=<local_dir>, so that task's cache lives under <local_dir>/hub, and
|
||||
# the probe process's own environment knows nothing about it.
|
||||
HF_CACHE_COMPLETE_PROBE = (
|
||||
"import os,sys;"
|
||||
"repo=sys.argv[1];"
|
||||
"root=os.path.expanduser(sys.argv[2]) if len(sys.argv)>2 and sys.argv[2] else '';"
|
||||
"base=os.path.join(root,'hub') if root else (os.environ.get('HUGGINGFACE_HUB_CACHE') or os.path.join(os.environ.get('HF_HOME', os.path.expanduser('~/.cache/huggingface')), 'hub'));"
|
||||
"d=os.path.join(base,'models--'+repo.replace('/','--'));"
|
||||
"snap=os.path.join(d,'snapshots');"
|
||||
"ok=os.path.isdir(snap) and any(os.path.isdir(os.path.join(snap,x)) and os.listdir(os.path.join(snap,x)) for x in os.listdir(snap));"
|
||||
"inc=False;"
|
||||
"blobs=os.path.join(d,'blobs');"
|
||||
"inc=os.path.isdir(blobs) and any(x.endswith('.incomplete') for x in os.listdir(blobs));"
|
||||
"sys.exit(0 if ok and not inc else 1)"
|
||||
)
|
||||
|
||||
HF_CACHE_INCOMPLETE_PROBE = (
|
||||
"import os,sys;"
|
||||
"repo=sys.argv[1];"
|
||||
"root=os.path.expanduser(sys.argv[2]) if len(sys.argv)>2 and sys.argv[2] else '';"
|
||||
"base=os.path.join(root,'hub') if root else (os.environ.get('HUGGINGFACE_HUB_CACHE') or os.path.join(os.environ.get('HF_HOME', os.path.expanduser('~/.cache/huggingface')), 'hub'));"
|
||||
"d=os.path.join(base,'models--'+repo.replace('/','--'));"
|
||||
"blobs=os.path.join(d,'blobs');"
|
||||
"inc=os.path.isdir(blobs) and any(x.endswith('.incomplete') for x in os.listdir(blobs));"
|
||||
"sys.exit(0 if inc else 1)"
|
||||
)
|
||||
|
||||
|
||||
def classify_dead_download(full_snapshot: str):
|
||||
"""Resolve a dead download session's status from its runner markers.
|
||||
|
||||
The runner prints DOWNLOAD_OK only after exiting 0 (and DOWNLOAD_FAILED
|
||||
otherwise), so the markers stay trustworthy after the tmux pane is gone.
|
||||
Returns (status, zero_files), or None when the snapshot carries no marker
|
||||
and the caller has to fall back to the cache probe. Same precedence as
|
||||
the live-session branch: DOWNLOAD_OK wins, except a "Fetching 0 files"
|
||||
run is an error (nothing matched the include/quant pattern).
|
||||
"""
|
||||
if not full_snapshot:
|
||||
return None
|
||||
if "DOWNLOAD_OK" in full_snapshot:
|
||||
if _FETCHING_ZERO_FILES_RE.search(full_snapshot):
|
||||
return ("error", True)
|
||||
return ("completed", False)
|
||||
if "DOWNLOAD_FAILED" in full_snapshot:
|
||||
return ("error", False)
|
||||
return None
|
||||
|
||||
|
||||
def error_aware_output_tail(full_snapshot: str, status: str) -> str:
|
||||
"""Return the trailing slice of a task log for the status response.
|
||||
|
||||
+23
-31
@@ -30,7 +30,10 @@ from core.platform_compat import (
|
||||
which_tool,
|
||||
)
|
||||
from routes.shell_routes import TMUX_LOG_DIR
|
||||
from routes.cookbook_output import error_aware_output_tail
|
||||
from routes.cookbook_output import (
|
||||
error_aware_output_tail, classify_dead_download,
|
||||
HF_CACHE_COMPLETE_PROBE, HF_CACHE_INCOMPLETE_PROBE,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -2636,30 +2639,20 @@ def setup_cookbook_routes() -> APIRouter:
|
||||
def _cookbook_tasks_status_sync():
|
||||
import subprocess
|
||||
|
||||
def _download_cache_complete(repo_id: str, remote_host: str = "", ssh_port: str = "") -> bool:
|
||||
def _download_cache_complete(repo_id: str, remote_host: str = "", ssh_port: str = "", cache_root: str = "") -> bool:
|
||||
"""Best-effort check for a completed HF cache entry.
|
||||
|
||||
tmux output can stop at a stale progress line if the pane/session
|
||||
disappears before Cookbook captures the final DOWNLOAD_OK marker.
|
||||
In that case, trust the cache shape: a snapshot directory with files
|
||||
and no *.incomplete blobs means HuggingFace finished materializing the
|
||||
model.
|
||||
model. cache_root is the task's custom download dir — the runner
|
||||
pointed HF_HOME there, so the cache lives under <cache_root>/hub,
|
||||
not wherever this probe's environment says.
|
||||
"""
|
||||
if not repo_id or "/" not in repo_id:
|
||||
return False
|
||||
py = (
|
||||
"import os,sys;"
|
||||
"repo=sys.argv[1];"
|
||||
"base=os.environ.get('HUGGINGFACE_HUB_CACHE') or os.path.join(os.environ.get('HF_HOME', os.path.expanduser('~/.cache/huggingface')), 'hub');"
|
||||
"d=os.path.join(base,'models--'+repo.replace('/','--'));"
|
||||
"snap=os.path.join(d,'snapshots');"
|
||||
"ok=os.path.isdir(snap) and any(os.path.isdir(os.path.join(snap,x)) and os.listdir(os.path.join(snap,x)) for x in os.listdir(snap));"
|
||||
"inc=False;"
|
||||
"blobs=os.path.join(d,'blobs');"
|
||||
"inc=os.path.isdir(blobs) and any(x.endswith('.incomplete') for x in os.listdir(blobs));"
|
||||
"sys.exit(0 if ok and not inc else 1)"
|
||||
)
|
||||
cmd = ["python3", "-c", py, repo_id]
|
||||
cmd = ["python3", "-c", HF_CACHE_COMPLETE_PROBE, repo_id, cache_root or ""]
|
||||
try:
|
||||
if remote_host:
|
||||
ssh_base = ["ssh"]
|
||||
@@ -2673,7 +2666,7 @@ def setup_cookbook_routes() -> APIRouter:
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
def _download_cache_incomplete(repo_id: str, remote_host: str = "", ssh_port: str = "") -> bool:
|
||||
def _download_cache_incomplete(repo_id: str, remote_host: str = "", ssh_port: str = "", cache_root: str = "") -> bool:
|
||||
"""Best-effort check for resumable HF partial blobs.
|
||||
|
||||
A lost SSH/tmux session can leave a real download still incomplete.
|
||||
@@ -2682,16 +2675,7 @@ def setup_cookbook_routes() -> APIRouter:
|
||||
"""
|
||||
if not repo_id or "/" not in repo_id:
|
||||
return False
|
||||
py = (
|
||||
"import os,sys;"
|
||||
"repo=sys.argv[1];"
|
||||
"base=os.environ.get('HUGGINGFACE_HUB_CACHE') or os.path.join(os.environ.get('HF_HOME', os.path.expanduser('~/.cache/huggingface')), 'hub');"
|
||||
"d=os.path.join(base,'models--'+repo.replace('/','--'));"
|
||||
"blobs=os.path.join(d,'blobs');"
|
||||
"inc=os.path.isdir(blobs) and any(x.endswith('.incomplete') for x in os.listdir(blobs));"
|
||||
"sys.exit(0 if inc else 1)"
|
||||
)
|
||||
cmd = ["python3", "-c", py, repo_id]
|
||||
cmd = ["python3", "-c", HF_CACHE_INCOMPLETE_PROBE, repo_id, cache_root or ""]
|
||||
try:
|
||||
if remote_host:
|
||||
ssh_base = ["ssh"]
|
||||
@@ -2896,7 +2880,7 @@ def setup_cookbook_routes() -> APIRouter:
|
||||
and (
|
||||
".incomplete" in full_snapshot
|
||||
or bool(re.search(r'model-\d+-of-\d+\.[A-Za-z0-9_.-]+:\s+(?:[0-9]|[1-8][0-9])%', full_snapshot))
|
||||
or _download_cache_incomplete(_payload.get("repo_id") or model, remote, str(_tport or ""))
|
||||
or _download_cache_incomplete(_payload.get("repo_id") or model, remote, str(_tport or ""), _payload.get("local_dir") or "")
|
||||
)
|
||||
)
|
||||
if is_alive or (local_win_task and full_snapshot):
|
||||
@@ -2937,11 +2921,19 @@ def setup_cookbook_routes() -> APIRouter:
|
||||
else:
|
||||
status = "running"
|
||||
else:
|
||||
# Session is dead — check if it completed or crashed
|
||||
if (
|
||||
# Session is dead — check if it completed or crashed. The
|
||||
# runner markers in the retained output are conclusive
|
||||
# (DOWNLOAD_OK only prints after exit 0), so check them before
|
||||
# the cache probe, which can't see ollama pulls at all.
|
||||
marker = classify_dead_download(full_snapshot) if task_type == "download" else None
|
||||
if marker is not None:
|
||||
status, download_zero_files = marker
|
||||
if status == "completed" and not progress_text:
|
||||
progress_text = "Download complete"
|
||||
elif (
|
||||
task_type == "download"
|
||||
and not download_has_incomplete_evidence
|
||||
and _download_cache_complete(_payload.get("repo_id") or model, remote, str(_tport or ""))
|
||||
and _download_cache_complete(_payload.get("repo_id") or model, remote, str(_tport or ""), _payload.get("local_dir") or "")
|
||||
):
|
||||
status = "completed"
|
||||
if not progress_text:
|
||||
|
||||
@@ -0,0 +1,124 @@
|
||||
"""Behavioral guards for dead-session download classification (issue #4017).
|
||||
|
||||
A download whose tmux pane is gone must not be reported as stopped when its
|
||||
retained output carries DOWNLOAD_OK, or when the files landed in a custom
|
||||
download dir. The runner exports HF_HOME=<local_dir>, so the cache lives
|
||||
under <local_dir>/hub — the probe only finds it if the task's dir is passed
|
||||
in explicitly rather than read from the probe process's environment.
|
||||
"""
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
|
||||
from routes.cookbook_output import (
|
||||
classify_dead_download,
|
||||
HF_CACHE_COMPLETE_PROBE,
|
||||
HF_CACHE_INCOMPLETE_PROBE,
|
||||
)
|
||||
|
||||
REPO = "org/some-model-GGUF"
|
||||
|
||||
|
||||
# ── Marker classification ──
|
||||
|
||||
|
||||
def test_download_ok_resolves_completed():
|
||||
snap = "Fetching 4 files: 100%|####| 4/4\nDownload complete\n\nDOWNLOAD_OK\n$"
|
||||
assert classify_dead_download(snap) == ("completed", False)
|
||||
|
||||
|
||||
def test_download_failed_resolves_error():
|
||||
snap = "some progress\n\nDOWNLOAD_FAILED (exit 1 after 3 attempts)"
|
||||
assert classify_dead_download(snap) == ("error", False)
|
||||
|
||||
|
||||
def test_download_ok_with_zero_files_resolves_error():
|
||||
# A DOWNLOAD_OK from a run that matched no files (bad include/quant
|
||||
# pattern) is still a failure — same guard as the live-session branch.
|
||||
snap = "Fetching 0 files: 0it [00:00, ?it/s]\n\nDOWNLOAD_OK"
|
||||
assert classify_dead_download(snap) == ("error", True)
|
||||
|
||||
|
||||
def test_no_marker_returns_none():
|
||||
# Mid-download tail with no terminal marker — caller must fall back to
|
||||
# the cache probe.
|
||||
assert classify_dead_download("Downloading model.gguf: 42%") is None
|
||||
assert classify_dead_download("") is None
|
||||
|
||||
|
||||
def test_ollama_pull_output_resolves_completed():
|
||||
snap = "pulling manifest\npulling 8f39d1c3...: 100%\nsuccess\n\nDOWNLOAD_OK"
|
||||
assert classify_dead_download(snap) == ("completed", False)
|
||||
|
||||
|
||||
# ── Cache probe scripts ──
|
||||
|
||||
|
||||
def _make_cache(root, repo=REPO, incomplete=False, empty_snapshot=False):
|
||||
d = os.path.join(root, "hub", "models--" + repo.replace("/", "--"))
|
||||
snap = os.path.join(d, "snapshots", "abc123")
|
||||
os.makedirs(snap)
|
||||
if not empty_snapshot:
|
||||
with open(os.path.join(snap, "model.gguf"), "w") as f:
|
||||
f.write("x")
|
||||
if incomplete:
|
||||
blobs = os.path.join(d, "blobs")
|
||||
os.makedirs(blobs)
|
||||
with open(os.path.join(blobs, "deadbeef.incomplete"), "w") as f:
|
||||
f.write("x")
|
||||
|
||||
|
||||
def _run_probe(probe, repo, cache_root, env=None):
|
||||
# Strip the HF cache vars so the probe can't accidentally find a real
|
||||
# cache on the machine running the tests.
|
||||
full_env = {k: v for k, v in os.environ.items()
|
||||
if k not in ("HF_HOME", "HUGGINGFACE_HUB_CACHE", "HF_HUB_CACHE")}
|
||||
full_env.update(env or {})
|
||||
return subprocess.run(
|
||||
[sys.executable, "-c", probe, repo, cache_root],
|
||||
env=full_env, capture_output=True, timeout=30,
|
||||
).returncode
|
||||
|
||||
|
||||
def test_complete_probe_finds_custom_dir_cache(tmp_path):
|
||||
# Model materialized under <local_dir>/hub — found only via the explicit
|
||||
# cache_root argument (issue #4017).
|
||||
root = str(tmp_path)
|
||||
_make_cache(root)
|
||||
assert _run_probe(HF_CACHE_COMPLETE_PROBE, REPO, root) == 0
|
||||
|
||||
|
||||
def test_complete_probe_misses_without_cache_root(tmp_path):
|
||||
# Same on-disk layout, but without the cache_root argument the probe
|
||||
# falls back to the default cache and misses it.
|
||||
_make_cache(str(tmp_path))
|
||||
assert _run_probe(HF_CACHE_COMPLETE_PROBE, REPO, "") == 1
|
||||
|
||||
|
||||
def test_complete_probe_rejects_incomplete_blobs(tmp_path):
|
||||
root = str(tmp_path)
|
||||
_make_cache(root, incomplete=True)
|
||||
assert _run_probe(HF_CACHE_COMPLETE_PROBE, REPO, root) == 1
|
||||
|
||||
|
||||
def test_complete_probe_rejects_empty_snapshot(tmp_path):
|
||||
root = str(tmp_path)
|
||||
_make_cache(root, empty_snapshot=True)
|
||||
assert _run_probe(HF_CACHE_COMPLETE_PROBE, REPO, root) == 1
|
||||
|
||||
|
||||
def test_complete_probe_env_fallback_still_works(tmp_path):
|
||||
# No custom dir on the task — the probe must keep honoring the standard
|
||||
# HF env vars so default-cache downloads classify as before.
|
||||
root = str(tmp_path)
|
||||
_make_cache(root)
|
||||
hub = os.path.join(root, "hub")
|
||||
assert _run_probe(HF_CACHE_COMPLETE_PROBE, REPO, "", env={"HUGGINGFACE_HUB_CACHE": hub}) == 0
|
||||
|
||||
|
||||
def test_incomplete_probe_sees_custom_dir_partials(tmp_path):
|
||||
root = str(tmp_path)
|
||||
_make_cache(root, incomplete=True)
|
||||
assert _run_probe(HF_CACHE_INCOMPLETE_PROBE, REPO, root) == 0
|
||||
# Clean cache → no resumable partials.
|
||||
assert _run_probe(HF_CACHE_INCOMPLETE_PROBE, "org/other-model", root) == 1
|
||||
Reference in New Issue
Block a user