mirror of
https://github.com/hwchase17/langchain.git
synced 2025-09-07 22:11:51 +00:00
community[minor]: [PebbloSafeLoader] Implement content-size-based batching (#24871)
- **Title:** [PebbloSafeLoader] Implement content-size-based batching in the classification flow(loader/doc API) - **Description:** - Implemented content-size-based batching in the loader/doc API, set to 100KB with no external configuration option, intentionally hard-coded to prevent timeouts. - Remove unused field(pb_id) from doc_metadata - **Issue:** NA - **Dependencies:** NA - **Add tests and docs:** Updated
This commit is contained in:
@@ -13,6 +13,7 @@ from langchain_core.documents import Document
|
||||
from langchain_community.document_loaders.base import BaseLoader
|
||||
from langchain_community.utilities.pebblo import (
|
||||
APP_DISCOVER_URL,
|
||||
BATCH_SIZE_BYTES,
|
||||
CLASSIFIER_URL,
|
||||
LOADER_DOC_URL,
|
||||
PEBBLO_CLOUD_URL,
|
||||
@@ -20,6 +21,7 @@ from langchain_community.utilities.pebblo import (
|
||||
App,
|
||||
Doc,
|
||||
IndexedDocument,
|
||||
generate_size_based_batches,
|
||||
get_full_path,
|
||||
get_loader_full_path,
|
||||
get_loader_type,
|
||||
@@ -68,6 +70,7 @@ class PebbloSafeLoader(BaseLoader):
|
||||
self.source_aggregate_size = 0
|
||||
self.classifier_url = classifier_url or CLASSIFIER_URL
|
||||
self.classifier_location = classifier_location
|
||||
self.batch_size = BATCH_SIZE_BYTES
|
||||
self.loader_details = {
|
||||
"loader": loader_name,
|
||||
"source_path": self.source_path,
|
||||
@@ -89,15 +92,37 @@ class PebbloSafeLoader(BaseLoader):
|
||||
list: Documents fetched from load method of the wrapped `loader`.
|
||||
"""
|
||||
self.docs = self.loader.load()
|
||||
self.docs_with_id = self._index_docs()
|
||||
classified_docs = self._classify_doc(loading_end=True)
|
||||
self._add_pebblo_specific_metadata(classified_docs)
|
||||
if self.load_semantic:
|
||||
self.docs = self._add_semantic_to_docs(classified_docs)
|
||||
else:
|
||||
self.docs = self._unindex_docs() # type: ignore
|
||||
# Classify docs in batches
|
||||
self.classify_in_batches()
|
||||
return self.docs
|
||||
|
||||
def classify_in_batches(self) -> None:
|
||||
"""
|
||||
Classify documents in batches.
|
||||
This is to avoid API timeouts when sending large number of documents.
|
||||
Batches are generated based on the page_content size.
|
||||
"""
|
||||
batches: List[List[Document]] = generate_size_based_batches(
|
||||
self.docs, self.batch_size
|
||||
)
|
||||
|
||||
processed_docs: List[Document] = []
|
||||
|
||||
total_batches = len(batches)
|
||||
for i, batch in enumerate(batches):
|
||||
is_last_batch: bool = i == total_batches - 1
|
||||
self.docs = batch
|
||||
self.docs_with_id = self._index_docs()
|
||||
classified_docs = self._classify_doc(loading_end=is_last_batch)
|
||||
self._add_pebblo_specific_metadata(classified_docs)
|
||||
if self.load_semantic:
|
||||
batch_processed_docs = self._add_semantic_to_docs(classified_docs)
|
||||
else:
|
||||
batch_processed_docs = self._unindex_docs()
|
||||
processed_docs.extend(batch_processed_docs)
|
||||
|
||||
self.docs = processed_docs
|
||||
|
||||
def lazy_load(self) -> Iterator[Document]:
|
||||
"""Load documents in lazy fashion.
|
||||
|
||||
@@ -531,7 +556,6 @@ class PebbloSafeLoader(BaseLoader):
|
||||
"full_path", doc_metadata.get("source", self.source_path)
|
||||
)
|
||||
)
|
||||
doc_metadata["pb_id"] = doc.pb_id
|
||||
doc_metadata["pb_checksum"] = classified_docs.get(doc.pb_id, {}).get(
|
||||
"pb_checksum", None
|
||||
)
|
||||
|
Reference in New Issue
Block a user