core[minor]: add new clean up strategy "scoped_full" to indexing (#28505)

~Note that this PR is now Draft, so I didn't add change to `aindex`
function and didn't add test codes for my change.
After we have an agreement on the direction, I will add commits.~

`batch_size` is very difficult to decide because setting a large number
like >10000 will impact VectorDB and RecordManager, while setting a
small number will delete records unnecessarily, leading to redundant
work, as the `IMPORTANT` section says.
On the other hand, we can't use `full` because the loader returns just a
subset of the dataset in our use case.

I guess many people are in the same situation as us.

So, as one of the possible solutions for it, I would like to introduce a
new argument, `scoped_full_cleanup`.
This argument will be valid only when `claneup` is Full. If True, Full
cleanup deletes all documents that haven't been updated AND that are
associated with source ids that were seen during indexing. Default is
False.

This change keeps backward compatibility.

---------

Co-authored-by: Eugene Yurtsev <eugene@langchain.dev>
Co-authored-by: Eugene Yurtsev <eyurtsev@gmail.com>
This commit is contained in:
Keiichi Hirobe 2024-12-14 05:35:25 +09:00 committed by GitHub
parent 4802c31a53
commit 258b3be5ec
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 384 additions and 33 deletions

View File

@ -39,19 +39,20 @@
"| None | ✅ | ✅ | ❌ | ❌ | - |\n",
"| Incremental | ✅ | ✅ | ❌ | ✅ | Continuously |\n",
"| Full | ✅ | ❌ | ✅ | ✅ | At end of indexing |\n",
"| Scoped_Full | ✅ | ✅ | ❌ | ✅ | At end of indexing |\n",
"\n",
"\n",
"`None` does not do any automatic clean up, allowing the user to manually do clean up of old content. \n",
"\n",
"`incremental` and `full` offer the following automated clean up:\n",
"`incremental`, `full` and `scoped_full` offer the following automated clean up:\n",
"\n",
"* If the content of the source document or derived documents has **changed**, both `incremental` or `full` modes will clean up (delete) previous versions of the content.\n",
"* If the source document has been **deleted** (meaning it is not included in the documents currently being indexed), the `full` cleanup mode will delete it from the vector store correctly, but the `incremental` mode will not.\n",
"* If the content of the source document or derived documents has **changed**, all 3 modes will clean up (delete) previous versions of the content.\n",
"* If the source document has been **deleted** (meaning it is not included in the documents currently being indexed), the `full` cleanup mode will delete it from the vector store correctly, but the `incremental` and `scoped_full` mode will not.\n",
"\n",
"When content is mutated (e.g., the source PDF file was revised) there will be a period of time during indexing when both the new and old versions may be returned to the user. This happens after the new content was written, but before the old version was deleted.\n",
"\n",
"* `incremental` indexing minimizes this period of time as it is able to do clean up continuously, as it writes.\n",
"* `full` mode does the clean up after all batches have been written.\n",
"* `full` and `scoped_full` mode does the clean up after all batches have been written.\n",
"\n",
"## Requirements\n",
"\n",
@ -64,7 +65,7 @@
" \n",
"## Caution\n",
"\n",
"The record manager relies on a time-based mechanism to determine what content can be cleaned up (when using `full` or `incremental` cleanup modes).\n",
"The record manager relies on a time-based mechanism to determine what content can be cleaned up (when using `full` or `incremental` or `scoped_full` cleanup modes).\n",
"\n",
"If two tasks run back-to-back, and the first task finishes before the clock time changes, then the second task may not be able to clean up content.\n",
"\n",

View File

@ -197,7 +197,7 @@ def index(
vector_store: Union[VectorStore, DocumentIndex],
*,
batch_size: int = 100,
cleanup: Literal["incremental", "full", None] = None,
cleanup: Literal["incremental", "full", "scoped_full", None] = None,
source_id_key: Union[str, Callable[[Document], str], None] = None,
cleanup_batch_size: int = 1_000,
force_update: bool = False,
@ -215,7 +215,7 @@ def index(
are not able to specify the uid of the document.
IMPORTANT:
* if auto_cleanup is set to True, the loader should be returning
* In full mode, the loader should be returning
the entire dataset, and not just a subset of the dataset.
Otherwise, the auto_cleanup will remove documents that it is not
supposed to.
@ -227,6 +227,11 @@ def index(
chunks, and we index them using a batch size of 5, we'll have 3 batches
all with the same source id. In general, to avoid doing too much
redundant work select as big a batch size as possible.
* The `scoped_full` mode is suitable if determining an appropriate batch size
is challenging or if your data loader cannot return the entire dataset at
once. This mode keeps track of source IDs in memory, which should be fine
for most use cases. If your dataset is large (10M+ docs), you will likely
need to parallelize the indexing process regardless.
Args:
docs_source: Data loader or iterable of documents to index.
@ -235,16 +240,19 @@ def index(
vector_store: VectorStore or DocumentIndex to index the documents into.
batch_size: Batch size to use when indexing. Default is 100.
cleanup: How to handle clean up of documents. Default is None.
- Incremental: Cleans up all documents that haven't been updated AND
- incremental: Cleans up all documents that haven't been updated AND
that are associated with source ids that were seen
during indexing.
Clean up is done continuously during indexing helping
to minimize the probability of users seeing duplicated
content.
- Full: Delete all documents that have not been returned by the loader
- full: Delete all documents that have not been returned by the loader
during this run of indexing.
Clean up runs after all documents have been indexed.
This means that users may see duplicated content during indexing.
- scoped_full: Similar to Full, but only deletes all documents
that haven't been updated AND that are associated with
source ids that were seen during indexing.
- None: Do not delete any documents.
source_id_key: Optional key that helps identify the original source
of the document. Default is None.
@ -270,16 +278,22 @@ def index(
ValueError: If vectorstore does not have
"delete" and "add_documents" required methods.
ValueError: If source_id_key is not None, but is not a string or callable.
.. version_modified:: 0.3.25
* Added `scoped_full` cleanup mode.
"""
if cleanup not in {"incremental", "full", None}:
if cleanup not in {"incremental", "full", "scoped_full", None}:
msg = (
f"cleanup should be one of 'incremental', 'full' or None. "
f"cleanup should be one of 'incremental', 'full', 'scoped_full' or None. "
f"Got {cleanup}."
)
raise ValueError(msg)
if cleanup == "incremental" and source_id_key is None:
msg = "Source id key is required when cleanup mode is incremental."
if (cleanup == "incremental" or cleanup == "scoped_full") and source_id_key is None:
msg = (
"Source id key is required when cleanup mode is incremental or scoped_full."
)
raise ValueError(msg)
destination = vector_store # Renaming internally for clarity
@ -326,6 +340,7 @@ def index(
num_skipped = 0
num_updated = 0
num_deleted = 0
scoped_full_cleanup_source_ids: set[str] = set()
for doc_batch in _batch(batch_size, doc_iterator):
hashed_docs = list(
@ -338,17 +353,20 @@ def index(
source_id_assigner(doc) for doc in hashed_docs
]
if cleanup == "incremental":
# If the cleanup mode is incremental, source ids are required.
if cleanup == "incremental" or cleanup == "scoped_full":
# source ids are required.
for source_id, hashed_doc in zip(source_ids, hashed_docs):
if source_id is None:
msg = (
"Source ids are required when cleanup mode is incremental. "
f"Source ids are required when cleanup mode is "
f"incremental or scoped_full. "
f"Document that starts with "
f"content: {hashed_doc.page_content[:100]} was not assigned "
f"as source id."
)
raise ValueError(msg)
if cleanup == "scoped_full":
scoped_full_cleanup_source_ids.add(source_id)
# source ids cannot be None after for loop above.
source_ids = cast(Sequence[str], source_ids) # type: ignore[assignment]
@ -428,9 +446,12 @@ def index(
record_manager.delete_keys(uids_to_delete)
num_deleted += len(uids_to_delete)
if cleanup == "full":
if cleanup == "full" or cleanup == "scoped_full":
delete_group_ids: Optional[Sequence[str]] = None
if cleanup == "scoped_full":
delete_group_ids = list(scoped_full_cleanup_source_ids)
while uids_to_delete := record_manager.list_keys(
before=index_start_dt, limit=cleanup_batch_size
group_ids=delete_group_ids, before=index_start_dt, limit=cleanup_batch_size
):
# First delete from record store.
destination.delete(uids_to_delete)
@ -459,7 +480,7 @@ async def aindex(
vector_store: Union[VectorStore, DocumentIndex],
*,
batch_size: int = 100,
cleanup: Literal["incremental", "full", None] = None,
cleanup: Literal["incremental", "full", "scoped_full", None] = None,
source_id_key: Union[str, Callable[[Document], str], None] = None,
cleanup_batch_size: int = 1_000,
force_update: bool = False,
@ -477,10 +498,23 @@ async def aindex(
are not able to specify the uid of the document.
IMPORTANT:
if auto_cleanup is set to True, the loader should be returning
the entire dataset, and not just a subset of the dataset.
Otherwise, the auto_cleanup will remove documents that it is not
supposed to.
* In full mode, the loader should be returning
the entire dataset, and not just a subset of the dataset.
Otherwise, the auto_cleanup will remove documents that it is not
supposed to.
* In incremental mode, if documents associated with a particular
source id appear across different batches, the indexing API
will do some redundant work. This will still result in the
correct end state of the index, but will unfortunately not be
100% efficient. For example, if a given document is split into 15
chunks, and we index them using a batch size of 5, we'll have 3 batches
all with the same source id. In general, to avoid doing too much
redundant work select as big a batch size as possible.
* The `scoped_full` mode is suitable if determining an appropriate batch size
is challenging or if your data loader cannot return the entire dataset at
once. This mode keeps track of source IDs in memory, which should be fine
for most use cases. If your dataset is large (10M+ docs), you will likely
need to parallelize the indexing process regardless.
Args:
docs_source: Data loader or iterable of documents to index.
@ -489,15 +523,18 @@ async def aindex(
vector_store: VectorStore or DocumentIndex to index the documents into.
batch_size: Batch size to use when indexing. Default is 100.
cleanup: How to handle clean up of documents. Default is None.
- Incremental: Cleans up all documents that haven't been updated AND
- incremental: Cleans up all documents that haven't been updated AND
that are associated with source ids that were seen
during indexing.
Clean up is done continuously during indexing helping
to minimize the probability of users seeing duplicated
content.
- Full: Delete all documents that haven to been returned by the loader.
- full: Delete all documents that haven to been returned by the loader.
Clean up runs after all documents have been indexed.
This means that users may see duplicated content during indexing.
- scoped_full: Similar to Full, but only deletes all documents
that haven't been updated AND that are associated with
source ids that were seen during indexing.
- None: Do not delete any documents.
source_id_key: Optional key that helps identify the original source
of the document. Default is None.
@ -523,17 +560,23 @@ async def aindex(
ValueError: If vectorstore does not have
"adelete" and "aadd_documents" required methods.
ValueError: If source_id_key is not None, but is not a string or callable.
.. version_modified:: 0.3.25
* Added `scoped_full` cleanup mode.
"""
if cleanup not in {"incremental", "full", None}:
if cleanup not in {"incremental", "full", "scoped_full", None}:
msg = (
f"cleanup should be one of 'incremental', 'full' or None. "
f"cleanup should be one of 'incremental', 'full', 'scoped_full' or None. "
f"Got {cleanup}."
)
raise ValueError(msg)
if cleanup == "incremental" and source_id_key is None:
msg = "Source id key is required when cleanup mode is incremental."
if (cleanup == "incremental" or cleanup == "scoped_full") and source_id_key is None:
msg = (
"Source id key is required when cleanup mode is incremental or scoped_full."
)
raise ValueError(msg)
destination = vector_store # Renaming internally for clarity
@ -589,6 +632,7 @@ async def aindex(
num_skipped = 0
num_updated = 0
num_deleted = 0
scoped_full_cleanup_source_ids: set[str] = set()
async for doc_batch in _abatch(batch_size, async_doc_iterator):
hashed_docs = list(
@ -601,17 +645,20 @@ async def aindex(
source_id_assigner(doc) for doc in hashed_docs
]
if cleanup == "incremental":
if cleanup == "incremental" or cleanup == "scoped_full":
# If the cleanup mode is incremental, source ids are required.
for source_id, hashed_doc in zip(source_ids, hashed_docs):
if source_id is None:
msg = (
"Source ids are required when cleanup mode is incremental. "
f"Source ids are required when cleanup mode is "
f"incremental or scoped_full. "
f"Document that starts with "
f"content: {hashed_doc.page_content[:100]} was not assigned "
f"as source id."
)
raise ValueError(msg)
if cleanup == "scoped_full":
scoped_full_cleanup_source_ids.add(source_id)
# source ids cannot be None after for loop above.
source_ids = cast(Sequence[str], source_ids)
@ -691,9 +738,12 @@ async def aindex(
await record_manager.adelete_keys(uids_to_delete)
num_deleted += len(uids_to_delete)
if cleanup == "full":
if cleanup == "full" or cleanup == "scoped_full":
delete_group_ids: Optional[Sequence[str]] = None
if cleanup == "scoped_full":
delete_group_ids = list(scoped_full_cleanup_source_ids)
while uids_to_delete := await record_manager.alist_keys(
before=index_start_dt, limit=cleanup_batch_size
group_ids=delete_group_ids, before=index_start_dt, limit=cleanup_batch_size
):
# First delete from record store.
await destination.adelete(uids_to_delete)

View File

@ -364,6 +364,306 @@ async def test_aincremental_fails_with_bad_source_ids(
)
def test_index_simple_delete_scoped_full(
record_manager: InMemoryRecordManager, vector_store: InMemoryVectorStore
) -> None:
"""Test Indexing with scoped_full strategy."""
loader = ToyLoader(
documents=[
Document(
page_content="This is a test document.",
metadata={"source": "1"},
),
Document(
page_content="This is another document.",
metadata={"source": "1"},
),
Document(
page_content="This is yet another document.",
metadata={"source": "1"},
),
Document(
page_content="This is a test document from another source.",
metadata={"source": "2"},
),
]
)
with patch.object(
record_manager, "get_time", return_value=datetime(2021, 1, 1).timestamp()
):
assert index(
loader,
record_manager,
vector_store,
cleanup="scoped_full",
source_id_key="source",
) == {
"num_added": 4,
"num_deleted": 0,
"num_skipped": 0,
"num_updated": 0,
}
with patch.object(
record_manager, "get_time", return_value=datetime(2021, 1, 2).timestamp()
):
assert index(
loader,
record_manager,
vector_store,
cleanup="scoped_full",
source_id_key="source",
) == {
"num_added": 0,
"num_deleted": 0,
"num_skipped": 4,
"num_updated": 0,
}
loader = ToyLoader(
documents=[
Document(
page_content="mutated document 1",
metadata={"source": "1"},
),
Document(
page_content="This is another document.", # <-- Same as original
metadata={"source": "1"},
),
]
)
with patch.object(
record_manager, "get_time", return_value=datetime(2021, 1, 3).timestamp()
):
assert index(
loader,
record_manager,
vector_store,
cleanup="scoped_full",
source_id_key="source",
) == {
"num_added": 1,
"num_deleted": 2,
"num_skipped": 1,
"num_updated": 0,
}
doc_texts = {
# Ignoring type since doc should be in the store and not a None
vector_store.get_by_ids([uid])[0].page_content # type: ignore
for uid in vector_store.store
}
assert doc_texts == {
"mutated document 1",
"This is another document.",
"This is a test document from another source.",
}
# Attempt to index again verify that nothing changes
with patch.object(
record_manager, "get_time", return_value=datetime(2021, 1, 4).timestamp()
):
assert index(
loader,
record_manager,
vector_store,
cleanup="scoped_full",
source_id_key="source",
) == {
"num_added": 0,
"num_deleted": 0,
"num_skipped": 2,
"num_updated": 0,
}
async def test_aindex_simple_delete_scoped_full(
arecord_manager: InMemoryRecordManager, vector_store: InMemoryVectorStore
) -> None:
"""Test Indexing with scoped_full strategy."""
loader = ToyLoader(
documents=[
Document(
page_content="This is a test document.",
metadata={"source": "1"},
),
Document(
page_content="This is another document.",
metadata={"source": "1"},
),
Document(
page_content="This is yet another document.",
metadata={"source": "1"},
),
Document(
page_content="This is a test document from another source.",
metadata={"source": "2"},
),
]
)
with patch.object(
arecord_manager, "get_time", return_value=datetime(2021, 1, 1).timestamp()
):
assert await aindex(
loader,
arecord_manager,
vector_store,
cleanup="scoped_full",
source_id_key="source",
) == {
"num_added": 4,
"num_deleted": 0,
"num_skipped": 0,
"num_updated": 0,
}
with patch.object(
arecord_manager, "get_time", return_value=datetime(2021, 1, 2).timestamp()
):
assert await aindex(
loader,
arecord_manager,
vector_store,
cleanup="scoped_full",
source_id_key="source",
) == {
"num_added": 0,
"num_deleted": 0,
"num_skipped": 4,
"num_updated": 0,
}
loader = ToyLoader(
documents=[
Document(
page_content="mutated document 1",
metadata={"source": "1"},
),
Document(
page_content="This is another document.", # <-- Same as original
metadata={"source": "1"},
),
]
)
with patch.object(
arecord_manager, "get_time", return_value=datetime(2021, 1, 3).timestamp()
):
assert await aindex(
loader,
arecord_manager,
vector_store,
cleanup="scoped_full",
source_id_key="source",
) == {
"num_added": 1,
"num_deleted": 2,
"num_skipped": 1,
"num_updated": 0,
}
doc_texts = {
# Ignoring type since doc should be in the store and not a None
vector_store.get_by_ids([uid])[0].page_content # type: ignore
for uid in vector_store.store
}
assert doc_texts == {
"mutated document 1",
"This is another document.",
"This is a test document from another source.",
}
# Attempt to index again verify that nothing changes
with patch.object(
arecord_manager, "get_time", return_value=datetime(2021, 1, 4).timestamp()
):
assert await aindex(
loader,
arecord_manager,
vector_store,
cleanup="scoped_full",
source_id_key="source",
) == {
"num_added": 0,
"num_deleted": 0,
"num_skipped": 2,
"num_updated": 0,
}
def test_scoped_full_fails_with_bad_source_ids(
record_manager: InMemoryRecordManager, vector_store: InMemoryVectorStore
) -> None:
"""Test Indexing with scoped_full strategy."""
loader = ToyLoader(
documents=[
Document(
page_content="This is a test document.",
metadata={"source": "1"},
),
Document(
page_content="This is another document.",
metadata={"source": "2"},
),
Document(
page_content="This is yet another document.",
metadata={"source": None},
),
]
)
with pytest.raises(ValueError):
# Should raise an error because no source id function was specified
index(loader, record_manager, vector_store, cleanup="scoped_full")
with pytest.raises(ValueError):
# Should raise an error because no source id function was specified
index(
loader,
record_manager,
vector_store,
cleanup="scoped_full",
source_id_key="source",
)
async def test_ascoped_full_fails_with_bad_source_ids(
arecord_manager: InMemoryRecordManager, vector_store: InMemoryVectorStore
) -> None:
"""Test Indexing with scoped_full strategy."""
loader = ToyLoader(
documents=[
Document(
page_content="This is a test document.",
metadata={"source": "1"},
),
Document(
page_content="This is another document.",
metadata={"source": "2"},
),
Document(
page_content="This is yet another document.",
metadata={"source": None},
),
]
)
with pytest.raises(ValueError):
# Should raise an error because no source id function was specified
await aindex(loader, arecord_manager, vector_store, cleanup="scoped_full")
with pytest.raises(ValueError):
# Should raise an error because no source id function was specified
await aindex(
loader,
arecord_manager,
vector_store,
cleanup="scoped_full",
source_id_key="source",
)
def test_no_delete(
record_manager: InMemoryRecordManager, vector_store: InMemoryVectorStore
) -> None: