refactor(tools): extract vault domain into src/tools/vault.py

Repoints tests/test_vault_password_not_in_argv.py source-introspection
to src/tools/vault.py (the vault do_* helpers moved there).
This commit is contained in:
yuandonghao
2026-06-16 15:15:08 +08:00
parent 2fd53856b1
commit 059b6602d1
4 changed files with 199 additions and 180 deletions
+5 -179
View File
@@ -59,6 +59,11 @@ from src.tools.image import do_edit_image # noqa: F401
from src.tools.research import do_manage_research, do_trigger_research # noqa: F401
# Contacts domain extracted to src/tools/contacts.py (slice 1, #4082/#4071).
from src.tools.contacts import do_resolve_contact, do_manage_contact # noqa: F401
# Vault domain extracted to src/tools/vault.py (slice 1, #4082/#4071).
from src.tools.vault import ( # noqa: F401
_load_vault_config, _run_bw,
do_vault_search, do_vault_get, do_vault_unlock,
)
logger = logging.getLogger(__name__)
@@ -149,182 +154,3 @@ def _internal_headers(owner: Optional[str] = None) -> Dict[str, str]:
if owner:
headers["X-Odysseus-Owner"] = owner
return headers
# ── Vaultwarden / Bitwarden CLI tools ──
def _load_vault_config() -> Dict:
"""Load Vaultwarden config from data/vault.json."""
from pathlib import Path
p = Path(VAULT_FILE)
if p.exists():
try:
return json.loads(p.read_text(encoding="utf-8"))
except Exception:
pass
return {}
async def _run_bw(args: list, session: Optional[str] = None, input_text: Optional[str] = None) -> tuple:
"""Run a bw CLI command with optional session + stdin. Returns (stdout, stderr, returncode)."""
import asyncio
env = {}
import os as _os
env.update(_os.environ)
if session:
env["BW_SESSION"] = session
proc = await asyncio.create_subprocess_exec(
"bw", *args,
stdin=asyncio.subprocess.PIPE if input_text else None,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=env,
)
stdout, stderr = await proc.communicate(input=input_text.encode() if input_text else None)
return stdout.decode(errors="replace").strip(), stderr.decode(errors="replace").strip(), proc.returncode
async def do_vault_search(content: str, owner: Optional[str] = None) -> Dict:
"""Search the vault by keyword. Returns matching item names + URLs, NO passwords."""
try:
args = _parse_tool_args(content)
except ValueError:
return {"error": "Invalid JSON arguments", "exit_code": 1}
query = args.get("query", "").strip()
if not query:
return {"error": "query is required", "exit_code": 1}
cfg = _load_vault_config()
session = cfg.get("session")
if not session:
return {"error": "Vault is locked. Run vault_unlock or provide session key in settings.", "exit_code": 1}
stdout, stderr, rc = await _run_bw(["list", "items", "--search", query], session=session)
if rc != 0:
return {"error": f"bw failed: {stderr[:300]}", "exit_code": 1}
try:
items = json.loads(stdout)
except json.JSONDecodeError:
return {"error": "Failed to parse bw output", "exit_code": 1}
if not items:
return {"output": f"No vault items match '{query}'.", "exit_code": 0}
lines = [f"Found {len(items)} item(s) matching '{query}':"]
for it in items[:20]:
item_id = it.get("id", "?")
name = it.get("name", "?")
login = it.get("login") or {}
username = login.get("username", "")
uris = login.get("uris") or []
url = uris[0].get("uri", "") if uris else ""
parts = [f"[{item_id[:8]}] {name}"]
if username:
parts.append(f"user: {username}")
if url:
parts.append(f"url: {url}")
lines.append("- " + " · ".join(parts))
lines.append("\nUse vault_get(item_id, reason) to retrieve the password.")
return {"output": "\n".join(lines), "exit_code": 0}
async def do_vault_get(content: str, owner: Optional[str] = None) -> Dict:
"""Retrieve a full vault entry (including password) by item ID. Logs access to assistant chat."""
try:
args = _parse_tool_args(content)
except ValueError:
return {"error": "Invalid JSON arguments", "exit_code": 1}
item_id = args.get("item_id", "").strip()
reason = args.get("reason", "").strip()
if not item_id:
return {"error": "item_id is required", "exit_code": 1}
if not reason:
return {"error": "reason is required — explain WHY you need this password", "exit_code": 1}
cfg = _load_vault_config()
session = cfg.get("session")
if not session:
return {"error": "Vault is locked. Unlock first.", "exit_code": 1}
stdout, stderr, rc = await _run_bw(["get", "item", item_id], session=session)
if rc != 0:
return {"error": f"bw failed: {stderr[:300]}", "exit_code": 1}
try:
item = json.loads(stdout)
except json.JSONDecodeError:
return {"error": "Failed to parse bw output", "exit_code": 1}
login = item.get("login") or {}
name = item.get("name", "?")
# Audit log to assistant chat
try:
from src.assistant_log import log_to_assistant
if owner:
log_to_assistant(
owner,
f"Retrieved password for **{name}** — reason: {reason}",
category="Vault",
)
except Exception:
pass
output = [
f"Vault item: {name}",
f"Username: {login.get('username', '(none)')}",
f"Password: {login.get('password', '(none)')}",
]
if login.get("totp"):
output.append(f"TOTP secret: {login['totp']}")
uris = login.get("uris") or []
if uris:
output.append("URLs: " + ", ".join(u.get("uri", "") for u in uris))
if item.get("notes"):
output.append(f"Notes: {item['notes']}")
return {"output": "\n".join(output), "exit_code": 0}
async def do_vault_unlock(content: str, owner: Optional[str] = None) -> Dict:
"""Unlock the vault using a master password. Stores the resulting session key."""
try:
args = _parse_tool_args(content)
except ValueError:
return {"error": "Invalid JSON arguments", "exit_code": 1}
master_password = args.get("master_password", "")
if not master_password:
return {"error": "master_password is required", "exit_code": 1}
# Do not pass the master password as an argv element. Local process lists
# can expose argv to other users; stdin keeps the secret out of `ps`.
stdout, stderr, rc = await _run_bw(["unlock", "--raw"], input_text=master_password + "\n")
if rc != 0:
return {"error": f"Unlock failed: {stderr[:300]}", "exit_code": 1}
session = stdout.strip()
if not session:
return {"error": "bw returned empty session", "exit_code": 1}
# Save session to vault.json
from pathlib import Path
p = Path(VAULT_FILE)
cfg = {}
if p.exists():
try:
cfg = json.loads(p.read_text(encoding="utf-8"))
except Exception:
pass
cfg["session"] = session
from datetime import datetime as _dt
cfg["unlocked_at"] = _dt.utcnow().isoformat()
p.write_text(json.dumps(cfg, indent=2), encoding="utf-8")
try:
import os as _os
_os.chmod(str(p), 0o600)
except Exception:
pass
return {"output": "Vault unlocked. Session saved.", "exit_code": 0}
+4
View File
@@ -27,3 +27,7 @@ from src.tools.calendar import do_manage_calendar # noqa: F401
from src.tools.image import do_edit_image # noqa: F401
from src.tools.research import do_manage_research, do_trigger_research # noqa: F401
from src.tools.contacts import do_resolve_contact, do_manage_contact # noqa: F401
from src.tools.vault import ( # noqa: F401
_load_vault_config, _run_bw,
do_vault_search, do_vault_get, do_vault_unlock,
)
+189
View File
@@ -0,0 +1,189 @@
"""Vault-domain tool implementations.
Extracted from tool_implementations.py as part of slice 1 (#4082/#4071).
Holds the Bitwarden CLI wrappers (vault_search / vault_get / vault_unlock)
and their helpers (_load_vault_config, _run_bw).
``src.tool_implementations`` re-exports these for backward compatibility.
"""
import json
from typing import Dict, Optional
from src.constants import VAULT_FILE
from src.tools._common import _parse_tool_args
def _load_vault_config() -> Dict:
"""Load Vaultwarden config from data/vault.json."""
from pathlib import Path
p = Path(VAULT_FILE)
if p.exists():
try:
return json.loads(p.read_text(encoding="utf-8"))
except Exception:
pass
return {}
async def _run_bw(args: list, session: Optional[str] = None, input_text: Optional[str] = None) -> tuple:
"""Run a bw CLI command with optional session + stdin. Returns (stdout, stderr, returncode)."""
import asyncio
env = {}
import os as _os
env.update(_os.environ)
if session:
env["BW_SESSION"] = session
proc = await asyncio.create_subprocess_exec(
"bw", *args,
stdin=asyncio.subprocess.PIPE if input_text else None,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=env,
)
stdout, stderr = await proc.communicate(input=input_text.encode() if input_text else None)
return stdout.decode(errors="replace").strip(), stderr.decode(errors="replace").strip(), proc.returncode
async def do_vault_search(content: str, owner: Optional[str] = None) -> Dict:
"""Search the vault by keyword. Returns matching item names + URLs, NO passwords."""
try:
args = _parse_tool_args(content)
except ValueError:
return {"error": "Invalid JSON arguments", "exit_code": 1}
query = args.get("query", "").strip()
if not query:
return {"error": "query is required", "exit_code": 1}
cfg = _load_vault_config()
session = cfg.get("session")
if not session:
return {"error": "Vault is locked. Run vault_unlock or provide session key in settings.", "exit_code": 1}
stdout, stderr, rc = await _run_bw(["list", "items", "--search", query], session=session)
if rc != 0:
return {"error": f"bw failed: {stderr[:300]}", "exit_code": 1}
try:
items = json.loads(stdout)
except json.JSONDecodeError:
return {"error": "Failed to parse bw output", "exit_code": 1}
if not items:
return {"output": f"No vault items match '{query}'.", "exit_code": 0}
lines = [f"Found {len(items)} item(s) matching '{query}':"]
for it in items[:20]:
item_id = it.get("id", "?")
name = it.get("name", "?")
login = it.get("login") or {}
username = login.get("username", "")
uris = login.get("uris") or []
url = uris[0].get("uri", "") if uris else ""
parts = [f"[{item_id[:8]}] {name}"]
if username:
parts.append(f"user: {username}")
if url:
parts.append(f"url: {url}")
lines.append("- " + " · ".join(parts))
lines.append("\nUse vault_get(item_id, reason) to retrieve the password.")
return {"output": "\n".join(lines), "exit_code": 0}
async def do_vault_get(content: str, owner: Optional[str] = None) -> Dict:
"""Retrieve a full vault entry (including password) by item ID. Logs access to assistant chat."""
try:
args = _parse_tool_args(content)
except ValueError:
return {"error": "Invalid JSON arguments", "exit_code": 1}
item_id = args.get("item_id", "").strip()
reason = args.get("reason", "").strip()
if not item_id:
return {"error": "item_id is required", "exit_code": 1}
if not reason:
return {"error": "reason is required — explain WHY you need this password", "exit_code": 1}
cfg = _load_vault_config()
session = cfg.get("session")
if not session:
return {"error": "Vault is locked. Unlock first.", "exit_code": 1}
stdout, stderr, rc = await _run_bw(["get", "item", item_id], session=session)
if rc != 0:
return {"error": f"bw failed: {stderr[:300]}", "exit_code": 1}
try:
item = json.loads(stdout)
except json.JSONDecodeError:
return {"error": "Failed to parse bw output", "exit_code": 1}
login = item.get("login") or {}
name = item.get("name", "?")
# Audit log to assistant chat
try:
from src.assistant_log import log_to_assistant
if owner:
log_to_assistant(
owner,
f"Retrieved password for **{name}** — reason: {reason}",
category="Vault",
)
except Exception:
pass
output = [
f"Vault item: {name}",
f"Username: {login.get('username', '(none)')}",
f"Password: {login.get('password', '(none)')}",
]
if login.get("totp"):
output.append(f"TOTP secret: {login['totp']}")
uris = login.get("uris") or []
if uris:
output.append("URLs: " + ", ".join(u.get("uri", "") for u in uris))
if item.get("notes"):
output.append(f"Notes: {item['notes']}")
return {"output": "\n".join(output), "exit_code": 0}
async def do_vault_unlock(content: str, owner: Optional[str] = None) -> Dict:
"""Unlock the vault using a master password. Stores the resulting session key."""
try:
args = _parse_tool_args(content)
except ValueError:
return {"error": "Invalid JSON arguments", "exit_code": 1}
master_password = args.get("master_password", "")
if not master_password:
return {"error": "master_password is required", "exit_code": 1}
# Do not pass the master password as an argv element. Local process lists
# can expose argv to other users; stdin keeps the secret out of `ps`.
stdout, stderr, rc = await _run_bw(["unlock", "--raw"], input_text=master_password + "\n")
if rc != 0:
return {"error": f"Unlock failed: {stderr[:300]}", "exit_code": 1}
session = stdout.strip()
if not session:
return {"error": "bw returned empty session", "exit_code": 1}
# Save session to vault.json
from pathlib import Path
p = Path(VAULT_FILE)
cfg = {}
if p.exists():
try:
cfg = json.loads(p.read_text(encoding="utf-8"))
except Exception:
pass
cfg["session"] = session
from datetime import datetime as _dt
cfg["unlocked_at"] = _dt.utcnow().isoformat()
p.write_text(json.dumps(cfg, indent=2), encoding="utf-8")
try:
import os as _os
_os.chmod(str(p), 0o600)
except Exception:
pass
return {"output": "Vault unlocked. Session saved.", "exit_code": 0}
+1 -1
View File
@@ -102,7 +102,7 @@ def test_unlock_handler_feeds_password_on_stdin_not_argv():
def test_tool_vault_unlock_feeds_password_on_stdin_not_argv():
text = open("src/tool_implementations.py", encoding="utf-8").read()
text = open("src/tools/vault.py", encoding="utf-8").read()
assert '["unlock", master_password, "--raw"]' not in text
assert '_run_bw(["unlock", master_password' not in text