Secure by default uplift (#511)

Co-authored-by: Alex Kenley <Alex.Kenley@threatvectorsecurity.com>
This commit is contained in:
Alexander Kenley
2026-06-01 23:30:07 +10:00
committed by GitHub
parent 766ddcaa99
commit 3c6b084f08
7 changed files with 190 additions and 69 deletions
+55 -5
View File
@@ -7,6 +7,10 @@ from typing import Dict, List, Optional, Any
import httpx
from core.atomic_io import atomic_write_json
from core.platform_compat import safe_chmod
from src.secret_storage import decrypt, encrypt, is_encrypted
log = logging.getLogger(__name__)
DATA_FILE = os.path.join(os.path.dirname(os.path.dirname(__file__)), "data", "integrations.json")
@@ -143,23 +147,69 @@ def _ensure_data_dir() -> None:
os.makedirs(os.path.dirname(DATA_FILE), exist_ok=True)
def _encrypt_integration_secrets(integrations: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Return storage-safe copies with API keys encrypted at rest."""
safe: List[Dict[str, Any]] = []
for item in integrations:
copy = dict(item)
api_key = copy.get("api_key", "")
if api_key:
copy["api_key"] = encrypt(str(api_key))
safe.append(copy)
return safe
def _decrypt_integration_secrets(integrations: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Return runtime copies with API keys decrypted for callers."""
decoded: List[Dict[str, Any]] = []
for item in integrations:
copy = dict(item)
api_key = copy.get("api_key", "")
if api_key:
copy["api_key"] = decrypt(str(api_key))
decoded.append(copy)
return decoded
def _has_plaintext_api_key(integrations: List[Dict[str, Any]]) -> bool:
return any(
bool(item.get("api_key")) and not is_encrypted(str(item.get("api_key")))
for item in integrations
)
def mask_integration_secret(integration: Dict[str, Any]) -> Dict[str, Any]:
"""Return a copy safe for API responses."""
safe = dict(integration)
api_key = safe.get("api_key", "")
if api_key:
safe["api_key"] = f"{str(api_key)[:4]}****"
return safe
def load_integrations() -> List[Dict[str, Any]]:
"""Load all integrations from disk."""
"""Load all integrations from disk with secrets decrypted for runtime use."""
if not os.path.exists(DATA_FILE):
return []
try:
with open(DATA_FILE, "r", encoding="utf-8") as f:
return json.load(f)
integrations = json.load(f)
if not isinstance(integrations, list):
log.error("Invalid integrations file shape: expected a list")
return []
if _has_plaintext_api_key(integrations):
save_integrations(_decrypt_integration_secrets(integrations))
return _decrypt_integration_secrets(integrations)
except (json.JSONDecodeError, IOError) as exc:
log.error("Failed to load integrations: %s", exc)
return []
def save_integrations(integrations: List[Dict[str, Any]]) -> None:
"""Persist integrations list to disk."""
"""Persist integrations list to disk with API keys encrypted at rest."""
_ensure_data_dir()
with open(DATA_FILE, "w", encoding="utf-8") as f:
json.dump(integrations, f, indent=2)
atomic_write_json(DATA_FILE, _encrypt_integration_secrets(integrations), indent=2)
safe_chmod(DATA_FILE, 0o600)
def get_integration(integration_id: str) -> Optional[Dict[str, Any]]: