"""Shell routes — user-facing command execution endpoint.""" import asyncio import json import logging import os import shlex import shutil import uuid import tempfile from pathlib import Path from typing import Dict, Any try: import fcntl import pty except ImportError as exc: fcntl = None pty = None _PTY_IMPORT_ERROR = exc else: _PTY_IMPORT_ERROR = None from fastapi import APIRouter, Request, HTTPException from fastapi.responses import StreamingResponse from pydantic import BaseModel def _require_admin(request: Request): """Reject non-admin callers. Shell exec is admin-only — never expose to regular users; that's RCE-after-signup.""" auth_manager = getattr(request.app.state, "auth_manager", None) if not auth_manager: # No auth at all — only safe in fully-trusted localhost dev mode return user = getattr(request.state, "current_user", None) # In-process tool loopback. The AuthMiddleware already validated the # internal token + loopback client before setting this marker, so # honour it here as admin-equivalent. if user == "internal-tool": return if not user or user == "api": raise HTTPException(403, "Admin only") if not auth_manager.is_admin(user): raise HTTPException(403, "Admin only") logger = logging.getLogger(__name__) PTY_SUPPORTED = pty is not None and fcntl is not None and hasattr(os, "setsid") def _find_line_break(buf): """Find next line terminator in buffer. Returns (index, separator_length) or (-1, 0).""" ni = buf.find(b"\n") ri = buf.find(b"\r") if ni == -1 and ri == -1: return -1, 0 if ni == -1: return ri, 1 if ri == -1: return ni, 1 if ri < ni: return ri, (2 if ri + 1 == ni else 1) return ni, 1 EXEC_TIMEOUT = 30 # seconds — shorter than agent's 60s STREAM_TIMEOUT = 120 # default for short commands MAX_OUTPUT = 200_000 # truncate limit TMUX_LOG_DIR = Path(tempfile.gettempdir()) / "odysseus-tmux" PTY_UNSUPPORTED_ERROR = "pty_unsupported" class ShellExecRequest(BaseModel): command: str timeout: int | None = None # optional override; 0 = no timeout (run until client disconnects) use_pty: bool = False # use pseudo-TTY (for progress bars) use_tmux: bool = False # run in tmux session (survives browser disconnect) async def _exec_shell(command: str, timeout: int = EXEC_TIMEOUT) -> Dict[str, Any]: """Run a shell command and return stdout/stderr/exit_code.""" proc = None try: proc = await asyncio.create_subprocess_shell( command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, cwd=str(Path.home()), ) stdout_b, stderr_b = await asyncio.wait_for( proc.communicate(), timeout=timeout ) stdout = stdout_b.decode(errors="replace")[:MAX_OUTPUT] stderr = stderr_b.decode(errors="replace")[:MAX_OUTPUT] return {"stdout": stdout, "stderr": stderr, "exit_code": proc.returncode} except asyncio.TimeoutError: if proc: try: proc.kill() await proc.wait() except ProcessLookupError: pass return {"stdout": "", "stderr": f"Command timed out after {timeout}s", "exit_code": -1} except Exception as e: return {"stdout": "", "stderr": str(e), "exit_code": -1} async def _generate_pty(cmd: str, timeout: int, request: Request): """Run command in a pseudo-TTY so tqdm/progress bars work natively.""" if not PTY_SUPPORTED: msg = "PTY streaming is not supported on this platform" if _PTY_IMPORT_ERROR: msg += f": {_PTY_IMPORT_ERROR}" yield f"data: {json.dumps({'stream': 'stderr', 'data': msg, 'error': PTY_UNSUPPORTED_ERROR})}\n\n" yield f"data: {json.dumps({'exit_code': -1, 'error': PTY_UNSUPPORTED_ERROR})}\n\n" return loop = asyncio.get_event_loop() master_fd, slave_fd = pty.openpty() # Set master to non-blocking flags = fcntl.fcntl(master_fd, fcntl.F_GETFL) fcntl.fcntl(master_fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) proc = await asyncio.create_subprocess_shell( cmd, stdin=slave_fd, stdout=slave_fd, stderr=slave_fd, cwd=str(Path.home()), preexec_fn=os.setsid, ) os.close(slave_fd) # parent doesn't need the slave side deadline = (loop.time() + timeout) if timeout else None buf = b"" process_done = asyncio.Event() async def _wait_proc(): await proc.wait() process_done.set() wait_task = asyncio.create_task(_wait_proc()) try: while not process_done.is_set(): if deadline and loop.time() > deadline: proc.kill() await proc.wait() yield f"data: {json.dumps({'stream': 'stderr', 'data': f'Command timed out after {timeout}s'})}\n\n" yield f"data: {json.dumps({'exit_code': -1})}\n\n" return # Check client disconnect if await request.is_disconnected(): proc.kill() await proc.wait() return # Read available data from PTY try: chunk = await asyncio.wait_for( loop.run_in_executor(None, _pty_read, master_fd), timeout=2.0, ) except asyncio.TimeoutError: continue except OSError: break if chunk is None: # No data yet, keep waiting continue if chunk == b"": # EOF — process closed the PTY break buf += chunk # Split on \r or \n while True: idx, sep_len = _find_line_break(buf) if idx == -1: break line = buf[:idx].decode(errors="replace") buf = buf[idx + sep_len:] if line: yield f"data: {json.dumps({'stream': 'stdout', 'data': line})}\n\n" # Drain any remaining PTY output after process exits try: while True: rest = _pty_read(master_fd) if rest is None or rest == b"": break buf += rest except OSError: pass # Flush remaining buffer if buf: # Split remaining buffer same as above while True: idx, sep_len = _find_line_break(buf) if idx == -1: break line = buf[:idx].decode(errors="replace") buf = buf[idx + sep_len:] if line: yield f"data: {json.dumps({'stream': 'stdout', 'data': line})}\n\n" if buf: text = buf.decode(errors="replace").strip() if text: yield f"data: {json.dumps({'stream': 'stdout', 'data': text})}\n\n" await wait_task yield f"data: {json.dumps({'exit_code': proc.returncode})}\n\n" except Exception as e: try: proc.kill() await proc.wait() except ProcessLookupError: pass yield f"data: {json.dumps({'stream': 'stderr', 'data': str(e)})}\n\n" yield f"data: {json.dumps({'exit_code': -1})}\n\n" finally: wait_task.cancel() try: os.close(master_fd) except OSError: pass def _pty_read(fd: int) -> bytes | None: """Blocking read from PTY fd. Called via run_in_executor. Returns bytes on data, None on timeout (no data yet).""" import select r, _, _ = select.select([fd], [], [], 1.0) if r: try: data = os.read(fd, 4096) return data if data else b"" # empty = EOF except OSError: return b"" # fd closed = EOF return None # timeout, no data yet async def _generate_tmux(cmd: str, request: Request): """Run command in a tmux session. Streams output via a log file. The tmux session survives browser disconnect — user can reconnect or `tmux attach -t ` to see it live.""" TMUX_LOG_DIR.mkdir(parents=True, exist_ok=True) session_id = f"cookbook-{uuid.uuid4().hex[:8]}" log_path = TMUX_LOG_DIR / f"{session_id}.log" # Write a wrapper script that runs the command, tees output, and records exit code. # Using a script avoids shell quoting issues with the tmux command. script_path = TMUX_LOG_DIR / f"{session_id}.sh" script_path.write_text( f"#!/bin/bash\n" f"ODYSSEUS_USER_SHELL=\"${{SHELL:-}}\"\n" f"if [ -n \"$ODYSSEUS_USER_SHELL\" ] && [ -x \"$ODYSSEUS_USER_SHELL\" ]; then\n" f" ODYSSEUS_USER_PATH=\"$(\"$ODYSSEUS_USER_SHELL\" -ic 'printf \"__ODYSSEUS_PATH__%s\\n\" \"$PATH\"' 2>/dev/null | sed -n 's/^__ODYSSEUS_PATH__//p' | tail -n 1 || true)\"\n" f" if [ -n \"$ODYSSEUS_USER_PATH\" ]; then export PATH=\"$ODYSSEUS_USER_PATH:$PATH\"; fi\n" f"fi\n" f"{cmd} 2>&1 | tee '{log_path}'\n" f"EC=${{PIPESTATUS[0]}}\n" f"echo ':::EXIT_CODE:::'$EC >> '{log_path}'\n" f"rm -f '{script_path}'\n" f"exit $EC\n" ) script_path.chmod(0o755) logger.info("tmux wrapper script created: session=%s path=%s", session_id, script_path) tmux_cmd = f"tmux new-session -d -s {session_id} {shlex.quote(str(script_path))}" proc = await asyncio.create_subprocess_shell( tmux_cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) await proc.wait() if proc.returncode != 0: stderr = (await proc.stderr.read()).decode(errors="replace") yield f"data: {json.dumps({'stream': 'stderr', 'data': f'Failed to start tmux: {stderr}'})}\n\n" yield f"data: {json.dumps({'exit_code': -1})}\n\n" return yield f"data: {json.dumps({'stream': 'stdout', 'data': f'Started tmux session: {session_id}'})}\n\n" # Tail the log file, streaming new lines as SSE lines_sent = 0 exit_code = None while True: # Check client disconnect if await request.is_disconnected(): # tmux keeps running — that's the whole point yield f"data: {json.dumps({'stream': 'stdout', 'data': f'Disconnected. tmux session {session_id} continues in background.'})}\n\n" return # Read new lines from log try: if log_path.exists(): lines = log_path.read_text(errors="replace").splitlines() new_lines = lines[lines_sent:] for line in new_lines: if line.startswith(":::EXIT_CODE:::"): try: exit_code = int(line.split(":::")[-1]) except ValueError: exit_code = -1 else: yield f"data: {json.dumps({'stream': 'stdout', 'data': line})}\n\n" lines_sent = len(lines) except Exception as e: logger.debug(f"tmux log read error: {e}") if exit_code is not None: break # Check if tmux session is still alive check = await asyncio.create_subprocess_shell( f"tmux has-session -t {session_id} 2>/dev/null", stdout=asyncio.subprocess.DEVNULL, stderr=asyncio.subprocess.DEVNULL, ) await check.wait() if check.returncode != 0: # Session ended — do one final read await asyncio.sleep(0.5) if log_path.exists(): lines = log_path.read_text(errors="replace").splitlines() for line in lines[lines_sent:]: if line.startswith(":::EXIT_CODE:::"): try: exit_code = int(line.split(":::")[-1]) except ValueError: exit_code = -1 else: yield f"data: {json.dumps({'stream': 'stdout', 'data': line})}\n\n" if exit_code is None: exit_code = 0 break await asyncio.sleep(1.0) yield f"data: {json.dumps({'exit_code': exit_code})}\n\n" # Clean up log file try: log_path.unlink(missing_ok=True) except Exception: pass def setup_shell_routes() -> APIRouter: router = APIRouter(tags=["shell"]) @router.post("/api/shell/exec") async def shell_exec(request: Request, req: ShellExecRequest) -> Dict[str, Any]: """Execute a shell command and return output. Admin only.""" _require_admin(request) cmd = req.command.strip() if not cmd: return {"stdout": "", "stderr": "No command provided", "exit_code": 1} logger.info("User shell exec requested: length=%d", len(cmd)) result = await _exec_shell(cmd, timeout=EXEC_TIMEOUT) return result @router.post("/api/shell/stream") async def shell_stream(request: Request, req: ShellExecRequest): """Execute a shell command and stream output line-by-line via SSE. Admin only.""" _require_admin(request) cmd = req.command.strip() if not cmd: async def empty(): yield f"data: {json.dumps({'stream': 'stderr', 'data': 'No command provided'})}\n\n" yield f"data: {json.dumps({'exit_code': 1})}\n\n" return StreamingResponse(empty(), media_type="text/event-stream") timeout = req.timeout if req.timeout is not None else STREAM_TIMEOUT use_pty = req.use_pty use_tmux = req.use_tmux logger.info( "User shell stream requested: timeout=%s pty=%s tmux=%s length=%d", "none" if timeout == 0 else f"{timeout}s", use_pty, use_tmux, len(cmd), ) if use_tmux: return StreamingResponse( _generate_tmux(cmd, request), media_type="text/event-stream", ) if use_pty: return StreamingResponse( _generate_pty(cmd, timeout, request), media_type="text/event-stream", ) async def generate(): proc = None reader_tasks = [] try: proc = await asyncio.create_subprocess_shell( cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, cwd=str(Path.home()), ) q: asyncio.Queue = asyncio.Queue() async def _reader(stream, name): """Read chunks, split on \\n or \\r for progress bar support.""" try: buf = b"" while True: chunk = await stream.read(4096) if not chunk: if buf: await q.put((name, buf.decode(errors="replace").rstrip("\r\n"))) break buf += chunk while True: idx, sep_len = _find_line_break(buf) if idx == -1: break line = buf[:idx].decode(errors="replace") buf = buf[idx + sep_len:] if line: await q.put((name, line)) finally: await q.put((name, None)) reader_tasks = [ asyncio.create_task(_reader(proc.stdout, "stdout")), asyncio.create_task(_reader(proc.stderr, "stderr")), ] finished = 0 deadline = (asyncio.get_event_loop().time() + timeout) if timeout else None while finished < 2: if deadline: remaining = deadline - asyncio.get_event_loop().time() if remaining <= 0: raise asyncio.TimeoutError() wait = min(remaining, 2.0) else: wait = 2.0 try: name, text = await asyncio.wait_for(q.get(), timeout=wait) except asyncio.TimeoutError: if await request.is_disconnected(): if proc: proc.kill() return continue if text is None: finished += 1 continue yield f"data: {json.dumps({'stream': name, 'data': text})}\n\n" await proc.wait() yield f"data: {json.dumps({'exit_code': proc.returncode})}\n\n" except asyncio.TimeoutError: if proc: try: proc.kill() await proc.wait() except ProcessLookupError: pass yield f"data: {json.dumps({'stream': 'stderr', 'data': f'Command timed out after {timeout}s'})}\n\n" yield f"data: {json.dumps({'exit_code': -1})}\n\n" except Exception as e: yield f"data: {json.dumps({'stream': 'stderr', 'data': str(e)})}\n\n" yield f"data: {json.dumps({'exit_code': -1})}\n\n" finally: for t in reader_tasks: t.cancel() return StreamingResponse(generate(), media_type="text/event-stream") @router.get("/api/cookbook/packages") async def list_packages(request: Request, host: str | None = None, ssh_port: str | None = None, venv: str | None = None): """Check which optional packages are installed. Local-target packages are checked in-process. Remote-target packages (vllm, sglang, llama_cpp, diffusers, hf_transfer) are checked on the SELECTED server over SSH, inside its venv — otherwise installing on a remote box never reflected because the check only ever looked at the local host. """ _require_admin(request) import importlib, shlex, json as _json port_arg = "" if ssh_port and str(ssh_port).strip() not in ("", "22"): _port = str(ssh_port).strip() if not _port.isdigit(): raise HTTPException(400, "Invalid ssh_port") port_arg = f"-p {int(_port)} " packages = [ # ── System ── OS binaries, not pip packages {"name": "tmux", "pip": "", "desc": "Required for Linux/Termux Cookbook background downloads and serves", "category": "System", "target": "remote", "kind": "system", "install_hint": "Run Cookbook server setup, or install tmux with apt/pacman/dnf/apk/zypper."}, {"name": "docker", "pip": "", "desc": "Required only for Docker-backed launch commands", "category": "System", "target": "remote", "kind": "system", "install_hint": "Install Docker on the selected server and allow this user to run docker."}, # ── LLM ── installs on GPU servers for model serving/downloading {"name": "hf_transfer", "pip": "hf_transfer", "desc": "Fast model downloads from HuggingFace", "category": "LLM", "target": "remote"}, {"name": "llama_cpp", "pip": "llama-cpp-python[server]", "desc": "Serve GGUF models via llama.cpp", "category": "LLM", "target": "remote"}, {"name": "sglang", "pip": "sglang[all]", "desc": "Serve HF safetensors models via SGLang", "category": "LLM", "target": "remote"}, {"name": "vllm", "pip": "vllm", "desc": "High-throughput LLM serving engine", "category": "LLM", "target": "remote"}, # ── Image ── editor + diffusion model serving {"name": "diffusers", "pip": "diffusers[torch]", "desc": "Image generation pipelines (SD, Flux) with PyTorch", "category": "Image", "target": "remote"}, {"name": "rembg", "pip": "rembg[gpu]", "desc": "AI background removal for image editor", "category": "Image", "target": "local"}, {"name": "realesrgan", "pip": "realesrgan", "desc": "AI denoise + upscale (Real-ESRGAN). Used by editor's Denoise and Upscale tools.", "category": "Image", "target": "local"}, # ── Tools ── {"name": "playwright", "pip": "playwright", "desc": "Browser automation for web tools", "category": "Tools", "target": "local"}, ] # Remote check: for remote-target packages, probe the selected server's # venv over SSH so a remote `pip install` actually reflects here. remote_status: dict = {} remote_names = [p["name"] for p in packages if p.get("target") == "remote" and p.get("kind") != "system"] remote_system_names = [p["name"] for p in packages if p.get("target") == "remote" and p.get("kind") == "system"] if host and remote_names: try: names_lit = ",".join(repr(n) for n in remote_names) py = ( "import importlib.util,json,shutil;" f"names=[{names_lit}];" "status={n:(importlib.util.find_spec(n) is not None) for n in names};" "status['llama_cpp']=status.get('llama_cpp',False) or shutil.which('llama-server') is not None;" "print(json.dumps(status))" ) src = "" if venv: act = venv if venv.endswith("/bin/activate") else venv.rstrip("/") + "/bin/activate" # NOT shlex.quoted: a leading ~ must stay shell-expandable on # the remote (quoting it breaks `~/venv` → activation fails → # the && short-circuits and every package reads as missing). src = f". {act} && " inner = f"{src}python3 -c {shlex.quote(py)}" ssh_cmd = ( f"ssh -o ConnectTimeout=6 -o StrictHostKeyChecking=no {port_arg}" f"{shlex.quote(host)} {shlex.quote(inner)}" ) proc = await asyncio.create_subprocess_shell( ssh_cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) out, _err = await asyncio.wait_for(proc.communicate(), timeout=12) txt = out.decode("utf-8", errors="replace").strip() # The activate script can emit noise — take the last JSON line. for line in reversed(txt.splitlines()): line = line.strip() if line.startswith("{"): remote_status = _json.loads(line) break except Exception: remote_status = {} if host and remote_system_names: try: checks = [] for name in remote_system_names: qn = shlex.quote(name) checks.append(f"if command -v {qn} >/dev/null 2>&1; then echo {qn}=1; else echo {qn}=0; fi") inner = " ; ".join(checks) ssh_cmd = ( f"ssh -o ConnectTimeout=6 -o StrictHostKeyChecking=no {port_arg}" f"{shlex.quote(host)} {shlex.quote(inner)}" ) proc = await asyncio.create_subprocess_shell( ssh_cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) out, _err = await asyncio.wait_for(proc.communicate(), timeout=12) txt = out.decode("utf-8", errors="replace").strip() for line in txt.splitlines(): name, sep, value = line.strip().partition("=") if sep and name in remote_system_names: remote_status[name] = value == "1" except Exception: pass for pkg in packages: if host and pkg.get("target") == "remote": pkg["installed"] = bool(remote_status.get(pkg["name"], False)) continue if pkg.get("kind") == "system": pkg["installed"] = shutil.which(pkg["name"]) is not None continue try: if pkg["name"] == "llama_cpp" and shutil.which("llama-server"): pkg["installed"] = True continue importlib.import_module(pkg["name"]) pkg["installed"] = True except ImportError: pkg["installed"] = False return {"packages": packages} @router.post("/api/cookbook/packages/install") async def install_package(request: Request): """Install a package via pip. Admin only — pip install is effectively code exec.""" _require_admin(request) import sys as _sys body = await request.json() pip_name = body.get("pip") if not pip_name: return {"ok": False, "error": "No package specified"} # Validate against known packages to prevent arbitrary pip install known = { "rembg[gpu]", "hf_transfer", "llama-cpp-python[server]", "sglang[all]", "diffusers", "diffusers[torch]", "TTS", "bark", "faster-whisper", "playwright", "realesrgan", "gfpgan", "insightface", "onnxruntime-gpu", "onnxruntime", "hdbscan", } if pip_name not in known: return {"ok": False, "error": f"Unknown package: {pip_name}"} cmd = [_sys.executable, "-m", "pip", "install", pip_name] proc = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await proc.communicate() if proc.returncode == 0: return {"ok": True, "output": stdout.decode()[-200:]} return {"ok": False, "error": stderr.decode()[-300:]} return router