feat(agent): add manage_bg_jobs tool to inspect and kill background bash jobs (#4577)

Detached bash jobs (#!bg) could be launched and auto-reported on completion,
but the agent had no way to act on a running one: no on-demand output read and
no kill (it blocked until the 1h max-runtime). bg_jobs had the pieces
(_read_output, list_for_session, internal _kill) but none was exposed.

Adds:
- bg_jobs.kill(job_id): tears down the process tree, marks the job killed, and
  sets followed_up so the monitor does not also auto-continue a deliberate kill.
- manage_bg_jobs registry tool with actions list / output / kill, scoped to the
  chat that launched the job (cross-session access reads as not found).
- Wiring: TOOL_HANDLERS/TAGS, function schema, RAG index + keyword hints, parser
  name map, dispatch (threads session_id via _direct_fallback). Gated like bash
  (NON_ADMIN_BLOCKED_TOOLS; plan-mode mutator).
- agent_loop: background-job intent regex maps to the files domain (and the tool
  joins _DOMAIN_TOOL_MAP[files]) so short commands like 'kill that job' are not
  dropped by the low-signal gate that skips tool retrieval.
- bg launch message tells the model to call manage_bg_jobs itself for check/stop
  rather than printing raw tool syntax to the user.

Tests: tests/test_bg_job_tools.py (kill semantics, per-chat scoping, actions,
and the intent classifier).
This commit is contained in:
Kenny Van de Maele
2026-06-19 09:28:22 +02:00
committed by GitHub
parent 39a802bea2
commit cdae9879f2
10 changed files with 347 additions and 5 deletions
+7 -1
View File
@@ -281,7 +281,7 @@ _DOMAIN_TOOL_MAP = {
"notes_calendar_tasks": {"manage_notes", "manage_calendar", "manage_tasks"},
"ui": {"ui_control"},
"sessions": {"create_session", "list_sessions", "manage_session", "send_to_session", "search_chats"},
"files": {"bash", "python", "read_file", "write_file", "edit_file", "grep", "glob", "ls", "get_workspace"},
"files": {"bash", "python", "read_file", "write_file", "edit_file", "grep", "glob", "ls", "get_workspace", "manage_bg_jobs"},
"settings": {"manage_settings", "manage_endpoints", "manage_mcp", "manage_webhooks", "manage_tokens", "app_api"},
"contacts": {"resolve_contact", "manage_contact"},
"integrations": {"api_call"},
@@ -816,6 +816,12 @@ def _classify_agent_request(messages: List[Dict], last_user: str) -> Dict[str, o
domains.add("sessions")
if has(r"\b(file|folder|directory|repo|git|grep|find in files|read file|edit file|shell|terminal|bash|python)\b"):
domains.add("files")
# Managing detached bash jobs: "kill the background job", "stop the job",
# "kill that job", "check the job output", "is the bg job done".
if (has(r"\b(background|bg)\s+(jobs?|task)\b")
or has(r"\b(kill|stop|cancel|terminate|check|tail|show|list)\b.{0,16}\bjobs?\b")
or has(r"\bjobs?\b.{0,16}\b(output|status|done|finished|running)\b")):
domains.add("files")
if has(r"\b(endpoint|api token|mcp|webhook|preference|configure|config|setting)\b"):
domains.add("settings")
if has(r"\b(contact|contacts|phone|phone number|address book|vcard)\b"):
+3 -1
View File
@@ -23,6 +23,7 @@ from .web_tools import WebSearchTool, WebFetchTool
from .filesystem_tools import ReadFileTool, WriteFileTool, EditFileTool, LsTool, GlobTool, GrepTool, GetWorkspaceTool
from .document_tools import CreateDocumentTool, UpdateDocumentTool, EditDocumentTool, SuggestDocumentTool, ManageDocumentTool
from .model_interaction_tools import ChatWithModelTool, AskTeacherTool, ListModelsTool
from .bg_job_tools import ManageBgJobsTool
TOOL_HANDLERS = {
"bash": BashTool().execute,
@@ -44,6 +45,7 @@ TOOL_HANDLERS = {
"chat_with_model": ChatWithModelTool().execute,
"ask_teacher": AskTeacherTool().execute,
"list_models": ListModelsTool().execute,
"manage_bg_jobs": ManageBgJobsTool().execute,
}
# ---------------------------------------------------------------------------
@@ -56,7 +58,7 @@ PYTHON_TIMEOUT = 30
# Tool types that trigger execution
TOOL_TAGS = {"bash", "python", "web_search", "web_fetch", "read_file", "write_file", "edit_file",
"grep", "glob", "ls", "get_workspace",
"grep", "glob", "ls", "get_workspace", "manage_bg_jobs",
"create_document", "update_document", "edit_document",
"search_chats",
"chat_with_model", "create_session", "list_sessions",
+98
View File
@@ -0,0 +1,98 @@
"""Agent tool to inspect and control detached background `bash` jobs.
`bash` blocks prefixed with a `#!bg` marker run detached via `src.bg_jobs`; the
agent is auto-re-invoked with the output when they finish. This tool covers the
gaps in that flow: list the jobs in the current chat, read a still-running job's
output on demand, and kill a runaway job instead of waiting out its max-runtime.
Registry tool (`TOOL_HANDLERS["manage_bg_jobs"]`). Jobs are scoped to the chat
that launched them, so every action requires the caller's `session_id` and a job
from another session is treated as not found.
"""
import json
import time
from typing import Any, Dict, List
_LIST_ACTIONS = {"list", "ls", "jobs"}
_OUTPUT_ACTIONS = {"output", "get", "read", "tail", "status", "show"}
_KILL_ACTIONS = {"kill", "stop", "cancel", "terminate"}
def _age(rec: Dict[str, Any]) -> str:
start = rec.get("started_at")
if not start:
return "?"
secs = int(time.time() - start)
if secs < 60:
return f"{secs}s"
if secs < 3600:
return f"{secs // 60}m"
return f"{secs // 3600}h{(secs % 3600) // 60}m"
def _status_label(rec: Dict[str, Any]) -> str:
status = rec.get("status", "?")
if rec.get("killed"):
return "killed"
if rec.get("timed_out"):
return "timed out"
if rec.get("died"):
return "died"
if status in ("done", "failed"):
return f"{status} (exit {rec.get('exit_code')})"
return status
def _row(rec: Dict[str, Any]) -> str:
cmd = (rec.get("command") or "").strip().splitlines()[0][:80]
return f"[{rec.get('id')}] {_status_label(rec)} | {_age(rec)} | {cmd}"
class ManageBgJobsTool:
async def execute(self, content: str, ctx: dict) -> dict:
from src import bg_jobs
session_id = ctx.get("session_id")
raw = (content or "").strip()
try:
args = json.loads(raw) if raw else {}
except (ValueError, TypeError):
args = {}
if not isinstance(args, dict):
args = {}
action = str(args.get("action", "list")).strip().lower()
job_id = str(args.get("job_id") or args.get("id") or "").strip()
if not session_id:
return {"error": "manage_bg_jobs: no active chat session; background jobs are scoped to a chat.", "exit_code": 1}
if action in _LIST_ACTIONS:
jobs: List[Dict[str, Any]] = bg_jobs.list_for_session(session_id)
if not jobs:
return {"output": "No background jobs in this chat.", "exit_code": 0}
jobs.sort(key=lambda r: r.get("started_at") or 0, reverse=True)
lines = "\n".join(_row(r) for r in jobs)
return {"output": f"{len(jobs)} background job(s):\n{lines}", "exit_code": 0}
if action in _OUTPUT_ACTIONS or action in _KILL_ACTIONS:
if not job_id:
return {"error": f"manage_bg_jobs: action '{action}' requires a job_id (see action='list').", "exit_code": 1}
rec = bg_jobs.get(job_id)
# Scope: only the chat that launched a job may see or control it.
if rec is None or rec.get("session_id") != session_id:
return {"error": f"manage_bg_jobs: no background job '{job_id}' in this chat.", "exit_code": 1}
if action in _KILL_ACTIONS:
if rec.get("status") != "running":
return {"output": f"Job `{job_id}` already {_status_label(rec)}; nothing to kill.", "exit_code": 0}
killed = bg_jobs.kill(job_id)
return {"output": f"Killed background job `{job_id}` ({(killed or {}).get('command', '').splitlines()[0][:80]}).", "exit_code": 0}
out = rec.get("output") or "(no output yet)"
return {
"output": f"Job `{job_id}` [{_status_label(rec)}, {_age(rec)}]\nCommand: {rec.get('command')}\n\nOutput:\n{out}",
"exit_code": 0,
}
return {"error": f"manage_bg_jobs: unknown action '{action}'. Use list, output, or kill.", "exit_code": 1}
+23 -1
View File
@@ -263,10 +263,32 @@ def list_for_session(session_id: str) -> List[Dict[str, Any]]:
return [r for r in refresh().values() if r.get("session_id") == session_id]
def kill(job_id: str) -> Optional[Dict[str, Any]]:
"""Terminate a running job's process tree and mark it killed. Returns the
updated record, or None if the id is unknown. Idempotent: a job that already
finished is returned unchanged. Sets followed_up so the monitor does not also
fire an auto-continue for a job the agent deliberately stopped."""
jobs = _load()
rec = jobs.get(job_id)
if rec is None:
return None
if rec.get("status") == "running":
_kill(rec.get("pid"))
rec["status"] = "failed"
rec["exit_code"] = -1
rec["ended_at"] = time.time()
rec["killed"] = True
rec["followed_up"] = True
_save(jobs)
return rec
def result_text(rec: Dict[str, Any]) -> str:
"""Human/agent-readable summary of a finished job, for the follow-up."""
out = _read_output(rec)
if rec.get("timed_out"):
if rec.get("killed"):
head = "Background job was killed."
elif rec.get("timed_out"):
head = f"Background job timed out after {rec.get('max_runtime_s')}s."
elif rec.get("died"):
head = "Background job process died unexpectedly (no exit code)."
+14 -2
View File
@@ -471,6 +471,8 @@ async def _direct_fallback(
tool: str,
content: str,
progress_cb: Optional[Callable[[Dict], Awaitable[None]]] = None,
session_id: Optional[str] = None,
owner: Optional[str] = None,
) -> Optional[Dict]:
_subproc_env = {
**os.environ,
@@ -484,6 +486,8 @@ async def _direct_fallback(
ctx = {
"progress_cb": progress_cb,
"subproc_env": _subproc_env,
"session_id": session_id,
"owner": owner,
}
from src.agent_tools import TOOL_HANDLERS
@@ -731,10 +735,13 @@ async def _execute_tool_block_impl(
desc = f"bash (background): {short}"
result = {
"output": (
f"Started background job `{rec['id']}`. It is running detached "
f"Started background job `{rec['id']}`. It is running detached; "
f"do NOT wait for it or poll it. You will be automatically re-invoked "
f"with its full output when it finishes. Continue with other work, or "
f"end your turn now and resume when the result arrives."
f"end your turn now and resume when the result arrives. If the user "
f"later asks to check progress or stop it, call the manage_bg_jobs "
f"tool yourself (output or kill); do not tell them to run a tool "
f"command, and do not surface raw tool syntax in your reply."
),
"exit_code": 0,
"bg_job_id": rec["id"],
@@ -755,6 +762,11 @@ async def _execute_tool_block_impl(
desc = f"{tool}: {first_line}"
result = await _direct_fallback(tool, content, progress_cb=progress_cb) \
or {"error": f"{tool}: execution failed", "exit_code": 1}
elif tool == "manage_bg_jobs":
# Inspect/kill detached `bash` jobs; needs session_id to scope to chat.
desc = f"manage_bg_jobs: {content.split(chr(10))[0][:80]}"
result = await _direct_fallback(tool, content, session_id=session_id, owner=owner) \
or {"error": "manage_bg_jobs: execution failed", "exit_code": 1}
elif tool in ("create_document", "update_document", "edit_document",
"suggest_document", "manage_documents"):
desc = f"{tool}: {content.split(chr(10))[0][:80]}"
+7
View File
@@ -135,6 +135,7 @@ BUILTIN_TOOL_DESCRIPTIONS: Dict[str, str] = {
"app_api": "Generic loopback to allowed Odysseus internal endpoints. Use this when the user wants something the UI can do but there's no named tool for it. Covers calendar, gallery, library/documents, memory, notes, tasks, settings, research, compare, cookbook GPUs/state — allowed UI buttons hit /api/* endpoints and you can hit them too. Sensitive auth/user/admin/shell paths and host-control Cookbook mutation routes are blocked; do NOT use app_api for shell commands, package installs, engine rebuilds, or PID signalling. Use named command tooling for shell commands. action='endpoints' with filter=<keyword> lists available endpoints. action='call' takes method+path+body. Hits same routes the UI uses — auth flows free. NOTE: themes are NOT an API endpoint — use the ui_control tool (create_theme / set_theme), not app_api. SESSIONS/CHATS: do NOT use app_api for these — GET /api/sessions returns EMPTY for tool calls (it's owner-filtered and tool calls authenticate as a different identity). EMAIL ACCOUNTS: do NOT use /api/email/accounts via app_api; use list_email_accounts, list_emails, and read_email instead. To list/rename/archive/delete/fork chats use the list_sessions and manage_session tools instead.",
"edit_image": "Edit an image in the gallery: upscale (increase resolution), remove background (rembg), inpaint (fill selected area), or harmonize (blend edits). Specify image ID and action.",
"trigger_research": "Start a deep research job on any topic — appears in the Deep Research sidebar, streams progress, produces a detailed report. Use for 'research X', 'look into Y', 'do deep research on Z', 'investigate'. NOT a scheduled task — it runs now and surfaces in the sidebar.",
"manage_bg_jobs": "Inspect and control detached background `bash` jobs (the ones started with a `#!bg` marker). action='list' shows this chat's jobs (id/status/age/command); action='output' returns a job's captured output so far (check on a long-running job, or re-read a finished one); action='kill' stops a runaway job by id. Use for 'is the background job done', 'check on that job', 'show the build output', 'kill the background job', 'stop the bg task'. output/kill need a job_id from list.",
}
@@ -349,6 +350,12 @@ class ToolIndex:
{"list_email_accounts", "list_emails", "read_email", "send_email", "reply_to_email", "bulk_email", "delete_email", "archive_email", "mark_email_read", "resolve_contact", "ui_control"},
frozenset({"calendar", "event", "meeting", "schedule", "appointment"}):
{"manage_calendar"},
# Detached background `bash` jobs (#!bg): check on / read output / kill.
frozenset({"background job", "background jobs", "bg job", "bg jobs",
"background task", "is the job done", "check the job",
"check on that job", "job output", "kill the job",
"kill the background", "stop the background", "running job"}):
{"manage_bg_jobs"},
frozenset({"note", "todo", "reminder", "remind", "checklist", "remember to"}):
{"manage_notes"},
# Chat/session management. "rename" alone maps to documents below, so a
+3
View File
@@ -175,6 +175,9 @@ _TOOL_NAME_MAP = {
"notes": "manage_notes",
"todo": "manage_notes",
"todos": "manage_notes",
"manage_bg_jobs": "manage_bg_jobs",
"bg_jobs": "manage_bg_jobs",
"background_jobs": "manage_bg_jobs",
}
_MISFENCED_WEB_TOOL_NAMES = {
+15
View File
@@ -1188,6 +1188,21 @@ FUNCTION_TOOL_SCHEMAS = [
}
}
},
{
"type": "function",
"function": {
"name": "manage_bg_jobs",
"description": "Inspect and control detached background `bash` jobs (started with the `#!bg` marker). action='list' shows this chat's jobs with id/status/age/command; action='output' returns a job's captured output so far (use for a still-running job, or to re-read a finished one); action='kill' terminates a runaway job's process tree instead of waiting out its max-runtime. output and kill need job_id from list.",
"parameters": {
"type": "object",
"properties": {
"action": {"type": "string", "enum": ["list", "output", "kill"], "description": "list | output | kill (default: list)"},
"job_id": {"type": "string", "description": "Background job id (required for output/kill; from action='list')"},
},
"required": ["action"]
}
}
},
]
+3
View File
@@ -14,6 +14,7 @@ logger = logging.getLogger(__name__)
NON_ADMIN_BLOCKED_TOOLS = {
"bash",
"python",
"manage_bg_jobs",
"read_file",
"write_file",
"edit_file",
@@ -114,6 +115,8 @@ _PLAN_MODE_KNOWN_MUTATORS = {
# Shell is never read-only-safe; block it explicitly so it stays out of plan
# mode even if the schema list fails to load.
"bash", "python",
# Controls shell processes (kill); plan mode can't run bash anyway.
"manage_bg_jobs",
}
+174
View File
@@ -0,0 +1,174 @@
"""Tests for bg_jobs.kill and the manage_bg_jobs agent tool.
Process-free: the store/dir are redirected to tmp, _pid_alive is forced True so
seeded "running" jobs stay running through refresh(), and _kill is stubbed so no
real signal is sent. Jobs are scoped to a chat (session_id), which is the main
invariant under test.
"""
import asyncio
import json
import time
import pytest
from src import bg_jobs
from src.agent_tools.bg_job_tools import ManageBgJobsTool
@pytest.fixture
def store(tmp_path, monkeypatch):
jobs_dir = tmp_path / "bg_jobs"
jobs_dir.mkdir()
monkeypatch.setattr(bg_jobs, "_STORE", tmp_path / "bg_jobs.json")
monkeypatch.setattr(bg_jobs, "_JOBS_DIR", jobs_dir)
monkeypatch.setattr(bg_jobs, "_pid_alive", lambda pid: True)
killed: list = []
monkeypatch.setattr(bg_jobs, "_kill", lambda pid: killed.append(pid))
return {"dir": jobs_dir, "killed": killed}
def _seed(session_id="sess-a", status="running", job_id="job0001", output="", pid=4321):
rec = {
"id": job_id, "session_id": session_id, "command": "sleep 60",
"status": status, "pid": pid, "started_at": time.time(),
"ended_at": None if status == "running" else time.time(),
"exit_code": None if status == "running" else 0,
"max_runtime_s": 3600, "followed_up": False,
"log_path": str(bg_jobs._JOBS_DIR / f"{job_id}.log"),
"exit_path": str(bg_jobs._JOBS_DIR / f"{job_id}.exit"),
}
if output:
(bg_jobs._JOBS_DIR / f"{job_id}.log").write_text(output, encoding="utf-8")
jobs = bg_jobs._load()
jobs[job_id] = rec
bg_jobs._save(jobs)
return rec
def _run(args, session_id="sess-a"):
return asyncio.run(ManageBgJobsTool().execute(json.dumps(args), {"session_id": session_id, "owner": None}))
# ── bg_jobs.kill ────────────────────────────────────────────────────────────
def test_kill_marks_killed_and_suppresses_followup(store):
_seed(job_id="job0001", pid=4321)
rec = bg_jobs.kill("job0001")
assert rec["status"] == "failed"
assert rec["killed"] is True
assert rec["exit_code"] == -1
# followed_up True so the monitor won't ALSO auto-continue a deliberate kill.
assert rec["followed_up"] is True
assert store["killed"] == [4321]
def test_kill_unknown_job_returns_none(store):
assert bg_jobs.kill("nope") is None
def test_kill_finished_job_is_noop(store):
_seed(job_id="done01", status="done")
rec = bg_jobs.kill("done01")
assert rec["status"] == "done"
assert store["killed"] == [] # no signal sent to an already-finished job
def test_result_text_reports_killed(store):
rec = _seed(job_id="job0001")
bg_jobs.kill("job0001")
assert "killed" in bg_jobs.result_text(bg_jobs.get("job0001")).lower()
# ── manage_bg_jobs tool ─────────────────────────────────────────────────────
def test_no_session_is_rejected(store):
out = asyncio.run(ManageBgJobsTool().execute('{"action":"list"}', {"session_id": None}))
assert "error" in out
def test_list_empty(store):
assert "No background jobs" in _run({"action": "list"})["output"]
def test_list_scoped_to_session(store):
_seed(session_id="sess-a", job_id="aaaa")
_seed(session_id="sess-b", job_id="bbbb")
out = _run({"action": "list"}, session_id="sess-a")["output"]
assert "aaaa" in out and "bbbb" not in out
def test_output_returns_captured_log(store):
_seed(job_id="job0001", output="hello from the job\n")
out = _run({"action": "output", "job_id": "job0001"})["output"]
assert "hello from the job" in out
def test_output_cross_session_denied(store):
_seed(session_id="sess-a", job_id="job0001", output="secret")
out = _run({"action": "output", "job_id": "job0001"}, session_id="sess-b")
assert "error" in out and "secret" not in out.get("error", "")
def test_kill_via_tool(store):
_seed(job_id="job0001", pid=999)
out = _run({"action": "kill", "job_id": "job0001"})
assert "Killed" in out["output"]
assert store["killed"] == [999]
assert bg_jobs.get("job0001")["killed"] is True
def test_kill_cross_session_denied(store):
_seed(session_id="sess-a", job_id="job0001")
out = _run({"action": "kill", "job_id": "job0001"}, session_id="sess-b")
assert "error" in out
assert store["killed"] == [] # never touched another chat's job
def test_kill_requires_job_id(store):
assert "error" in _run({"action": "kill"})
def test_unknown_action(store):
assert "error" in _run({"action": "frobnicate"})
def test_action_aliases(store):
_seed(job_id="job0001", output="aliased")
# 'read' aliases to output, 'jobs' to list, 'stop' to kill
assert "aliased" in _run({"action": "read", "job_id": "job0001"})["output"]
assert "job0001" in _run({"action": "jobs"})["output"]
assert "Killed" in _run({"action": "stop", "job_id": "job0001"})["output"]
# ── intent classifier: short bg-job commands must not be dropped as low-signal ─
# A short imperative ("kill that job") otherwise trips the low-signal gate, which
# skips tool retrieval entirely and never surfaces manage_bg_jobs (the live bug
# this feature hit). These lock in that bg-job control reaches the files domain.
@pytest.mark.parametrize("msg", [
"stop the job",
"kill that job",
"Now kill that background job.",
"is the job done?",
"check the job output",
"list my jobs",
"kill the bg task",
])
def test_bg_job_commands_are_not_low_signal(msg):
from src.agent_loop import _classify_agent_request, _DOMAIN_TOOL_MAP
r = _classify_agent_request([{"role": "user", "content": msg}], msg)
assert r["low_signal"] is False
assert "files" in r["domains"]
# files domain seeds manage_bg_jobs, so it gets offered to the model.
assert "manage_bg_jobs" in _DOMAIN_TOOL_MAP["files"]
@pytest.mark.parametrize("msg", [
"run this in the background", # launching, not managing
"find me a job listing", # unrelated use of "job"
])
def test_non_bg_messages_do_not_trip_files_domain(msg):
from src.agent_loop import _classify_agent_request
r = _classify_agent_request([{"role": "user", "content": msg}], msg)
assert "files" not in r["domains"]