community[minor]: Implement DirectoryLoader lazy_load function (#19537)

Thank you for contributing to LangChain!

- [x] **PR title**: "community: Implement DirectoryLoader lazy_load
function"

- [x] **Description**: The `lazy_load` function of the `DirectoryLoader`
yields each document separately. If the given `loader_cls` of the
`DirectoryLoader` also implemented `lazy_load`, it will be used to yield
subdocuments of the file.

- [x] **Add tests and docs**: If you're adding a new integration, please
include
1. a test for the integration, preferably unit tests that do not rely on
network access:
`libs/community/tests/unit_tests/document_loaders/test_directory_loader.py`
2. an example notebook showing its use. It lives in
`docs/docs/integrations` directory:
`docs/docs/integrations/document_loaders/directory.ipynb`


- [x] **Lint and test**: Run `make format`, `make lint` and `make test`
from the root of the package(s) you've modified. See contribution
guidelines for more: https://python.langchain.com/docs/contributing/

Additional guidelines:
- Make sure optional dependencies are imported within a function.
- Please do not add dependencies to pyproject.toml files (even optional
ones) unless they are required for unit tests.
- Most PRs should not touch more than one package.
- Changes should be backwards compatible.
- If you are adding something to community, do not re-import it in
langchain.

If no one reviews your PR within a few days, please @-mention one of
baskaryan, efriis, eyurtsev, hwchase17.

---------

Co-authored-by: Eugene Yurtsev <eyurtsev@gmail.com>
This commit is contained in:
DasDingoCodes 2024-03-29 15:46:52 +01:00 committed by GitHub
parent 6b2b511f68
commit 73eb3f8fd9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 161 additions and 36 deletions

View File

@ -2,17 +2,18 @@ import concurrent
import logging
import random
from pathlib import Path
from typing import Any, List, Optional, Sequence, Type, Union
from typing import Any, Callable, Iterator, List, Optional, Sequence, Type, Union
from langchain_core.documents import Document
from langchain_community.document_loaders.base import BaseLoader
from langchain_community.document_loaders.csv_loader import CSVLoader
from langchain_community.document_loaders.html_bs import BSHTMLLoader
from langchain_community.document_loaders.text import TextLoader
from langchain_community.document_loaders.unstructured import UnstructuredFileLoader
FILE_LOADER_TYPE = Union[
Type[UnstructuredFileLoader], Type[TextLoader], Type[BSHTMLLoader]
Type[UnstructuredFileLoader], Type[TextLoader], Type[BSHTMLLoader], Type[CSVLoader]
]
logger = logging.getLogger(__name__)
@ -111,44 +112,18 @@ class DirectoryLoader(BaseLoader):
self.randomize_sample = randomize_sample
self.sample_seed = sample_seed
def load_file(
self, item: Path, path: Path, docs: List[Document], pbar: Optional[Any]
) -> None:
"""Load a file.
Args:
item: File path.
path: Directory path.
docs: List of documents to append to.
pbar: Progress bar. Defaults to None.
"""
if item.is_file():
if _is_visible(item.relative_to(path)) or self.load_hidden:
try:
logger.debug(f"Processing file: {str(item)}")
sub_docs = self.loader_cls(str(item), **self.loader_kwargs).load()
docs.extend(sub_docs)
except Exception as e:
if self.silent_errors:
logger.warning(f"Error loading file {str(item)}: {e}")
else:
logger.error(f"Error loading file {str(item)}")
raise e
finally:
if pbar:
pbar.update(1)
def load(self) -> List[Document]:
"""Load documents."""
return list(self.lazy_load())
def lazy_load(self) -> Iterator[Document]:
"""Load documents lazily."""
p = Path(self.path)
if not p.exists():
raise FileNotFoundError(f"Directory not found: '{self.path}'")
if not p.is_dir():
raise ValueError(f"Expected directory, got file: '{self.path}'")
docs: List[Document] = []
paths = p.rglob(self.glob) if self.recursive else p.glob(self.glob)
items = [
path
@ -185,15 +160,62 @@ class DirectoryLoader(BaseLoader):
)
if self.use_multithreading:
futures = []
with concurrent.futures.ThreadPoolExecutor(
max_workers=self.max_concurrency
) as executor:
executor.map(lambda i: self.load_file(i, p, docs, pbar), items)
for i in items:
futures.append(
executor.submit(
self._lazy_load_file_to_non_generator(self._lazy_load_file),
i,
p,
pbar,
)
)
for future in concurrent.futures.as_completed(futures):
yield future.result()
else:
for i in items:
self.load_file(i, p, docs, pbar)
yield from self._lazy_load_file(i, p, pbar)
if pbar:
pbar.close()
return docs
def _lazy_load_file_to_non_generator(self, func: Callable) -> Callable:
def non_generator(item: Path, path: Path, pbar: Optional[Any]) -> List:
return [x for x in func(item, path, pbar)]
return non_generator
def _lazy_load_file(
self, item: Path, path: Path, pbar: Optional[Any]
) -> Iterator[Document]:
"""Load a file.
Args:
item: File path.
path: Directory path.
pbar: Progress bar. Defaults to None.
"""
if item.is_file():
if _is_visible(item.relative_to(path)) or self.load_hidden:
try:
logger.debug(f"Processing file: {str(item)}")
loader = self.loader_cls(str(item), **self.loader_kwargs)
try:
for subdoc in loader.lazy_load():
yield subdoc
except NotImplementedError:
for subdoc in loader.load():
yield subdoc
except Exception as e:
if self.silent_errors:
logger.warning(f"Error loading file {str(item)}: {e}")
else:
logger.error(f"Error loading file {str(item)}")
raise e
finally:
if pbar:
pbar.update(1)

View File

@ -1,5 +1,5 @@
from pathlib import Path
from typing import Any, List
from typing import Any, Iterator, List
import pytest
from langchain_core.documents import Document
@ -35,6 +35,9 @@ class CustomLoader:
with open(self.path, "r") as f:
return [Document(page_content=f.read())]
def lazy_load(self) -> Iterator[Document]:
raise NotImplementedError("CustomLoader does not implement lazy_load()")
def test_exclude_ignores_matching_files(tmp_path: Path) -> None:
txt_file = tmp_path / "test.txt"

View File

@ -0,0 +1,100 @@
from pathlib import Path
import pytest
from langchain_core.documents import Document
from langchain_community.document_loaders.csv_loader import CSVLoader
from langchain_community.document_loaders.directory import DirectoryLoader
class TestDirectoryLoader:
# Tests that lazy loading a CSV file with multiple documents is successful.
def test_directory_loader_lazy_load_single_file_multiple_docs(self) -> None:
# Setup
dir_path = self._get_csv_dir_path()
file_name = "test_nominal.csv"
file_path = self._get_csv_file_path(file_name)
expected_docs = [
Document(
page_content="column1: value1\ncolumn2: value2\ncolumn3: value3",
metadata={"source": file_path, "row": 0},
),
Document(
page_content="column1: value4\ncolumn2: value5\ncolumn3: value6",
metadata={"source": file_path, "row": 1},
),
]
# Assert
loader = DirectoryLoader(dir_path, glob=file_name, loader_cls=CSVLoader)
for i, doc in enumerate(loader.lazy_load()):
assert doc == expected_docs[i]
# Tests that lazy loading an empty CSV file is handled correctly.
def test_directory_loader_lazy_load_empty_file(self) -> None:
# Setup
dir_path = self._get_csv_dir_path()
file_name = "test_empty.csv"
# Assert
loader = DirectoryLoader(dir_path, glob=file_name, loader_cls=CSVLoader)
for _ in loader.lazy_load():
pytest.fail(
"DirectoryLoader.lazy_load should not yield something for an empty file"
)
# Tests that lazy loading multiple CSV files is handled correctly.
def test_directory_loader_lazy_load_multiple_files(self) -> None:
# Setup
dir_path = self._get_csv_dir_path()
file_name = "test_nominal.csv"
file_path = self._get_csv_file_path(file_name)
expected_docs = [
Document(
page_content="column1: value1\ncolumn2: value2\ncolumn3: value3",
metadata={"source": file_path, "row": 0},
),
Document(
page_content="column1: value4\ncolumn2: value5\ncolumn3: value6",
metadata={"source": file_path, "row": 1},
),
]
file_name = "test_one_col.csv"
file_path = self._get_csv_file_path(file_name)
expected_docs += [
Document(
page_content="column1: value1",
metadata={"source": file_path, "row": 0},
),
Document(
page_content="column1: value2",
metadata={"source": file_path, "row": 1},
),
Document(
page_content="column1: value3",
metadata={"source": file_path, "row": 2},
),
]
file_name = "test_one_row.csv"
file_path = self._get_csv_file_path(file_name)
expected_docs += [
Document(
page_content="column1: value1\ncolumn2: value2\ncolumn3: value3",
metadata={"source": file_path, "row": 0},
)
]
# Assert
loader = DirectoryLoader(dir_path, loader_cls=CSVLoader)
loaded_docs = []
for doc in loader.lazy_load():
assert doc in expected_docs
loaded_docs.append(doc)
assert len(loaded_docs) == len(expected_docs)
# utility functions
def _get_csv_file_path(self, file_name: str) -> str:
return str(Path(__file__).resolve().parent / "test_docs" / "csv" / file_name)
def _get_csv_dir_path(self) -> str:
return str(Path(__file__).resolve().parent / "test_docs" / "csv")