core[patch]: improve index/aindex api when batch_size<n_docs (#25754)

- **Description:** prevent index function to re-index entire source
document even if nothing has changed.
- **Issue:** #22135

I worked on a solution to this issue that is a compromise between being
cheap and being fast.
In the previous code, when batch_size is greater than the number of docs
from a certain source almost the entire source is deleted (all documents
from that source except for the documents in the first batch)
My solution deletes documents from vector store and record manager only
if at least one document has changed for that source.

Hope this can help!

---------

Co-authored-by: Eugene Yurtsev <eyurtsev@gmail.com>
This commit is contained in:
federico-pisanu 2024-09-30 22:57:41 +02:00 committed by GitHub
parent 7fde2791dc
commit 2538963945
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 342 additions and 16 deletions

View File

@ -386,16 +386,17 @@ def index(
# mypy isn't good enough to determine that source ids cannot be None
# here due to a check that's happening above, so we check again.
for source_id in source_ids:
if source_id is None:
raise AssertionError("Source ids cannot be None here.")
if any(source_id is None for source_id in source_ids):
raise AssertionError("Source ids cannot be if cleanup=='incremental'.")
_source_ids = cast(Sequence[str], source_ids)
indexed_source_ids = cast(
Sequence[str], [source_id_assigner(doc) for doc in docs_to_index]
)
uids_to_delete = record_manager.list_keys(
group_ids=_source_ids, before=index_start_dt
group_ids=indexed_source_ids, before=index_start_dt
)
if uids_to_delete:
if indexed_source_ids and uids_to_delete:
# Then delete from vector store.
destination.delete(uids_to_delete)
# First delete from record store.
@ -626,16 +627,17 @@ async def aindex(
# mypy isn't good enough to determine that source ids cannot be None
# here due to a check that's happening above, so we check again.
for source_id in source_ids:
if source_id is None:
raise AssertionError("Source ids cannot be None here.")
if any(source_id is None for source_id in source_ids):
raise AssertionError("Source ids cannot be if cleanup=='incremental'.")
_source_ids = cast(Sequence[str], source_ids)
indexed_source_ids = cast(
Sequence[str], [source_id_assigner(doc) for doc in docs_to_index]
)
uids_to_delete = await record_manager.alist_keys(
group_ids=_source_ids, before=index_start_dt
group_ids=indexed_source_ids, before=index_start_dt
)
if uids_to_delete:
if indexed_source_ids and uids_to_delete:
# Then delete from vector store.
await destination.adelete(uids_to_delete)
# First delete from record store.

View File

@ -655,7 +655,7 @@ def test_incremental_indexing_with_batch_size(
)
with patch.object(
record_manager, "get_time", return_value=datetime(2021, 1, 2).timestamp()
record_manager, "get_time", return_value=datetime(2021, 1, 1).timestamp()
):
assert index(
loader,
@ -671,6 +671,16 @@ def test_incremental_indexing_with_batch_size(
"num_updated": 0,
}
doc_texts = {
# Ignoring type since doc should be in the store and not a None
vector_store.get_by_ids([uid])[0].page_content # type: ignore
for uid in vector_store.store
}
assert doc_texts == {"1", "2", "3", "4"}
with patch.object(
record_manager, "get_time", return_value=datetime(2021, 1, 2).timestamp()
):
assert index(
loader,
record_manager,
@ -693,6 +703,149 @@ def test_incremental_indexing_with_batch_size(
assert doc_texts == {"1", "2", "3", "4"}
def test_incremental_indexing_with_batch_size_with_optimization(
record_manager: InMemoryRecordManager, vector_store: InMemoryVectorStore
) -> None:
"""Test case when batch_size < num of docs.
Here, we'll verify that an indexing optimization works as expected.
"""
documents = [
Document(
page_content="1",
metadata={"source": "1"},
),
Document(
page_content="2",
metadata={"source": "1"},
),
Document(
page_content="3",
metadata={"source": "1"},
),
Document(
page_content="4",
metadata={"source": "1"},
),
]
with patch.object(
record_manager, "get_time", return_value=datetime(2021, 1, 1).timestamp()
):
assert index(
documents,
record_manager,
vector_store,
cleanup="incremental",
source_id_key="source",
batch_size=2,
) == {
"num_added": 4,
"num_deleted": 0,
"num_skipped": 0,
"num_updated": 0,
}
doc_texts = {
# Ignoring type since doc should be in the store and not a None
vector_store.get_by_ids([uid])[0].page_content # type: ignore
for uid in vector_store.store
}
assert doc_texts == {"1", "2", "3", "4"}
# Mutate content in first batch
doc_first_batch_mutation = [
Document(
page_content="updated 1",
metadata={"source": "1"},
),
Document(
page_content="2",
metadata={"source": "1"},
),
Document(
page_content="3",
metadata={"source": "1"},
),
Document(
page_content="4",
metadata={"source": "1"},
),
]
with patch.object(
record_manager, "get_time", return_value=datetime(2021, 1, 2).timestamp()
):
assert index(
doc_first_batch_mutation,
record_manager,
vector_store,
cleanup="incremental",
source_id_key="source",
batch_size=2,
) == {
# Cannot optimize here since the first batch was changed
# So only skpping indexing the document with content "2"
"num_added": 3,
"num_deleted": 3,
"num_skipped": 1,
"num_updated": 0,
}
doc_texts = {
# Ignoring type since doc should be in the store and not a None
vector_store.get_by_ids([uid])[0].page_content # type: ignore
for uid in vector_store.store
}
assert doc_texts == {"updated 1", "2", "3", "4"}
# Mutate content in second batch
doc_second_batch_mutation = [
Document(
page_content="updated 1", # <-- This was already previously updated
metadata={"source": "1"},
),
Document(
page_content="2",
metadata={"source": "1"},
),
Document(
page_content="3",
metadata={"source": "1"},
),
Document(
page_content="updated 4",
metadata={"source": "1"},
),
]
with patch.object(
record_manager, "get_time", return_value=datetime(2021, 1, 3).timestamp()
):
assert index(
doc_second_batch_mutation,
record_manager,
vector_store,
cleanup="incremental",
source_id_key="source",
batch_size=2,
) == {
# Skips updating content from the first batch, only updates
# the `updated 4` content
"num_added": 1,
"num_deleted": 1,
"num_skipped": 3,
"num_updated": 0,
}
doc_texts = {
# Ignoring type since doc should be in the store and not a None
vector_store.get_by_ids([uid])[0].page_content # type: ignore
for uid in vector_store.store
}
assert doc_texts == {"updated 1", "2", "3", "updated 4"}
def test_incremental_delete_with_batch_size(
record_manager: InMemoryRecordManager, vector_store: InMemoryVectorStore
) -> None:
@ -719,7 +872,7 @@ def test_incremental_delete_with_batch_size(
)
with patch.object(
record_manager, "get_time", return_value=datetime(2021, 1, 2).timestamp()
record_manager, "get_time", return_value=datetime(2021, 1, 1).timestamp()
):
assert index(
loader,
@ -760,6 +913,13 @@ def test_incremental_delete_with_batch_size(
"num_updated": 0,
}
doc_texts = {
# Ignoring type since doc should be in the store and not a None
vector_store.get_by_ids([uid])[0].page_content # type: ignore
for uid in vector_store.store
}
assert doc_texts == {"1", "2", "3", "4"}
# Attempt to index again verify that nothing changes
with patch.object(
record_manager, "get_time", return_value=datetime(2022, 1, 3).timestamp()
@ -789,9 +949,16 @@ def test_incremental_delete_with_batch_size(
"num_updated": 0,
}
doc_texts = {
# Ignoring type since doc should be in the store and not a None
vector_store.get_by_ids([uid])[0].page_content # type: ignore
for uid in vector_store.store
}
assert doc_texts == {"1", "2", "3", "4"}
# Attempt to index again verify that nothing changes
with patch.object(
record_manager, "get_time", return_value=datetime(2023, 1, 3).timestamp()
record_manager, "get_time", return_value=datetime(2023, 1, 4).timestamp()
):
# Docs with same content
docs = [
@ -818,9 +985,16 @@ def test_incremental_delete_with_batch_size(
"num_updated": 0,
}
doc_texts = {
# Ignoring type since doc should be in the store and not a None
vector_store.get_by_ids([uid])[0].page_content # type: ignore
for uid in vector_store.store
}
assert doc_texts == {"1", "2", "3", "4"}
# Try to index with changed docs now
with patch.object(
record_manager, "get_time", return_value=datetime(2024, 1, 3).timestamp()
record_manager, "get_time", return_value=datetime(2024, 1, 5).timestamp()
):
# Docs with same content
docs = [
@ -846,6 +1020,13 @@ def test_incremental_delete_with_batch_size(
"num_updated": 0,
}
doc_texts = {
# Ignoring type since doc should be in the store and not a None
vector_store.get_by_ids([uid])[0].page_content # type: ignore
for uid in vector_store.store
}
assert doc_texts == {"changed 1", "changed 2", "3", "4"}
async def test_aincremental_delete(
arecord_manager: InMemoryRecordManager, vector_store: InMemoryVectorStore
@ -1404,3 +1585,146 @@ async def test_aindex_into_document_index(
"num_skipped": 0,
"num_updated": 0,
}
async def test_incremental_aindexing_with_batch_size_with_optimization(
arecord_manager: InMemoryRecordManager, vector_store: InMemoryVectorStore
) -> None:
"""Test case when batch_size < num of docs.
Here, we'll verify that an indexing optimization works as expected.
"""
documents = [
Document(
page_content="1",
metadata={"source": "1"},
),
Document(
page_content="2",
metadata={"source": "1"},
),
Document(
page_content="3",
metadata={"source": "1"},
),
Document(
page_content="4",
metadata={"source": "1"},
),
]
with patch.object(
arecord_manager, "get_time", return_value=datetime(2021, 1, 1).timestamp()
):
assert await aindex(
documents,
arecord_manager,
vector_store,
cleanup="incremental",
source_id_key="source",
batch_size=2,
) == {
"num_added": 4,
"num_deleted": 0,
"num_skipped": 0,
"num_updated": 0,
}
doc_texts = {
# Ignoring type since doc should be in the store and not a None
vector_store.get_by_ids([uid])[0].page_content # type: ignore
for uid in vector_store.store
}
assert doc_texts == {"1", "2", "3", "4"}
# Mutate content in first batch
doc_first_batch_mutation = [
Document(
page_content="updated 1",
metadata={"source": "1"},
),
Document(
page_content="2",
metadata={"source": "1"},
),
Document(
page_content="3",
metadata={"source": "1"},
),
Document(
page_content="4",
metadata={"source": "1"},
),
]
with patch.object(
arecord_manager, "get_time", return_value=datetime(2021, 1, 2).timestamp()
):
assert await aindex(
doc_first_batch_mutation,
arecord_manager,
vector_store,
cleanup="incremental",
source_id_key="source",
batch_size=2,
) == {
# Cannot optimize here since the first batch was changed
# So only skpping indexing the document with content "2"
"num_added": 3,
"num_deleted": 3,
"num_skipped": 1,
"num_updated": 0,
}
doc_texts = {
# Ignoring type since doc should be in the store and not a None
vector_store.get_by_ids([uid])[0].page_content # type: ignore
for uid in vector_store.store
}
assert doc_texts == {"updated 1", "2", "3", "4"}
# Mutate content in second batch
doc_second_batch_mutation = [
Document(
page_content="updated 1", # <-- This was already previously updated
metadata={"source": "1"},
),
Document(
page_content="2",
metadata={"source": "1"},
),
Document(
page_content="3",
metadata={"source": "1"},
),
Document(
page_content="updated 4",
metadata={"source": "1"},
),
]
with patch.object(
arecord_manager, "get_time", return_value=datetime(2021, 1, 3).timestamp()
):
assert await aindex(
doc_second_batch_mutation,
arecord_manager,
vector_store,
cleanup="incremental",
source_id_key="source",
batch_size=2,
) == {
# Skips updating content from the first batch, only updates
# the `updated 4` content
"num_added": 1,
"num_deleted": 1,
"num_skipped": 3,
"num_updated": 0,
}
doc_texts = {
# Ignoring type since doc should be in the store and not a None
vector_store.get_by_ids([uid])[0].page_content # type: ignore
for uid in vector_store.store
}
assert doc_texts == {"updated 1", "2", "3", "updated 4"}