core[minor]: Add support for DocumentIndex in the index api (#25100)

Support document index in the index api.
This commit is contained in:
Eugene Yurtsev 2024-08-06 15:30:49 -04:00 committed by GitHub
parent 264ab96980
commit d283f452cc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 183 additions and 39 deletions

View File

@ -27,7 +27,7 @@ from typing import (
from langchain_core.document_loaders.base import BaseLoader
from langchain_core.documents import Document
from langchain_core.indexing.base import RecordManager
from langchain_core.indexing.base import DocumentIndex, RecordManager
from langchain_core.pydantic_v1 import root_validator
from langchain_core.vectorstores import VectorStore
@ -106,6 +106,7 @@ class _HashedDocument(Document):
def to_document(self) -> Document:
"""Return a Document object."""
return Document(
id=self.uid,
page_content=self.page_content,
metadata=self.metadata,
)
@ -195,7 +196,7 @@ class IndexingResult(TypedDict):
def index(
docs_source: Union[BaseLoader, Iterable[Document]],
record_manager: RecordManager,
vector_store: VectorStore,
vector_store: Union[VectorStore, DocumentIndex],
*,
batch_size: int = 100,
cleanup: Literal["incremental", "full", None] = None,
@ -232,7 +233,7 @@ def index(
docs_source: Data loader or iterable of documents to index.
record_manager: Timestamped set to keep track of which documents were
updated.
vector_store: Vector store to index the documents into.
vector_store: VectorStore or DocumentIndex to index the documents into.
batch_size: Batch size to use when indexing. Default is 100.
cleanup: How to handle clean up of documents. Default is None.
- Incremental: Cleans up all documents that haven't been updated AND
@ -274,19 +275,30 @@ def index(
if cleanup == "incremental" and source_id_key is None:
raise ValueError("Source id key is required when cleanup mode is incremental.")
# Check that the Vectorstore has required methods implemented
methods = ["delete", "add_documents"]
destination = vector_store # Renaming internally for clarity
for method in methods:
if not hasattr(vector_store, method):
raise ValueError(
f"Vectorstore {vector_store} does not have required method {method}"
)
# If it's a vectorstore, let's check if it has the required methods.
if isinstance(destination, VectorStore):
# Check that the Vectorstore has required methods implemented
methods = ["delete", "add_documents"]
if type(vector_store).delete == VectorStore.delete:
# Checking if the vectorstore has overridden the default delete method
# implementation which just raises a NotImplementedError
raise ValueError("Vectorstore has not implemented the delete method")
for method in methods:
if not hasattr(destination, method):
raise ValueError(
f"Vectorstore {destination} does not have required method {method}"
)
if type(destination).delete == VectorStore.delete:
# Checking if the vectorstore has overridden the default delete method
# implementation which just raises a NotImplementedError
raise ValueError("Vectorstore has not implemented the delete method")
elif isinstance(destination, DocumentIndex):
pass
else:
raise TypeError(
f"Vectorstore should be either a VectorStore or a DocumentIndex. "
f"Got {type(destination)}."
)
if isinstance(docs_source, BaseLoader):
try:
@ -354,7 +366,13 @@ def index(
# Be pessimistic and assume that all vector store write will fail.
# First write to vector store
if docs_to_index:
vector_store.add_documents(docs_to_index, ids=uids, batch_size=batch_size)
if isinstance(destination, VectorStore):
destination.add_documents(
docs_to_index, ids=uids, batch_size=batch_size
)
elif isinstance(destination, DocumentIndex):
destination.upsert(docs_to_index)
num_added += len(docs_to_index) - len(seen_docs)
num_updated += len(seen_docs)
@ -384,7 +402,7 @@ def index(
)
if uids_to_delete:
# Then delete from vector store.
vector_store.delete(uids_to_delete)
destination.delete(uids_to_delete)
# First delete from record store.
record_manager.delete_keys(uids_to_delete)
num_deleted += len(uids_to_delete)
@ -394,7 +412,7 @@ def index(
before=index_start_dt, limit=cleanup_batch_size
):
# First delete from record store.
vector_store.delete(uids_to_delete)
destination.delete(uids_to_delete)
# Then delete from record manager.
record_manager.delete_keys(uids_to_delete)
num_deleted += len(uids_to_delete)
@ -417,7 +435,7 @@ async def _to_async_iterator(iterator: Iterable[T]) -> AsyncIterator[T]:
async def aindex(
docs_source: Union[BaseLoader, Iterable[Document], AsyncIterator[Document]],
record_manager: RecordManager,
vector_store: VectorStore,
vectorstore: Union[VectorStore, DocumentIndex],
*,
batch_size: int = 100,
cleanup: Literal["incremental", "full", None] = None,
@ -446,7 +464,7 @@ async def aindex(
docs_source: Data loader or iterable of documents to index.
record_manager: Timestamped set to keep track of which documents were
updated.
vector_store: Vector store to index the documents into.
vectorstore: Vector store or Document Index to index the documents into
batch_size: Batch size to use when indexing. Default is 100.
cleanup: How to handle clean up of documents. Default is None.
- Incremental: Cleans up all documents that haven't been updated AND
@ -488,20 +506,31 @@ async def aindex(
if cleanup == "incremental" and source_id_key is None:
raise ValueError("Source id key is required when cleanup mode is incremental.")
# Check that the Vectorstore has required methods implemented
methods = ["adelete", "aadd_documents"]
destination = vectorstore # Renaming internally for clarity
for method in methods:
if not hasattr(vector_store, method):
raise ValueError(
f"Vectorstore {vector_store} does not have required method {method}"
)
# If it's a vectorstore, let's check if it has the required methods.
if isinstance(destination, VectorStore):
# Check that the Vectorstore has required methods implemented
# Check that the Vectorstore has required methods implemented
methods = ["adelete", "aadd_documents"]
if type(vector_store).adelete == VectorStore.adelete:
# Checking if the vectorstore has overridden the default delete method
# implementation which just raises a NotImplementedError
raise ValueError("Vectorstore has not implemented the delete method")
for method in methods:
if not hasattr(destination, method):
raise ValueError(
f"Vectorstore {destination} does not have required method {method}"
)
if type(destination).adelete == VectorStore.adelete:
# Checking if the vectorstore has overridden the default delete method
# implementation which just raises a NotImplementedError
raise ValueError("Vectorstore has not implemented the delete method")
elif isinstance(destination, DocumentIndex):
pass
else:
raise TypeError(
f"Vectorstore should be either a VectorStore or a DocumentIndex. "
f"Got {type(destination)}."
)
async_doc_iterator: AsyncIterator[Document]
if isinstance(docs_source, BaseLoader):
try:
@ -577,9 +606,12 @@ async def aindex(
# Be pessimistic and assume that all vector store write will fail.
# First write to vector store
if docs_to_index:
await vector_store.aadd_documents(
docs_to_index, ids=uids, batch_size=batch_size
)
if isinstance(destination, VectorStore):
await destination.aadd_documents(
docs_to_index, ids=uids, batch_size=batch_size
)
elif isinstance(destination, DocumentIndex):
await destination.aupsert(docs_to_index)
num_added += len(docs_to_index) - len(seen_docs)
num_updated += len(seen_docs)
@ -610,7 +642,7 @@ async def aindex(
)
if uids_to_delete:
# Then delete from vector store.
await vector_store.adelete(uids_to_delete)
await destination.adelete(uids_to_delete)
# First delete from record store.
await record_manager.adelete_keys(uids_to_delete)
num_deleted += len(uids_to_delete)
@ -620,7 +652,7 @@ async def aindex(
before=index_start_dt, limit=cleanup_batch_size
):
# First delete from record store.
await vector_store.adelete(uids_to_delete)
await destination.adelete(uids_to_delete)
# Then delete from record manager.
await record_manager.adelete_keys(uids_to_delete)
num_deleted += len(uids_to_delete)

View File

@ -16,6 +16,7 @@ from langchain_core.documents import Document
from langchain_core.embeddings import DeterministicFakeEmbedding
from langchain_core.indexing import InMemoryRecordManager, aindex, index
from langchain_core.indexing.api import _abatch, _HashedDocument
from langchain_core.indexing.in_memory import InMemoryDocumentIndex
from langchain_core.vectorstores import InMemoryVectorStore, VectorStore
@ -1284,7 +1285,10 @@ def test_indexing_custom_batch_size(
index(docs, record_manager, vector_store, batch_size=batch_size)
args, kwargs = mock_add_documents.call_args
assert args == (docs,)
doc_with_id = Document(
id=ids[0], page_content="This is a test document.", metadata={"source": "1"}
)
assert args == ([doc_with_id],)
assert kwargs == {"ids": ids, "batch_size": batch_size}
finally:
vector_store.add_documents = original # type: ignore
@ -1304,8 +1308,102 @@ async def test_aindexing_custom_batch_size(
batch_size = 1
mock_add_documents = AsyncMock()
doc_with_id = Document(
id=ids[0], page_content="This is a test document.", metadata={"source": "1"}
)
vector_store.aadd_documents = mock_add_documents # type: ignore
await aindex(docs, arecord_manager, vector_store, batch_size=batch_size)
args, kwargs = mock_add_documents.call_args
assert args == (docs,)
assert args == ([doc_with_id],)
assert kwargs == {"ids": ids, "batch_size": batch_size}
def test_index_into_document_index(record_manager: InMemoryRecordManager) -> None:
"""Get an in memory index."""
document_index = InMemoryDocumentIndex()
docs = [
Document(
page_content="This is a test document.",
metadata={"source": "1"},
),
Document(
page_content="This is another document.",
metadata={"source": "2"},
),
]
assert index(docs, record_manager, document_index, cleanup="full") == {
"num_added": 2,
"num_deleted": 0,
"num_skipped": 0,
"num_updated": 0,
}
assert index(docs, record_manager, document_index, cleanup="full") == {
"num_added": 0,
"num_deleted": 0,
"num_skipped": 2,
"num_updated": 0,
}
assert index(
docs, record_manager, document_index, cleanup="full", force_update=True
) == {
"num_added": 0,
"num_deleted": 0,
"num_skipped": 0,
"num_updated": 2,
}
assert index([], record_manager, document_index, cleanup="full") == {
"num_added": 0,
"num_deleted": 2,
"num_skipped": 0,
"num_updated": 0,
}
async def test_aindex_into_document_index(
arecord_manager: InMemoryRecordManager,
) -> None:
"""Get an in memory index."""
document_index = InMemoryDocumentIndex()
docs = [
Document(
page_content="This is a test document.",
metadata={"source": "1"},
),
Document(
page_content="This is another document.",
metadata={"source": "2"},
),
]
assert await aindex(docs, arecord_manager, document_index, cleanup="full") == {
"num_added": 2,
"num_deleted": 0,
"num_skipped": 0,
"num_updated": 0,
}
assert await aindex(docs, arecord_manager, document_index, cleanup="full") == {
"num_added": 0,
"num_deleted": 0,
"num_skipped": 2,
"num_updated": 0,
}
assert await aindex(
docs, arecord_manager, document_index, cleanup="full", force_update=True
) == {
"num_added": 0,
"num_deleted": 0,
"num_skipped": 0,
"num_updated": 2,
}
assert await aindex([], arecord_manager, document_index, cleanup="full") == {
"num_added": 0,
"num_deleted": 2,
"num_skipped": 0,
"num_updated": 0,
}

View File

@ -1386,7 +1386,14 @@ def test_indexing_custom_batch_size(
with patch.object(vector_store, "add_documents") as mock_add_documents:
index(docs, record_manager, vector_store, batch_size=batch_size)
args, kwargs = mock_add_documents.call_args
assert args == (docs,)
docs_with_id = [
Document(
page_content="This is a test document.",
metadata={"source": "1"},
id=ids[0],
)
]
assert args == (docs_with_id,)
assert kwargs == {"ids": ids, "batch_size": batch_size}
@ -1407,5 +1414,12 @@ async def test_aindexing_custom_batch_size(
with patch.object(vector_store, "aadd_documents") as mock_add_documents:
await aindex(docs, arecord_manager, vector_store, batch_size=batch_size)
args, kwargs = mock_add_documents.call_args
assert args == (docs,)
docs_with_id = [
Document(
page_content="This is a test document.",
metadata={"source": "1"},
id=ids[0],
)
]
assert args == (docs_with_id,)
assert kwargs == {"ids": ids, "batch_size": batch_size}