langchain-core[patch]: Incremental record manager deletion should be batched (#31206)

**Description:** Before this commit, if one record is batched in more
than 32k rows for sqlite3 >= 3.32 or more than 999 rows for sqlite3 <
3.31, the `record_manager.delete_keys()` will fail, as we are creating a
query with too many variables.

This commit ensures that we are batching the delete operation leveraging
the `cleanup_batch_size` as it is already done for `full` cleanup.

Added unit tests for incremental mode as well on different deleting
batch size.
This commit is contained in:
Lope Ramos 2025-05-14 17:38:21 +02:00 committed by GitHub
parent 263c215112
commit b8ae2de169
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 105 additions and 10 deletions

View File

@ -466,10 +466,9 @@ def index(
_source_ids = cast("Sequence[str]", source_ids)
uids_to_delete = record_manager.list_keys(
group_ids=_source_ids, before=index_start_dt
)
if uids_to_delete:
while uids_to_delete := record_manager.list_keys(
group_ids=_source_ids, before=index_start_dt, limit=cleanup_batch_size
):
# Then delete from vector store.
_delete(destination, uids_to_delete)
# First delete from record store.
@ -780,10 +779,9 @@ async def aindex(
_source_ids = cast("Sequence[str]", source_ids)
uids_to_delete = await record_manager.alist_keys(
group_ids=_source_ids, before=index_start_dt
)
if uids_to_delete:
while uids_to_delete := await record_manager.alist_keys(
group_ids=_source_ids, before=index_start_dt, limit=cleanup_batch_size
):
# Then delete from vector store.
await _adelete(destination, uids_to_delete)
# First delete from record store.

View File

@ -1882,7 +1882,7 @@ async def test_adeduplication(
}
def test_cleanup_with_different_batchsize(
def test_full_cleanup_with_different_batchsize(
record_manager: InMemoryRecordManager, vector_store: VectorStore
) -> None:
"""Check that we can clean up with different batch size."""
@ -1919,7 +1919,56 @@ def test_cleanup_with_different_batchsize(
}
async def test_async_cleanup_with_different_batchsize(
def test_incremental_cleanup_with_different_batchsize(
record_manager: InMemoryRecordManager, vector_store: VectorStore
) -> None:
"""Check that we can clean up with different batch size."""
docs = [
Document(
page_content="This is a test document.",
metadata={"source": str(d)},
)
for d in range(1000)
]
assert index(
docs,
record_manager,
vector_store,
source_id_key="source",
cleanup="incremental",
) == {
"num_added": 1000,
"num_deleted": 0,
"num_skipped": 0,
"num_updated": 0,
}
docs = [
Document(
page_content="Different doc",
metadata={"source": str(d)},
)
for d in range(1001)
]
assert index(
docs,
record_manager,
vector_store,
source_id_key="source",
cleanup="incremental",
cleanup_batch_size=17,
) == {
"num_added": 1001,
"num_deleted": 1000,
"num_skipped": 0,
"num_updated": 0,
}
async def test_afull_cleanup_with_different_batchsize(
arecord_manager: InMemoryRecordManager, vector_store: InMemoryVectorStore
) -> None:
"""Check that we can clean up with different batch size."""
@ -1956,6 +2005,54 @@ async def test_async_cleanup_with_different_batchsize(
}
async def test_aincremental_cleanup_with_different_batchsize(
arecord_manager: InMemoryRecordManager, vector_store: InMemoryVectorStore
) -> None:
"""Check that we can clean up with different batch size."""
docs = [
Document(
page_content="This is a test document.",
metadata={"source": str(d)},
)
for d in range(1000)
]
assert await aindex(
docs,
arecord_manager,
vector_store,
source_id_key="source",
cleanup="incremental",
) == {
"num_added": 1000,
"num_deleted": 0,
"num_skipped": 0,
"num_updated": 0,
}
docs = [
Document(
page_content="Different doc",
metadata={"source": str(d)},
)
for d in range(1001)
]
assert await aindex(
docs,
arecord_manager,
vector_store,
cleanup="incremental",
source_id_key="source",
cleanup_batch_size=17,
) == {
"num_added": 1001,
"num_deleted": 1000,
"num_skipped": 0,
"num_updated": 0,
}
def test_deduplication_v2(
record_manager: InMemoryRecordManager, vector_store: VectorStore
) -> None: