diff --git a/src/agent_loop.py b/src/agent_loop.py index 9b63119bb..219765a18 100644 --- a/src/agent_loop.py +++ b/src/agent_loop.py @@ -281,7 +281,7 @@ _DOMAIN_TOOL_MAP = { "notes_calendar_tasks": {"manage_notes", "manage_calendar", "manage_tasks"}, "ui": {"ui_control"}, "sessions": {"create_session", "list_sessions", "manage_session", "send_to_session", "search_chats"}, - "files": {"bash", "python", "read_file", "write_file", "edit_file", "grep", "glob", "ls", "get_workspace"}, + "files": {"bash", "python", "read_file", "write_file", "edit_file", "grep", "glob", "ls", "get_workspace", "manage_bg_jobs"}, "settings": {"manage_settings", "manage_endpoints", "manage_mcp", "manage_webhooks", "manage_tokens", "app_api"}, "contacts": {"resolve_contact", "manage_contact"}, "integrations": {"api_call"}, @@ -816,6 +816,12 @@ def _classify_agent_request(messages: List[Dict], last_user: str) -> Dict[str, o domains.add("sessions") if has(r"\b(file|folder|directory|repo|git|grep|find in files|read file|edit file|shell|terminal|bash|python)\b"): domains.add("files") + # Managing detached bash jobs: "kill the background job", "stop the job", + # "kill that job", "check the job output", "is the bg job done". + if (has(r"\b(background|bg)\s+(jobs?|task)\b") + or has(r"\b(kill|stop|cancel|terminate|check|tail|show|list)\b.{0,16}\bjobs?\b") + or has(r"\bjobs?\b.{0,16}\b(output|status|done|finished|running)\b")): + domains.add("files") if has(r"\b(endpoint|api token|mcp|webhook|preference|configure|config|setting)\b"): domains.add("settings") if has(r"\b(contact|contacts|phone|phone number|address book|vcard)\b"): diff --git a/src/agent_tools/__init__.py b/src/agent_tools/__init__.py index c2d910627..372765cec 100644 --- a/src/agent_tools/__init__.py +++ b/src/agent_tools/__init__.py @@ -23,6 +23,7 @@ from .web_tools import WebSearchTool, WebFetchTool from .filesystem_tools import ReadFileTool, WriteFileTool, EditFileTool, LsTool, GlobTool, GrepTool, GetWorkspaceTool from .document_tools import CreateDocumentTool, UpdateDocumentTool, EditDocumentTool, SuggestDocumentTool, ManageDocumentTool from .model_interaction_tools import ChatWithModelTool, AskTeacherTool, ListModelsTool +from .bg_job_tools import ManageBgJobsTool TOOL_HANDLERS = { "bash": BashTool().execute, @@ -44,6 +45,7 @@ TOOL_HANDLERS = { "chat_with_model": ChatWithModelTool().execute, "ask_teacher": AskTeacherTool().execute, "list_models": ListModelsTool().execute, + "manage_bg_jobs": ManageBgJobsTool().execute, } # --------------------------------------------------------------------------- @@ -56,7 +58,7 @@ PYTHON_TIMEOUT = 30 # Tool types that trigger execution TOOL_TAGS = {"bash", "python", "web_search", "web_fetch", "read_file", "write_file", "edit_file", - "grep", "glob", "ls", "get_workspace", + "grep", "glob", "ls", "get_workspace", "manage_bg_jobs", "create_document", "update_document", "edit_document", "search_chats", "chat_with_model", "create_session", "list_sessions", diff --git a/src/agent_tools/bg_job_tools.py b/src/agent_tools/bg_job_tools.py new file mode 100644 index 000000000..a29e813cc --- /dev/null +++ b/src/agent_tools/bg_job_tools.py @@ -0,0 +1,98 @@ +"""Agent tool to inspect and control detached background `bash` jobs. + +`bash` blocks prefixed with a `#!bg` marker run detached via `src.bg_jobs`; the +agent is auto-re-invoked with the output when they finish. This tool covers the +gaps in that flow: list the jobs in the current chat, read a still-running job's +output on demand, and kill a runaway job instead of waiting out its max-runtime. + +Registry tool (`TOOL_HANDLERS["manage_bg_jobs"]`). Jobs are scoped to the chat +that launched them, so every action requires the caller's `session_id` and a job +from another session is treated as not found. +""" + +import json +import time +from typing import Any, Dict, List + +_LIST_ACTIONS = {"list", "ls", "jobs"} +_OUTPUT_ACTIONS = {"output", "get", "read", "tail", "status", "show"} +_KILL_ACTIONS = {"kill", "stop", "cancel", "terminate"} + + +def _age(rec: Dict[str, Any]) -> str: + start = rec.get("started_at") + if not start: + return "?" + secs = int(time.time() - start) + if secs < 60: + return f"{secs}s" + if secs < 3600: + return f"{secs // 60}m" + return f"{secs // 3600}h{(secs % 3600) // 60}m" + + +def _status_label(rec: Dict[str, Any]) -> str: + status = rec.get("status", "?") + if rec.get("killed"): + return "killed" + if rec.get("timed_out"): + return "timed out" + if rec.get("died"): + return "died" + if status in ("done", "failed"): + return f"{status} (exit {rec.get('exit_code')})" + return status + + +def _row(rec: Dict[str, Any]) -> str: + cmd = (rec.get("command") or "").strip().splitlines()[0][:80] + return f"[{rec.get('id')}] {_status_label(rec)} | {_age(rec)} | {cmd}" + + +class ManageBgJobsTool: + async def execute(self, content: str, ctx: dict) -> dict: + from src import bg_jobs + + session_id = ctx.get("session_id") + raw = (content or "").strip() + try: + args = json.loads(raw) if raw else {} + except (ValueError, TypeError): + args = {} + if not isinstance(args, dict): + args = {} + action = str(args.get("action", "list")).strip().lower() + job_id = str(args.get("job_id") or args.get("id") or "").strip() + + if not session_id: + return {"error": "manage_bg_jobs: no active chat session; background jobs are scoped to a chat.", "exit_code": 1} + + if action in _LIST_ACTIONS: + jobs: List[Dict[str, Any]] = bg_jobs.list_for_session(session_id) + if not jobs: + return {"output": "No background jobs in this chat.", "exit_code": 0} + jobs.sort(key=lambda r: r.get("started_at") or 0, reverse=True) + lines = "\n".join(_row(r) for r in jobs) + return {"output": f"{len(jobs)} background job(s):\n{lines}", "exit_code": 0} + + if action in _OUTPUT_ACTIONS or action in _KILL_ACTIONS: + if not job_id: + return {"error": f"manage_bg_jobs: action '{action}' requires a job_id (see action='list').", "exit_code": 1} + rec = bg_jobs.get(job_id) + # Scope: only the chat that launched a job may see or control it. + if rec is None or rec.get("session_id") != session_id: + return {"error": f"manage_bg_jobs: no background job '{job_id}' in this chat.", "exit_code": 1} + + if action in _KILL_ACTIONS: + if rec.get("status") != "running": + return {"output": f"Job `{job_id}` already {_status_label(rec)}; nothing to kill.", "exit_code": 0} + killed = bg_jobs.kill(job_id) + return {"output": f"Killed background job `{job_id}` ({(killed or {}).get('command', '').splitlines()[0][:80]}).", "exit_code": 0} + + out = rec.get("output") or "(no output yet)" + return { + "output": f"Job `{job_id}` [{_status_label(rec)}, {_age(rec)}]\nCommand: {rec.get('command')}\n\nOutput:\n{out}", + "exit_code": 0, + } + + return {"error": f"manage_bg_jobs: unknown action '{action}'. Use list, output, or kill.", "exit_code": 1} diff --git a/src/bg_jobs.py b/src/bg_jobs.py index 8e452106b..f864f8ef1 100644 --- a/src/bg_jobs.py +++ b/src/bg_jobs.py @@ -263,10 +263,32 @@ def list_for_session(session_id: str) -> List[Dict[str, Any]]: return [r for r in refresh().values() if r.get("session_id") == session_id] +def kill(job_id: str) -> Optional[Dict[str, Any]]: + """Terminate a running job's process tree and mark it killed. Returns the + updated record, or None if the id is unknown. Idempotent: a job that already + finished is returned unchanged. Sets followed_up so the monitor does not also + fire an auto-continue for a job the agent deliberately stopped.""" + jobs = _load() + rec = jobs.get(job_id) + if rec is None: + return None + if rec.get("status") == "running": + _kill(rec.get("pid")) + rec["status"] = "failed" + rec["exit_code"] = -1 + rec["ended_at"] = time.time() + rec["killed"] = True + rec["followed_up"] = True + _save(jobs) + return rec + + def result_text(rec: Dict[str, Any]) -> str: """Human/agent-readable summary of a finished job, for the follow-up.""" out = _read_output(rec) - if rec.get("timed_out"): + if rec.get("killed"): + head = "Background job was killed." + elif rec.get("timed_out"): head = f"Background job timed out after {rec.get('max_runtime_s')}s." elif rec.get("died"): head = "Background job process died unexpectedly (no exit code)." diff --git a/src/tool_execution.py b/src/tool_execution.py index 05022bdba..c13910e3a 100644 --- a/src/tool_execution.py +++ b/src/tool_execution.py @@ -471,6 +471,8 @@ async def _direct_fallback( tool: str, content: str, progress_cb: Optional[Callable[[Dict], Awaitable[None]]] = None, + session_id: Optional[str] = None, + owner: Optional[str] = None, ) -> Optional[Dict]: _subproc_env = { **os.environ, @@ -484,6 +486,8 @@ async def _direct_fallback( ctx = { "progress_cb": progress_cb, "subproc_env": _subproc_env, + "session_id": session_id, + "owner": owner, } from src.agent_tools import TOOL_HANDLERS @@ -731,10 +735,13 @@ async def _execute_tool_block_impl( desc = f"bash (background): {short}" result = { "output": ( - f"Started background job `{rec['id']}`. It is running detached — " + f"Started background job `{rec['id']}`. It is running detached; " f"do NOT wait for it or poll it. You will be automatically re-invoked " f"with its full output when it finishes. Continue with other work, or " - f"end your turn now and resume when the result arrives." + f"end your turn now and resume when the result arrives. If the user " + f"later asks to check progress or stop it, call the manage_bg_jobs " + f"tool yourself (output or kill); do not tell them to run a tool " + f"command, and do not surface raw tool syntax in your reply." ), "exit_code": 0, "bg_job_id": rec["id"], @@ -755,6 +762,11 @@ async def _execute_tool_block_impl( desc = f"{tool}: {first_line}" result = await _direct_fallback(tool, content, progress_cb=progress_cb) \ or {"error": f"{tool}: execution failed", "exit_code": 1} + elif tool == "manage_bg_jobs": + # Inspect/kill detached `bash` jobs; needs session_id to scope to chat. + desc = f"manage_bg_jobs: {content.split(chr(10))[0][:80]}" + result = await _direct_fallback(tool, content, session_id=session_id, owner=owner) \ + or {"error": "manage_bg_jobs: execution failed", "exit_code": 1} elif tool in ("create_document", "update_document", "edit_document", "suggest_document", "manage_documents"): desc = f"{tool}: {content.split(chr(10))[0][:80]}" diff --git a/src/tool_index.py b/src/tool_index.py index 291c0984e..64640bcef 100644 --- a/src/tool_index.py +++ b/src/tool_index.py @@ -135,6 +135,7 @@ BUILTIN_TOOL_DESCRIPTIONS: Dict[str, str] = { "app_api": "Generic loopback to allowed Odysseus internal endpoints. Use this when the user wants something the UI can do but there's no named tool for it. Covers calendar, gallery, library/documents, memory, notes, tasks, settings, research, compare, cookbook GPUs/state — allowed UI buttons hit /api/* endpoints and you can hit them too. Sensitive auth/user/admin/shell paths and host-control Cookbook mutation routes are blocked; do NOT use app_api for shell commands, package installs, engine rebuilds, or PID signalling. Use named command tooling for shell commands. action='endpoints' with filter= lists available endpoints. action='call' takes method+path+body. Hits same routes the UI uses — auth flows free. NOTE: themes are NOT an API endpoint — use the ui_control tool (create_theme / set_theme), not app_api. SESSIONS/CHATS: do NOT use app_api for these — GET /api/sessions returns EMPTY for tool calls (it's owner-filtered and tool calls authenticate as a different identity). EMAIL ACCOUNTS: do NOT use /api/email/accounts via app_api; use list_email_accounts, list_emails, and read_email instead. To list/rename/archive/delete/fork chats use the list_sessions and manage_session tools instead.", "edit_image": "Edit an image in the gallery: upscale (increase resolution), remove background (rembg), inpaint (fill selected area), or harmonize (blend edits). Specify image ID and action.", "trigger_research": "Start a deep research job on any topic — appears in the Deep Research sidebar, streams progress, produces a detailed report. Use for 'research X', 'look into Y', 'do deep research on Z', 'investigate'. NOT a scheduled task — it runs now and surfaces in the sidebar.", + "manage_bg_jobs": "Inspect and control detached background `bash` jobs (the ones started with a `#!bg` marker). action='list' shows this chat's jobs (id/status/age/command); action='output' returns a job's captured output so far (check on a long-running job, or re-read a finished one); action='kill' stops a runaway job by id. Use for 'is the background job done', 'check on that job', 'show the build output', 'kill the background job', 'stop the bg task'. output/kill need a job_id from list.", } @@ -349,6 +350,12 @@ class ToolIndex: {"list_email_accounts", "list_emails", "read_email", "send_email", "reply_to_email", "bulk_email", "delete_email", "archive_email", "mark_email_read", "resolve_contact", "ui_control"}, frozenset({"calendar", "event", "meeting", "schedule", "appointment"}): {"manage_calendar"}, + # Detached background `bash` jobs (#!bg): check on / read output / kill. + frozenset({"background job", "background jobs", "bg job", "bg jobs", + "background task", "is the job done", "check the job", + "check on that job", "job output", "kill the job", + "kill the background", "stop the background", "running job"}): + {"manage_bg_jobs"}, frozenset({"note", "todo", "reminder", "remind", "checklist", "remember to"}): {"manage_notes"}, # Chat/session management. "rename" alone maps to documents below, so a diff --git a/src/tool_parsing.py b/src/tool_parsing.py index 97d3f3477..c9548cce9 100644 --- a/src/tool_parsing.py +++ b/src/tool_parsing.py @@ -175,6 +175,9 @@ _TOOL_NAME_MAP = { "notes": "manage_notes", "todo": "manage_notes", "todos": "manage_notes", + "manage_bg_jobs": "manage_bg_jobs", + "bg_jobs": "manage_bg_jobs", + "background_jobs": "manage_bg_jobs", } _MISFENCED_WEB_TOOL_NAMES = { diff --git a/src/tool_schemas.py b/src/tool_schemas.py index 4e233317b..1d64b5db6 100644 --- a/src/tool_schemas.py +++ b/src/tool_schemas.py @@ -1188,6 +1188,21 @@ FUNCTION_TOOL_SCHEMAS = [ } } }, + { + "type": "function", + "function": { + "name": "manage_bg_jobs", + "description": "Inspect and control detached background `bash` jobs (started with the `#!bg` marker). action='list' shows this chat's jobs with id/status/age/command; action='output' returns a job's captured output so far (use for a still-running job, or to re-read a finished one); action='kill' terminates a runaway job's process tree instead of waiting out its max-runtime. output and kill need job_id from list.", + "parameters": { + "type": "object", + "properties": { + "action": {"type": "string", "enum": ["list", "output", "kill"], "description": "list | output | kill (default: list)"}, + "job_id": {"type": "string", "description": "Background job id (required for output/kill; from action='list')"}, + }, + "required": ["action"] + } + } + }, ] diff --git a/src/tool_security.py b/src/tool_security.py index 3dc53ff26..2a7dca3c0 100644 --- a/src/tool_security.py +++ b/src/tool_security.py @@ -14,6 +14,7 @@ logger = logging.getLogger(__name__) NON_ADMIN_BLOCKED_TOOLS = { "bash", "python", + "manage_bg_jobs", "read_file", "write_file", "edit_file", @@ -114,6 +115,8 @@ _PLAN_MODE_KNOWN_MUTATORS = { # Shell is never read-only-safe; block it explicitly so it stays out of plan # mode even if the schema list fails to load. "bash", "python", + # Controls shell processes (kill); plan mode can't run bash anyway. + "manage_bg_jobs", } diff --git a/tests/test_bg_job_tools.py b/tests/test_bg_job_tools.py new file mode 100644 index 000000000..a21fde88f --- /dev/null +++ b/tests/test_bg_job_tools.py @@ -0,0 +1,174 @@ +"""Tests for bg_jobs.kill and the manage_bg_jobs agent tool. + +Process-free: the store/dir are redirected to tmp, _pid_alive is forced True so +seeded "running" jobs stay running through refresh(), and _kill is stubbed so no +real signal is sent. Jobs are scoped to a chat (session_id), which is the main +invariant under test. +""" +import asyncio +import json +import time + +import pytest + +from src import bg_jobs +from src.agent_tools.bg_job_tools import ManageBgJobsTool + + +@pytest.fixture +def store(tmp_path, monkeypatch): + jobs_dir = tmp_path / "bg_jobs" + jobs_dir.mkdir() + monkeypatch.setattr(bg_jobs, "_STORE", tmp_path / "bg_jobs.json") + monkeypatch.setattr(bg_jobs, "_JOBS_DIR", jobs_dir) + monkeypatch.setattr(bg_jobs, "_pid_alive", lambda pid: True) + killed: list = [] + monkeypatch.setattr(bg_jobs, "_kill", lambda pid: killed.append(pid)) + return {"dir": jobs_dir, "killed": killed} + + +def _seed(session_id="sess-a", status="running", job_id="job0001", output="", pid=4321): + rec = { + "id": job_id, "session_id": session_id, "command": "sleep 60", + "status": status, "pid": pid, "started_at": time.time(), + "ended_at": None if status == "running" else time.time(), + "exit_code": None if status == "running" else 0, + "max_runtime_s": 3600, "followed_up": False, + "log_path": str(bg_jobs._JOBS_DIR / f"{job_id}.log"), + "exit_path": str(bg_jobs._JOBS_DIR / f"{job_id}.exit"), + } + if output: + (bg_jobs._JOBS_DIR / f"{job_id}.log").write_text(output, encoding="utf-8") + jobs = bg_jobs._load() + jobs[job_id] = rec + bg_jobs._save(jobs) + return rec + + +def _run(args, session_id="sess-a"): + return asyncio.run(ManageBgJobsTool().execute(json.dumps(args), {"session_id": session_id, "owner": None})) + + +# ── bg_jobs.kill ──────────────────────────────────────────────────────────── + +def test_kill_marks_killed_and_suppresses_followup(store): + _seed(job_id="job0001", pid=4321) + rec = bg_jobs.kill("job0001") + assert rec["status"] == "failed" + assert rec["killed"] is True + assert rec["exit_code"] == -1 + # followed_up True so the monitor won't ALSO auto-continue a deliberate kill. + assert rec["followed_up"] is True + assert store["killed"] == [4321] + + +def test_kill_unknown_job_returns_none(store): + assert bg_jobs.kill("nope") is None + + +def test_kill_finished_job_is_noop(store): + _seed(job_id="done01", status="done") + rec = bg_jobs.kill("done01") + assert rec["status"] == "done" + assert store["killed"] == [] # no signal sent to an already-finished job + + +def test_result_text_reports_killed(store): + rec = _seed(job_id="job0001") + bg_jobs.kill("job0001") + assert "killed" in bg_jobs.result_text(bg_jobs.get("job0001")).lower() + + +# ── manage_bg_jobs tool ───────────────────────────────────────────────────── + +def test_no_session_is_rejected(store): + out = asyncio.run(ManageBgJobsTool().execute('{"action":"list"}', {"session_id": None})) + assert "error" in out + + +def test_list_empty(store): + assert "No background jobs" in _run({"action": "list"})["output"] + + +def test_list_scoped_to_session(store): + _seed(session_id="sess-a", job_id="aaaa") + _seed(session_id="sess-b", job_id="bbbb") + out = _run({"action": "list"}, session_id="sess-a")["output"] + assert "aaaa" in out and "bbbb" not in out + + +def test_output_returns_captured_log(store): + _seed(job_id="job0001", output="hello from the job\n") + out = _run({"action": "output", "job_id": "job0001"})["output"] + assert "hello from the job" in out + + +def test_output_cross_session_denied(store): + _seed(session_id="sess-a", job_id="job0001", output="secret") + out = _run({"action": "output", "job_id": "job0001"}, session_id="sess-b") + assert "error" in out and "secret" not in out.get("error", "") + + +def test_kill_via_tool(store): + _seed(job_id="job0001", pid=999) + out = _run({"action": "kill", "job_id": "job0001"}) + assert "Killed" in out["output"] + assert store["killed"] == [999] + assert bg_jobs.get("job0001")["killed"] is True + + +def test_kill_cross_session_denied(store): + _seed(session_id="sess-a", job_id="job0001") + out = _run({"action": "kill", "job_id": "job0001"}, session_id="sess-b") + assert "error" in out + assert store["killed"] == [] # never touched another chat's job + + +def test_kill_requires_job_id(store): + assert "error" in _run({"action": "kill"}) + + +def test_unknown_action(store): + assert "error" in _run({"action": "frobnicate"}) + + +def test_action_aliases(store): + _seed(job_id="job0001", output="aliased") + # 'read' aliases to output, 'jobs' to list, 'stop' to kill + assert "aliased" in _run({"action": "read", "job_id": "job0001"})["output"] + assert "job0001" in _run({"action": "jobs"})["output"] + assert "Killed" in _run({"action": "stop", "job_id": "job0001"})["output"] + + +# ── intent classifier: short bg-job commands must not be dropped as low-signal ─ +# A short imperative ("kill that job") otherwise trips the low-signal gate, which +# skips tool retrieval entirely and never surfaces manage_bg_jobs (the live bug +# this feature hit). These lock in that bg-job control reaches the files domain. + + +@pytest.mark.parametrize("msg", [ + "stop the job", + "kill that job", + "Now kill that background job.", + "is the job done?", + "check the job output", + "list my jobs", + "kill the bg task", +]) +def test_bg_job_commands_are_not_low_signal(msg): + from src.agent_loop import _classify_agent_request, _DOMAIN_TOOL_MAP + r = _classify_agent_request([{"role": "user", "content": msg}], msg) + assert r["low_signal"] is False + assert "files" in r["domains"] + # files domain seeds manage_bg_jobs, so it gets offered to the model. + assert "manage_bg_jobs" in _DOMAIN_TOOL_MAP["files"] + + +@pytest.mark.parametrize("msg", [ + "run this in the background", # launching, not managing + "find me a job listing", # unrelated use of "job" +]) +def test_non_bg_messages_do_not_trip_files_domain(msg): + from src.agent_loop import _classify_agent_request + r = _classify_agent_request([{"role": "user", "content": msg}], msg) + assert "files" not in r["domains"]