From 943aa01c14f1f6a1ad294c5732c5b443b9cfac3d Mon Sep 17 00:00:00 2001 From: Eugene Yurtsev Date: Fri, 1 Dec 2023 12:10:07 -0500 Subject: [PATCH] Improve indexing performance for Postgres (remote database) for refresh for async API (#14132) This PR speeds up the indexing api on the async path by batching the uid updates in the sql record manager (which may be remote). --- libs/langchain/langchain/indexes/_api.py | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/libs/langchain/langchain/indexes/_api.py b/libs/langchain/langchain/indexes/_api.py index cd018fbe77e..5637a12511b 100644 --- a/libs/langchain/langchain/indexes/_api.py +++ b/libs/langchain/langchain/indexes/_api.py @@ -303,18 +303,18 @@ def index( # Filter out documents that already exist in the record store. uids = [] docs_to_index = [] - docs_to_update = [] + uids_to_refresh = [] for hashed_doc, doc_exists in zip(hashed_docs, exists_batch): if doc_exists: - docs_to_update.append(hashed_doc.uid) + uids_to_refresh.append(hashed_doc.uid) continue uids.append(hashed_doc.uid) docs_to_index.append(hashed_doc.to_document()) # Update refresh timestamp - if docs_to_update: - record_manager.update(docs_to_update, time_at_least=index_start_dt) - num_skipped += len(docs_to_update) + if uids_to_refresh: + record_manager.update(uids_to_refresh, time_at_least=index_start_dt) + num_skipped += len(uids_to_refresh) # Be pessimistic and assume that all vector store write will fail. # First write to vector store @@ -503,18 +503,20 @@ async def aindex( # Filter out documents that already exist in the record store. uids: list[str] = [] docs_to_index: list[Document] = [] + uids_to_refresh = [] for hashed_doc, doc_exists in zip(hashed_docs, exists_batch): if doc_exists: - # Must be updated to refresh timestamp. - await record_manager.aupdate( - [hashed_doc.uid], time_at_least=index_start_dt - ) - num_skipped += 1 + uids_to_refresh.append(hashed_doc.uid) continue uids.append(hashed_doc.uid) docs_to_index.append(hashed_doc.to_document()) + if uids_to_refresh: + # Must be updated to refresh timestamp. + await record_manager.aupdate(uids_to_refresh, time_at_least=index_start_dt) + num_skipped += len(uids_to_refresh) + # Be pessimistic and assume that all vector store write will fail. # First write to vector store if docs_to_index: