core[patch]: Reverts PR #25754 and add unit tests (#28702)

I reported the bug 2 weeks ago here:
https://github.com/langchain-ai/langchain/issues/28447

I believe this is a critical bug for the indexer, so I submitted a PR to
revert the change and added unit tests to prevent similar bugs from
being introduced in the future.

@eyurtsev Could you check this?
This commit is contained in:
Keiichi Hirobe 2024-12-14 05:13:06 +09:00 committed by GitHub
parent b0a298894d
commit da28cf1f54
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 94 additions and 306 deletions

View File

@ -408,18 +408,17 @@ def index(
# mypy isn't good enough to determine that source ids cannot be None
# here due to a check that's happening above, so we check again.
if any(source_id is None for source_id in source_ids):
msg = "Source ids cannot be if cleanup=='incremental'."
for source_id in source_ids:
if source_id is None:
msg = "Source ids cannot be None here."
raise AssertionError(msg)
indexed_source_ids = cast(
Sequence[str], [source_id_assigner(doc) for doc in docs_to_index]
)
_source_ids = cast(Sequence[str], source_ids)
uids_to_delete = record_manager.list_keys(
group_ids=indexed_source_ids, before=index_start_dt
group_ids=_source_ids, before=index_start_dt
)
if indexed_source_ids and uids_to_delete:
if uids_to_delete:
# Then delete from vector store.
destination.delete(uids_to_delete)
# First delete from record store.
@ -669,18 +668,17 @@ async def aindex(
# mypy isn't good enough to determine that source ids cannot be None
# here due to a check that's happening above, so we check again.
if any(source_id is None for source_id in source_ids):
msg = "Source ids cannot be if cleanup=='incremental'."
for source_id in source_ids:
if source_id is None:
msg = "Source ids cannot be None here."
raise AssertionError(msg)
indexed_source_ids = cast(
Sequence[str], [source_id_assigner(doc) for doc in docs_to_index]
)
_source_ids = cast(Sequence[str], source_ids)
uids_to_delete = await record_manager.alist_keys(
group_ids=indexed_source_ids, before=index_start_dt
group_ids=_source_ids, before=index_start_dt
)
if indexed_source_ids and uids_to_delete:
if uids_to_delete:
# Then delete from vector store.
await destination.adelete(uids_to_delete)
# First delete from record store.

View File

@ -544,7 +544,7 @@ def test_incremental_delete(
)
with patch.object(
record_manager, "get_time", return_value=datetime(2021, 1, 2).timestamp()
record_manager, "get_time", return_value=datetime(2021, 1, 1).timestamp()
):
assert index(
loader,
@ -630,6 +630,82 @@ def test_incremental_delete(
}
def test_incremental_delete_with_same_source(
record_manager: InMemoryRecordManager, vector_store: InMemoryVectorStore
) -> None:
"""Test indexing with incremental deletion strategy."""
loader = ToyLoader(
documents=[
Document(
page_content="This is a test document.",
metadata={"source": "1"},
),
Document(
page_content="This is another document.",
metadata={"source": "1"},
),
]
)
with patch.object(
record_manager, "get_time", return_value=datetime(2021, 1, 1).timestamp()
):
assert index(
loader,
record_manager,
vector_store,
cleanup="incremental",
source_id_key="source",
) == {
"num_added": 2,
"num_deleted": 0,
"num_skipped": 0,
"num_updated": 0,
}
doc_texts = {
# Ignoring type since doc should be in the store and not a None
vector_store.get_by_ids([uid])[0].page_content # type: ignore
for uid in vector_store.store
}
assert doc_texts == {"This is another document.", "This is a test document."}
# Delete 1 document and unchange 1 document
loader = ToyLoader(
documents=[
Document(
page_content="This is another document.", # <-- Same as original
metadata={"source": "1"},
),
]
)
with patch.object(
record_manager, "get_time", return_value=datetime(2021, 1, 2).timestamp()
):
assert index(
loader,
record_manager,
vector_store,
cleanup="incremental",
source_id_key="source",
) == {
"num_added": 0,
"num_deleted": 1,
"num_skipped": 1,
"num_updated": 0,
}
doc_texts = {
# Ignoring type since doc should be in the store and not a None
vector_store.get_by_ids([uid])[0].page_content # type: ignore
for uid in vector_store.store
}
assert doc_texts == {
"This is another document.",
}
def test_incremental_indexing_with_batch_size(
record_manager: InMemoryRecordManager, vector_store: InMemoryVectorStore
) -> None:
@ -690,9 +766,9 @@ def test_incremental_indexing_with_batch_size(
source_id_key="source",
batch_size=2,
) == {
"num_added": 0,
"num_deleted": 0,
"num_skipped": 4,
"num_added": 2,
"num_deleted": 2,
"num_skipped": 2,
"num_updated": 0,
}
@ -704,149 +780,6 @@ def test_incremental_indexing_with_batch_size(
assert doc_texts == {"1", "2", "3", "4"}
def test_incremental_indexing_with_batch_size_with_optimization(
record_manager: InMemoryRecordManager, vector_store: InMemoryVectorStore
) -> None:
"""Test case when batch_size < num of docs.
Here, we'll verify that an indexing optimization works as expected.
"""
documents = [
Document(
page_content="1",
metadata={"source": "1"},
),
Document(
page_content="2",
metadata={"source": "1"},
),
Document(
page_content="3",
metadata={"source": "1"},
),
Document(
page_content="4",
metadata={"source": "1"},
),
]
with patch.object(
record_manager, "get_time", return_value=datetime(2021, 1, 1).timestamp()
):
assert index(
documents,
record_manager,
vector_store,
cleanup="incremental",
source_id_key="source",
batch_size=2,
) == {
"num_added": 4,
"num_deleted": 0,
"num_skipped": 0,
"num_updated": 0,
}
doc_texts = {
# Ignoring type since doc should be in the store and not a None
vector_store.get_by_ids([uid])[0].page_content # type: ignore
for uid in vector_store.store
}
assert doc_texts == {"1", "2", "3", "4"}
# Mutate content in first batch
doc_first_batch_mutation = [
Document(
page_content="updated 1",
metadata={"source": "1"},
),
Document(
page_content="2",
metadata={"source": "1"},
),
Document(
page_content="3",
metadata={"source": "1"},
),
Document(
page_content="4",
metadata={"source": "1"},
),
]
with patch.object(
record_manager, "get_time", return_value=datetime(2021, 1, 2).timestamp()
):
assert index(
doc_first_batch_mutation,
record_manager,
vector_store,
cleanup="incremental",
source_id_key="source",
batch_size=2,
) == {
# Cannot optimize here since the first batch was changed
# So only skpping indexing the document with content "2"
"num_added": 3,
"num_deleted": 3,
"num_skipped": 1,
"num_updated": 0,
}
doc_texts = {
# Ignoring type since doc should be in the store and not a None
vector_store.get_by_ids([uid])[0].page_content # type: ignore
for uid in vector_store.store
}
assert doc_texts == {"updated 1", "2", "3", "4"}
# Mutate content in second batch
doc_second_batch_mutation = [
Document(
page_content="updated 1", # <-- This was already previously updated
metadata={"source": "1"},
),
Document(
page_content="2",
metadata={"source": "1"},
),
Document(
page_content="3",
metadata={"source": "1"},
),
Document(
page_content="updated 4",
metadata={"source": "1"},
),
]
with patch.object(
record_manager, "get_time", return_value=datetime(2021, 1, 3).timestamp()
):
assert index(
doc_second_batch_mutation,
record_manager,
vector_store,
cleanup="incremental",
source_id_key="source",
batch_size=2,
) == {
# Skips updating content from the first batch, only updates
# the `updated 4` content
"num_added": 1,
"num_deleted": 1,
"num_skipped": 3,
"num_updated": 0,
}
doc_texts = {
# Ignoring type since doc should be in the store and not a None
vector_store.get_by_ids([uid])[0].page_content # type: ignore
for uid in vector_store.store
}
assert doc_texts == {"updated 1", "2", "3", "updated 4"}
def test_incremental_delete_with_batch_size(
record_manager: InMemoryRecordManager, vector_store: InMemoryVectorStore
) -> None:
@ -1588,149 +1521,6 @@ async def test_aindex_into_document_index(
}
async def test_incremental_aindexing_with_batch_size_with_optimization(
arecord_manager: InMemoryRecordManager, vector_store: InMemoryVectorStore
) -> None:
"""Test case when batch_size < num of docs.
Here, we'll verify that an indexing optimization works as expected.
"""
documents = [
Document(
page_content="1",
metadata={"source": "1"},
),
Document(
page_content="2",
metadata={"source": "1"},
),
Document(
page_content="3",
metadata={"source": "1"},
),
Document(
page_content="4",
metadata={"source": "1"},
),
]
with patch.object(
arecord_manager, "get_time", return_value=datetime(2021, 1, 1).timestamp()
):
assert await aindex(
documents,
arecord_manager,
vector_store,
cleanup="incremental",
source_id_key="source",
batch_size=2,
) == {
"num_added": 4,
"num_deleted": 0,
"num_skipped": 0,
"num_updated": 0,
}
doc_texts = {
# Ignoring type since doc should be in the store and not a None
vector_store.get_by_ids([uid])[0].page_content # type: ignore
for uid in vector_store.store
}
assert doc_texts == {"1", "2", "3", "4"}
# Mutate content in first batch
doc_first_batch_mutation = [
Document(
page_content="updated 1",
metadata={"source": "1"},
),
Document(
page_content="2",
metadata={"source": "1"},
),
Document(
page_content="3",
metadata={"source": "1"},
),
Document(
page_content="4",
metadata={"source": "1"},
),
]
with patch.object(
arecord_manager, "get_time", return_value=datetime(2021, 1, 2).timestamp()
):
assert await aindex(
doc_first_batch_mutation,
arecord_manager,
vector_store,
cleanup="incremental",
source_id_key="source",
batch_size=2,
) == {
# Cannot optimize here since the first batch was changed
# So only skpping indexing the document with content "2"
"num_added": 3,
"num_deleted": 3,
"num_skipped": 1,
"num_updated": 0,
}
doc_texts = {
# Ignoring type since doc should be in the store and not a None
vector_store.get_by_ids([uid])[0].page_content # type: ignore
for uid in vector_store.store
}
assert doc_texts == {"updated 1", "2", "3", "4"}
# Mutate content in second batch
doc_second_batch_mutation = [
Document(
page_content="updated 1", # <-- This was already previously updated
metadata={"source": "1"},
),
Document(
page_content="2",
metadata={"source": "1"},
),
Document(
page_content="3",
metadata={"source": "1"},
),
Document(
page_content="updated 4",
metadata={"source": "1"},
),
]
with patch.object(
arecord_manager, "get_time", return_value=datetime(2021, 1, 3).timestamp()
):
assert await aindex(
doc_second_batch_mutation,
arecord_manager,
vector_store,
cleanup="incremental",
source_id_key="source",
batch_size=2,
) == {
# Skips updating content from the first batch, only updates
# the `updated 4` content
"num_added": 1,
"num_deleted": 1,
"num_skipped": 3,
"num_updated": 0,
}
doc_texts = {
# Ignoring type since doc should be in the store and not a None
vector_store.get_by_ids([uid])[0].page_content # type: ignore
for uid in vector_store.store
}
assert doc_texts == {"updated 1", "2", "3", "updated 4"}
def test_index_with_upsert_kwargs(
record_manager: InMemoryRecordManager, upserting_vector_store: InMemoryVectorStore
) -> None: