diff --git a/src/tool_implementations.py b/src/tool_implementations.py index ccc41c9d5..85e3376b7 100644 --- a/src/tool_implementations.py +++ b/src/tool_implementations.py @@ -59,6 +59,11 @@ from src.tools.image import do_edit_image # noqa: F401 from src.tools.research import do_manage_research, do_trigger_research # noqa: F401 # Contacts domain extracted to src/tools/contacts.py (slice 1, #4082/#4071). from src.tools.contacts import do_resolve_contact, do_manage_contact # noqa: F401 +# Vault domain extracted to src/tools/vault.py (slice 1, #4082/#4071). +from src.tools.vault import ( # noqa: F401 + _load_vault_config, _run_bw, + do_vault_search, do_vault_get, do_vault_unlock, +) logger = logging.getLogger(__name__) @@ -149,182 +154,3 @@ def _internal_headers(owner: Optional[str] = None) -> Dict[str, str]: if owner: headers["X-Odysseus-Owner"] = owner return headers - - -# ── Vaultwarden / Bitwarden CLI tools ── - -def _load_vault_config() -> Dict: - """Load Vaultwarden config from data/vault.json.""" - from pathlib import Path - p = Path(VAULT_FILE) - if p.exists(): - try: - return json.loads(p.read_text(encoding="utf-8")) - except Exception: - pass - return {} - - -async def _run_bw(args: list, session: Optional[str] = None, input_text: Optional[str] = None) -> tuple: - """Run a bw CLI command with optional session + stdin. Returns (stdout, stderr, returncode).""" - import asyncio - env = {} - import os as _os - env.update(_os.environ) - if session: - env["BW_SESSION"] = session - - proc = await asyncio.create_subprocess_exec( - "bw", *args, - stdin=asyncio.subprocess.PIPE if input_text else None, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - env=env, - ) - stdout, stderr = await proc.communicate(input=input_text.encode() if input_text else None) - return stdout.decode(errors="replace").strip(), stderr.decode(errors="replace").strip(), proc.returncode - - -async def do_vault_search(content: str, owner: Optional[str] = None) -> Dict: - """Search the vault by keyword. Returns matching item names + URLs, NO passwords.""" - try: - args = _parse_tool_args(content) - except ValueError: - return {"error": "Invalid JSON arguments", "exit_code": 1} - query = args.get("query", "").strip() - if not query: - return {"error": "query is required", "exit_code": 1} - - cfg = _load_vault_config() - session = cfg.get("session") - if not session: - return {"error": "Vault is locked. Run vault_unlock or provide session key in settings.", "exit_code": 1} - - stdout, stderr, rc = await _run_bw(["list", "items", "--search", query], session=session) - if rc != 0: - return {"error": f"bw failed: {stderr[:300]}", "exit_code": 1} - - try: - items = json.loads(stdout) - except json.JSONDecodeError: - return {"error": "Failed to parse bw output", "exit_code": 1} - - if not items: - return {"output": f"No vault items match '{query}'.", "exit_code": 0} - - lines = [f"Found {len(items)} item(s) matching '{query}':"] - for it in items[:20]: - item_id = it.get("id", "?") - name = it.get("name", "?") - login = it.get("login") or {} - username = login.get("username", "") - uris = login.get("uris") or [] - url = uris[0].get("uri", "") if uris else "" - parts = [f"[{item_id[:8]}] {name}"] - if username: - parts.append(f"user: {username}") - if url: - parts.append(f"url: {url}") - lines.append("- " + " · ".join(parts)) - lines.append("\nUse vault_get(item_id, reason) to retrieve the password.") - return {"output": "\n".join(lines), "exit_code": 0} - - -async def do_vault_get(content: str, owner: Optional[str] = None) -> Dict: - """Retrieve a full vault entry (including password) by item ID. Logs access to assistant chat.""" - try: - args = _parse_tool_args(content) - except ValueError: - return {"error": "Invalid JSON arguments", "exit_code": 1} - item_id = args.get("item_id", "").strip() - reason = args.get("reason", "").strip() - if not item_id: - return {"error": "item_id is required", "exit_code": 1} - if not reason: - return {"error": "reason is required — explain WHY you need this password", "exit_code": 1} - - cfg = _load_vault_config() - session = cfg.get("session") - if not session: - return {"error": "Vault is locked. Unlock first.", "exit_code": 1} - - stdout, stderr, rc = await _run_bw(["get", "item", item_id], session=session) - if rc != 0: - return {"error": f"bw failed: {stderr[:300]}", "exit_code": 1} - - try: - item = json.loads(stdout) - except json.JSONDecodeError: - return {"error": "Failed to parse bw output", "exit_code": 1} - - login = item.get("login") or {} - name = item.get("name", "?") - - # Audit log to assistant chat - try: - from src.assistant_log import log_to_assistant - if owner: - log_to_assistant( - owner, - f"Retrieved password for **{name}** — reason: {reason}", - category="Vault", - ) - except Exception: - pass - - output = [ - f"Vault item: {name}", - f"Username: {login.get('username', '(none)')}", - f"Password: {login.get('password', '(none)')}", - ] - if login.get("totp"): - output.append(f"TOTP secret: {login['totp']}") - uris = login.get("uris") or [] - if uris: - output.append("URLs: " + ", ".join(u.get("uri", "") for u in uris)) - if item.get("notes"): - output.append(f"Notes: {item['notes']}") - - return {"output": "\n".join(output), "exit_code": 0} - - -async def do_vault_unlock(content: str, owner: Optional[str] = None) -> Dict: - """Unlock the vault using a master password. Stores the resulting session key.""" - try: - args = _parse_tool_args(content) - except ValueError: - return {"error": "Invalid JSON arguments", "exit_code": 1} - master_password = args.get("master_password", "") - if not master_password: - return {"error": "master_password is required", "exit_code": 1} - - # Do not pass the master password as an argv element. Local process lists - # can expose argv to other users; stdin keeps the secret out of `ps`. - stdout, stderr, rc = await _run_bw(["unlock", "--raw"], input_text=master_password + "\n") - if rc != 0: - return {"error": f"Unlock failed: {stderr[:300]}", "exit_code": 1} - - session = stdout.strip() - if not session: - return {"error": "bw returned empty session", "exit_code": 1} - - # Save session to vault.json - from pathlib import Path - p = Path(VAULT_FILE) - cfg = {} - if p.exists(): - try: - cfg = json.loads(p.read_text(encoding="utf-8")) - except Exception: - pass - cfg["session"] = session - from datetime import datetime as _dt - cfg["unlocked_at"] = _dt.utcnow().isoformat() - p.write_text(json.dumps(cfg, indent=2), encoding="utf-8") - try: - import os as _os - _os.chmod(str(p), 0o600) - except Exception: - pass - - return {"output": "Vault unlocked. Session saved.", "exit_code": 0} diff --git a/src/tools/__init__.py b/src/tools/__init__.py index 397c5d37b..7f847488e 100644 --- a/src/tools/__init__.py +++ b/src/tools/__init__.py @@ -27,3 +27,7 @@ from src.tools.calendar import do_manage_calendar # noqa: F401 from src.tools.image import do_edit_image # noqa: F401 from src.tools.research import do_manage_research, do_trigger_research # noqa: F401 from src.tools.contacts import do_resolve_contact, do_manage_contact # noqa: F401 +from src.tools.vault import ( # noqa: F401 + _load_vault_config, _run_bw, + do_vault_search, do_vault_get, do_vault_unlock, +) diff --git a/src/tools/vault.py b/src/tools/vault.py new file mode 100644 index 000000000..fbb3bfcf9 --- /dev/null +++ b/src/tools/vault.py @@ -0,0 +1,189 @@ +"""Vault-domain tool implementations. + +Extracted from tool_implementations.py as part of slice 1 (#4082/#4071). +Holds the Bitwarden CLI wrappers (vault_search / vault_get / vault_unlock) +and their helpers (_load_vault_config, _run_bw). +``src.tool_implementations`` re-exports these for backward compatibility. +""" +import json +from typing import Dict, Optional + +from src.constants import VAULT_FILE +from src.tools._common import _parse_tool_args + + +def _load_vault_config() -> Dict: + """Load Vaultwarden config from data/vault.json.""" + from pathlib import Path + p = Path(VAULT_FILE) + if p.exists(): + try: + return json.loads(p.read_text(encoding="utf-8")) + except Exception: + pass + return {} + + +async def _run_bw(args: list, session: Optional[str] = None, input_text: Optional[str] = None) -> tuple: + """Run a bw CLI command with optional session + stdin. Returns (stdout, stderr, returncode).""" + import asyncio + env = {} + import os as _os + env.update(_os.environ) + if session: + env["BW_SESSION"] = session + + proc = await asyncio.create_subprocess_exec( + "bw", *args, + stdin=asyncio.subprocess.PIPE if input_text else None, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + env=env, + ) + stdout, stderr = await proc.communicate(input=input_text.encode() if input_text else None) + return stdout.decode(errors="replace").strip(), stderr.decode(errors="replace").strip(), proc.returncode + + +async def do_vault_search(content: str, owner: Optional[str] = None) -> Dict: + """Search the vault by keyword. Returns matching item names + URLs, NO passwords.""" + try: + args = _parse_tool_args(content) + except ValueError: + return {"error": "Invalid JSON arguments", "exit_code": 1} + query = args.get("query", "").strip() + if not query: + return {"error": "query is required", "exit_code": 1} + + cfg = _load_vault_config() + session = cfg.get("session") + if not session: + return {"error": "Vault is locked. Run vault_unlock or provide session key in settings.", "exit_code": 1} + + stdout, stderr, rc = await _run_bw(["list", "items", "--search", query], session=session) + if rc != 0: + return {"error": f"bw failed: {stderr[:300]}", "exit_code": 1} + + try: + items = json.loads(stdout) + except json.JSONDecodeError: + return {"error": "Failed to parse bw output", "exit_code": 1} + + if not items: + return {"output": f"No vault items match '{query}'.", "exit_code": 0} + + lines = [f"Found {len(items)} item(s) matching '{query}':"] + for it in items[:20]: + item_id = it.get("id", "?") + name = it.get("name", "?") + login = it.get("login") or {} + username = login.get("username", "") + uris = login.get("uris") or [] + url = uris[0].get("uri", "") if uris else "" + parts = [f"[{item_id[:8]}] {name}"] + if username: + parts.append(f"user: {username}") + if url: + parts.append(f"url: {url}") + lines.append("- " + " · ".join(parts)) + lines.append("\nUse vault_get(item_id, reason) to retrieve the password.") + return {"output": "\n".join(lines), "exit_code": 0} + + +async def do_vault_get(content: str, owner: Optional[str] = None) -> Dict: + """Retrieve a full vault entry (including password) by item ID. Logs access to assistant chat.""" + try: + args = _parse_tool_args(content) + except ValueError: + return {"error": "Invalid JSON arguments", "exit_code": 1} + item_id = args.get("item_id", "").strip() + reason = args.get("reason", "").strip() + if not item_id: + return {"error": "item_id is required", "exit_code": 1} + if not reason: + return {"error": "reason is required — explain WHY you need this password", "exit_code": 1} + + cfg = _load_vault_config() + session = cfg.get("session") + if not session: + return {"error": "Vault is locked. Unlock first.", "exit_code": 1} + + stdout, stderr, rc = await _run_bw(["get", "item", item_id], session=session) + if rc != 0: + return {"error": f"bw failed: {stderr[:300]}", "exit_code": 1} + + try: + item = json.loads(stdout) + except json.JSONDecodeError: + return {"error": "Failed to parse bw output", "exit_code": 1} + + login = item.get("login") or {} + name = item.get("name", "?") + + # Audit log to assistant chat + try: + from src.assistant_log import log_to_assistant + if owner: + log_to_assistant( + owner, + f"Retrieved password for **{name}** — reason: {reason}", + category="Vault", + ) + except Exception: + pass + + output = [ + f"Vault item: {name}", + f"Username: {login.get('username', '(none)')}", + f"Password: {login.get('password', '(none)')}", + ] + if login.get("totp"): + output.append(f"TOTP secret: {login['totp']}") + uris = login.get("uris") or [] + if uris: + output.append("URLs: " + ", ".join(u.get("uri", "") for u in uris)) + if item.get("notes"): + output.append(f"Notes: {item['notes']}") + + return {"output": "\n".join(output), "exit_code": 0} + + +async def do_vault_unlock(content: str, owner: Optional[str] = None) -> Dict: + """Unlock the vault using a master password. Stores the resulting session key.""" + try: + args = _parse_tool_args(content) + except ValueError: + return {"error": "Invalid JSON arguments", "exit_code": 1} + master_password = args.get("master_password", "") + if not master_password: + return {"error": "master_password is required", "exit_code": 1} + + # Do not pass the master password as an argv element. Local process lists + # can expose argv to other users; stdin keeps the secret out of `ps`. + stdout, stderr, rc = await _run_bw(["unlock", "--raw"], input_text=master_password + "\n") + if rc != 0: + return {"error": f"Unlock failed: {stderr[:300]}", "exit_code": 1} + + session = stdout.strip() + if not session: + return {"error": "bw returned empty session", "exit_code": 1} + + # Save session to vault.json + from pathlib import Path + p = Path(VAULT_FILE) + cfg = {} + if p.exists(): + try: + cfg = json.loads(p.read_text(encoding="utf-8")) + except Exception: + pass + cfg["session"] = session + from datetime import datetime as _dt + cfg["unlocked_at"] = _dt.utcnow().isoformat() + p.write_text(json.dumps(cfg, indent=2), encoding="utf-8") + try: + import os as _os + _os.chmod(str(p), 0o600) + except Exception: + pass + + return {"output": "Vault unlocked. Session saved.", "exit_code": 0} diff --git a/tests/test_vault_password_not_in_argv.py b/tests/test_vault_password_not_in_argv.py index 32267a925..f23cddcd8 100644 --- a/tests/test_vault_password_not_in_argv.py +++ b/tests/test_vault_password_not_in_argv.py @@ -102,7 +102,7 @@ def test_unlock_handler_feeds_password_on_stdin_not_argv(): def test_tool_vault_unlock_feeds_password_on_stdin_not_argv(): - text = open("src/tool_implementations.py", encoding="utf-8").read() + text = open("src/tools/vault.py", encoding="utf-8").read() assert '["unlock", master_password, "--raw"]' not in text assert '_run_bw(["unlock", master_password' not in text