""" vault_routes.py Vaultwarden / Bitwarden CLI integration — config and unlock endpoints. Stores the BW_SESSION key in data/vault.json with restrictive permissions. """ import json import logging import os import shutil import asyncio from pathlib import Path from datetime import datetime from fastapi import APIRouter, Request from pydantic import BaseModel from core.middleware import require_admin logger = logging.getLogger(__name__) VAULT_FILE = Path("data/vault.json") def _find_bw() -> str: """Locate the bw binary, checking PATH and common npm-global locations.""" p = shutil.which("bw") if p: return p home = os.path.expanduser("~") for candidate in ( f"{home}/.npm-global/bin/bw", f"{home}/.nvm/versions/node/*/bin/bw", "/usr/local/bin/bw", "/opt/homebrew/bin/bw", ): if "*" in candidate: import glob for m in glob.glob(candidate): if os.path.isfile(m) and os.access(m, os.X_OK): return m elif os.path.isfile(candidate) and os.access(candidate, os.X_OK): return candidate return "bw" # fall back to PATH lookup (will FileNotFoundError, handled below) def _load_config() -> dict: if VAULT_FILE.exists(): try: return json.loads(VAULT_FILE.read_text()) except Exception: pass return {} def _save_config(cfg: dict): VAULT_FILE.parent.mkdir(parents=True, exist_ok=True) VAULT_FILE.write_text(json.dumps(cfg, indent=2)) try: os.chmod(str(VAULT_FILE), 0o600) except Exception: pass async def _run_bw(args: list, session: str = None, input_text: str = None) -> tuple: env = {} env.update(os.environ) if session: env["BW_SESSION"] = session bw_path = _find_bw() try: proc = await asyncio.create_subprocess_exec( bw_path, *args, stdin=asyncio.subprocess.PIPE if input_text else None, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, env=env, ) except FileNotFoundError: return "", "bw CLI not installed (install `nodejs-bitwarden-cli` or `bitwarden-cli`)", 127 except Exception as e: return "", f"Failed to launch bw: {e}", 1 try: stdout, stderr = await proc.communicate(input=input_text.encode() if input_text else None) except Exception as e: return "", f"bw subprocess error: {e}", 1 return stdout.decode(errors="replace").strip(), stderr.decode(errors="replace").strip(), proc.returncode class VaultConfig(BaseModel): server_url: str = "" email: str = "" class VaultUnlockRequest(BaseModel): master_password: str class VaultLoginRequest(BaseModel): email: str master_password: str def setup_vault_routes(): router = APIRouter(prefix="/api/vault", tags=["vault"]) @router.get("/config") async def get_config(request: Request): """Return vault config (no sensitive fields).""" require_admin(request) cfg = _load_config() return { "server_url": cfg.get("server_url", ""), "email": cfg.get("email", ""), "unlocked": bool(cfg.get("session")), "unlocked_at": cfg.get("unlocked_at", ""), "bw_installed": await _check_bw_installed(), } @router.post("/config") async def save_config(req: VaultConfig, request: Request): """Save vault URL + email. Runs 'bw config server' to point at Vaultwarden.""" require_admin(request) cfg = _load_config() cfg["server_url"] = req.server_url.strip().rstrip("/") cfg["email"] = req.email.strip() if cfg["server_url"]: _, stderr, rc = await _run_bw(["config", "server", cfg["server_url"]]) if rc != 0: return {"ok": False, "error": f"bw config failed: {stderr[:300]}"} _save_config(cfg) return {"ok": True} @router.post("/login") async def login(req: VaultLoginRequest, request: Request): """Log in to Vaultwarden (required once per account).""" require_admin(request) cfg = _load_config() # Update email cfg["email"] = req.email _save_config(cfg) stdout, stderr, rc = await _run_bw( ["login", req.email, "--raw"], input_text=req.master_password + "\n", ) if rc != 0: # Already logged in is OK if "already logged in" in stderr.lower(): return {"ok": True, "already": True} return {"ok": False, "error": f"Login failed: {stderr[:300]}"} # bw login --raw prints session key on success (when 2FA disabled) if stdout: cfg["session"] = stdout cfg["unlocked_at"] = datetime.utcnow().isoformat() _save_config(cfg) return {"ok": True} @router.post("/unlock") async def unlock(req: VaultUnlockRequest, request: Request): """Unlock the vault and save the session key.""" require_admin(request) stdout, stderr, rc = await _run_bw( ["unlock", req.master_password, "--raw"], ) if rc != 0: return {"ok": False, "error": f"Unlock failed: {stderr[:300]}"} session = stdout.strip() if not session: return {"ok": False, "error": "bw returned empty session"} cfg = _load_config() cfg["session"] = session cfg["unlocked_at"] = datetime.utcnow().isoformat() _save_config(cfg) return {"ok": True, "message": "Vault unlocked"} @router.post("/lock") async def lock(request: Request): """Lock the vault (clear session from config).""" require_admin(request) cfg = _load_config() cfg.pop("session", None) cfg.pop("unlocked_at", None) _save_config(cfg) # Also tell bw to lock await _run_bw(["lock"]) return {"ok": True, "message": "Vault locked"} @router.post("/logout") async def logout(request: Request): """Log out of the Bitwarden CLI completely.""" require_admin(request) await _run_bw(["logout"]) cfg = _load_config() cfg.pop("session", None) cfg.pop("email", None) cfg.pop("unlocked_at", None) _save_config(cfg) return {"ok": True} return router async def _check_bw_installed() -> bool: try: proc = await asyncio.create_subprocess_exec( _find_bw(), "--version", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) await proc.communicate() return proc.returncode == 0 except Exception: return False