fix(personal): confine remove_directory_from_rag to PERSONAL_DIR

Resolve remove_directory_from_rag paths through the same PERSONAL_DIR confinement helper used by add_directory_to_rag before removal sinks are reached.
This commit is contained in:
Tom
2026-06-15 07:00:35 +01:00
committed by GitHub
parent dbd1e6572f
commit 59efa8a44b
2 changed files with 48 additions and 2 deletions
+5 -2
View File
@@ -160,8 +160,11 @@ def setup_personal_routes(personal_docs_manager, rag_manager, rag_available):
JSON response confirming removal
"""
try:
if not directory:
raise HTTPException(400, "Directory path is required")
# Confine to PERSONAL_DIR — parity with add_directory_to_rag (which
# resolves the path the same way). Without this, an arbitrary or
# `..`-escaping path is passed straight to
# personal_docs_manager.remove_directory / rag.remove_directory.
directory = _resolve_allowed_personal_dir(directory)
logger.info(f"Removing directory from RAG: {directory}")
@@ -0,0 +1,43 @@
"""Regression: remove_directory_from_rag must confine its path to PERSONAL_DIR.
DELETE /api/personal/remove_directory took a raw ``directory`` query parameter
and passed it straight to ``personal_docs_manager.remove_directory`` /
``rag.remove_directory`` with no containment check — unlike add_directory_to_rag,
which resolves the path via ``_resolve_allowed_personal_dir`` first. This pins
the parity fix.
``_resolve_allowed_personal_dir`` is a closure inside ``setup_personal_routes``,
so this is a source-level test, matching test_personal_dir_symlink_escape.py.
"""
import ast
from pathlib import Path
SRC = Path(__file__).resolve().parent.parent / "routes" / "personal_routes.py"
def _function_source(src_text: str, name: str) -> str:
tree = ast.parse(src_text)
for node in ast.walk(tree):
if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)) and node.name == name:
return ast.get_source_segment(src_text, node)
raise AssertionError(f"{name} not found in {SRC}")
def test_remove_directory_confines_path():
body = _function_source(SRC.read_text(), "remove_directory_from_rag")
assert "_resolve_allowed_personal_dir(" in body, (
"remove_directory_from_rag must call _resolve_allowed_personal_dir to "
"confine the user-supplied directory to PERSONAL_DIR (parity with "
"add_directory_to_rag)"
)
def test_confinement_runs_before_removal_sinks():
"""The confinement must happen before the path reaches either removal sink."""
body = _function_source(SRC.read_text(), "remove_directory_from_rag")
resolve_idx = body.index("_resolve_allowed_personal_dir(")
for sink in ("personal_docs_manager.remove_directory(", "rag.remove_directory("):
assert sink in body, f"expected sink {sink} in remove_directory_from_rag"
assert body.index(sink) > resolve_idx, (
f"{sink} runs before _resolve_allowed_personal_dir — path not confined"
)