Files
odysseus/tests/test_embedding_lanes.py
2026-06-06 03:17:19 -06:00

1105 lines
38 KiB
Python

import pytest
from src.embedding_lanes import (
EmbeddingLane,
LANE_CUSTOM,
LANE_FASTEMBED,
build_embedding_lanes,
)
class FakeEmbedder:
def __init__(self, dim, model, url):
self.dim = dim
self.model = model
self.url = url
def get_sentence_embedding_dimension(self):
return self.dim
def encode(self, texts, normalize_embeddings=True):
return [[float(i + 1)] * self.dim for i, _ in enumerate(texts)]
class FailingEmbedder(FakeEmbedder):
def encode(self, texts, normalize_embeddings=True):
raise RuntimeError("embedding endpoint rate limited")
class FakeCollection:
def __init__(self, name, metadata=None):
self.name = name
self.metadata = metadata or {}
self.rows = {}
self.dim = None
def count(self):
return len(self.rows)
def add(self, ids, embeddings, documents=None, metadatas=None):
self._check_dim(embeddings)
documents = documents or [None] * len(ids)
metadatas = metadatas or [{}] * len(ids)
for row_id, emb, doc, meta in zip(ids, embeddings, documents, metadatas):
self.rows[row_id] = {"embedding": emb, "document": doc, "metadata": meta}
def upsert(self, ids, embeddings, documents=None, metadatas=None):
self.add(ids, embeddings, documents=documents, metadatas=metadatas)
def get(self, ids=None, include=None, where=None, limit=None):
selected = list(self.rows.items())
if ids is not None:
id_set = set(ids)
selected = [(row_id, row) for row_id, row in selected if row_id in id_set]
if where:
selected = [
(row_id, row)
for row_id, row in selected
if all(row["metadata"].get(k) == v for k, v in where.items())
]
if limit is not None:
selected = selected[:limit]
return {
"ids": [row_id for row_id, _ in selected],
"documents": [row["document"] for _, row in selected],
"metadatas": [row["metadata"] for _, row in selected],
"embeddings": [row["embedding"] for _, row in selected],
}
def query(self, query_embeddings, n_results, where=None, include=None):
self._check_dim(query_embeddings)
rows = self.get(where=where)
ids = rows["ids"][:n_results]
docs = rows["documents"][:n_results]
metas = rows["metadatas"][:n_results]
return {
"ids": [ids],
"documents": [docs],
"metadatas": [metas],
"distances": [[0.1 + i * 0.01 for i in range(len(ids))]],
}
def delete(self, ids):
for row_id in ids:
self.rows.pop(row_id, None)
def _check_dim(self, embeddings):
if not embeddings:
return
dim = len(embeddings[0])
if self.dim is None:
self.dim = dim
elif self.dim != dim:
raise RuntimeError(f"Collection expecting embedding with dimension of {self.dim}, got {dim}")
class FakeChroma:
def __init__(self):
self.collections = {}
self.deleted = []
self.fail_next_add_for = {}
def get_or_create_collection(self, name, metadata=None):
if name not in self.collections:
self.collections[name] = FakeCollection(name, metadata=metadata)
if self.fail_next_add_for.get(name, 0) > 0:
original_add = self.collections[name].add
def fail_once(*args, **kwargs):
self.fail_next_add_for[name] -= 1
self.collections[name].add = original_add
raise RuntimeError("chroma write failed")
self.collections[name].add = fail_once
elif metadata is not None:
self.collections[name].metadata = metadata
return self.collections[name]
def get_collection(self, name):
if name not in self.collections:
raise KeyError(name)
return self.collections[name]
def delete_collection(self, name):
self.deleted.append(name)
self.collections.pop(name, None)
def _patch_chroma(monkeypatch, fake):
import src.chroma_client as chroma_client
monkeypatch.setattr(chroma_client, "get_chroma_client", lambda: fake)
def test_build_embedding_lanes_keeps_custom_and_fastembed_dimensions_separate(monkeypatch):
fake = FakeChroma()
_patch_chroma(monkeypatch, fake)
import src.embedding_lanes as lanes
monkeypatch.setattr(
lanes,
"_build_custom_client",
lambda: FakeEmbedder(768, "nomic-embed-text", "http://embeddings/v1"),
)
monkeypatch.setattr(
lanes,
"_build_fastembed_client",
lambda: FakeEmbedder(384, "sentence-transformers/all-MiniLM-L6-v2", "local://fastembed"),
)
built = build_embedding_lanes("odysseus_memories")
assert [lane.name for lane in built] == [LANE_CUSTOM, LANE_FASTEMBED]
assert built[0].collection_name == "odysseus_memories_custom"
assert built[0].dimension == 768
assert built[1].collection_name == "odysseus_memories_fastembed"
assert built[1].dimension == 384
built[0].collection.add(ids=["custom"], embeddings=built[0].encode(["a"]), documents=["a"])
built[1].collection.add(ids=["fast"], embeddings=built[1].encode(["a"]), documents=["a"])
with pytest.raises(RuntimeError, match="dimension"):
built[0].collection.query(query_embeddings=built[1].encode(["bad"]), n_results=1)
def test_build_embedding_lanes_recreates_only_custom_when_fingerprint_changes(monkeypatch):
fake = FakeChroma()
old_custom = fake.get_or_create_collection(
"odysseus_rag_custom",
metadata={
"embedding_lane": "custom",
"embedding_dimension": 768,
"embedding_fingerprint": "old",
},
)
old_custom.add(ids=["old"], embeddings=[[0.0] * 768], documents=["old"])
fast = fake.get_or_create_collection(
"odysseus_rag_fastembed",
metadata={
"embedding_lane": "fastembed",
"embedding_dimension": 384,
},
)
fast.add(ids=["fast"], embeddings=[[0.0] * 384], documents=["fast"])
_patch_chroma(monkeypatch, fake)
import src.embedding_lanes as lanes
monkeypatch.setattr(lanes, "_build_custom_client", lambda: FakeEmbedder(1024, "bge-large", "http://embeddings/v1"))
monkeypatch.setattr(lanes, "_build_fastembed_client", lambda: FakeEmbedder(384, "sentence-transformers/all-MiniLM-L6-v2", "local://fastembed"))
built = build_embedding_lanes("odysseus_rag")
assert "odysseus_rag_custom" in fake.deleted
assert fake.collections["odysseus_rag_custom"].count() == 1
assert len(fake.collections["odysseus_rag_custom"].rows["old"]["embedding"]) == 1024
assert fake.collections["odysseus_rag_fastembed"].count() == 1
assert built[0].dimension == 1024
def test_lane_reset_reembeds_existing_documents_on_fingerprint_change(monkeypatch):
fake = FakeChroma()
old_custom = fake.get_or_create_collection(
"odysseus_memories_custom",
metadata={
"embedding_lane": "custom",
"embedding_dimension": 384,
"embedding_fingerprint": "old",
},
)
old_custom.add(
ids=["existing-memory"],
embeddings=[[0.0] * 384],
documents=["existing custom memory"],
metadatas=[{"source": "memory"}],
)
_patch_chroma(monkeypatch, fake)
import src.embedding_lanes as lanes
monkeypatch.setattr(lanes, "_build_custom_client", lambda: FakeEmbedder(768, "nomic", "http://embeddings/v1"))
def fail_fastembed():
raise RuntimeError("fastembed missing")
monkeypatch.setattr(lanes, "_build_fastembed_client", fail_fastembed)
built = build_embedding_lanes("odysseus_memories")
assert [lane.name for lane in built] == [LANE_CUSTOM]
assert "odysseus_memories_custom" in fake.deleted
rebuilt = fake.collections["odysseus_memories_custom"]
assert rebuilt.count() == 1
assert rebuilt.get()["ids"] == ["existing-memory"]
assert len(rebuilt.rows["existing-memory"]["embedding"]) == 768
def test_lane_reset_keeps_existing_collection_when_reembed_fails(monkeypatch):
fake = FakeChroma()
old_custom = fake.get_or_create_collection(
"odysseus_memories_custom",
metadata={
"embedding_lane": "custom",
"embedding_dimension": 384,
"embedding_fingerprint": "old",
},
)
old_custom.add(
ids=["existing-memory"],
embeddings=[[0.0] * 384],
documents=["existing custom memory"],
metadatas=[{"source": "memory"}],
)
_patch_chroma(monkeypatch, fake)
import src.embedding_lanes as lanes
monkeypatch.setattr(lanes, "_build_custom_client", lambda: FailingEmbedder(768, "nomic", "http://embeddings/v1"))
monkeypatch.setattr(lanes, "_build_fastembed_client", lambda: FakeEmbedder(384, "mini", "local://fastembed"))
built = build_embedding_lanes("odysseus_memories")
assert [lane.name for lane in built] == [LANE_FASTEMBED]
assert "odysseus_memories_custom" not in fake.deleted
assert fake.collections["odysseus_memories_custom"].count() == 1
assert len(fake.collections["odysseus_memories_custom"].rows["existing-memory"]["embedding"]) == 384
def test_lane_reset_keeps_existing_collection_when_preserve_read_fails(monkeypatch):
fake = FakeChroma()
old_custom = fake.get_or_create_collection(
"odysseus_memories_custom",
metadata={
"embedding_lane": "custom",
"embedding_dimension": 384,
"embedding_fingerprint": "old",
},
)
old_custom.add(
ids=["existing-memory"],
embeddings=[[0.0] * 384],
documents=["existing custom memory"],
metadatas=[{"source": "memory"}],
)
def fail_get(*_args, **_kwargs):
raise RuntimeError("chroma read failed")
old_custom.get = fail_get
_patch_chroma(monkeypatch, fake)
import src.embedding_lanes as lanes
monkeypatch.setattr(lanes, "_build_custom_client", lambda: FakeEmbedder(768, "nomic", "http://embeddings/v1"))
def fail_fastembed():
raise RuntimeError("fastembed missing")
monkeypatch.setattr(lanes, "_build_fastembed_client", fail_fastembed)
built = build_embedding_lanes("odysseus_memories")
assert built == []
assert "odysseus_memories_custom" not in fake.deleted
assert "odysseus_memories_custom" in fake.collections
def test_lane_reset_restores_existing_collection_when_rewrite_fails(monkeypatch):
fake = FakeChroma()
old_custom = fake.get_or_create_collection(
"odysseus_memories_custom",
metadata={
"embedding_lane": "custom",
"embedding_dimension": 384,
"embedding_fingerprint": "old",
},
)
old_custom.add(
ids=["existing-memory"],
embeddings=[[0.0] * 384],
documents=["existing custom memory"],
metadatas=[{"source": "memory"}],
)
fake.fail_next_add_for["odysseus_memories_custom"] = 1
_patch_chroma(monkeypatch, fake)
import src.embedding_lanes as lanes
monkeypatch.setattr(lanes, "_build_custom_client", lambda: FakeEmbedder(768, "nomic", "http://embeddings/v1"))
def fail_fastembed():
raise RuntimeError("fastembed missing")
monkeypatch.setattr(lanes, "_build_fastembed_client", fail_fastembed)
built = build_embedding_lanes("odysseus_memories")
assert built == []
restored = fake.collections["odysseus_memories_custom"]
assert restored.count() == 1
assert restored.get()["ids"] == ["existing-memory"]
assert len(restored.rows["existing-memory"]["embedding"]) == 384
def test_build_embedding_lanes_uses_fastembed_when_custom_unavailable(monkeypatch):
fake = FakeChroma()
_patch_chroma(monkeypatch, fake)
import src.embedding_lanes as lanes
def fail_custom():
raise RuntimeError("down")
monkeypatch.setattr(lanes, "_build_custom_client", fail_custom)
monkeypatch.setattr(lanes, "_build_fastembed_client", lambda: FakeEmbedder(384, "mini", "local://fastembed"))
built = build_embedding_lanes("odysseus_tool_index")
assert [lane.name for lane in built] == [LANE_FASTEMBED]
assert built[0].collection_name == "odysseus_tool_index_fastembed"
def test_custom_lane_preserves_default_embedding_client_probe(monkeypatch):
import src.embedding_lanes as lanes
import src.embeddings as embeddings
embeddings.reset_http_embed_state()
monkeypatch.setattr(lanes, "_load_custom_endpoint", lambda: {})
calls = []
class DefaultClient(FakeEmbedder):
def __init__(self, url=None, model=None, api_key=None):
calls.append({"url": url, "model": model, "api_key": api_key})
super().__init__(768, model or "all-minilm:l6-v2", url or "http://localhost:11434/v1/embeddings")
monkeypatch.setattr(embeddings, "EmbeddingClient", DefaultClient)
client = lanes._build_custom_client()
assert calls == [{"url": None, "model": None, "api_key": None}]
assert client.url == "http://localhost:11434/v1/embeddings"
embeddings.reset_http_embed_state()
def test_custom_lane_uses_http_down_latch(monkeypatch):
import src.embedding_lanes as lanes
import src.embeddings as embeddings
embeddings.reset_http_embed_state()
calls = []
class DownClient:
def __init__(self, url=None, model=None, api_key=None):
calls.append({"url": url, "model": model, "api_key": api_key})
def get_sentence_embedding_dimension(self):
raise RuntimeError("endpoint down")
class LocalFastEmbed(FakeEmbedder):
def __init__(self):
super().__init__(384, "mini", "local://fastembed")
monkeypatch.setattr(embeddings, "EmbeddingClient", DownClient)
monkeypatch.setattr(embeddings, "FastEmbedClient", LocalFastEmbed)
with pytest.raises(RuntimeError, match="HTTP embedding lane unavailable"):
lanes._build_custom_client()
with pytest.raises(RuntimeError, match="HTTP embedding lane unavailable"):
lanes._build_custom_client()
assert calls == [{"url": None, "model": None, "api_key": None}]
embeddings.reset_http_embed_state()
def test_memory_vector_store_writes_both_lanes_and_prefers_custom(monkeypatch):
fake = FakeChroma()
_patch_chroma(monkeypatch, fake)
import src.embedding_lanes as lanes
monkeypatch.setattr(lanes, "_build_custom_client", lambda: FakeEmbedder(768, "nomic", "http://embeddings/v1"))
monkeypatch.setattr(lanes, "_build_fastembed_client", lambda: FakeEmbedder(384, "mini", "local://fastembed"))
from src.memory_vector import MemoryVectorStore
store = MemoryVectorStore("data")
store.add("mem-1", "Nicholai likes direct memory systems")
assert fake.collections["odysseus_memories_custom"].count() == 1
assert fake.collections["odysseus_memories_fastembed"].count() == 1
results = store.search("direct memory", k=5)
assert results[0]["memory_id"] == "mem-1"
assert results[0]["embedding_lane"] == LANE_CUSTOM
def test_memory_search_merges_fallback_only_results_before_limit():
custom_collection = FakeCollection("odysseus_memories_custom", metadata={"embedding_lane": "custom"})
fast_collection = FakeCollection("odysseus_memories_fastembed", metadata={"embedding_lane": "fastembed"})
custom_collection.add(
ids=["old-1", "old-2"],
embeddings=[[0.0] * 768, [0.0] * 768],
documents=["older custom memory", "another custom memory"],
metadatas=[{"source": "memory"}, {"source": "memory"}],
)
fast_collection.add(
ids=["fallback-only"],
embeddings=[[0.0] * 384],
documents=["fallback only relevant memory"],
metadatas=[{"source": "memory"}],
)
custom_collection.query = lambda **_kwargs: {
"ids": [["old-1", "old-2"]],
"distances": [[0.20, 0.21]],
}
fast_collection.query = lambda **_kwargs: {
"ids": [["fallback-only"]],
"distances": [[0.05]],
}
custom_lane = EmbeddingLane(
name=LANE_CUSTOM,
client=FakeEmbedder(768, "nomic", "http://embeddings/v1"),
collection=custom_collection,
collection_name="odysseus_memories_custom",
model="nomic",
url="http://embeddings/v1",
dimension=768,
fingerprint="custom",
)
fast_lane = EmbeddingLane(
name=LANE_FASTEMBED,
client=FakeEmbedder(384, "mini", "local://fastembed"),
collection=fast_collection,
collection_name="odysseus_memories_fastembed",
model="mini",
url="local://fastembed",
dimension=384,
fingerprint="fast",
)
from src.memory_vector import MemoryVectorStore
store = MemoryVectorStore.__new__(MemoryVectorStore)
store._lanes = [custom_lane, fast_lane]
store._healthy = True
results = store.search("fallback relevant", k=2)
assert [row["memory_id"] for row in results] == ["fallback-only", "old-1"]
def test_vector_rag_writes_both_lanes_and_falls_back_to_fastembed(monkeypatch):
fake = FakeChroma()
_patch_chroma(monkeypatch, fake)
import src.embedding_lanes as lanes
monkeypatch.setattr(lanes, "_build_custom_client", lambda: None)
monkeypatch.setattr(lanes, "_build_fastembed_client", lambda: FakeEmbedder(384, "mini", "local://fastembed"))
from src.rag_vector import VectorRAG
rag = VectorRAG()
assert rag.add_document("session search belongs in tools", {"source": "/tmp/a.md", "owner": "alice"})
assert "odysseus_rag_custom" not in fake.collections
assert fake.collections["odysseus_rag_fastembed"].count() == 1
results = rag.search("session search", k=3, owner="alice")
assert results[0]["document"] == "session search belongs in tools"
assert results[0]["embedding_lane"] == LANE_FASTEMBED
def test_vector_rag_batch_index_continues_when_custom_lane_fails(monkeypatch, tmp_path):
fake = FakeChroma()
_patch_chroma(monkeypatch, fake)
import src.embedding_lanes as lanes
monkeypatch.setattr(lanes, "_build_custom_client", lambda: FailingEmbedder(768, "nomic", "http://embeddings/v1"))
monkeypatch.setattr(lanes, "_build_fastembed_client", lambda: FakeEmbedder(384, "mini", "local://fastembed"))
from src.rag_vector import VectorRAG
rag = VectorRAG(persist_directory=str(tmp_path))
result = rag.add_documents_batch([
("batch fallback document", {"source": "/tmp/a.md", "owner": "alice"}),
])
assert result["success"]
assert result["added_count"] == 1
assert fake.collections["odysseus_rag_custom"].count() == 0
assert fake.collections["odysseus_rag_fastembed"].count() == 1
def test_vector_rag_batch_index_reports_failure_when_all_lanes_fail(monkeypatch, tmp_path):
fake = FakeChroma()
_patch_chroma(monkeypatch, fake)
import src.embedding_lanes as lanes
monkeypatch.setattr(lanes, "_build_custom_client", lambda: FailingEmbedder(768, "nomic", "http://embeddings/v1"))
monkeypatch.setattr(lanes, "_build_fastembed_client", lambda: FailingEmbedder(384, "mini", "local://fastembed"))
from src.rag_vector import VectorRAG
rag = VectorRAG(persist_directory=str(tmp_path))
result = rag.add_documents_batch([
("batch outage document", {"source": "/tmp/a.md", "owner": "alice"}),
])
assert not result["success"]
assert fake.collections["odysseus_rag_custom"].count() == 0
assert fake.collections["odysseus_rag_fastembed"].count() == 0
def test_tool_index_indexes_and_retrieves_from_available_lanes(monkeypatch):
fake = FakeChroma()
_patch_chroma(monkeypatch, fake)
import src.embedding_lanes as lanes
monkeypatch.setattr(lanes, "_build_custom_client", lambda: FakeEmbedder(768, "nomic", "http://embeddings/v1"))
monkeypatch.setattr(lanes, "_build_fastembed_client", lambda: FakeEmbedder(384, "mini", "local://fastembed"))
from src.tool_index import ToolIndex
index = ToolIndex()
index.index_builtin_tools()
assert fake.collections["odysseus_tool_index_custom"].count() > 0
assert fake.collections["odysseus_tool_index_fastembed"].count() > 0
assert "bash" in index.retrieve("run a shell command", k=10)
def test_tool_index_builtin_indexing_fails_when_all_lanes_fail():
custom_lane = EmbeddingLane(
name=LANE_CUSTOM,
client=FailingEmbedder(768, "nomic", "http://embeddings/v1"),
collection=FakeCollection("odysseus_tool_index_custom", metadata={"embedding_lane": "custom"}),
collection_name="odysseus_tool_index_custom",
model="nomic",
url="http://embeddings/v1",
dimension=768,
fingerprint="custom",
)
fast_lane = EmbeddingLane(
name=LANE_FASTEMBED,
client=FailingEmbedder(384, "mini", "local://fastembed"),
collection=FakeCollection("odysseus_tool_index_fastembed", metadata={"embedding_lane": "fastembed"}),
collection_name="odysseus_tool_index_fastembed",
model="mini",
url="local://fastembed",
dimension=384,
fingerprint="fast",
)
from src.tool_index import ToolIndex
index = ToolIndex.__new__(ToolIndex)
index._lanes = [custom_lane, fast_lane]
index._healthy = True
with pytest.raises(RuntimeError, match="all embedding lanes"):
index.index_builtin_tools()
assert not index.healthy
def test_tool_index_retrieval_continues_when_custom_lane_query_fails():
custom_collection = FakeCollection("odysseus_tool_index_custom", metadata={"embedding_lane": "custom"})
fast_collection = FakeCollection("odysseus_tool_index_fastembed", metadata={"embedding_lane": "fastembed"})
fast_collection.add(
ids=["builtin_bash"],
embeddings=[[0.0] * 384],
documents=["Tool: bash\nRun shell commands"],
metadatas=[{"tool_name": "bash", "tool_type": "builtin"}],
)
def fail_query(*_args, **_kwargs):
raise RuntimeError("custom endpoint down")
custom_collection.add(
ids=["builtin_python"],
embeddings=[[0.0] * 768],
documents=["Tool: python\nRun Python"],
metadatas=[{"tool_name": "python", "tool_type": "builtin"}],
)
custom_collection.query = fail_query
custom_lane = EmbeddingLane(
name=LANE_CUSTOM,
client=FakeEmbedder(768, "nomic", "http://embeddings/v1"),
collection=custom_collection,
collection_name="odysseus_tool_index_custom",
model="nomic",
url="http://embeddings/v1",
dimension=768,
fingerprint="custom",
)
fast_lane = EmbeddingLane(
name=LANE_FASTEMBED,
client=FakeEmbedder(384, "mini", "local://fastembed"),
collection=fast_collection,
collection_name="odysseus_tool_index_fastembed",
model="mini",
url="local://fastembed",
dimension=384,
fingerprint="fast",
)
from src.tool_index import ToolIndex
index = ToolIndex.__new__(ToolIndex)
index._lanes = [custom_lane, fast_lane]
assert index.retrieve("run shell", k=5) == ["bash"]
def test_tool_index_merges_fallback_tool_results_before_limit():
custom_collection = FakeCollection("odysseus_tool_index_custom", metadata={"embedding_lane": "custom"})
fast_collection = FakeCollection("odysseus_tool_index_fastembed", metadata={"embedding_lane": "fastembed"})
custom_collection.add(
ids=["builtin_one", "builtin_two"],
embeddings=[[0.0] * 768, [0.0] * 768],
documents=["Tool: one", "Tool: two"],
metadatas=[
{"tool_name": "one", "tool_type": "builtin"},
{"tool_name": "two", "tool_type": "builtin"},
],
)
fast_collection.add(
ids=["mcp_current"],
embeddings=[[0.0] * 384],
documents=["Tool: current MCP"],
metadatas=[{"tool_name": "current_mcp", "tool_type": "mcp"}],
)
custom_collection.query = lambda **_kwargs: {
"ids": [["builtin_one", "builtin_two"]],
"metadatas": [[
{"tool_name": "one", "tool_type": "builtin"},
{"tool_name": "two", "tool_type": "builtin"},
]],
"distances": [[0.20, 0.21]],
}
fast_collection.query = lambda **_kwargs: {
"ids": [["mcp_current"]],
"metadatas": [[{"tool_name": "current_mcp", "tool_type": "mcp"}]],
"distances": [[0.05]],
}
custom_lane = EmbeddingLane(
name=LANE_CUSTOM,
client=FakeEmbedder(768, "nomic", "http://embeddings/v1"),
collection=custom_collection,
collection_name="odysseus_tool_index_custom",
model="nomic",
url="http://embeddings/v1",
dimension=768,
fingerprint="custom",
)
fast_lane = EmbeddingLane(
name=LANE_FASTEMBED,
client=FakeEmbedder(384, "mini", "local://fastembed"),
collection=fast_collection,
collection_name="odysseus_tool_index_fastembed",
model="mini",
url="local://fastembed",
dimension=384,
fingerprint="fast",
)
from src.tool_index import ToolIndex
index = ToolIndex.__new__(ToolIndex)
index._lanes = [custom_lane, fast_lane]
assert index.retrieve("current mcp", k=2) == ["current_mcp", "one"]
def test_legacy_collection_backfills_fastembed_lane(monkeypatch):
fake = FakeChroma()
legacy = fake.get_or_create_collection("odysseus_memories", metadata={"hnsw:space": "cosine"})
legacy.add(
ids=["legacy-memory"],
embeddings=[[0.0] * 384],
documents=["legacy memory row"],
metadatas=[{"source": "memory"}],
)
_patch_chroma(monkeypatch, fake)
import src.embedding_lanes as lanes
monkeypatch.setattr(lanes, "_build_custom_client", lambda: None)
monkeypatch.setattr(lanes, "_build_fastembed_client", lambda: FakeEmbedder(384, "mini", "local://fastembed"))
from src.memory_vector import MemoryVectorStore
store = MemoryVectorStore("data")
assert store.count() == 1
assert fake.collections["odysseus_memories"].count() == 1
assert fake.collections["odysseus_memories_fastembed"].count() == 1
def test_legacy_collection_backfills_custom_only_lane(monkeypatch):
fake = FakeChroma()
legacy = fake.get_or_create_collection("odysseus_memories", metadata={"hnsw:space": "cosine"})
legacy.add(
ids=["legacy-memory"],
embeddings=[[0.0] * 384],
documents=["legacy memory row"],
metadatas=[{"source": "memory"}],
)
_patch_chroma(monkeypatch, fake)
import src.embedding_lanes as lanes
monkeypatch.setattr(lanes, "_build_custom_client", lambda: FakeEmbedder(768, "nomic", "http://embeddings/v1"))
def fail_fastembed():
raise RuntimeError("fastembed missing")
monkeypatch.setattr(lanes, "_build_fastembed_client", fail_fastembed)
from src.memory_vector import MemoryVectorStore
store = MemoryVectorStore("data")
assert store.count() == 1
assert "odysseus_memories_fastembed" not in fake.collections
assert fake.collections["odysseus_memories_custom"].count() == 1
assert len(fake.collections["odysseus_memories_custom"].rows["legacy-memory"]["embedding"]) == 768
def test_legacy_migration_continues_when_custom_backfill_fails(monkeypatch):
fake = FakeChroma()
legacy = fake.get_or_create_collection("odysseus_memories", metadata={"hnsw:space": "cosine"})
legacy.add(
ids=["legacy-memory"],
embeddings=[[0.0] * 384],
documents=["legacy memory row"],
metadatas=[{"source": "memory"}],
)
_patch_chroma(monkeypatch, fake)
import src.embedding_lanes as lanes
monkeypatch.setattr(lanes, "_build_custom_client", lambda: FailingEmbedder(768, "nomic", "http://embeddings/v1"))
monkeypatch.setattr(lanes, "_build_fastembed_client", lambda: FakeEmbedder(384, "mini", "local://fastembed"))
from src.memory_vector import MemoryVectorStore
store = MemoryVectorStore("data")
assert store.healthy
assert fake.collections["odysseus_memories_custom"].count() == 0
assert fake.collections["odysseus_memories_fastembed"].count() == 1
def test_legacy_migration_resumes_partial_lane_backfill(monkeypatch):
fake = FakeChroma()
legacy = fake.get_or_create_collection("odysseus_memories", metadata={"hnsw:space": "cosine"})
legacy.add(
ids=["legacy-1", "legacy-2"],
embeddings=[[0.0] * 384, [0.0] * 384],
documents=["legacy memory one", "legacy memory two"],
metadatas=[{"source": "memory"}, {"source": "memory"}],
)
partial = fake.get_or_create_collection("odysseus_memories_fastembed", metadata={"embedding_lane": "fastembed"})
partial.add(
ids=["legacy-1"],
embeddings=[[0.0] * 384],
documents=["legacy memory one"],
metadatas=[{"source": "memory"}],
)
_patch_chroma(monkeypatch, fake)
import src.embedding_lanes as lanes
monkeypatch.setattr(lanes, "_build_custom_client", lambda: None)
monkeypatch.setattr(lanes, "_build_fastembed_client", lambda: FakeEmbedder(384, "mini", "local://fastembed"))
from src.memory_vector import MemoryVectorStore
store = MemoryVectorStore("data")
assert store.count() == 2
assert set(fake.collections["odysseus_memories_fastembed"].get()["ids"]) == {"legacy-1", "legacy-2"}
def test_memory_rebuild_does_not_reimport_legacy_collection(monkeypatch):
fake = FakeChroma()
legacy = fake.get_or_create_collection("odysseus_memories", metadata={"hnsw:space": "cosine"})
legacy.add(
ids=["stale-memory"],
embeddings=[[0.0] * 384],
documents=["stale legacy memory"],
metadatas=[{"source": "memory"}],
)
inactive_custom = fake.get_or_create_collection("odysseus_memories_custom", metadata={"embedding_lane": "custom"})
inactive_custom.add(
ids=["stale-custom"],
embeddings=[[0.0] * 768],
documents=["stale inactive custom memory"],
metadatas=[{"source": "memory"}],
)
_patch_chroma(monkeypatch, fake)
import src.embedding_lanes as lanes
monkeypatch.setattr(lanes, "_build_custom_client", lambda: None)
monkeypatch.setattr(lanes, "_build_fastembed_client", lambda: FakeEmbedder(384, "mini", "local://fastembed"))
from src.memory_vector import MemoryVectorStore
store = MemoryVectorStore("data")
assert fake.collections["odysseus_memories_fastembed"].count() == 1
store.rebuild([{"id": "current-memory", "text": "current rebuilt memory"}])
assert "odysseus_memories" not in fake.collections
assert "odysseus_memories_custom" not in fake.collections
assert fake.collections["odysseus_memories_fastembed"].count() == 1
assert fake.collections["odysseus_memories_fastembed"].get()["ids"] == ["current-memory"]
def test_memory_remove_deletes_inactive_lane_collection(monkeypatch):
fake = FakeChroma()
custom_collection = fake.get_or_create_collection("odysseus_memories_custom", metadata={"embedding_lane": "custom"})
fast_collection = fake.get_or_create_collection("odysseus_memories_fastembed", metadata={"embedding_lane": "fastembed"})
custom_collection.add(
ids=["mem-1"],
embeddings=[[0.0] * 768],
documents=["custom stale memory"],
metadatas=[{"source": "memory"}],
)
fast_collection.add(
ids=["mem-1"],
embeddings=[[0.0] * 384],
documents=["fast memory"],
metadatas=[{"source": "memory"}],
)
_patch_chroma(monkeypatch, fake)
fast_lane = EmbeddingLane(
name=LANE_FASTEMBED,
client=FakeEmbedder(384, "mini", "local://fastembed"),
collection=fast_collection,
collection_name="odysseus_memories_fastembed",
model="mini",
url="local://fastembed",
dimension=384,
fingerprint="fast",
)
from src.memory_vector import MemoryVectorStore
store = MemoryVectorStore.__new__(MemoryVectorStore)
store._lanes = [fast_lane]
store._healthy = True
store.remove("mem-1")
assert custom_collection.count() == 0
assert fast_collection.count() == 0
def test_memory_rebuild_continues_when_custom_lane_fails(monkeypatch):
fake = FakeChroma()
_patch_chroma(monkeypatch, fake)
import src.embedding_lanes as lanes
monkeypatch.setattr(lanes, "_build_custom_client", lambda: FailingEmbedder(768, "nomic", "http://embeddings/v1"))
monkeypatch.setattr(lanes, "_build_fastembed_client", lambda: FakeEmbedder(384, "mini", "local://fastembed"))
from src.memory_vector import MemoryVectorStore
store = MemoryVectorStore("data")
store.rebuild([{"id": "current-memory", "text": "current rebuilt memory"}])
assert fake.collections["odysseus_memories_custom"].count() == 0
assert fake.collections["odysseus_memories_fastembed"].count() == 1
assert fake.collections["odysseus_memories_fastembed"].get()["ids"] == ["current-memory"]
def test_rag_rebuild_does_not_reimport_legacy_collection(monkeypatch, tmp_path):
fake = FakeChroma()
legacy = fake.get_or_create_collection("odysseus_rag", metadata={"hnsw:space": "cosine"})
legacy.add(
ids=["stale-doc"],
embeddings=[[0.0] * 384],
documents=["stale legacy document"],
metadatas=[{"source": "/tmp/stale.md"}],
)
inactive_custom = fake.get_or_create_collection("odysseus_rag_custom", metadata={"embedding_lane": "custom"})
inactive_custom.add(
ids=["stale-custom-doc"],
embeddings=[[0.0] * 768],
documents=["stale inactive custom document"],
metadatas=[{"source": "/tmp/stale.md"}],
)
_patch_chroma(monkeypatch, fake)
import src.embedding_lanes as lanes
monkeypatch.setattr(lanes, "_build_custom_client", lambda: None)
monkeypatch.setattr(lanes, "_build_fastembed_client", lambda: FakeEmbedder(384, "mini", "local://fastembed"))
from src.rag_vector import VectorRAG
rag = VectorRAG(persist_directory=str(tmp_path))
assert fake.collections["odysseus_rag_fastembed"].count() == 1
assert rag.rebuild_index()
assert "odysseus_rag" not in fake.collections
assert "odysseus_rag_custom" not in fake.collections
assert fake.collections["odysseus_rag_fastembed"].count() == 0
assert rag.search("stale legacy", k=3) == []
def test_rag_remove_directory_deletes_inactive_lane_collection(monkeypatch, tmp_path):
fake = FakeChroma()
legacy_collection = fake.get_or_create_collection("odysseus_rag", metadata={"hnsw:space": "cosine"})
custom_collection = fake.get_or_create_collection("odysseus_rag_custom", metadata={"embedding_lane": "custom"})
fast_collection = fake.get_or_create_collection("odysseus_rag_fastembed", metadata={"embedding_lane": "fastembed"})
source = str(tmp_path / "docs" / "note.md")
directory = str(tmp_path / "docs")
legacy_collection.add(
ids=["legacy-doc"],
embeddings=[[0.0] * 384],
documents=["legacy stale doc"],
metadatas=[{"source": source}],
)
custom_collection.add(
ids=["custom-doc"],
embeddings=[[0.0] * 768],
documents=["custom stale doc"],
metadatas=[{"source": source}],
)
fast_collection.add(
ids=["fast-doc"],
embeddings=[[0.0] * 384],
documents=["fast current doc"],
metadatas=[{"source": source}],
)
_patch_chroma(monkeypatch, fake)
fast_lane = EmbeddingLane(
name=LANE_FASTEMBED,
client=FakeEmbedder(384, "mini", "local://fastembed"),
collection=fast_collection,
collection_name="odysseus_rag_fastembed",
model="mini",
url="local://fastembed",
dimension=384,
fingerprint="fast",
)
from src.rag_vector import VectorRAG
rag = VectorRAG.__new__(VectorRAG)
rag._lanes = [fast_lane]
rag._collection = fast_collection
rag._healthy = True
result = rag.remove_directory(directory)
assert result["success"]
assert result["removed_count"] == 3
assert legacy_collection.count() == 0
assert custom_collection.count() == 0
assert fast_collection.count() == 0
def test_rag_delete_by_source_deletes_inactive_lane_collection(monkeypatch, tmp_path):
fake = FakeChroma()
legacy_collection = fake.get_or_create_collection("odysseus_rag", metadata={"hnsw:space": "cosine"})
custom_collection = fake.get_or_create_collection("odysseus_rag_custom", metadata={"embedding_lane": "custom"})
fast_collection = fake.get_or_create_collection("odysseus_rag_fastembed", metadata={"embedding_lane": "fastembed"})
source = str(tmp_path / "docs" / "note.md")
legacy_collection.add(
ids=["legacy-doc"],
embeddings=[[0.0] * 384],
documents=["legacy stale doc"],
metadatas=[{"source": source}],
)
custom_collection.add(
ids=["shared-doc"],
embeddings=[[0.0] * 768],
documents=["custom stale doc"],
metadatas=[{"source": source}],
)
fast_collection.add(
ids=["shared-doc"],
embeddings=[[0.0] * 384],
documents=["fast current doc"],
metadatas=[{"source": source}],
)
_patch_chroma(monkeypatch, fake)
fast_lane = EmbeddingLane(
name=LANE_FASTEMBED,
client=FakeEmbedder(384, "mini", "local://fastembed"),
collection=fast_collection,
collection_name="odysseus_rag_fastembed",
model="mini",
url="local://fastembed",
dimension=384,
fingerprint="fast",
)
from src.rag_vector import VectorRAG
rag = VectorRAG.__new__(VectorRAG)
rag._lanes = [fast_lane]
rag._collection = fast_collection
rag._healthy = True
assert rag.delete_by_source(source) == 2
assert legacy_collection.count() == 0
assert custom_collection.count() == 0
assert fast_collection.count() == 0
def test_vector_rag_uses_keyword_fallback_when_all_lanes_query_fail():
collection = FakeCollection("odysseus_rag_fastembed", metadata={"embedding_lane": "fastembed"})
collection.add(
ids=["doc-1"],
embeddings=[[0.0] * 384],
documents=["fallback keyword document"],
metadatas=[{"source": "/tmp/doc.md"}],
)
def fail_query(*_args, **_kwargs):
raise RuntimeError("embedding query down")
collection.query = fail_query
lane = EmbeddingLane(
name=LANE_FASTEMBED,
client=FakeEmbedder(384, "mini", "local://fastembed"),
collection=collection,
collection_name="odysseus_rag_fastembed",
model="mini",
url="local://fastembed",
dimension=384,
fingerprint="fp",
)
from src.rag_vector import VectorRAG
rag = VectorRAG.__new__(VectorRAG)
rag._lanes = [lane]
rag._collection = collection
rag._healthy = True
results = rag.search("fallback keyword", k=3)
assert results[0]["id"] == "doc-1"
assert results[0]["search_type"] == "keyword_fallback"