mirror of
https://github.com/hwchase17/langchain.git
synced 2025-09-01 19:12:42 +00:00
Add ConcurrentLoader (#7512)
Works just like the GenericLoader but concurrently for those who choose to optimize their workflow. @rlancemartin @eyurtsev --------- Co-authored-by: Harrison Chase <hw.chase.17@gmail.com>
This commit is contained in:
@@ -28,6 +28,7 @@ from langchain.document_loaders.brave_search import BraveSearchLoader
|
||||
from langchain.document_loaders.browserless import BrowserlessLoader
|
||||
from langchain.document_loaders.chatgpt import ChatGPTLoader
|
||||
from langchain.document_loaders.college_confidential import CollegeConfidentialLoader
|
||||
from langchain.document_loaders.concurrent import ConcurrentLoader
|
||||
from langchain.document_loaders.confluence import ConfluenceLoader
|
||||
from langchain.document_loaders.conllu import CoNLLULoader
|
||||
from langchain.document_loaders.csv_loader import CSVLoader, UnstructuredCSVLoader
|
||||
@@ -305,4 +306,5 @@ __all__ = [
|
||||
"XorbitsLoader",
|
||||
"YoutubeAudioLoader",
|
||||
"YoutubeLoader",
|
||||
"ConcurrentLoader",
|
||||
]
|
||||
|
65
libs/langchain/langchain/document_loaders/concurrent.py
Normal file
65
libs/langchain/langchain/document_loaders/concurrent.py
Normal file
@@ -0,0 +1,65 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import concurrent.futures
|
||||
from pathlib import Path
|
||||
from typing import Iterator, Literal, Optional, Sequence, Union
|
||||
|
||||
from langchain.document_loaders.base import BaseBlobParser
|
||||
from langchain.document_loaders.blob_loaders import BlobLoader, FileSystemBlobLoader
|
||||
from langchain.document_loaders.generic import GenericLoader
|
||||
from langchain.document_loaders.parsers.registry import get_parser
|
||||
from langchain.schema import Document
|
||||
|
||||
_PathLike = Union[str, Path]
|
||||
|
||||
DEFAULT = Literal["default"]
|
||||
|
||||
|
||||
class ConcurrentLoader(GenericLoader):
|
||||
"""
|
||||
A generic document loader that loads and parses documents concurrently.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self, blob_loader: BlobLoader, blob_parser: BaseBlobParser, num_workers: int = 4
|
||||
) -> None:
|
||||
super().__init__(blob_loader, blob_parser)
|
||||
self.num_workers = num_workers
|
||||
|
||||
def lazy_load(
|
||||
self,
|
||||
) -> Iterator[Document]:
|
||||
"""Load documents lazily with concurrent parsing."""
|
||||
with concurrent.futures.ThreadPoolExecutor(
|
||||
max_workers=self.num_workers
|
||||
) as executor:
|
||||
futures = {
|
||||
executor.submit(self.blob_parser.lazy_parse, blob)
|
||||
for blob in self.blob_loader.yield_blobs()
|
||||
}
|
||||
for future in concurrent.futures.as_completed(futures):
|
||||
yield from future.result()
|
||||
|
||||
@classmethod
|
||||
def from_filesystem(
|
||||
cls,
|
||||
path: _PathLike,
|
||||
*,
|
||||
glob: str = "**/[!.]*",
|
||||
suffixes: Optional[Sequence[str]] = None,
|
||||
show_progress: bool = False,
|
||||
parser: Union[DEFAULT, BaseBlobParser] = "default",
|
||||
num_workers: int = 4,
|
||||
) -> ConcurrentLoader:
|
||||
"""
|
||||
Create a concurrent generic document loader using a
|
||||
filesystem blob loader.
|
||||
"""
|
||||
blob_loader = FileSystemBlobLoader(
|
||||
path, glob=glob, suffixes=suffixes, show_progress=show_progress
|
||||
)
|
||||
if isinstance(parser, str):
|
||||
blob_parser = get_parser(parser)
|
||||
else:
|
||||
blob_parser = parser
|
||||
return cls(blob_loader, blob_parser, num_workers)
|
@@ -2,6 +2,7 @@ from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from langchain.document_loaders.concurrent import ConcurrentLoader
|
||||
from langchain.document_loaders.generic import GenericLoader
|
||||
from langchain.document_loaders.parsers import LanguageParser
|
||||
from langchain.text_splitter import Language
|
||||
@@ -131,3 +132,52 @@ def test_language_loader_for_javascript_with_parser_threshold() -> None:
|
||||
docs = loader.load()
|
||||
|
||||
assert len(docs) == 1
|
||||
|
||||
|
||||
def test_concurrent_language_loader_for_javascript_with_parser_threshold() -> None:
|
||||
"""Test JavaScript ConcurrentLoader with parser enabled and below threshold."""
|
||||
file_path = Path(__file__).parent.parent.parent / "examples"
|
||||
loader = ConcurrentLoader.from_filesystem(
|
||||
file_path,
|
||||
glob="hello_world.js",
|
||||
parser=LanguageParser(language=Language.JS, parser_threshold=1000),
|
||||
)
|
||||
docs = loader.load()
|
||||
|
||||
assert len(docs) == 1
|
||||
|
||||
|
||||
def test_concurrent_language_loader_for_python_with_parser_threshold() -> None:
|
||||
"""Test Python ConcurrentLoader with parser enabled and below threshold."""
|
||||
file_path = Path(__file__).parent.parent.parent / "examples"
|
||||
loader = ConcurrentLoader.from_filesystem(
|
||||
file_path,
|
||||
glob="hello_world.py",
|
||||
parser=LanguageParser(language=Language.PYTHON, parser_threshold=1000),
|
||||
)
|
||||
docs = loader.load()
|
||||
|
||||
assert len(docs) == 1
|
||||
|
||||
|
||||
@pytest.mark.skipif(not esprima_installed(), reason="requires esprima package")
|
||||
def test_concurrent_language_loader_for_javascript() -> None:
|
||||
"""Test JavaScript ConcurrentLoader with parser enabled."""
|
||||
file_path = Path(__file__).parent.parent.parent / "examples"
|
||||
loader = ConcurrentLoader.from_filesystem(
|
||||
file_path, glob="hello_world.js", parser=LanguageParser(parser_threshold=5)
|
||||
)
|
||||
docs = loader.load()
|
||||
|
||||
assert len(docs) == 3
|
||||
|
||||
|
||||
def test_concurrent_language_loader_for_python() -> None:
|
||||
"""Test Python ConcurrentLoader with parser enabled."""
|
||||
file_path = Path(__file__).parent.parent.parent / "examples"
|
||||
loader = ConcurrentLoader.from_filesystem(
|
||||
file_path, glob="hello_world.py", parser=LanguageParser(parser_threshold=5)
|
||||
)
|
||||
docs = loader.load()
|
||||
|
||||
assert len(docs) == 2
|
||||
|
Reference in New Issue
Block a user