mirror of
https://github.com/hwchase17/langchain.git
synced 2025-07-18 18:53:10 +00:00
core[patch]: throw exception indexing code if deletion fails in vectorstore (#28103)
The delete methods in the VectorStore and DocumentIndex interfaces return a status indicating the result. Therefore, we can assume that their implementations don't throw exceptions but instead return a result indicating whether the delete operations have failed. The current implementation doesn't check the returned value, so I modified it to throw an exception when the operation fails. --------- Co-authored-by: Eugene Yurtsev <eyurtsev@gmail.com>
This commit is contained in:
parent
258b3be5ec
commit
67fd554512
@ -22,6 +22,7 @@ from pydantic import model_validator
|
|||||||
|
|
||||||
from langchain_core.document_loaders.base import BaseLoader
|
from langchain_core.document_loaders.base import BaseLoader
|
||||||
from langchain_core.documents import Document
|
from langchain_core.documents import Document
|
||||||
|
from langchain_core.exceptions import LangChainException
|
||||||
from langchain_core.indexing.base import DocumentIndex, RecordManager
|
from langchain_core.indexing.base import DocumentIndex, RecordManager
|
||||||
from langchain_core.vectorstores import VectorStore
|
from langchain_core.vectorstores import VectorStore
|
||||||
|
|
||||||
@ -175,6 +176,32 @@ def _deduplicate_in_order(
|
|||||||
yield hashed_doc
|
yield hashed_doc
|
||||||
|
|
||||||
|
|
||||||
|
class IndexingException(LangChainException):
|
||||||
|
"""Raised when an indexing operation fails."""
|
||||||
|
|
||||||
|
|
||||||
|
def _delete(
|
||||||
|
vector_store: Union[VectorStore, DocumentIndex],
|
||||||
|
ids: list[str],
|
||||||
|
) -> None:
|
||||||
|
if isinstance(vector_store, VectorStore):
|
||||||
|
delete_ok = vector_store.delete(ids)
|
||||||
|
if delete_ok is not None and delete_ok is False:
|
||||||
|
msg = "The delete operation to VectorStore failed."
|
||||||
|
raise IndexingException(msg)
|
||||||
|
elif isinstance(vector_store, DocumentIndex):
|
||||||
|
delete_response = vector_store.delete(ids)
|
||||||
|
if "num_failed" in delete_response and delete_response["num_failed"] > 0:
|
||||||
|
msg = "The delete operation to DocumentIndex failed."
|
||||||
|
raise IndexingException(msg)
|
||||||
|
else:
|
||||||
|
msg = (
|
||||||
|
f"Vectorstore should be either a VectorStore or a DocumentIndex. "
|
||||||
|
f"Got {type(vector_store)}."
|
||||||
|
)
|
||||||
|
raise TypeError(msg)
|
||||||
|
|
||||||
|
|
||||||
# PUBLIC API
|
# PUBLIC API
|
||||||
|
|
||||||
|
|
||||||
@ -441,7 +468,7 @@ def index(
|
|||||||
)
|
)
|
||||||
if uids_to_delete:
|
if uids_to_delete:
|
||||||
# Then delete from vector store.
|
# Then delete from vector store.
|
||||||
destination.delete(uids_to_delete)
|
_delete(destination, uids_to_delete)
|
||||||
# First delete from record store.
|
# First delete from record store.
|
||||||
record_manager.delete_keys(uids_to_delete)
|
record_manager.delete_keys(uids_to_delete)
|
||||||
num_deleted += len(uids_to_delete)
|
num_deleted += len(uids_to_delete)
|
||||||
@ -454,7 +481,7 @@ def index(
|
|||||||
group_ids=delete_group_ids, before=index_start_dt, limit=cleanup_batch_size
|
group_ids=delete_group_ids, before=index_start_dt, limit=cleanup_batch_size
|
||||||
):
|
):
|
||||||
# First delete from record store.
|
# First delete from record store.
|
||||||
destination.delete(uids_to_delete)
|
_delete(destination, uids_to_delete)
|
||||||
# Then delete from record manager.
|
# Then delete from record manager.
|
||||||
record_manager.delete_keys(uids_to_delete)
|
record_manager.delete_keys(uids_to_delete)
|
||||||
num_deleted += len(uids_to_delete)
|
num_deleted += len(uids_to_delete)
|
||||||
@ -474,6 +501,28 @@ async def _to_async_iterator(iterator: Iterable[T]) -> AsyncIterator[T]:
|
|||||||
yield item
|
yield item
|
||||||
|
|
||||||
|
|
||||||
|
async def _adelete(
|
||||||
|
vector_store: Union[VectorStore, DocumentIndex],
|
||||||
|
ids: list[str],
|
||||||
|
) -> None:
|
||||||
|
if isinstance(vector_store, VectorStore):
|
||||||
|
delete_ok = await vector_store.adelete(ids)
|
||||||
|
if delete_ok is not None and delete_ok is False:
|
||||||
|
msg = "The delete operation to VectorStore failed."
|
||||||
|
raise IndexingException(msg)
|
||||||
|
elif isinstance(vector_store, DocumentIndex):
|
||||||
|
delete_response = await vector_store.adelete(ids)
|
||||||
|
if "num_failed" in delete_response and delete_response["num_failed"] > 0:
|
||||||
|
msg = "The delete operation to DocumentIndex failed."
|
||||||
|
raise IndexingException(msg)
|
||||||
|
else:
|
||||||
|
msg = (
|
||||||
|
f"Vectorstore should be either a VectorStore or a DocumentIndex. "
|
||||||
|
f"Got {type(vector_store)}."
|
||||||
|
)
|
||||||
|
raise TypeError(msg)
|
||||||
|
|
||||||
|
|
||||||
async def aindex(
|
async def aindex(
|
||||||
docs_source: Union[BaseLoader, Iterable[Document], AsyncIterator[Document]],
|
docs_source: Union[BaseLoader, Iterable[Document], AsyncIterator[Document]],
|
||||||
record_manager: RecordManager,
|
record_manager: RecordManager,
|
||||||
@ -733,7 +782,7 @@ async def aindex(
|
|||||||
)
|
)
|
||||||
if uids_to_delete:
|
if uids_to_delete:
|
||||||
# Then delete from vector store.
|
# Then delete from vector store.
|
||||||
await destination.adelete(uids_to_delete)
|
await _adelete(destination, uids_to_delete)
|
||||||
# First delete from record store.
|
# First delete from record store.
|
||||||
await record_manager.adelete_keys(uids_to_delete)
|
await record_manager.adelete_keys(uids_to_delete)
|
||||||
num_deleted += len(uids_to_delete)
|
num_deleted += len(uids_to_delete)
|
||||||
@ -746,7 +795,7 @@ async def aindex(
|
|||||||
group_ids=delete_group_ids, before=index_start_dt, limit=cleanup_batch_size
|
group_ids=delete_group_ids, before=index_start_dt, limit=cleanup_batch_size
|
||||||
):
|
):
|
||||||
# First delete from record store.
|
# First delete from record store.
|
||||||
await destination.adelete(uids_to_delete)
|
await _adelete(destination, uids_to_delete)
|
||||||
# Then delete from record manager.
|
# Then delete from record manager.
|
||||||
await record_manager.adelete_keys(uids_to_delete)
|
await record_manager.adelete_keys(uids_to_delete)
|
||||||
num_deleted += len(uids_to_delete)
|
num_deleted += len(uids_to_delete)
|
||||||
|
@ -13,7 +13,7 @@ from langchain_core.document_loaders.base import BaseLoader
|
|||||||
from langchain_core.documents import Document
|
from langchain_core.documents import Document
|
||||||
from langchain_core.embeddings import DeterministicFakeEmbedding
|
from langchain_core.embeddings import DeterministicFakeEmbedding
|
||||||
from langchain_core.indexing import InMemoryRecordManager, aindex, index
|
from langchain_core.indexing import InMemoryRecordManager, aindex, index
|
||||||
from langchain_core.indexing.api import _abatch, _HashedDocument
|
from langchain_core.indexing.api import IndexingException, _abatch, _HashedDocument
|
||||||
from langchain_core.indexing.in_memory import InMemoryDocumentIndex
|
from langchain_core.indexing.in_memory import InMemoryDocumentIndex
|
||||||
from langchain_core.vectorstores import InMemoryVectorStore, VectorStore
|
from langchain_core.vectorstores import InMemoryVectorStore, VectorStore
|
||||||
|
|
||||||
@ -287,6 +287,164 @@ async def test_aindex_simple_delete_full(
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def test_index_delete_full_recovery_after_deletion_failure(
|
||||||
|
record_manager: InMemoryRecordManager, vector_store: InMemoryVectorStore
|
||||||
|
) -> None:
|
||||||
|
"""Indexing some content to confirm it gets added only once."""
|
||||||
|
loader = ToyLoader(
|
||||||
|
documents=[
|
||||||
|
Document(
|
||||||
|
page_content="This is a test document.",
|
||||||
|
),
|
||||||
|
Document(
|
||||||
|
page_content="This is another document.",
|
||||||
|
),
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
|
with patch.object(
|
||||||
|
record_manager, "get_time", return_value=datetime(2021, 1, 1).timestamp()
|
||||||
|
):
|
||||||
|
assert index(loader, record_manager, vector_store, cleanup="full") == {
|
||||||
|
"num_added": 2,
|
||||||
|
"num_deleted": 0,
|
||||||
|
"num_skipped": 0,
|
||||||
|
"num_updated": 0,
|
||||||
|
}
|
||||||
|
|
||||||
|
loader = ToyLoader(
|
||||||
|
documents=[
|
||||||
|
Document(
|
||||||
|
page_content="mutated document 1",
|
||||||
|
),
|
||||||
|
Document(
|
||||||
|
page_content="This is another document.", # <-- Same as original
|
||||||
|
),
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
|
with (
|
||||||
|
patch.object(
|
||||||
|
record_manager, "get_time", return_value=datetime(2021, 1, 2).timestamp()
|
||||||
|
),
|
||||||
|
patch.object(vector_store, "delete", return_value=False),
|
||||||
|
pytest.raises(IndexingException),
|
||||||
|
):
|
||||||
|
indexing_result = index(loader, record_manager, vector_store, cleanup="full")
|
||||||
|
|
||||||
|
# At this point, there should be 3 records in both the record manager
|
||||||
|
# and the vector store
|
||||||
|
doc_texts = {
|
||||||
|
# Ignoring type since doc should be in the store and not a None
|
||||||
|
vector_store.get_by_ids([uid])[0].page_content # type: ignore
|
||||||
|
for uid in vector_store.store
|
||||||
|
}
|
||||||
|
assert doc_texts == {
|
||||||
|
"This is a test document.",
|
||||||
|
"mutated document 1",
|
||||||
|
"This is another document.",
|
||||||
|
}
|
||||||
|
|
||||||
|
with patch.object(
|
||||||
|
record_manager, "get_time", return_value=datetime(2021, 1, 3).timestamp()
|
||||||
|
):
|
||||||
|
indexing_result = index(loader, record_manager, vector_store, cleanup="full")
|
||||||
|
doc_texts = {
|
||||||
|
# Ignoring type since doc should be in the store and not a None
|
||||||
|
vector_store.get_by_ids([uid])[0].page_content # type: ignore
|
||||||
|
for uid in vector_store.store
|
||||||
|
}
|
||||||
|
assert doc_texts == {"mutated document 1", "This is another document."}
|
||||||
|
|
||||||
|
assert indexing_result == {
|
||||||
|
"num_added": 0,
|
||||||
|
"num_deleted": 1,
|
||||||
|
"num_skipped": 2,
|
||||||
|
"num_updated": 0,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
async def test_aindex_delete_full_recovery_after_deletion_failure(
|
||||||
|
arecord_manager: InMemoryRecordManager, vector_store: InMemoryVectorStore
|
||||||
|
) -> None:
|
||||||
|
"""Indexing some content to confirm it gets added only once."""
|
||||||
|
loader = ToyLoader(
|
||||||
|
documents=[
|
||||||
|
Document(
|
||||||
|
page_content="This is a test document.",
|
||||||
|
),
|
||||||
|
Document(
|
||||||
|
page_content="This is another document.",
|
||||||
|
),
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
|
with patch.object(
|
||||||
|
arecord_manager, "get_time", return_value=datetime(2021, 1, 1).timestamp()
|
||||||
|
):
|
||||||
|
assert await aindex(loader, arecord_manager, vector_store, cleanup="full") == {
|
||||||
|
"num_added": 2,
|
||||||
|
"num_deleted": 0,
|
||||||
|
"num_skipped": 0,
|
||||||
|
"num_updated": 0,
|
||||||
|
}
|
||||||
|
|
||||||
|
loader = ToyLoader(
|
||||||
|
documents=[
|
||||||
|
Document(
|
||||||
|
page_content="mutated document 1",
|
||||||
|
),
|
||||||
|
Document(
|
||||||
|
page_content="This is another document.", # <-- Same as original
|
||||||
|
),
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
|
with (
|
||||||
|
patch.object(
|
||||||
|
arecord_manager, "get_time", return_value=datetime(2021, 1, 2).timestamp()
|
||||||
|
),
|
||||||
|
patch.object(vector_store, "adelete", return_value=False),
|
||||||
|
pytest.raises(IndexingException),
|
||||||
|
):
|
||||||
|
indexing_result = await aindex(
|
||||||
|
loader, arecord_manager, vector_store, cleanup="full"
|
||||||
|
)
|
||||||
|
|
||||||
|
# At this point, there should be 3 records in both the record manager
|
||||||
|
# and the vector store
|
||||||
|
doc_texts = {
|
||||||
|
# Ignoring type since doc should be in the store and not a None
|
||||||
|
vector_store.get_by_ids([uid])[0].page_content # type: ignore
|
||||||
|
for uid in vector_store.store
|
||||||
|
}
|
||||||
|
assert doc_texts == {
|
||||||
|
"This is a test document.",
|
||||||
|
"mutated document 1",
|
||||||
|
"This is another document.",
|
||||||
|
}
|
||||||
|
|
||||||
|
with patch.object(
|
||||||
|
arecord_manager, "get_time", return_value=datetime(2021, 1, 3).timestamp()
|
||||||
|
):
|
||||||
|
indexing_result = await aindex(
|
||||||
|
loader, arecord_manager, vector_store, cleanup="full"
|
||||||
|
)
|
||||||
|
doc_texts = {
|
||||||
|
# Ignoring type since doc should be in the store and not a None
|
||||||
|
vector_store.get_by_ids([uid])[0].page_content # type: ignore
|
||||||
|
for uid in vector_store.store
|
||||||
|
}
|
||||||
|
assert doc_texts == {"mutated document 1", "This is another document."}
|
||||||
|
|
||||||
|
assert indexing_result == {
|
||||||
|
"num_added": 0,
|
||||||
|
"num_deleted": 1,
|
||||||
|
"num_skipped": 2,
|
||||||
|
"num_updated": 0,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
def test_incremental_fails_with_bad_source_ids(
|
def test_incremental_fails_with_bad_source_ids(
|
||||||
record_manager: InMemoryRecordManager, vector_store: InMemoryVectorStore
|
record_manager: InMemoryRecordManager, vector_store: InMemoryVectorStore
|
||||||
) -> None:
|
) -> None:
|
||||||
|
Loading…
Reference in New Issue
Block a user