From a6add89bd4b3b816655d27a5654f6c20068a0a79 Mon Sep 17 00:00:00 2001 From: Rajendra Kadam Date: Wed, 31 Jul 2024 18:40:28 +0530 Subject: [PATCH] community[minor]: [PebbloSafeLoader] Implement content-size-based batching (#24871) - **Title:** [PebbloSafeLoader] Implement content-size-based batching in the classification flow(loader/doc API) - **Description:** - Implemented content-size-based batching in the loader/doc API, set to 100KB with no external configuration option, intentionally hard-coded to prevent timeouts. - Remove unused field(pb_id) from doc_metadata - **Issue:** NA - **Dependencies:** NA - **Add tests and docs:** Updated --- .../document_loaders/pebblo.py | 40 +++++++++++++---- .../langchain_community/utilities/pebblo.py | 43 ++++++++++++++++++- .../document_loaders/test_pebblo.py | 2 - 3 files changed, 74 insertions(+), 11 deletions(-) diff --git a/libs/community/langchain_community/document_loaders/pebblo.py b/libs/community/langchain_community/document_loaders/pebblo.py index 2e31b370cbd..b3bd4475161 100644 --- a/libs/community/langchain_community/document_loaders/pebblo.py +++ b/libs/community/langchain_community/document_loaders/pebblo.py @@ -13,6 +13,7 @@ from langchain_core.documents import Document from langchain_community.document_loaders.base import BaseLoader from langchain_community.utilities.pebblo import ( APP_DISCOVER_URL, + BATCH_SIZE_BYTES, CLASSIFIER_URL, LOADER_DOC_URL, PEBBLO_CLOUD_URL, @@ -20,6 +21,7 @@ from langchain_community.utilities.pebblo import ( App, Doc, IndexedDocument, + generate_size_based_batches, get_full_path, get_loader_full_path, get_loader_type, @@ -68,6 +70,7 @@ class PebbloSafeLoader(BaseLoader): self.source_aggregate_size = 0 self.classifier_url = classifier_url or CLASSIFIER_URL self.classifier_location = classifier_location + self.batch_size = BATCH_SIZE_BYTES self.loader_details = { "loader": loader_name, "source_path": self.source_path, @@ -89,15 +92,37 @@ class PebbloSafeLoader(BaseLoader): list: Documents fetched from load method of the wrapped `loader`. """ self.docs = self.loader.load() - self.docs_with_id = self._index_docs() - classified_docs = self._classify_doc(loading_end=True) - self._add_pebblo_specific_metadata(classified_docs) - if self.load_semantic: - self.docs = self._add_semantic_to_docs(classified_docs) - else: - self.docs = self._unindex_docs() # type: ignore + # Classify docs in batches + self.classify_in_batches() return self.docs + def classify_in_batches(self) -> None: + """ + Classify documents in batches. + This is to avoid API timeouts when sending large number of documents. + Batches are generated based on the page_content size. + """ + batches: List[List[Document]] = generate_size_based_batches( + self.docs, self.batch_size + ) + + processed_docs: List[Document] = [] + + total_batches = len(batches) + for i, batch in enumerate(batches): + is_last_batch: bool = i == total_batches - 1 + self.docs = batch + self.docs_with_id = self._index_docs() + classified_docs = self._classify_doc(loading_end=is_last_batch) + self._add_pebblo_specific_metadata(classified_docs) + if self.load_semantic: + batch_processed_docs = self._add_semantic_to_docs(classified_docs) + else: + batch_processed_docs = self._unindex_docs() + processed_docs.extend(batch_processed_docs) + + self.docs = processed_docs + def lazy_load(self) -> Iterator[Document]: """Load documents in lazy fashion. @@ -531,7 +556,6 @@ class PebbloSafeLoader(BaseLoader): "full_path", doc_metadata.get("source", self.source_path) ) ) - doc_metadata["pb_id"] = doc.pb_id doc_metadata["pb_checksum"] = classified_docs.get(doc.pb_id, {}).get( "pb_checksum", None ) diff --git a/libs/community/langchain_community/utilities/pebblo.py b/libs/community/langchain_community/utilities/pebblo.py index 568efae7b68..c61ce5bc000 100644 --- a/libs/community/langchain_community/utilities/pebblo.py +++ b/libs/community/langchain_community/utilities/pebblo.py @@ -4,7 +4,7 @@ import logging import os import pathlib import platform -from typing import Optional, Tuple +from typing import List, Optional, Tuple from langchain_core.documents import Document from langchain_core.env import get_runtime_environment @@ -20,6 +20,7 @@ PEBBLO_CLOUD_URL = os.getenv("PEBBLO_CLOUD_URL", "https://api.daxa.ai") LOADER_DOC_URL = "/v1/loader/doc" APP_DISCOVER_URL = "/v1/app/discover" +BATCH_SIZE_BYTES = 100 * 1024 # 100 KB # Supported loaders for Pebblo safe data loading file_loader = [ @@ -301,3 +302,43 @@ def get_ip() -> str: except Exception: public_ip = socket.gethostbyname("localhost") return public_ip + + +def generate_size_based_batches( + docs: List[Document], max_batch_size: int = 100 * 1024 +) -> List[List[Document]]: + """ + Generate batches of documents based on page_content size. + Args: + docs: List of documents to be batched. + max_batch_size: Maximum size of each batch in bytes. Defaults to 100*1024(100KB) + Returns: + List[List[Document]]: List of batches of documents + """ + batches: List[List[Document]] = [] + current_batch: List[Document] = [] + current_batch_size: int = 0 + + for doc in docs: + # Calculate the size of the document in bytes + doc_size: int = len(doc.page_content.encode("utf-8")) + + if doc_size > max_batch_size: + # If a single document exceeds the max batch size, send it as a single batch + batches.append([doc]) + else: + if current_batch_size + doc_size > max_batch_size: + # If adding this document exceeds the max batch size, start a new batch + batches.append(current_batch) + current_batch = [] + current_batch_size = 0 + + # Add document to the current batch + current_batch.append(doc) + current_batch_size += doc_size + + # Add the last batch if it has documents + if current_batch: + batches.append(current_batch) + + return batches diff --git a/libs/community/tests/unit_tests/document_loaders/test_pebblo.py b/libs/community/tests/unit_tests/document_loaders/test_pebblo.py index d0a71faae7a..2d6256b5044 100644 --- a/libs/community/tests/unit_tests/document_loaders/test_pebblo.py +++ b/libs/community/tests/unit_tests/document_loaders/test_pebblo.py @@ -69,7 +69,6 @@ def test_csv_loader_load_valid_data(mocker: MockerFixture) -> None: "source": full_file_path, "row": 0, "full_path": full_file_path, - "pb_id": "0", # For UT as here we are not calculating checksum "pb_checksum": None, }, @@ -80,7 +79,6 @@ def test_csv_loader_load_valid_data(mocker: MockerFixture) -> None: "source": full_file_path, "row": 1, "full_path": full_file_path, - "pb_id": "1", # For UT as here we are not calculating checksum "pb_checksum": None, },