Improve indexing performance for Postgres (remote database) for refresh for async API (#14132)

This PR speeds up the indexing api on the async path by batching the uid
updates in the sql record manager (which may be remote).
This commit is contained in:
Eugene Yurtsev 2023-12-01 12:10:07 -05:00 committed by GitHub
parent 528fc76d6a
commit 943aa01c14
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -303,18 +303,18 @@ def index(
# Filter out documents that already exist in the record store. # Filter out documents that already exist in the record store.
uids = [] uids = []
docs_to_index = [] docs_to_index = []
docs_to_update = [] uids_to_refresh = []
for hashed_doc, doc_exists in zip(hashed_docs, exists_batch): for hashed_doc, doc_exists in zip(hashed_docs, exists_batch):
if doc_exists: if doc_exists:
docs_to_update.append(hashed_doc.uid) uids_to_refresh.append(hashed_doc.uid)
continue continue
uids.append(hashed_doc.uid) uids.append(hashed_doc.uid)
docs_to_index.append(hashed_doc.to_document()) docs_to_index.append(hashed_doc.to_document())
# Update refresh timestamp # Update refresh timestamp
if docs_to_update: if uids_to_refresh:
record_manager.update(docs_to_update, time_at_least=index_start_dt) record_manager.update(uids_to_refresh, time_at_least=index_start_dt)
num_skipped += len(docs_to_update) num_skipped += len(uids_to_refresh)
# Be pessimistic and assume that all vector store write will fail. # Be pessimistic and assume that all vector store write will fail.
# First write to vector store # First write to vector store
@ -503,18 +503,20 @@ async def aindex(
# Filter out documents that already exist in the record store. # Filter out documents that already exist in the record store.
uids: list[str] = [] uids: list[str] = []
docs_to_index: list[Document] = [] docs_to_index: list[Document] = []
uids_to_refresh = []
for hashed_doc, doc_exists in zip(hashed_docs, exists_batch): for hashed_doc, doc_exists in zip(hashed_docs, exists_batch):
if doc_exists: if doc_exists:
# Must be updated to refresh timestamp. uids_to_refresh.append(hashed_doc.uid)
await record_manager.aupdate(
[hashed_doc.uid], time_at_least=index_start_dt
)
num_skipped += 1
continue continue
uids.append(hashed_doc.uid) uids.append(hashed_doc.uid)
docs_to_index.append(hashed_doc.to_document()) docs_to_index.append(hashed_doc.to_document())
if uids_to_refresh:
# Must be updated to refresh timestamp.
await record_manager.aupdate(uids_to_refresh, time_at_least=index_start_dt)
num_skipped += len(uids_to_refresh)
# Be pessimistic and assume that all vector store write will fail. # Be pessimistic and assume that all vector store write will fail.
# First write to vector store # First write to vector store
if docs_to_index: if docs_to_index: