Files
odysseus/tests/test_bg_job_tools.py
T
Kenny Van de Maele cdae9879f2 feat(agent): add manage_bg_jobs tool to inspect and kill background bash jobs (#4577)
Detached bash jobs (#!bg) could be launched and auto-reported on completion,
but the agent had no way to act on a running one: no on-demand output read and
no kill (it blocked until the 1h max-runtime). bg_jobs had the pieces
(_read_output, list_for_session, internal _kill) but none was exposed.

Adds:
- bg_jobs.kill(job_id): tears down the process tree, marks the job killed, and
  sets followed_up so the monitor does not also auto-continue a deliberate kill.
- manage_bg_jobs registry tool with actions list / output / kill, scoped to the
  chat that launched the job (cross-session access reads as not found).
- Wiring: TOOL_HANDLERS/TAGS, function schema, RAG index + keyword hints, parser
  name map, dispatch (threads session_id via _direct_fallback). Gated like bash
  (NON_ADMIN_BLOCKED_TOOLS; plan-mode mutator).
- agent_loop: background-job intent regex maps to the files domain (and the tool
  joins _DOMAIN_TOOL_MAP[files]) so short commands like 'kill that job' are not
  dropped by the low-signal gate that skips tool retrieval.
- bg launch message tells the model to call manage_bg_jobs itself for check/stop
  rather than printing raw tool syntax to the user.

Tests: tests/test_bg_job_tools.py (kill semantics, per-chat scoping, actions,
and the intent classifier).
2026-06-19 00:28:22 -07:00

175 lines
6.3 KiB
Python

"""Tests for bg_jobs.kill and the manage_bg_jobs agent tool.
Process-free: the store/dir are redirected to tmp, _pid_alive is forced True so
seeded "running" jobs stay running through refresh(), and _kill is stubbed so no
real signal is sent. Jobs are scoped to a chat (session_id), which is the main
invariant under test.
"""
import asyncio
import json
import time
import pytest
from src import bg_jobs
from src.agent_tools.bg_job_tools import ManageBgJobsTool
@pytest.fixture
def store(tmp_path, monkeypatch):
jobs_dir = tmp_path / "bg_jobs"
jobs_dir.mkdir()
monkeypatch.setattr(bg_jobs, "_STORE", tmp_path / "bg_jobs.json")
monkeypatch.setattr(bg_jobs, "_JOBS_DIR", jobs_dir)
monkeypatch.setattr(bg_jobs, "_pid_alive", lambda pid: True)
killed: list = []
monkeypatch.setattr(bg_jobs, "_kill", lambda pid: killed.append(pid))
return {"dir": jobs_dir, "killed": killed}
def _seed(session_id="sess-a", status="running", job_id="job0001", output="", pid=4321):
rec = {
"id": job_id, "session_id": session_id, "command": "sleep 60",
"status": status, "pid": pid, "started_at": time.time(),
"ended_at": None if status == "running" else time.time(),
"exit_code": None if status == "running" else 0,
"max_runtime_s": 3600, "followed_up": False,
"log_path": str(bg_jobs._JOBS_DIR / f"{job_id}.log"),
"exit_path": str(bg_jobs._JOBS_DIR / f"{job_id}.exit"),
}
if output:
(bg_jobs._JOBS_DIR / f"{job_id}.log").write_text(output, encoding="utf-8")
jobs = bg_jobs._load()
jobs[job_id] = rec
bg_jobs._save(jobs)
return rec
def _run(args, session_id="sess-a"):
return asyncio.run(ManageBgJobsTool().execute(json.dumps(args), {"session_id": session_id, "owner": None}))
# ── bg_jobs.kill ────────────────────────────────────────────────────────────
def test_kill_marks_killed_and_suppresses_followup(store):
_seed(job_id="job0001", pid=4321)
rec = bg_jobs.kill("job0001")
assert rec["status"] == "failed"
assert rec["killed"] is True
assert rec["exit_code"] == -1
# followed_up True so the monitor won't ALSO auto-continue a deliberate kill.
assert rec["followed_up"] is True
assert store["killed"] == [4321]
def test_kill_unknown_job_returns_none(store):
assert bg_jobs.kill("nope") is None
def test_kill_finished_job_is_noop(store):
_seed(job_id="done01", status="done")
rec = bg_jobs.kill("done01")
assert rec["status"] == "done"
assert store["killed"] == [] # no signal sent to an already-finished job
def test_result_text_reports_killed(store):
rec = _seed(job_id="job0001")
bg_jobs.kill("job0001")
assert "killed" in bg_jobs.result_text(bg_jobs.get("job0001")).lower()
# ── manage_bg_jobs tool ─────────────────────────────────────────────────────
def test_no_session_is_rejected(store):
out = asyncio.run(ManageBgJobsTool().execute('{"action":"list"}', {"session_id": None}))
assert "error" in out
def test_list_empty(store):
assert "No background jobs" in _run({"action": "list"})["output"]
def test_list_scoped_to_session(store):
_seed(session_id="sess-a", job_id="aaaa")
_seed(session_id="sess-b", job_id="bbbb")
out = _run({"action": "list"}, session_id="sess-a")["output"]
assert "aaaa" in out and "bbbb" not in out
def test_output_returns_captured_log(store):
_seed(job_id="job0001", output="hello from the job\n")
out = _run({"action": "output", "job_id": "job0001"})["output"]
assert "hello from the job" in out
def test_output_cross_session_denied(store):
_seed(session_id="sess-a", job_id="job0001", output="secret")
out = _run({"action": "output", "job_id": "job0001"}, session_id="sess-b")
assert "error" in out and "secret" not in out.get("error", "")
def test_kill_via_tool(store):
_seed(job_id="job0001", pid=999)
out = _run({"action": "kill", "job_id": "job0001"})
assert "Killed" in out["output"]
assert store["killed"] == [999]
assert bg_jobs.get("job0001")["killed"] is True
def test_kill_cross_session_denied(store):
_seed(session_id="sess-a", job_id="job0001")
out = _run({"action": "kill", "job_id": "job0001"}, session_id="sess-b")
assert "error" in out
assert store["killed"] == [] # never touched another chat's job
def test_kill_requires_job_id(store):
assert "error" in _run({"action": "kill"})
def test_unknown_action(store):
assert "error" in _run({"action": "frobnicate"})
def test_action_aliases(store):
_seed(job_id="job0001", output="aliased")
# 'read' aliases to output, 'jobs' to list, 'stop' to kill
assert "aliased" in _run({"action": "read", "job_id": "job0001"})["output"]
assert "job0001" in _run({"action": "jobs"})["output"]
assert "Killed" in _run({"action": "stop", "job_id": "job0001"})["output"]
# ── intent classifier: short bg-job commands must not be dropped as low-signal ─
# A short imperative ("kill that job") otherwise trips the low-signal gate, which
# skips tool retrieval entirely and never surfaces manage_bg_jobs (the live bug
# this feature hit). These lock in that bg-job control reaches the files domain.
@pytest.mark.parametrize("msg", [
"stop the job",
"kill that job",
"Now kill that background job.",
"is the job done?",
"check the job output",
"list my jobs",
"kill the bg task",
])
def test_bg_job_commands_are_not_low_signal(msg):
from src.agent_loop import _classify_agent_request, _DOMAIN_TOOL_MAP
r = _classify_agent_request([{"role": "user", "content": msg}], msg)
assert r["low_signal"] is False
assert "files" in r["domains"]
# files domain seeds manage_bg_jobs, so it gets offered to the model.
assert "manage_bg_jobs" in _DOMAIN_TOOL_MAP["files"]
@pytest.mark.parametrize("msg", [
"run this in the background", # launching, not managing
"find me a job listing", # unrelated use of "job"
])
def test_non_bg_messages_do_not_trip_files_domain(msg):
from src.agent_loop import _classify_agent_request
r = _classify_agent_request([{"role": "user", "content": msg}], msg)
assert "files" not in r["domains"]