From 86abcb75d088e9a7b8215d46aafea84fe72478d0 Mon Sep 17 00:00:00 2001 From: Nicholai Date: Sat, 6 Jun 2026 03:17:19 -0600 Subject: [PATCH] fix: split Chroma embedding lanes (#3046) --- routes/embedding_routes.py | 20 + src/embedding_lanes.py | 380 ++++++++++++ src/memory_vector.py | 230 ++++--- src/rag_vector.py | 385 +++++++----- src/tool_index.py | 170 +++-- tests/test_embedding_lanes.py | 1104 +++++++++++++++++++++++++++++++++ 6 files changed, 1995 insertions(+), 294 deletions(-) create mode 100644 src/embedding_lanes.py create mode 100644 tests/test_embedding_lanes.py diff --git a/routes/embedding_routes.py b/routes/embedding_routes.py index c6f0645a7..d79fe91f1 100644 --- a/routes/embedding_routes.py +++ b/routes/embedding_routes.py @@ -316,6 +316,16 @@ def setup_embedding_routes(): reset_http_embed_state() except Exception: pass + try: + from src.embedding_lanes import reset_embedding_lane_state + reset_embedding_lane_state() + except Exception: + pass + try: + from src.tool_index import reset_tool_index + reset_tool_index() + except Exception: + pass # Reset ChromaDB client (collections will be recreated with new embeddings) try: @@ -347,6 +357,16 @@ def setup_embedding_routes(): reset_http_embed_state() except Exception: pass + try: + from src.embedding_lanes import reset_embedding_lane_state + reset_embedding_lane_state() + except Exception: + pass + try: + from src.tool_index import reset_tool_index + reset_tool_index() + except Exception: + pass # Reset ChromaDB client try: diff --git a/src/embedding_lanes.py b/src/embedding_lanes.py new file mode 100644 index 000000000..bca4eaef2 --- /dev/null +++ b/src/embedding_lanes.py @@ -0,0 +1,380 @@ +""" +embedding_lanes.py + +Helpers for keeping FastEmbed fallback vectors separate from user-configured +embedding vectors. ChromaDB fixes a collection's dimension on first insert, so +different embedding models must never share one collection. +""" + +from __future__ import annotations + +from dataclasses import dataclass +import hashlib +import logging +import os +from typing import Any, Callable, Dict, Iterable, List, Optional, Sequence + +logger = logging.getLogger(__name__) + +LANE_FASTEMBED = "fastembed" +LANE_CUSTOM = "custom" + + +@dataclass +class EmbeddingLane: + name: str + client: Any + collection: Any + collection_name: str + model: str + url: str + dimension: int + fingerprint: str + + @property + def healthy(self) -> bool: + return self.collection is not None and self.client is not None + + def encode(self, texts: Sequence[str]) -> List[List[float]]: + vecs = self.client.encode(list(texts), normalize_embeddings=True) + return vecs.tolist() if hasattr(vecs, "tolist") else [list(v) for v in vecs] + + def count(self) -> int: + try: + return int(self.collection.count()) + except Exception: + return 0 + + def stats(self) -> Dict[str, Any]: + return { + "name": self.name, + "collection": self.collection_name, + "model": self.model, + "url": self.url, + "dimension": self.dimension, + "fingerprint": self.fingerprint, + "count": self.count(), + "healthy": self.healthy, + } + + +def reset_embedding_lane_state() -> None: + """Reset process-local embedding lane state after endpoint config changes.""" + try: + from src.embeddings import reset_http_embed_state + reset_http_embed_state() + except Exception: + pass + + +def collection_name(base_name: str, lane_name: str) -> str: + return f"{base_name}_{lane_name}" + + +def _fingerprint(lane_name: str, url: str, model: str, dimension: int) -> str: + raw = f"{lane_name}\n{url}\n{model}\n{dimension}" + return hashlib.sha256(raw.encode("utf-8")).hexdigest()[:16] + + +def _metadata(lane_name: str, url: str, model: str, dimension: int, fingerprint: str) -> Dict[str, Any]: + return { + "hnsw:space": "cosine", + "embedding_lane": lane_name, + "embedding_url": url, + "embedding_model": model, + "embedding_dimension": dimension, + "embedding_fingerprint": fingerprint, + } + + +def _load_custom_endpoint() -> Dict[str, str]: + try: + from src.embeddings import _load_persisted_endpoint + persisted = _load_persisted_endpoint() + except Exception: + persisted = {} + + url = persisted.get("url") or os.environ.get("EMBEDDING_URL", "") + if not url: + return {} + + model = persisted.get("model") or os.environ.get("EMBEDDING_MODEL", "") + api_key = persisted.get("api_key") or os.environ.get("EMBEDDING_API_KEY", "") + if persisted.get("api_key"): + try: + from src.secret_storage import decrypt + api_key = decrypt(api_key) + except Exception: + logger.warning("Could not decrypt saved embedding endpoint API key") + api_key = "" + + return {"url": url, "model": model, "api_key": api_key} + + +def _build_fastembed_client(): + from src.embeddings import FastEmbedClient + + client = FastEmbedClient() + client.get_sentence_embedding_dimension() + return client + + +def _build_custom_client(): + from src.embeddings import EmbeddingClient, get_embedding_client + + client = get_embedding_client() + if isinstance(client, EmbeddingClient): + return client + raise RuntimeError("HTTP embedding lane unavailable") + + +def _encode_with_client(client: Any, texts: Sequence[str]) -> List[List[float]]: + vecs = client.encode(list(texts), normalize_embeddings=True) + return vecs.tolist() if hasattr(vecs, "tolist") else [list(v) for v in vecs] + + +def _get_or_reset_collection(chroma_client, name: str, metadata: Dict[str, Any], client: Any): + try: + collection = chroma_client.get_collection(name) + except Exception: + return chroma_client.get_or_create_collection(name=name, metadata=metadata) + + current = collection.metadata or {} + if not ( + current.get("embedding_fingerprint") not in (None, metadata["embedding_fingerprint"]) + or current.get("embedding_dimension") not in (None, metadata["embedding_dimension"]) + or current.get("embedding_lane") not in (None, metadata["embedding_lane"]) + ): + return collection + + logger.info( + "Recreating Chroma collection %s for embedding lane change (%s -> %s)", + name, + current.get("embedding_fingerprint"), + metadata["embedding_fingerprint"], + ) + preserved = {"ids": [], "documents": [], "metadatas": [], "embeddings": []} + try: + preserved = collection.get(include=["documents", "metadatas", "embeddings"]) or preserved + except Exception as e: + raise RuntimeError(f"Could not preserve documents before resetting {name}: {e}") from e + + ids = preserved.get("ids") or [] + docs = preserved.get("documents") or [] + metas = preserved.get("metadatas") or [] + prepared_batches = [] + if ids and docs: + try: + for start in range(0, len(ids), 100): + batch_ids = ids[start:start + 100] + batch_docs = docs[start:start + 100] + batch_metas = metas[start:start + 100] + if len(batch_metas) < len(batch_ids): + batch_metas += [{}] * (len(batch_ids) - len(batch_metas)) + prepared_batches.append(( + batch_ids, + batch_docs, + batch_metas, + _encode_with_client(client, batch_docs), + )) + except Exception as e: + raise RuntimeError(f"Could not re-embed preserved rows for {name}: {e}") from e + + chroma_client.delete_collection(name) + collection = chroma_client.get_or_create_collection(name=name, metadata=metadata) + + try: + for batch_ids, batch_docs, batch_metas, embeddings in prepared_batches: + collection.add( + ids=batch_ids, + documents=batch_docs, + metadatas=batch_metas, + embeddings=embeddings, + ) + except Exception as e: + logger.warning("Could not write reset collection %s; restoring previous rows: %s", name, e) + try: + chroma_client.delete_collection(name) + restored = chroma_client.get_or_create_collection(name=name, metadata=current) + old_embeddings = preserved.get("embeddings") or [] + if ids and docs and old_embeddings: + for start in range(0, len(ids), 100): + batch_ids = ids[start:start + 100] + batch_docs = docs[start:start + 100] + batch_metas = metas[start:start + 100] + batch_embeddings = old_embeddings[start:start + 100] + if len(batch_metas) < len(batch_ids): + batch_metas += [{}] * (len(batch_ids) - len(batch_metas)) + restored.add( + ids=batch_ids, + documents=batch_docs, + metadatas=batch_metas, + embeddings=batch_embeddings, + ) + except Exception as restore_error: + logger.warning("Could not restore previous collection %s: %s", name, restore_error) + raise RuntimeError(f"Could not write reset collection {name}: {e}") from e + if prepared_batches: + logger.info("Re-embedded %s rows after resetting %s", len(ids), name) + + return collection + + +def _create_lane(chroma_client, base_name: str, lane_name: str, client: Any) -> EmbeddingLane: + dimension = int(client.get_sentence_embedding_dimension()) + model = getattr(client, "model", "") + url = getattr(client, "url", "") + fp = _fingerprint(lane_name, url, model, dimension) + name = collection_name(base_name, lane_name) + metadata = _metadata(lane_name, url, model, dimension, fp) + collection = _get_or_reset_collection(chroma_client, name, metadata, client) + return EmbeddingLane( + name=lane_name, + client=client, + collection=collection, + collection_name=name, + model=model, + url=url, + dimension=dimension, + fingerprint=fp, + ) + + +def build_embedding_lanes(base_name: str) -> List[EmbeddingLane]: + """Return healthy lanes in retrieval preference order: custom, fastembed.""" + from src.chroma_client import get_chroma_client + + chroma_client = get_chroma_client() + lanes: List[EmbeddingLane] = [] + + try: + custom = _build_custom_client() + if custom is not None: + lanes.append(_create_lane(chroma_client, base_name, LANE_CUSTOM, custom)) + except Exception as e: + logger.warning("Custom embedding lane unavailable for %s: %s", base_name, e) + + try: + fastembed = _build_fastembed_client() + lanes.append(_create_lane(chroma_client, base_name, LANE_FASTEMBED, fastembed)) + except Exception as e: + logger.warning("FastEmbed lane unavailable for %s: %s", base_name, e) + + return lanes + + +def migrate_legacy_collection(base_name: str, lanes: Sequence[EmbeddingLane]) -> None: + """Backfill empty lanes from a legacy unsuffixed collection, if present.""" + if not lanes: + return + + try: + from src.chroma_client import get_chroma_client + + chroma_client = get_chroma_client() + legacy = chroma_client.get_collection(base_name) + data = legacy.get(include=["documents", "metadatas"]) + except Exception: + return + + ids = data.get("ids") or [] + docs = data.get("documents") or [] + metas = data.get("metadatas") or [] + if not ids or not docs: + return + + for lane in lanes: + try: + existing = lane.collection.get(ids=ids) + existing_ids = set(existing.get("ids") or []) + except Exception: + existing_ids = set() + all_metas = list(metas or []) + if len(all_metas) < len(ids): + all_metas += [{}] * (len(ids) - len(all_metas)) + missing = [ + (row_id, doc, meta) + for row_id, doc, meta in zip(ids, docs, all_metas) + if row_id not in existing_ids + ] + if not missing: + continue + + for start in range(0, len(missing), 100): + batch = missing[start:start + 100] + batch_ids = [row_id for row_id, _doc, _meta in batch] + batch_docs = [doc for _row_id, doc, _meta in batch] + batch_metas = [meta or {} for _row_id, _doc, meta in batch] + if len(batch_metas) < len(batch_ids): + batch_metas += [{}] * (len(batch_ids) - len(batch_metas)) + try: + embeddings = lane.encode(batch_docs) + lane.collection.add( + ids=batch_ids, + documents=batch_docs, + metadatas=batch_metas, + embeddings=embeddings, + ) + except Exception as e: + logger.warning( + "Could not backfill %s lane from legacy collection %s: %s", + lane.name, + base_name, + e, + ) + break + else: + logger.info("Backfilled %s %s lane rows from legacy collection %s", len(missing), lane.name, base_name) + + +def lane_count(lanes: Sequence[EmbeddingLane]) -> int: + return max((lane.count() for lane in lanes), default=0) + + +def dedupe_results(results: Iterable[Dict[str, Any]], id_key: str = "id", limit: Optional[int] = None) -> List[Dict[str, Any]]: + seen = set() + out: List[Dict[str, Any]] = [] + for row in results: + row_id = row.get(id_key) + if not row_id or row_id in seen: + continue + seen.add(row_id) + out.append(row) + if limit is not None and len(out) >= limit: + break + return out + + +def query_lanes( + lanes: Sequence[EmbeddingLane], + query: str, + n_results: Callable[[EmbeddingLane], int], + include: Sequence[str], + where: Optional[Dict[str, Any]] = None, + raise_if_all_failed: bool = False, +) -> List[tuple[EmbeddingLane, Dict[str, Any]]]: + out: List[tuple[EmbeddingLane, Dict[str, Any]]] = [] + attempted = 0 + failures: List[str] = [] + for lane in lanes: + try: + count = lane.count() + if count == 0: + continue + attempted += 1 + n = min(n_results(lane), count) + if n <= 0: + continue + results = lane.collection.query( + query_embeddings=lane.encode([query]), + n_results=n, + where=where, + include=list(include), + ) + out.append((lane, results)) + except Exception as e: + failures.append(f"{lane.name}: {e}") + logger.warning("%s lane query failed for %s: %s", lane.name, lane.collection_name, e) + if raise_if_all_failed and attempted and not out and failures: + raise RuntimeError("; ".join(failures)) + return out diff --git a/src/memory_vector.py b/src/memory_vector.py index 9f482b309..5b57f38d7 100644 --- a/src/memory_vector.py +++ b/src/memory_vector.py @@ -9,6 +9,16 @@ Stores pre-computed embeddings (ChromaDB does not manage embedding). import logging from typing import List, Dict, Optional +from src.embedding_lanes import ( + LANE_CUSTOM, + LANE_FASTEMBED, + build_embedding_lanes, + collection_name, + dedupe_results, + lane_count, + migrate_legacy_collection, +) + logger = logging.getLogger(__name__) @@ -20,30 +30,28 @@ class MemoryVectorStore: def __init__(self, data_dir: str, embedding_model=None): self._model = embedding_model self._collection = None + self._lanes = [] self._healthy = False self._initialize() def _initialize(self): try: - from src.chroma_client import get_chroma_client - - if self._model is None: - from src.embeddings import get_embedding_client - self._model = get_embedding_client() - if self._model is None: - raise RuntimeError("No embedding backend available") - logger.info(f"MemoryVectorStore using embeddings: {self._model.url}") - - client = get_chroma_client() - self._collection = client.get_or_create_collection( - name=self.COLLECTION_NAME, - metadata={"hnsw:space": "cosine"}, - ) + self._lanes = build_embedding_lanes(self.COLLECTION_NAME) + if not self._lanes: + raise RuntimeError("No embedding lanes available") self._healthy = True - count = self._collection.count() - logger.info(f"MemoryVectorStore ready (entries={count})") + self._collection = next( + (lane.collection for lane in self._lanes if lane.name == LANE_FASTEMBED), + self._lanes[0].collection, + ) + migrate_legacy_collection(self.COLLECTION_NAME, self._lanes) + logger.info( + "MemoryVectorStore ready (lanes=%s entries=%s)", + [lane.name for lane in self._lanes], + self.count(), + ) except Exception as e: logger.error(f"MemoryVectorStore init failed: {e}") @@ -53,39 +61,73 @@ class MemoryVectorStore: return self._healthy def _embed(self, texts: List[str]) -> List[List[float]]: - vecs = self._model.encode(texts, normalize_embeddings=True) - return vecs.tolist() + if not self._lanes: + return [] + return self._lanes[0].encode(texts) def count(self) -> int: """Return the number of stored vectors.""" if not self._healthy: return 0 - return self._collection.count() + return lane_count(self._lanes) + + def _collections_for_delete(self): + collections = [] + seen = set() + + def add(collection) -> None: + if collection is None: + return + key = getattr(collection, "name", None) or id(collection) + if key in seen: + return + seen.add(key) + collections.append(collection) + + for lane in self._lanes: + add(lane.collection) + + try: + from src.chroma_client import get_chroma_client + + client = get_chroma_client() + for lane_name in (LANE_CUSTOM, LANE_FASTEMBED): + try: + add(client.get_collection(collection_name(self.COLLECTION_NAME, lane_name))) + except Exception: + pass + except Exception: + pass + + return collections def add(self, memory_id: str, text: str): """Add a single memory entry to the vector index.""" if not self._healthy: return - # Skip if already exists - existing = self._collection.get(ids=[memory_id]) - if existing["ids"]: - return - embeddings = self._embed([text]) - self._collection.add( - ids=[memory_id], - embeddings=embeddings, - documents=[text], - metadatas=[{"source": "memory"}], - ) + for lane in self._lanes: + try: + existing = lane.collection.get(ids=[memory_id]) + if existing["ids"]: + continue + lane.collection.add( + ids=[memory_id], + embeddings=lane.encode([text]), + documents=[text], + metadatas=[{"source": "memory"}], + ) + except Exception as e: + logger.warning("memory add failed in %s lane for %s: %s", lane.name, memory_id, e) def remove(self, memory_id: str): """Remove a memory entry. O(1) — no rebuild needed.""" if not self._healthy: return - try: - self._collection.delete(ids=[memory_id]) - except Exception as e: - logger.warning(f"memory remove {memory_id}: {e}") + for collection in self._collections_for_delete(): + try: + collection.delete(ids=[memory_id]) + except Exception as e: + logger.warning(f"memory remove {memory_id}: {e}") def search(self, query: str, k: int = 8) -> List[Dict]: """Search for the most relevant memory IDs by semantic similarity. @@ -94,41 +136,53 @@ class MemoryVectorStore: ChromaDB cosine distance = 1 - cosine_similarity. We convert back: similarity = 1.0 - distance. """ - if not self._healthy or self._collection.count() == 0: + if not self._healthy or self.count() == 0: return [] - embeddings = self._embed([query]) - actual_k = min(k, self._collection.count()) - results = self._collection.query( - query_embeddings=embeddings, - n_results=actual_k, - ) - out = [] - for idx, mid in enumerate(results["ids"][0]): - distance = results["distances"][0][idx] - out.append({ - "memory_id": mid, - "score": round(1.0 - distance, 4), - }) - return out + lane_priority = {LANE_CUSTOM: 0, LANE_FASTEMBED: 1} + for lane in self._lanes: + try: + if lane.count() == 0: + continue + results = lane.collection.query( + query_embeddings=lane.encode([query]), + n_results=min(k, lane.count()), + include=["distances"], + ) + for idx, mid in enumerate(results["ids"][0]): + distance = results["distances"][0][idx] + out.append({ + "memory_id": mid, + "score": round(1.0 - distance, 4), + "embedding_lane": lane.name, + }) + except Exception as e: + logger.warning("memory search failed in %s lane: %s", lane.name, e) + out.sort(key=lambda row: (-row["score"], lane_priority.get(row["embedding_lane"], 99))) + return dedupe_results(out, id_key="memory_id", limit=k) def find_similar(self, text: str, threshold: float = 0.92) -> Optional[str]: """Check if a near-duplicate exists. Returns memory_id if found, else None.""" - if not self._healthy or self._collection.count() == 0: + if not self._healthy or self.count() == 0: return None - embeddings = self._embed([text]) - results = self._collection.query( - query_embeddings=embeddings, - n_results=1, - ) - - if results["ids"][0]: - distance = results["distances"][0][0] - similarity = 1.0 - distance - if similarity >= threshold: - return results["ids"][0][0] + for lane in self._lanes: + try: + if lane.count() == 0: + continue + results = lane.collection.query( + query_embeddings=lane.encode([text]), + n_results=1, + include=["distances"], + ) + if results["ids"][0]: + distance = results["distances"][0][0] + similarity = 1.0 - distance + if similarity >= threshold: + return results["ids"][0][0] + except Exception as e: + logger.warning("memory similarity search failed in %s lane: %s", lane.name, e) return None def rebuild(self, memories: List[Dict]): @@ -139,15 +193,23 @@ class MemoryVectorStore: from src.chroma_client import get_chroma_client - # Delete and recreate collection for a clean rebuild client = get_chroma_client() - try: - client.delete_collection(self.COLLECTION_NAME) - except Exception: - pass - self._collection = client.get_or_create_collection( - name=self.COLLECTION_NAME, - metadata={"hnsw:space": "cosine"}, + lane_names = [ + self.COLLECTION_NAME, + collection_name(self.COLLECTION_NAME, LANE_CUSTOM), + collection_name(self.COLLECTION_NAME, LANE_FASTEMBED), + ] + for name in lane_names: + try: + client.delete_collection(name) + except Exception: + pass + # Explicit rebuilds must start from the supplied memory list, so clear + # legacy unsuffixed collections too. + self._lanes = build_embedding_lanes(self.COLLECTION_NAME) + self._collection = next( + (lane.collection for lane in self._lanes if lane.name == LANE_FASTEMBED), + self._lanes[0].collection if self._lanes else None, ) texts = [] @@ -161,15 +223,29 @@ class MemoryVectorStore: if texts: # Batch in chunks of 100 to avoid oversized requests + failed_lanes = set() for i in range(0, len(texts), 100): batch_texts = texts[i:i + 100] batch_ids = ids[i:i + 100] - embeddings = self._embed(batch_texts) - self._collection.add( - ids=batch_ids, - embeddings=embeddings, - documents=batch_texts, - metadatas=[{"source": "memory"}] * len(batch_ids), - ) + for lane in self._lanes: + if lane.name in failed_lanes: + continue + try: + lane.collection.add( + ids=batch_ids, + embeddings=lane.encode(batch_texts), + documents=batch_texts, + metadatas=[{"source": "memory"}] * len(batch_ids), + ) + except Exception as e: + failed_lanes.add(lane.name) + logger.warning("memory rebuild failed in %s lane: %s", lane.name, e) - logger.info(f"MemoryVectorStore rebuilt with {len(ids)} entries") + logger.info(f"MemoryVectorStore rebuilt with {len(ids)} entries across {len(self._lanes)} lanes") + + def get_stats(self) -> Dict: + return { + "healthy": self.healthy, + "count": self.count(), + "lanes": [lane.stats() for lane in self._lanes], + } diff --git a/src/rag_vector.py b/src/rag_vector.py index 5f2b880b7..b10680c45 100644 --- a/src/rag_vector.py +++ b/src/rag_vector.py @@ -14,6 +14,17 @@ import numpy as np from typing import List, Dict, Any, Optional, Set from pathlib import Path +from src.embedding_lanes import ( + LANE_CUSTOM, + LANE_FASTEMBED, + build_embedding_lanes, + collection_name, + dedupe_results, + lane_count, + migrate_legacy_collection, + query_lanes, +) + logger = logging.getLogger(__name__) DEFAULT_FILE_EXTENSIONS: Set[str] = { @@ -44,6 +55,7 @@ class VectorRAG: self.persist_directory = persist_directory self._collection = None self._model = None + self._lanes = [] self._healthy = False Path(self.persist_directory).mkdir(parents=True, exist_ok=True) @@ -55,22 +67,20 @@ class VectorRAG: def _initialize_system(self) -> bool: try: - from src.chroma_client import get_chroma_client - from src.embeddings import get_embedding_client - - self._model = get_embedding_client() - if self._model is None: - raise RuntimeError("No embedding backend available") - logger.info(f"Embedding: {self._model.url} model={self._model.model}") - - client = get_chroma_client() - self._collection = client.get_or_create_collection( - name=COLLECTION_NAME, - metadata={"hnsw:space": "cosine"}, + self._lanes = build_embedding_lanes(COLLECTION_NAME) + if not self._lanes: + raise RuntimeError("No embedding lanes available") + self._collection = next( + (lane.collection for lane in self._lanes if lane.name == LANE_FASTEMBED), + self._lanes[0].collection, + ) + self._model = self._lanes[0].client + migrate_legacy_collection(COLLECTION_NAME, self._lanes) + logger.info( + "VectorRAG ready (lanes=%s docs=%s)", + [lane.name for lane in self._lanes], + lane_count(self._lanes), ) - - count = self._collection.count() - logger.info(f"VectorRAG ready ({count} docs)") self._healthy = True return True @@ -80,8 +90,9 @@ class VectorRAG: return False def _embed(self, texts: List[str]) -> List[List[float]]: - vecs = self._model.encode(texts, normalize_embeddings=True) - return np.array(vecs, dtype=np.float32).tolist() + if not self._lanes: + return [] + return np.array(self._lanes[0].encode(texts), dtype=np.float32).tolist() # ------------------------------------------------------------------ # Properties @@ -89,13 +100,57 @@ class VectorRAG: @property def healthy(self) -> bool: - return self._healthy and self._collection is not None + if getattr(self, "_lanes", None): + return self._healthy and bool(self._lanes) + return self._healthy and getattr(self, "_collection", None) is not None @property def collection(self): """Expose the ChromaDB collection for direct access by personal_routes etc.""" return self._collection + def _active_collections(self): + lanes = getattr(self, "_lanes", None) + if lanes: + return [(lane.name, lane.collection) for lane in lanes] + collection = getattr(self, "_collection", None) + return [("legacy", collection)] if collection is not None else [] + + def _collections_for_delete(self): + collections = [] + seen = set() + + def add(lane_name: str, collection) -> None: + if collection is None: + return + key = getattr(collection, "name", None) or id(collection) + if key in seen: + return + seen.add(key) + collections.append((lane_name, collection)) + + for lane_name, collection in self._active_collections(): + add(lane_name, collection) + + if getattr(self, "_lanes", None): + try: + from src.chroma_client import get_chroma_client + + client = get_chroma_client() + try: + add("legacy", client.get_collection(COLLECTION_NAME)) + except Exception: + pass + for lane_name in (LANE_CUSTOM, LANE_FASTEMBED): + try: + add(lane_name, client.get_collection(collection_name(COLLECTION_NAME, lane_name))) + except Exception: + pass + except Exception: + pass + + return collections + # ------------------------------------------------------------------ # Document operations # ------------------------------------------------------------------ @@ -109,23 +164,24 @@ class VectorRAG: if not metadata or not isinstance(metadata, dict): return False - try: - doc_id = _generate_doc_id(text, metadata.get("owner") or "") - # Check if already exists - existing = self._collection.get(ids=[doc_id]) - if existing["ids"]: - return True # already exists - embeddings = self._embed([text]) - self._collection.add( - ids=[doc_id], - embeddings=embeddings, - documents=[text], - metadatas=[metadata], - ) - return True - except Exception as e: - logger.error(f"add_document failed: {e}") - return False + doc_id = _generate_doc_id(text, metadata.get("owner") or "") + wrote = False + for lane in self._lanes: + try: + existing = lane.collection.get(ids=[doc_id]) + if existing["ids"]: + wrote = True + continue + lane.collection.add( + ids=[doc_id], + embeddings=lane.encode([text]), + documents=[text], + metadatas=[metadata], + ) + wrote = True + except Exception as e: + logger.warning("add_document failed in %s lane: %s", lane.name, e) + return wrote def add_documents_batch(self, docs: List[tuple]) -> Dict[str, Any]: if not self.healthy: @@ -140,42 +196,57 @@ class VectorRAG: if not valid: return {"success": False, "message": "No valid documents"} - try: - # Get existing IDs to avoid duplicates + added_ids = set() + attempted_new = False + write_failed = False + for lane in self._lanes: + all_ids = [_generate_doc_id(t, m.get("owner") or "") for t, m in valid] + try: + existing = lane.collection.get(ids=all_ids) + existing_ids = set(existing.get("ids") or []) + except Exception: + existing_ids = set() + new_texts = [] new_metas = [] new_ids = [] - for t, m in valid: - doc_id = _generate_doc_id(t, m.get("owner") or "") - existing = self._collection.get(ids=[doc_id]) - if not existing["ids"]: - new_texts.append(t) - new_metas.append(m) + for (text, meta), doc_id in zip(valid, all_ids): + if doc_id not in existing_ids: + new_texts.append(text) + new_metas.append(meta) new_ids.append(doc_id) if new_texts: - # Batch in chunks of 100 + attempted_new = True + lane_failed = False for i in range(0, len(new_texts), 100): batch_texts = new_texts[i:i + 100] batch_ids = new_ids[i:i + 100] batch_metas = new_metas[i:i + 100] - embeddings = self._embed(batch_texts) - self._collection.add( - ids=batch_ids, - embeddings=embeddings, - documents=batch_texts, - metadatas=batch_metas, - ) + try: + lane.collection.add( + ids=batch_ids, + embeddings=lane.encode(batch_texts), + documents=batch_texts, + metadatas=batch_metas, + ) + except Exception as e: + lane_failed = True + write_failed = True + logger.warning("add_documents_batch failed in %s lane: %s", lane.name, e) + break + if not lane_failed: + added_ids.update(new_ids) - return { - "success": True, - "added_count": len(new_texts), - "total_count": len(docs), - "failed_count": len(docs) - len(valid), - } - except Exception as e: - logger.error(f"add_documents_batch failed: {e}") - return {"success": False, "message": str(e)} + if attempted_new and write_failed and not added_ids: + return {"success": False, "message": "No embedding lane accepted the batch"} + + return { + "success": True, + "added_count": len(added_ids), + "total_count": len(docs), + "failed_count": len(docs) - len(valid), + } # ------------------------------------------------------------------ # Search — hybrid: vector similarity + keyword overlap @@ -186,58 +257,51 @@ class VectorRAG: return [] if not query or not isinstance(query, str): return [] - if self._collection.count() == 0: + if lane_count(self._lanes) == 0: return [] try: - # Fetch extra candidates when owner-filtering - fetch_k = min(k * 3, max(k, 20), self._collection.count()) - if owner: - fetch_k = min(fetch_k * 2, self._collection.count()) - - query_embeddings = self._embed([query]) - - # Use ChromaDB where filter for owner if specified where_filter = {"owner": owner} if owner else None - - results = self._collection.query( - query_embeddings=query_embeddings, - n_results=fetch_k, - where=where_filter, - include=["documents", "metadatas", "distances"], - ) - query_words = set(query.lower().split()) candidates = [] - for idx in range(len(results["ids"][0])): - doc_id = results["ids"][0][idx] - distance = results["distances"][0][idx] - doc_text = results["documents"][0][idx] - meta = results["metadatas"][0][idx] + for lane, results in query_lanes( + self._lanes, + query, + n_results=lambda lane: min( + (k * 6 if owner else k * 3), + max(k, 20), + lane.count(), + ), + where=where_filter, + include=["documents", "metadatas", "distances"], + raise_if_all_failed=True, + ): + for idx in range(len(results["ids"][0])): + doc_id = results["ids"][0][idx] + distance = results["distances"][0][idx] + doc_text = results["documents"][0][idx] + meta = results["metadatas"][0][idx] - # ChromaDB cosine distance = 1 - cosine_similarity - vector_sim = 1.0 - distance + vector_sim = 1.0 - distance + doc_words = set(doc_text.lower().split()) + overlap = len(query_words & doc_words) + keyword_score = overlap / len(query_words) if query_words else 0.0 + hybrid_score = (VECTOR_WEIGHT * vector_sim) + (KEYWORD_WEIGHT * keyword_score) - # Keyword overlap score - doc_words = set(doc_text.lower().split()) - overlap = len(query_words & doc_words) - keyword_score = overlap / len(query_words) if query_words else 0.0 - - hybrid_score = (VECTOR_WEIGHT * vector_sim) + (KEYWORD_WEIGHT * keyword_score) - - candidates.append({ - "id": doc_id, - "document": doc_text, - "metadata": meta, - "distance": round(distance, 4), - "similarity": round(hybrid_score, 4), - "vector_similarity": round(vector_sim, 4), - "keyword_score": round(keyword_score, 4), - }) + candidates.append({ + "id": doc_id, + "document": doc_text, + "metadata": meta, + "distance": round(distance, 4), + "similarity": round(hybrid_score, 4), + "vector_similarity": round(vector_sim, 4), + "keyword_score": round(keyword_score, 4), + "embedding_lane": lane.name, + }) candidates.sort(key=lambda c: c["similarity"], reverse=True) - top = candidates[:k] + top = dedupe_results(candidates, limit=k) logger.info(f"Hybrid search for '{query[:60]}': {len(top)} results") return top @@ -247,39 +311,36 @@ class VectorRAG: def _keyword_search_fallback(self, query: str, k: int = 5, owner: Optional[str] = None) -> List[Dict[str, Any]]: try: - if self._collection.count() == 0: - return [] - - # Fetch all documents for keyword search fallback - all_docs = self._collection.get(include=["documents", "metadatas"]) - if not all_docs["ids"]: + if not self._active_collections(): return [] query_words = query.lower().split() scored = [] - for i, doc in enumerate(all_docs["documents"]): - meta = all_docs["metadatas"][i] - if owner: - # Match the primary path's strict where={"owner": owner} - # filter. The old `if doc_owner and doc_owner != owner` - # let docs with a missing/empty owner fall through, leaking - # owner-less documents into another user's results. - if meta.get("owner") != owner: + for lane_name, collection in self._active_collections(): + if collection.count() == 0: + continue + all_docs = collection.get(include=["documents", "metadatas"]) + if not all_docs["ids"]: + continue + for i, doc in enumerate(all_docs["documents"]): + meta = all_docs["metadatas"][i] + if owner and meta.get("owner") != owner: continue - doc_lower = doc.lower() - score = sum(1 for w in query_words if w in doc_lower) - if score > 0: - scored.append({ - "id": all_docs["ids"][i], - "document": doc, - "metadata": meta, - "distance": 0, - "similarity": score, - "search_type": "keyword_fallback", - }) + doc_lower = doc.lower() + score = sum(1 for w in query_words if w in doc_lower) + if score > 0: + scored.append({ + "id": all_docs["ids"][i], + "document": doc, + "metadata": meta, + "distance": 0, + "similarity": score, + "search_type": "keyword_fallback", + "embedding_lane": lane_name, + }) scored.sort(key=lambda x: x["similarity"], reverse=True) - return scored[:k] + return dedupe_results(scored, limit=k) except Exception as e: logger.error(f"keyword fallback failed: {e}") return [] @@ -296,9 +357,20 @@ class VectorRAG: client.delete_collection(COLLECTION_NAME) except Exception: pass - self._collection = client.get_or_create_collection( - name=COLLECTION_NAME, - metadata={"hnsw:space": "cosine"}, + for name in ( + collection_name(COLLECTION_NAME, LANE_CUSTOM), + collection_name(COLLECTION_NAME, LANE_FASTEMBED), + ): + try: + client.delete_collection(name) + except Exception: + pass + # Rebuild means empty current lanes. Clear the legacy unsuffixed + # collection too so startup migration cannot resurrect stale docs. + self._lanes = build_embedding_lanes(COLLECTION_NAME) + self._collection = next( + (lane.collection for lane in self._lanes if lane.name == LANE_FASTEMBED), + self._lanes[0].collection if self._lanes else None, ) self._healthy = True return True @@ -312,10 +384,11 @@ class VectorRAG: return {"error": "Collection not initialized"} try: return { - "document_count": self._collection.count(), - "embedding_model": f"{self._model.model} @ {self._model.url}" if self._model else "N/A", + "document_count": lane_count(self._lanes), + "embedding_model": f"{self._lanes[0].model} @ {self._lanes[0].url}" if self._lanes else "N/A", "persist_directory": self.persist_directory, "collection_name": COLLECTION_NAME, + "embedding_lanes": [lane.stats() for lane in self._lanes], "healthy": True, } except Exception as e: @@ -400,19 +473,23 @@ class VectorRAG: return {"success": False, "message": "Collection not initialized"} directory = os.path.abspath(directory) try: - results = self._collection.get(include=["metadatas"]) - ids = [ - results["ids"][i] - for i, m in enumerate(results["metadatas"]) - if isinstance(m, dict) - and isinstance(m.get("source"), str) - and (m["source"] == directory or m["source"].startswith(directory + os.sep)) - ] - if not ids: + removed_ids = set() + for _lane_name, collection in self._collections_for_delete(): + results = collection.get(include=["metadatas"]) + ids = [ + results["ids"][i] + for i, m in enumerate(results["metadatas"]) + if isinstance(m, dict) + and isinstance(m.get("source"), str) + and (m["source"] == directory or m["source"].startswith(directory + os.sep)) + ] + if ids: + collection.delete(ids=ids) + removed_ids.update(ids) + if not removed_ids: return {"success": True, "removed_count": 0, "message": "No docs found"} - self._collection.delete(ids=ids) - n = len(ids) + n = len(removed_ids) logger.info(f"Removed {n} chunks from {directory}") return {"success": True, "removed_count": n, "message": f"Removed {n} chunks"} except Exception as e: @@ -504,16 +581,18 @@ class VectorRAG: if not self.healthy: return 0 try: - results = self._collection.get( - where={"source": source}, - include=[], - ) - ids = results.get("ids", []) - if not ids: - return 0 - self._collection.delete(ids=ids) - logger.info(f"Deleted {len(ids)} chunks for source={source}") - return len(ids) + removed_ids = set() + for _lane_name, collection in self._collections_for_delete(): + results = collection.get( + where={"source": source}, + include=[], + ) + ids = results.get("ids", []) + if ids: + collection.delete(ids=ids) + removed_ids.update(ids) + logger.info(f"Deleted {len(removed_ids)} chunks for source={source}") + return len(removed_ids) except Exception as e: logger.error(f"delete_by_source failed: {e}") return 0 diff --git a/src/tool_index.py b/src/tool_index.py index ae8c4ec31..f6acd0828 100644 --- a/src/tool_index.py +++ b/src/tool_index.py @@ -12,6 +12,14 @@ import re import time from typing import Dict, List, Optional, Set +from src.embedding_lanes import ( + LANE_CUSTOM, + LANE_FASTEMBED, + build_embedding_lanes, + dedupe_results, + migrate_legacy_collection, +) + try: import numpy as np except ImportError: @@ -155,32 +163,30 @@ class ToolIndex: """ChromaDB-backed tool index for RAG-based tool selection.""" def __init__(self): - from src.chroma_client import get_chroma_client - from src.embeddings import get_embedding_client - - self._embedder = get_embedding_client() - if not self._embedder: - raise RuntimeError("No embedding client available") - - client = get_chroma_client() - self._collection = client.get_or_create_collection( - name=COLLECTION_NAME, - metadata={"hnsw:space": "cosine"}, + self._lanes = build_embedding_lanes(COLLECTION_NAME) + if not self._lanes: + raise RuntimeError("No embedding lanes available") + self._embedder = self._lanes[0].client + self._collection = next( + (lane.collection for lane in self._lanes if lane.name == LANE_FASTEMBED), + self._lanes[0].collection, ) + migrate_legacy_collection(COLLECTION_NAME, self._lanes) self._fingerprint = "" self._mcp_generation = -1 self._healthy = True - logger.info("ToolIndex initialized") + logger.info("ToolIndex initialized (lanes=%s)", [lane.name for lane in self._lanes]) @property def healthy(self): return self._healthy def _embed(self, texts: List[str]) -> List[List[float]]: - vecs = self._embedder.encode(texts, normalize_embeddings=True) + if not self._lanes: + return [] + vecs = self._lanes[0].encode(texts) if np is not None: return np.array(vecs, dtype=np.float32).tolist() - # Fallback without numpy return [list(v) for v in vecs] def index_builtin_tools(self): @@ -201,23 +207,31 @@ class ToolIndex: # registry (e.g. removed tools like the old vault_* set). # Without this, upsert leaves them in place and RAG keeps # surfacing tools that no longer exist. - try: - existing = self._collection.get(where={"tool_type": "builtin"}) - existing_ids = (existing or {}).get("ids") or [] - stale = [i for i in existing_ids if i not in set(ids)] - if stale: - self._collection.delete(ids=stale) - logger.info(f"Pruned {len(stale)} stale builtin tool entries from index") - except Exception as e: - logger.debug(f"Stale-pruning skipped: {e}") + indexed = False + for lane in self._lanes: + try: + existing = lane.collection.get(where={"tool_type": "builtin"}) + existing_ids = (existing or {}).get("ids") or [] + stale = [i for i in existing_ids if i not in set(ids)] + if stale: + lane.collection.delete(ids=stale) + logger.info(f"Pruned {len(stale)} stale builtin tool entries from {lane.name} index") + except Exception as e: + logger.debug(f"Stale-pruning skipped for {lane.name}: {e}") - embeddings = self._embed(docs) - self._collection.upsert( - ids=ids, - documents=docs, - embeddings=embeddings, - metadatas=metadatas, - ) + try: + lane.collection.upsert( + ids=ids, + documents=docs, + embeddings=lane.encode(docs), + metadatas=metadatas, + ) + indexed = True + except Exception as e: + logger.warning("Builtin tool indexing failed in %s lane: %s", lane.name, e) + if not indexed: + self._healthy = False + raise RuntimeError("Builtin tool indexing failed in all embedding lanes") self._fingerprint = hashlib.sha256( ",".join(sorted(BUILTIN_TOOL_DESCRIPTIONS.keys())).encode() ).hexdigest() @@ -232,15 +246,15 @@ class ToolIndex: gen = getattr(mcp_mgr, '_generation', 0) if gen == self._mcp_generation: return - self._mcp_generation = gen # Remove old MCP entries - try: - existing = self._collection.get(where={"tool_type": "mcp"}) - if existing and existing["ids"]: - self._collection.delete(ids=existing["ids"]) - except Exception: - pass + for lane in self._lanes: + try: + existing = lane.collection.get(where={"tool_type": "mcp"}) + if existing and existing["ids"]: + lane.collection.delete(ids=existing["ids"]) + except Exception: + pass # Get current MCP tools try: @@ -249,6 +263,7 @@ class ToolIndex: all_tools = "" if not all_tools: + self._mcp_generation = gen return # Parse MCP tool descriptions from the prompt text @@ -276,39 +291,59 @@ class ToolIndex: metadatas.append({"tool_name": name, "tool_type": "mcp"}) if not docs: + self._mcp_generation = gen return - embeddings = self._embed(docs) - self._collection.upsert( - ids=ids, - documents=docs, - embeddings=embeddings, - metadatas=metadatas, - ) + indexed = False + for lane in self._lanes: + try: + lane.collection.upsert( + ids=ids, + documents=docs, + embeddings=lane.encode(docs), + metadatas=metadatas, + ) + indexed = True + except Exception as e: + logger.warning("MCP tool indexing failed in %s lane: %s", lane.name, e) + if not indexed: + logger.warning("MCP tool indexing failed in all embedding lanes") + return + self._mcp_generation = gen logger.info(f"Indexed {len(docs)} MCP tools") def retrieve(self, query: str, k: int = 8) -> List[str]: """Retrieve the top-K most relevant tool names for a query.""" - try: - query_embedding = self._embed([query]) - results = self._collection.query( - query_embeddings=query_embedding, - n_results=min(k, self._collection.count() or k), - include=["metadatas", "distances"], - ) - if not results or not results.get("metadatas"): - return [] - - tool_names = [] - for meta_list in results["metadatas"]: - for meta in meta_list: - name = meta.get("tool_name", "") - if name and name not in tool_names: - tool_names.append(name) - return tool_names - except Exception as e: - logger.warning(f"Tool retrieval failed: {e}") - return [] + rows = [] + lane_priority = {LANE_CUSTOM: 0, LANE_FASTEMBED: 1} + for lane in self._lanes: + try: + count = lane.count() + if count == 0: + continue + results = lane.collection.query( + query_embeddings=lane.encode([query]), + n_results=min(k, count), + include=["metadatas", "distances"], + ) + if not results or not results.get("metadatas"): + continue + distances = results.get("distances") or [] + for list_idx, meta_list in enumerate(results["metadatas"]): + distance_list = distances[list_idx] if list_idx < len(distances) else [] + for idx, meta in enumerate(meta_list): + name = meta.get("tool_name", "") + if name: + distance = distance_list[idx] if idx < len(distance_list) else 1.0 + rows.append({ + "tool_name": name, + "score": round(1.0 - distance, 4), + "embedding_lane": lane.name, + }) + except Exception as e: + logger.warning("Tool retrieval failed in %s lane: %s", lane.name, e) + rows.sort(key=lambda row: (-row["score"], lane_priority.get(row["embedding_lane"], 99))) + return [row["tool_name"] for row in dedupe_results(rows, id_key="tool_name", limit=k)] # Structural recurring-schedule intent. Typo-resilient (matches "every dya" # via "every "), and catches bare clock times ("at 7:30 am", "7am"). @@ -511,3 +546,10 @@ def get_tool_index() -> Optional[ToolIndex]: logger.warning(f"ToolIndex init failed (will retry in {_RETRY_INTERVAL}s): {e}") _tool_index = None return None + + +def reset_tool_index() -> None: + """Clear the singleton so embedding endpoint changes rebuild tool lanes.""" + global _tool_index, _last_attempt + _tool_index = None + _last_attempt = 0.0 diff --git a/tests/test_embedding_lanes.py b/tests/test_embedding_lanes.py new file mode 100644 index 000000000..e7adf88bf --- /dev/null +++ b/tests/test_embedding_lanes.py @@ -0,0 +1,1104 @@ +import pytest + +from src.embedding_lanes import ( + EmbeddingLane, + LANE_CUSTOM, + LANE_FASTEMBED, + build_embedding_lanes, +) + + +class FakeEmbedder: + def __init__(self, dim, model, url): + self.dim = dim + self.model = model + self.url = url + + def get_sentence_embedding_dimension(self): + return self.dim + + def encode(self, texts, normalize_embeddings=True): + return [[float(i + 1)] * self.dim for i, _ in enumerate(texts)] + + +class FailingEmbedder(FakeEmbedder): + def encode(self, texts, normalize_embeddings=True): + raise RuntimeError("embedding endpoint rate limited") + + +class FakeCollection: + def __init__(self, name, metadata=None): + self.name = name + self.metadata = metadata or {} + self.rows = {} + self.dim = None + + def count(self): + return len(self.rows) + + def add(self, ids, embeddings, documents=None, metadatas=None): + self._check_dim(embeddings) + documents = documents or [None] * len(ids) + metadatas = metadatas or [{}] * len(ids) + for row_id, emb, doc, meta in zip(ids, embeddings, documents, metadatas): + self.rows[row_id] = {"embedding": emb, "document": doc, "metadata": meta} + + def upsert(self, ids, embeddings, documents=None, metadatas=None): + self.add(ids, embeddings, documents=documents, metadatas=metadatas) + + def get(self, ids=None, include=None, where=None, limit=None): + selected = list(self.rows.items()) + if ids is not None: + id_set = set(ids) + selected = [(row_id, row) for row_id, row in selected if row_id in id_set] + if where: + selected = [ + (row_id, row) + for row_id, row in selected + if all(row["metadata"].get(k) == v for k, v in where.items()) + ] + if limit is not None: + selected = selected[:limit] + return { + "ids": [row_id for row_id, _ in selected], + "documents": [row["document"] for _, row in selected], + "metadatas": [row["metadata"] for _, row in selected], + "embeddings": [row["embedding"] for _, row in selected], + } + + def query(self, query_embeddings, n_results, where=None, include=None): + self._check_dim(query_embeddings) + rows = self.get(where=where) + ids = rows["ids"][:n_results] + docs = rows["documents"][:n_results] + metas = rows["metadatas"][:n_results] + return { + "ids": [ids], + "documents": [docs], + "metadatas": [metas], + "distances": [[0.1 + i * 0.01 for i in range(len(ids))]], + } + + def delete(self, ids): + for row_id in ids: + self.rows.pop(row_id, None) + + def _check_dim(self, embeddings): + if not embeddings: + return + dim = len(embeddings[0]) + if self.dim is None: + self.dim = dim + elif self.dim != dim: + raise RuntimeError(f"Collection expecting embedding with dimension of {self.dim}, got {dim}") + + +class FakeChroma: + def __init__(self): + self.collections = {} + self.deleted = [] + self.fail_next_add_for = {} + + def get_or_create_collection(self, name, metadata=None): + if name not in self.collections: + self.collections[name] = FakeCollection(name, metadata=metadata) + if self.fail_next_add_for.get(name, 0) > 0: + original_add = self.collections[name].add + + def fail_once(*args, **kwargs): + self.fail_next_add_for[name] -= 1 + self.collections[name].add = original_add + raise RuntimeError("chroma write failed") + + self.collections[name].add = fail_once + elif metadata is not None: + self.collections[name].metadata = metadata + return self.collections[name] + + def get_collection(self, name): + if name not in self.collections: + raise KeyError(name) + return self.collections[name] + + def delete_collection(self, name): + self.deleted.append(name) + self.collections.pop(name, None) + + +def _patch_chroma(monkeypatch, fake): + import src.chroma_client as chroma_client + + monkeypatch.setattr(chroma_client, "get_chroma_client", lambda: fake) + + +def test_build_embedding_lanes_keeps_custom_and_fastembed_dimensions_separate(monkeypatch): + fake = FakeChroma() + _patch_chroma(monkeypatch, fake) + + import src.embedding_lanes as lanes + + monkeypatch.setattr( + lanes, + "_build_custom_client", + lambda: FakeEmbedder(768, "nomic-embed-text", "http://embeddings/v1"), + ) + monkeypatch.setattr( + lanes, + "_build_fastembed_client", + lambda: FakeEmbedder(384, "sentence-transformers/all-MiniLM-L6-v2", "local://fastembed"), + ) + + built = build_embedding_lanes("odysseus_memories") + + assert [lane.name for lane in built] == [LANE_CUSTOM, LANE_FASTEMBED] + assert built[0].collection_name == "odysseus_memories_custom" + assert built[0].dimension == 768 + assert built[1].collection_name == "odysseus_memories_fastembed" + assert built[1].dimension == 384 + + built[0].collection.add(ids=["custom"], embeddings=built[0].encode(["a"]), documents=["a"]) + built[1].collection.add(ids=["fast"], embeddings=built[1].encode(["a"]), documents=["a"]) + + with pytest.raises(RuntimeError, match="dimension"): + built[0].collection.query(query_embeddings=built[1].encode(["bad"]), n_results=1) + + +def test_build_embedding_lanes_recreates_only_custom_when_fingerprint_changes(monkeypatch): + fake = FakeChroma() + old_custom = fake.get_or_create_collection( + "odysseus_rag_custom", + metadata={ + "embedding_lane": "custom", + "embedding_dimension": 768, + "embedding_fingerprint": "old", + }, + ) + old_custom.add(ids=["old"], embeddings=[[0.0] * 768], documents=["old"]) + fast = fake.get_or_create_collection( + "odysseus_rag_fastembed", + metadata={ + "embedding_lane": "fastembed", + "embedding_dimension": 384, + }, + ) + fast.add(ids=["fast"], embeddings=[[0.0] * 384], documents=["fast"]) + _patch_chroma(monkeypatch, fake) + + import src.embedding_lanes as lanes + + monkeypatch.setattr(lanes, "_build_custom_client", lambda: FakeEmbedder(1024, "bge-large", "http://embeddings/v1")) + monkeypatch.setattr(lanes, "_build_fastembed_client", lambda: FakeEmbedder(384, "sentence-transformers/all-MiniLM-L6-v2", "local://fastembed")) + + built = build_embedding_lanes("odysseus_rag") + + assert "odysseus_rag_custom" in fake.deleted + assert fake.collections["odysseus_rag_custom"].count() == 1 + assert len(fake.collections["odysseus_rag_custom"].rows["old"]["embedding"]) == 1024 + assert fake.collections["odysseus_rag_fastembed"].count() == 1 + assert built[0].dimension == 1024 + + +def test_lane_reset_reembeds_existing_documents_on_fingerprint_change(monkeypatch): + fake = FakeChroma() + old_custom = fake.get_or_create_collection( + "odysseus_memories_custom", + metadata={ + "embedding_lane": "custom", + "embedding_dimension": 384, + "embedding_fingerprint": "old", + }, + ) + old_custom.add( + ids=["existing-memory"], + embeddings=[[0.0] * 384], + documents=["existing custom memory"], + metadatas=[{"source": "memory"}], + ) + _patch_chroma(monkeypatch, fake) + + import src.embedding_lanes as lanes + + monkeypatch.setattr(lanes, "_build_custom_client", lambda: FakeEmbedder(768, "nomic", "http://embeddings/v1")) + + def fail_fastembed(): + raise RuntimeError("fastembed missing") + + monkeypatch.setattr(lanes, "_build_fastembed_client", fail_fastembed) + + built = build_embedding_lanes("odysseus_memories") + + assert [lane.name for lane in built] == [LANE_CUSTOM] + assert "odysseus_memories_custom" in fake.deleted + rebuilt = fake.collections["odysseus_memories_custom"] + assert rebuilt.count() == 1 + assert rebuilt.get()["ids"] == ["existing-memory"] + assert len(rebuilt.rows["existing-memory"]["embedding"]) == 768 + + +def test_lane_reset_keeps_existing_collection_when_reembed_fails(monkeypatch): + fake = FakeChroma() + old_custom = fake.get_or_create_collection( + "odysseus_memories_custom", + metadata={ + "embedding_lane": "custom", + "embedding_dimension": 384, + "embedding_fingerprint": "old", + }, + ) + old_custom.add( + ids=["existing-memory"], + embeddings=[[0.0] * 384], + documents=["existing custom memory"], + metadatas=[{"source": "memory"}], + ) + _patch_chroma(monkeypatch, fake) + + import src.embedding_lanes as lanes + + monkeypatch.setattr(lanes, "_build_custom_client", lambda: FailingEmbedder(768, "nomic", "http://embeddings/v1")) + monkeypatch.setattr(lanes, "_build_fastembed_client", lambda: FakeEmbedder(384, "mini", "local://fastembed")) + + built = build_embedding_lanes("odysseus_memories") + + assert [lane.name for lane in built] == [LANE_FASTEMBED] + assert "odysseus_memories_custom" not in fake.deleted + assert fake.collections["odysseus_memories_custom"].count() == 1 + assert len(fake.collections["odysseus_memories_custom"].rows["existing-memory"]["embedding"]) == 384 + + +def test_lane_reset_keeps_existing_collection_when_preserve_read_fails(monkeypatch): + fake = FakeChroma() + old_custom = fake.get_or_create_collection( + "odysseus_memories_custom", + metadata={ + "embedding_lane": "custom", + "embedding_dimension": 384, + "embedding_fingerprint": "old", + }, + ) + old_custom.add( + ids=["existing-memory"], + embeddings=[[0.0] * 384], + documents=["existing custom memory"], + metadatas=[{"source": "memory"}], + ) + + def fail_get(*_args, **_kwargs): + raise RuntimeError("chroma read failed") + + old_custom.get = fail_get + _patch_chroma(monkeypatch, fake) + + import src.embedding_lanes as lanes + + monkeypatch.setattr(lanes, "_build_custom_client", lambda: FakeEmbedder(768, "nomic", "http://embeddings/v1")) + + def fail_fastembed(): + raise RuntimeError("fastembed missing") + + monkeypatch.setattr(lanes, "_build_fastembed_client", fail_fastembed) + + built = build_embedding_lanes("odysseus_memories") + + assert built == [] + assert "odysseus_memories_custom" not in fake.deleted + assert "odysseus_memories_custom" in fake.collections + + +def test_lane_reset_restores_existing_collection_when_rewrite_fails(monkeypatch): + fake = FakeChroma() + old_custom = fake.get_or_create_collection( + "odysseus_memories_custom", + metadata={ + "embedding_lane": "custom", + "embedding_dimension": 384, + "embedding_fingerprint": "old", + }, + ) + old_custom.add( + ids=["existing-memory"], + embeddings=[[0.0] * 384], + documents=["existing custom memory"], + metadatas=[{"source": "memory"}], + ) + fake.fail_next_add_for["odysseus_memories_custom"] = 1 + _patch_chroma(monkeypatch, fake) + + import src.embedding_lanes as lanes + + monkeypatch.setattr(lanes, "_build_custom_client", lambda: FakeEmbedder(768, "nomic", "http://embeddings/v1")) + + def fail_fastembed(): + raise RuntimeError("fastembed missing") + + monkeypatch.setattr(lanes, "_build_fastembed_client", fail_fastembed) + + built = build_embedding_lanes("odysseus_memories") + + assert built == [] + restored = fake.collections["odysseus_memories_custom"] + assert restored.count() == 1 + assert restored.get()["ids"] == ["existing-memory"] + assert len(restored.rows["existing-memory"]["embedding"]) == 384 + + +def test_build_embedding_lanes_uses_fastembed_when_custom_unavailable(monkeypatch): + fake = FakeChroma() + _patch_chroma(monkeypatch, fake) + + import src.embedding_lanes as lanes + + def fail_custom(): + raise RuntimeError("down") + + monkeypatch.setattr(lanes, "_build_custom_client", fail_custom) + monkeypatch.setattr(lanes, "_build_fastembed_client", lambda: FakeEmbedder(384, "mini", "local://fastembed")) + + built = build_embedding_lanes("odysseus_tool_index") + + assert [lane.name for lane in built] == [LANE_FASTEMBED] + assert built[0].collection_name == "odysseus_tool_index_fastembed" + + +def test_custom_lane_preserves_default_embedding_client_probe(monkeypatch): + import src.embedding_lanes as lanes + import src.embeddings as embeddings + + embeddings.reset_http_embed_state() + monkeypatch.setattr(lanes, "_load_custom_endpoint", lambda: {}) + + calls = [] + + class DefaultClient(FakeEmbedder): + def __init__(self, url=None, model=None, api_key=None): + calls.append({"url": url, "model": model, "api_key": api_key}) + super().__init__(768, model or "all-minilm:l6-v2", url or "http://localhost:11434/v1/embeddings") + + monkeypatch.setattr(embeddings, "EmbeddingClient", DefaultClient) + + client = lanes._build_custom_client() + + assert calls == [{"url": None, "model": None, "api_key": None}] + assert client.url == "http://localhost:11434/v1/embeddings" + embeddings.reset_http_embed_state() + + +def test_custom_lane_uses_http_down_latch(monkeypatch): + import src.embedding_lanes as lanes + import src.embeddings as embeddings + + embeddings.reset_http_embed_state() + calls = [] + + class DownClient: + def __init__(self, url=None, model=None, api_key=None): + calls.append({"url": url, "model": model, "api_key": api_key}) + + def get_sentence_embedding_dimension(self): + raise RuntimeError("endpoint down") + + class LocalFastEmbed(FakeEmbedder): + def __init__(self): + super().__init__(384, "mini", "local://fastembed") + + monkeypatch.setattr(embeddings, "EmbeddingClient", DownClient) + monkeypatch.setattr(embeddings, "FastEmbedClient", LocalFastEmbed) + + with pytest.raises(RuntimeError, match="HTTP embedding lane unavailable"): + lanes._build_custom_client() + with pytest.raises(RuntimeError, match="HTTP embedding lane unavailable"): + lanes._build_custom_client() + + assert calls == [{"url": None, "model": None, "api_key": None}] + embeddings.reset_http_embed_state() + + +def test_memory_vector_store_writes_both_lanes_and_prefers_custom(monkeypatch): + fake = FakeChroma() + _patch_chroma(monkeypatch, fake) + + import src.embedding_lanes as lanes + + monkeypatch.setattr(lanes, "_build_custom_client", lambda: FakeEmbedder(768, "nomic", "http://embeddings/v1")) + monkeypatch.setattr(lanes, "_build_fastembed_client", lambda: FakeEmbedder(384, "mini", "local://fastembed")) + + from src.memory_vector import MemoryVectorStore + + store = MemoryVectorStore("data") + store.add("mem-1", "Nicholai likes direct memory systems") + + assert fake.collections["odysseus_memories_custom"].count() == 1 + assert fake.collections["odysseus_memories_fastembed"].count() == 1 + + results = store.search("direct memory", k=5) + assert results[0]["memory_id"] == "mem-1" + assert results[0]["embedding_lane"] == LANE_CUSTOM + + +def test_memory_search_merges_fallback_only_results_before_limit(): + custom_collection = FakeCollection("odysseus_memories_custom", metadata={"embedding_lane": "custom"}) + fast_collection = FakeCollection("odysseus_memories_fastembed", metadata={"embedding_lane": "fastembed"}) + custom_collection.add( + ids=["old-1", "old-2"], + embeddings=[[0.0] * 768, [0.0] * 768], + documents=["older custom memory", "another custom memory"], + metadatas=[{"source": "memory"}, {"source": "memory"}], + ) + fast_collection.add( + ids=["fallback-only"], + embeddings=[[0.0] * 384], + documents=["fallback only relevant memory"], + metadatas=[{"source": "memory"}], + ) + + custom_collection.query = lambda **_kwargs: { + "ids": [["old-1", "old-2"]], + "distances": [[0.20, 0.21]], + } + fast_collection.query = lambda **_kwargs: { + "ids": [["fallback-only"]], + "distances": [[0.05]], + } + + custom_lane = EmbeddingLane( + name=LANE_CUSTOM, + client=FakeEmbedder(768, "nomic", "http://embeddings/v1"), + collection=custom_collection, + collection_name="odysseus_memories_custom", + model="nomic", + url="http://embeddings/v1", + dimension=768, + fingerprint="custom", + ) + fast_lane = EmbeddingLane( + name=LANE_FASTEMBED, + client=FakeEmbedder(384, "mini", "local://fastembed"), + collection=fast_collection, + collection_name="odysseus_memories_fastembed", + model="mini", + url="local://fastembed", + dimension=384, + fingerprint="fast", + ) + + from src.memory_vector import MemoryVectorStore + + store = MemoryVectorStore.__new__(MemoryVectorStore) + store._lanes = [custom_lane, fast_lane] + store._healthy = True + + results = store.search("fallback relevant", k=2) + + assert [row["memory_id"] for row in results] == ["fallback-only", "old-1"] + + +def test_vector_rag_writes_both_lanes_and_falls_back_to_fastembed(monkeypatch): + fake = FakeChroma() + _patch_chroma(monkeypatch, fake) + + import src.embedding_lanes as lanes + + monkeypatch.setattr(lanes, "_build_custom_client", lambda: None) + monkeypatch.setattr(lanes, "_build_fastembed_client", lambda: FakeEmbedder(384, "mini", "local://fastembed")) + + from src.rag_vector import VectorRAG + + rag = VectorRAG() + assert rag.add_document("session search belongs in tools", {"source": "/tmp/a.md", "owner": "alice"}) + assert "odysseus_rag_custom" not in fake.collections + assert fake.collections["odysseus_rag_fastembed"].count() == 1 + + results = rag.search("session search", k=3, owner="alice") + assert results[0]["document"] == "session search belongs in tools" + assert results[0]["embedding_lane"] == LANE_FASTEMBED + + +def test_vector_rag_batch_index_continues_when_custom_lane_fails(monkeypatch, tmp_path): + fake = FakeChroma() + _patch_chroma(monkeypatch, fake) + + import src.embedding_lanes as lanes + + monkeypatch.setattr(lanes, "_build_custom_client", lambda: FailingEmbedder(768, "nomic", "http://embeddings/v1")) + monkeypatch.setattr(lanes, "_build_fastembed_client", lambda: FakeEmbedder(384, "mini", "local://fastembed")) + + from src.rag_vector import VectorRAG + + rag = VectorRAG(persist_directory=str(tmp_path)) + result = rag.add_documents_batch([ + ("batch fallback document", {"source": "/tmp/a.md", "owner": "alice"}), + ]) + + assert result["success"] + assert result["added_count"] == 1 + assert fake.collections["odysseus_rag_custom"].count() == 0 + assert fake.collections["odysseus_rag_fastembed"].count() == 1 + + +def test_vector_rag_batch_index_reports_failure_when_all_lanes_fail(monkeypatch, tmp_path): + fake = FakeChroma() + _patch_chroma(monkeypatch, fake) + + import src.embedding_lanes as lanes + + monkeypatch.setattr(lanes, "_build_custom_client", lambda: FailingEmbedder(768, "nomic", "http://embeddings/v1")) + monkeypatch.setattr(lanes, "_build_fastembed_client", lambda: FailingEmbedder(384, "mini", "local://fastembed")) + + from src.rag_vector import VectorRAG + + rag = VectorRAG(persist_directory=str(tmp_path)) + result = rag.add_documents_batch([ + ("batch outage document", {"source": "/tmp/a.md", "owner": "alice"}), + ]) + + assert not result["success"] + assert fake.collections["odysseus_rag_custom"].count() == 0 + assert fake.collections["odysseus_rag_fastembed"].count() == 0 + + +def test_tool_index_indexes_and_retrieves_from_available_lanes(monkeypatch): + fake = FakeChroma() + _patch_chroma(monkeypatch, fake) + + import src.embedding_lanes as lanes + + monkeypatch.setattr(lanes, "_build_custom_client", lambda: FakeEmbedder(768, "nomic", "http://embeddings/v1")) + monkeypatch.setattr(lanes, "_build_fastembed_client", lambda: FakeEmbedder(384, "mini", "local://fastembed")) + + from src.tool_index import ToolIndex + + index = ToolIndex() + index.index_builtin_tools() + + assert fake.collections["odysseus_tool_index_custom"].count() > 0 + assert fake.collections["odysseus_tool_index_fastembed"].count() > 0 + assert "bash" in index.retrieve("run a shell command", k=10) + + +def test_tool_index_builtin_indexing_fails_when_all_lanes_fail(): + custom_lane = EmbeddingLane( + name=LANE_CUSTOM, + client=FailingEmbedder(768, "nomic", "http://embeddings/v1"), + collection=FakeCollection("odysseus_tool_index_custom", metadata={"embedding_lane": "custom"}), + collection_name="odysseus_tool_index_custom", + model="nomic", + url="http://embeddings/v1", + dimension=768, + fingerprint="custom", + ) + fast_lane = EmbeddingLane( + name=LANE_FASTEMBED, + client=FailingEmbedder(384, "mini", "local://fastembed"), + collection=FakeCollection("odysseus_tool_index_fastembed", metadata={"embedding_lane": "fastembed"}), + collection_name="odysseus_tool_index_fastembed", + model="mini", + url="local://fastembed", + dimension=384, + fingerprint="fast", + ) + + from src.tool_index import ToolIndex + + index = ToolIndex.__new__(ToolIndex) + index._lanes = [custom_lane, fast_lane] + index._healthy = True + + with pytest.raises(RuntimeError, match="all embedding lanes"): + index.index_builtin_tools() + assert not index.healthy + + +def test_tool_index_retrieval_continues_when_custom_lane_query_fails(): + custom_collection = FakeCollection("odysseus_tool_index_custom", metadata={"embedding_lane": "custom"}) + fast_collection = FakeCollection("odysseus_tool_index_fastembed", metadata={"embedding_lane": "fastembed"}) + fast_collection.add( + ids=["builtin_bash"], + embeddings=[[0.0] * 384], + documents=["Tool: bash\nRun shell commands"], + metadatas=[{"tool_name": "bash", "tool_type": "builtin"}], + ) + + def fail_query(*_args, **_kwargs): + raise RuntimeError("custom endpoint down") + + custom_collection.add( + ids=["builtin_python"], + embeddings=[[0.0] * 768], + documents=["Tool: python\nRun Python"], + metadatas=[{"tool_name": "python", "tool_type": "builtin"}], + ) + custom_collection.query = fail_query + + custom_lane = EmbeddingLane( + name=LANE_CUSTOM, + client=FakeEmbedder(768, "nomic", "http://embeddings/v1"), + collection=custom_collection, + collection_name="odysseus_tool_index_custom", + model="nomic", + url="http://embeddings/v1", + dimension=768, + fingerprint="custom", + ) + fast_lane = EmbeddingLane( + name=LANE_FASTEMBED, + client=FakeEmbedder(384, "mini", "local://fastembed"), + collection=fast_collection, + collection_name="odysseus_tool_index_fastembed", + model="mini", + url="local://fastembed", + dimension=384, + fingerprint="fast", + ) + + from src.tool_index import ToolIndex + + index = ToolIndex.__new__(ToolIndex) + index._lanes = [custom_lane, fast_lane] + + assert index.retrieve("run shell", k=5) == ["bash"] + + +def test_tool_index_merges_fallback_tool_results_before_limit(): + custom_collection = FakeCollection("odysseus_tool_index_custom", metadata={"embedding_lane": "custom"}) + fast_collection = FakeCollection("odysseus_tool_index_fastembed", metadata={"embedding_lane": "fastembed"}) + custom_collection.add( + ids=["builtin_one", "builtin_two"], + embeddings=[[0.0] * 768, [0.0] * 768], + documents=["Tool: one", "Tool: two"], + metadatas=[ + {"tool_name": "one", "tool_type": "builtin"}, + {"tool_name": "two", "tool_type": "builtin"}, + ], + ) + fast_collection.add( + ids=["mcp_current"], + embeddings=[[0.0] * 384], + documents=["Tool: current MCP"], + metadatas=[{"tool_name": "current_mcp", "tool_type": "mcp"}], + ) + + custom_collection.query = lambda **_kwargs: { + "ids": [["builtin_one", "builtin_two"]], + "metadatas": [[ + {"tool_name": "one", "tool_type": "builtin"}, + {"tool_name": "two", "tool_type": "builtin"}, + ]], + "distances": [[0.20, 0.21]], + } + fast_collection.query = lambda **_kwargs: { + "ids": [["mcp_current"]], + "metadatas": [[{"tool_name": "current_mcp", "tool_type": "mcp"}]], + "distances": [[0.05]], + } + + custom_lane = EmbeddingLane( + name=LANE_CUSTOM, + client=FakeEmbedder(768, "nomic", "http://embeddings/v1"), + collection=custom_collection, + collection_name="odysseus_tool_index_custom", + model="nomic", + url="http://embeddings/v1", + dimension=768, + fingerprint="custom", + ) + fast_lane = EmbeddingLane( + name=LANE_FASTEMBED, + client=FakeEmbedder(384, "mini", "local://fastembed"), + collection=fast_collection, + collection_name="odysseus_tool_index_fastembed", + model="mini", + url="local://fastembed", + dimension=384, + fingerprint="fast", + ) + + from src.tool_index import ToolIndex + + index = ToolIndex.__new__(ToolIndex) + index._lanes = [custom_lane, fast_lane] + + assert index.retrieve("current mcp", k=2) == ["current_mcp", "one"] + + +def test_legacy_collection_backfills_fastembed_lane(monkeypatch): + fake = FakeChroma() + legacy = fake.get_or_create_collection("odysseus_memories", metadata={"hnsw:space": "cosine"}) + legacy.add( + ids=["legacy-memory"], + embeddings=[[0.0] * 384], + documents=["legacy memory row"], + metadatas=[{"source": "memory"}], + ) + _patch_chroma(monkeypatch, fake) + + import src.embedding_lanes as lanes + + monkeypatch.setattr(lanes, "_build_custom_client", lambda: None) + monkeypatch.setattr(lanes, "_build_fastembed_client", lambda: FakeEmbedder(384, "mini", "local://fastembed")) + + from src.memory_vector import MemoryVectorStore + + store = MemoryVectorStore("data") + + assert store.count() == 1 + assert fake.collections["odysseus_memories"].count() == 1 + assert fake.collections["odysseus_memories_fastembed"].count() == 1 + + +def test_legacy_collection_backfills_custom_only_lane(monkeypatch): + fake = FakeChroma() + legacy = fake.get_or_create_collection("odysseus_memories", metadata={"hnsw:space": "cosine"}) + legacy.add( + ids=["legacy-memory"], + embeddings=[[0.0] * 384], + documents=["legacy memory row"], + metadatas=[{"source": "memory"}], + ) + _patch_chroma(monkeypatch, fake) + + import src.embedding_lanes as lanes + + monkeypatch.setattr(lanes, "_build_custom_client", lambda: FakeEmbedder(768, "nomic", "http://embeddings/v1")) + + def fail_fastembed(): + raise RuntimeError("fastembed missing") + + monkeypatch.setattr(lanes, "_build_fastembed_client", fail_fastembed) + + from src.memory_vector import MemoryVectorStore + + store = MemoryVectorStore("data") + + assert store.count() == 1 + assert "odysseus_memories_fastembed" not in fake.collections + assert fake.collections["odysseus_memories_custom"].count() == 1 + assert len(fake.collections["odysseus_memories_custom"].rows["legacy-memory"]["embedding"]) == 768 + + +def test_legacy_migration_continues_when_custom_backfill_fails(monkeypatch): + fake = FakeChroma() + legacy = fake.get_or_create_collection("odysseus_memories", metadata={"hnsw:space": "cosine"}) + legacy.add( + ids=["legacy-memory"], + embeddings=[[0.0] * 384], + documents=["legacy memory row"], + metadatas=[{"source": "memory"}], + ) + _patch_chroma(monkeypatch, fake) + + import src.embedding_lanes as lanes + + monkeypatch.setattr(lanes, "_build_custom_client", lambda: FailingEmbedder(768, "nomic", "http://embeddings/v1")) + monkeypatch.setattr(lanes, "_build_fastembed_client", lambda: FakeEmbedder(384, "mini", "local://fastembed")) + + from src.memory_vector import MemoryVectorStore + + store = MemoryVectorStore("data") + + assert store.healthy + assert fake.collections["odysseus_memories_custom"].count() == 0 + assert fake.collections["odysseus_memories_fastembed"].count() == 1 + + +def test_legacy_migration_resumes_partial_lane_backfill(monkeypatch): + fake = FakeChroma() + legacy = fake.get_or_create_collection("odysseus_memories", metadata={"hnsw:space": "cosine"}) + legacy.add( + ids=["legacy-1", "legacy-2"], + embeddings=[[0.0] * 384, [0.0] * 384], + documents=["legacy memory one", "legacy memory two"], + metadatas=[{"source": "memory"}, {"source": "memory"}], + ) + partial = fake.get_or_create_collection("odysseus_memories_fastembed", metadata={"embedding_lane": "fastembed"}) + partial.add( + ids=["legacy-1"], + embeddings=[[0.0] * 384], + documents=["legacy memory one"], + metadatas=[{"source": "memory"}], + ) + _patch_chroma(monkeypatch, fake) + + import src.embedding_lanes as lanes + + monkeypatch.setattr(lanes, "_build_custom_client", lambda: None) + monkeypatch.setattr(lanes, "_build_fastembed_client", lambda: FakeEmbedder(384, "mini", "local://fastembed")) + + from src.memory_vector import MemoryVectorStore + + store = MemoryVectorStore("data") + + assert store.count() == 2 + assert set(fake.collections["odysseus_memories_fastembed"].get()["ids"]) == {"legacy-1", "legacy-2"} + + +def test_memory_rebuild_does_not_reimport_legacy_collection(monkeypatch): + fake = FakeChroma() + legacy = fake.get_or_create_collection("odysseus_memories", metadata={"hnsw:space": "cosine"}) + legacy.add( + ids=["stale-memory"], + embeddings=[[0.0] * 384], + documents=["stale legacy memory"], + metadatas=[{"source": "memory"}], + ) + inactive_custom = fake.get_or_create_collection("odysseus_memories_custom", metadata={"embedding_lane": "custom"}) + inactive_custom.add( + ids=["stale-custom"], + embeddings=[[0.0] * 768], + documents=["stale inactive custom memory"], + metadatas=[{"source": "memory"}], + ) + _patch_chroma(monkeypatch, fake) + + import src.embedding_lanes as lanes + + monkeypatch.setattr(lanes, "_build_custom_client", lambda: None) + monkeypatch.setattr(lanes, "_build_fastembed_client", lambda: FakeEmbedder(384, "mini", "local://fastembed")) + + from src.memory_vector import MemoryVectorStore + + store = MemoryVectorStore("data") + assert fake.collections["odysseus_memories_fastembed"].count() == 1 + + store.rebuild([{"id": "current-memory", "text": "current rebuilt memory"}]) + + assert "odysseus_memories" not in fake.collections + assert "odysseus_memories_custom" not in fake.collections + assert fake.collections["odysseus_memories_fastembed"].count() == 1 + assert fake.collections["odysseus_memories_fastembed"].get()["ids"] == ["current-memory"] + + +def test_memory_remove_deletes_inactive_lane_collection(monkeypatch): + fake = FakeChroma() + custom_collection = fake.get_or_create_collection("odysseus_memories_custom", metadata={"embedding_lane": "custom"}) + fast_collection = fake.get_or_create_collection("odysseus_memories_fastembed", metadata={"embedding_lane": "fastembed"}) + custom_collection.add( + ids=["mem-1"], + embeddings=[[0.0] * 768], + documents=["custom stale memory"], + metadatas=[{"source": "memory"}], + ) + fast_collection.add( + ids=["mem-1"], + embeddings=[[0.0] * 384], + documents=["fast memory"], + metadatas=[{"source": "memory"}], + ) + _patch_chroma(monkeypatch, fake) + + fast_lane = EmbeddingLane( + name=LANE_FASTEMBED, + client=FakeEmbedder(384, "mini", "local://fastembed"), + collection=fast_collection, + collection_name="odysseus_memories_fastembed", + model="mini", + url="local://fastembed", + dimension=384, + fingerprint="fast", + ) + + from src.memory_vector import MemoryVectorStore + + store = MemoryVectorStore.__new__(MemoryVectorStore) + store._lanes = [fast_lane] + store._healthy = True + + store.remove("mem-1") + + assert custom_collection.count() == 0 + assert fast_collection.count() == 0 + + +def test_memory_rebuild_continues_when_custom_lane_fails(monkeypatch): + fake = FakeChroma() + _patch_chroma(monkeypatch, fake) + + import src.embedding_lanes as lanes + + monkeypatch.setattr(lanes, "_build_custom_client", lambda: FailingEmbedder(768, "nomic", "http://embeddings/v1")) + monkeypatch.setattr(lanes, "_build_fastembed_client", lambda: FakeEmbedder(384, "mini", "local://fastembed")) + + from src.memory_vector import MemoryVectorStore + + store = MemoryVectorStore("data") + store.rebuild([{"id": "current-memory", "text": "current rebuilt memory"}]) + + assert fake.collections["odysseus_memories_custom"].count() == 0 + assert fake.collections["odysseus_memories_fastembed"].count() == 1 + assert fake.collections["odysseus_memories_fastembed"].get()["ids"] == ["current-memory"] + + +def test_rag_rebuild_does_not_reimport_legacy_collection(monkeypatch, tmp_path): + fake = FakeChroma() + legacy = fake.get_or_create_collection("odysseus_rag", metadata={"hnsw:space": "cosine"}) + legacy.add( + ids=["stale-doc"], + embeddings=[[0.0] * 384], + documents=["stale legacy document"], + metadatas=[{"source": "/tmp/stale.md"}], + ) + inactive_custom = fake.get_or_create_collection("odysseus_rag_custom", metadata={"embedding_lane": "custom"}) + inactive_custom.add( + ids=["stale-custom-doc"], + embeddings=[[0.0] * 768], + documents=["stale inactive custom document"], + metadatas=[{"source": "/tmp/stale.md"}], + ) + _patch_chroma(monkeypatch, fake) + + import src.embedding_lanes as lanes + + monkeypatch.setattr(lanes, "_build_custom_client", lambda: None) + monkeypatch.setattr(lanes, "_build_fastembed_client", lambda: FakeEmbedder(384, "mini", "local://fastembed")) + + from src.rag_vector import VectorRAG + + rag = VectorRAG(persist_directory=str(tmp_path)) + assert fake.collections["odysseus_rag_fastembed"].count() == 1 + + assert rag.rebuild_index() + + assert "odysseus_rag" not in fake.collections + assert "odysseus_rag_custom" not in fake.collections + assert fake.collections["odysseus_rag_fastembed"].count() == 0 + assert rag.search("stale legacy", k=3) == [] + + +def test_rag_remove_directory_deletes_inactive_lane_collection(monkeypatch, tmp_path): + fake = FakeChroma() + legacy_collection = fake.get_or_create_collection("odysseus_rag", metadata={"hnsw:space": "cosine"}) + custom_collection = fake.get_or_create_collection("odysseus_rag_custom", metadata={"embedding_lane": "custom"}) + fast_collection = fake.get_or_create_collection("odysseus_rag_fastembed", metadata={"embedding_lane": "fastembed"}) + source = str(tmp_path / "docs" / "note.md") + directory = str(tmp_path / "docs") + legacy_collection.add( + ids=["legacy-doc"], + embeddings=[[0.0] * 384], + documents=["legacy stale doc"], + metadatas=[{"source": source}], + ) + custom_collection.add( + ids=["custom-doc"], + embeddings=[[0.0] * 768], + documents=["custom stale doc"], + metadatas=[{"source": source}], + ) + fast_collection.add( + ids=["fast-doc"], + embeddings=[[0.0] * 384], + documents=["fast current doc"], + metadatas=[{"source": source}], + ) + _patch_chroma(monkeypatch, fake) + + fast_lane = EmbeddingLane( + name=LANE_FASTEMBED, + client=FakeEmbedder(384, "mini", "local://fastembed"), + collection=fast_collection, + collection_name="odysseus_rag_fastembed", + model="mini", + url="local://fastembed", + dimension=384, + fingerprint="fast", + ) + + from src.rag_vector import VectorRAG + + rag = VectorRAG.__new__(VectorRAG) + rag._lanes = [fast_lane] + rag._collection = fast_collection + rag._healthy = True + + result = rag.remove_directory(directory) + + assert result["success"] + assert result["removed_count"] == 3 + assert legacy_collection.count() == 0 + assert custom_collection.count() == 0 + assert fast_collection.count() == 0 + + +def test_rag_delete_by_source_deletes_inactive_lane_collection(monkeypatch, tmp_path): + fake = FakeChroma() + legacy_collection = fake.get_or_create_collection("odysseus_rag", metadata={"hnsw:space": "cosine"}) + custom_collection = fake.get_or_create_collection("odysseus_rag_custom", metadata={"embedding_lane": "custom"}) + fast_collection = fake.get_or_create_collection("odysseus_rag_fastembed", metadata={"embedding_lane": "fastembed"}) + source = str(tmp_path / "docs" / "note.md") + legacy_collection.add( + ids=["legacy-doc"], + embeddings=[[0.0] * 384], + documents=["legacy stale doc"], + metadatas=[{"source": source}], + ) + custom_collection.add( + ids=["shared-doc"], + embeddings=[[0.0] * 768], + documents=["custom stale doc"], + metadatas=[{"source": source}], + ) + fast_collection.add( + ids=["shared-doc"], + embeddings=[[0.0] * 384], + documents=["fast current doc"], + metadatas=[{"source": source}], + ) + _patch_chroma(monkeypatch, fake) + + fast_lane = EmbeddingLane( + name=LANE_FASTEMBED, + client=FakeEmbedder(384, "mini", "local://fastembed"), + collection=fast_collection, + collection_name="odysseus_rag_fastembed", + model="mini", + url="local://fastembed", + dimension=384, + fingerprint="fast", + ) + + from src.rag_vector import VectorRAG + + rag = VectorRAG.__new__(VectorRAG) + rag._lanes = [fast_lane] + rag._collection = fast_collection + rag._healthy = True + + assert rag.delete_by_source(source) == 2 + assert legacy_collection.count() == 0 + assert custom_collection.count() == 0 + assert fast_collection.count() == 0 + + +def test_vector_rag_uses_keyword_fallback_when_all_lanes_query_fail(): + collection = FakeCollection("odysseus_rag_fastembed", metadata={"embedding_lane": "fastembed"}) + collection.add( + ids=["doc-1"], + embeddings=[[0.0] * 384], + documents=["fallback keyword document"], + metadatas=[{"source": "/tmp/doc.md"}], + ) + + def fail_query(*_args, **_kwargs): + raise RuntimeError("embedding query down") + + collection.query = fail_query + lane = EmbeddingLane( + name=LANE_FASTEMBED, + client=FakeEmbedder(384, "mini", "local://fastembed"), + collection=collection, + collection_name="odysseus_rag_fastembed", + model="mini", + url="local://fastembed", + dimension=384, + fingerprint="fp", + ) + + from src.rag_vector import VectorRAG + + rag = VectorRAG.__new__(VectorRAG) + rag._lanes = [lane] + rag._collection = collection + rag._healthy = True + + results = rag.search("fallback keyword", k=3) + + assert results[0]["id"] == "doc-1" + assert results[0]["search_type"] == "keyword_fallback"