From 73eb3f8fd9f76050a72cd659ffc3729a9d773e03 Mon Sep 17 00:00:00 2001 From: DasDingoCodes <105673014+DasDingoCodes@users.noreply.github.com> Date: Fri, 29 Mar 2024 15:46:52 +0100 Subject: [PATCH] community[minor]: Implement DirectoryLoader lazy_load function (#19537) Thank you for contributing to LangChain! - [x] **PR title**: "community: Implement DirectoryLoader lazy_load function" - [x] **Description**: The `lazy_load` function of the `DirectoryLoader` yields each document separately. If the given `loader_cls` of the `DirectoryLoader` also implemented `lazy_load`, it will be used to yield subdocuments of the file. - [x] **Add tests and docs**: If you're adding a new integration, please include 1. a test for the integration, preferably unit tests that do not rely on network access: `libs/community/tests/unit_tests/document_loaders/test_directory_loader.py` 2. an example notebook showing its use. It lives in `docs/docs/integrations` directory: `docs/docs/integrations/document_loaders/directory.ipynb` - [x] **Lint and test**: Run `make format`, `make lint` and `make test` from the root of the package(s) you've modified. See contribution guidelines for more: https://python.langchain.com/docs/contributing/ Additional guidelines: - Make sure optional dependencies are imported within a function. - Please do not add dependencies to pyproject.toml files (even optional ones) unless they are required for unit tests. - Most PRs should not touch more than one package. - Changes should be backwards compatible. - If you are adding something to community, do not re-import it in langchain. If no one reviews your PR within a few days, please @-mention one of baskaryan, efriis, eyurtsev, hwchase17. --------- Co-authored-by: Eugene Yurtsev --- .../document_loaders/directory.py | 92 ++++++++++------ .../document_loaders/test_directory.py | 5 +- .../document_loaders/test_directory_loader.py | 100 ++++++++++++++++++ 3 files changed, 161 insertions(+), 36 deletions(-) create mode 100644 libs/community/tests/unit_tests/document_loaders/test_directory_loader.py diff --git a/libs/community/langchain_community/document_loaders/directory.py b/libs/community/langchain_community/document_loaders/directory.py index 3cb2ad1309a..3902b8ae93c 100644 --- a/libs/community/langchain_community/document_loaders/directory.py +++ b/libs/community/langchain_community/document_loaders/directory.py @@ -2,17 +2,18 @@ import concurrent import logging import random from pathlib import Path -from typing import Any, List, Optional, Sequence, Type, Union +from typing import Any, Callable, Iterator, List, Optional, Sequence, Type, Union from langchain_core.documents import Document from langchain_community.document_loaders.base import BaseLoader +from langchain_community.document_loaders.csv_loader import CSVLoader from langchain_community.document_loaders.html_bs import BSHTMLLoader from langchain_community.document_loaders.text import TextLoader from langchain_community.document_loaders.unstructured import UnstructuredFileLoader FILE_LOADER_TYPE = Union[ - Type[UnstructuredFileLoader], Type[TextLoader], Type[BSHTMLLoader] + Type[UnstructuredFileLoader], Type[TextLoader], Type[BSHTMLLoader], Type[CSVLoader] ] logger = logging.getLogger(__name__) @@ -111,44 +112,18 @@ class DirectoryLoader(BaseLoader): self.randomize_sample = randomize_sample self.sample_seed = sample_seed - def load_file( - self, item: Path, path: Path, docs: List[Document], pbar: Optional[Any] - ) -> None: - """Load a file. - - Args: - item: File path. - path: Directory path. - docs: List of documents to append to. - pbar: Progress bar. Defaults to None. - - """ - if item.is_file(): - if _is_visible(item.relative_to(path)) or self.load_hidden: - try: - logger.debug(f"Processing file: {str(item)}") - sub_docs = self.loader_cls(str(item), **self.loader_kwargs).load() - docs.extend(sub_docs) - except Exception as e: - if self.silent_errors: - logger.warning(f"Error loading file {str(item)}: {e}") - else: - logger.error(f"Error loading file {str(item)}") - raise e - finally: - if pbar: - pbar.update(1) - def load(self) -> List[Document]: """Load documents.""" + return list(self.lazy_load()) + + def lazy_load(self) -> Iterator[Document]: + """Load documents lazily.""" p = Path(self.path) if not p.exists(): raise FileNotFoundError(f"Directory not found: '{self.path}'") if not p.is_dir(): raise ValueError(f"Expected directory, got file: '{self.path}'") - docs: List[Document] = [] - paths = p.rglob(self.glob) if self.recursive else p.glob(self.glob) items = [ path @@ -185,15 +160,62 @@ class DirectoryLoader(BaseLoader): ) if self.use_multithreading: + futures = [] with concurrent.futures.ThreadPoolExecutor( max_workers=self.max_concurrency ) as executor: - executor.map(lambda i: self.load_file(i, p, docs, pbar), items) + for i in items: + futures.append( + executor.submit( + self._lazy_load_file_to_non_generator(self._lazy_load_file), + i, + p, + pbar, + ) + ) + for future in concurrent.futures.as_completed(futures): + yield future.result() else: for i in items: - self.load_file(i, p, docs, pbar) + yield from self._lazy_load_file(i, p, pbar) if pbar: pbar.close() - return docs + def _lazy_load_file_to_non_generator(self, func: Callable) -> Callable: + def non_generator(item: Path, path: Path, pbar: Optional[Any]) -> List: + return [x for x in func(item, path, pbar)] + + return non_generator + + def _lazy_load_file( + self, item: Path, path: Path, pbar: Optional[Any] + ) -> Iterator[Document]: + """Load a file. + + Args: + item: File path. + path: Directory path. + pbar: Progress bar. Defaults to None. + + """ + if item.is_file(): + if _is_visible(item.relative_to(path)) or self.load_hidden: + try: + logger.debug(f"Processing file: {str(item)}") + loader = self.loader_cls(str(item), **self.loader_kwargs) + try: + for subdoc in loader.lazy_load(): + yield subdoc + except NotImplementedError: + for subdoc in loader.load(): + yield subdoc + except Exception as e: + if self.silent_errors: + logger.warning(f"Error loading file {str(item)}: {e}") + else: + logger.error(f"Error loading file {str(item)}") + raise e + finally: + if pbar: + pbar.update(1) diff --git a/libs/community/tests/unit_tests/document_loaders/test_directory.py b/libs/community/tests/unit_tests/document_loaders/test_directory.py index f83e4bc2dfe..9523ebfa26a 100644 --- a/libs/community/tests/unit_tests/document_loaders/test_directory.py +++ b/libs/community/tests/unit_tests/document_loaders/test_directory.py @@ -1,5 +1,5 @@ from pathlib import Path -from typing import Any, List +from typing import Any, Iterator, List import pytest from langchain_core.documents import Document @@ -35,6 +35,9 @@ class CustomLoader: with open(self.path, "r") as f: return [Document(page_content=f.read())] + def lazy_load(self) -> Iterator[Document]: + raise NotImplementedError("CustomLoader does not implement lazy_load()") + def test_exclude_ignores_matching_files(tmp_path: Path) -> None: txt_file = tmp_path / "test.txt" diff --git a/libs/community/tests/unit_tests/document_loaders/test_directory_loader.py b/libs/community/tests/unit_tests/document_loaders/test_directory_loader.py new file mode 100644 index 00000000000..3793878297a --- /dev/null +++ b/libs/community/tests/unit_tests/document_loaders/test_directory_loader.py @@ -0,0 +1,100 @@ +from pathlib import Path + +import pytest +from langchain_core.documents import Document + +from langchain_community.document_loaders.csv_loader import CSVLoader +from langchain_community.document_loaders.directory import DirectoryLoader + + +class TestDirectoryLoader: + # Tests that lazy loading a CSV file with multiple documents is successful. + def test_directory_loader_lazy_load_single_file_multiple_docs(self) -> None: + # Setup + dir_path = self._get_csv_dir_path() + file_name = "test_nominal.csv" + file_path = self._get_csv_file_path(file_name) + expected_docs = [ + Document( + page_content="column1: value1\ncolumn2: value2\ncolumn3: value3", + metadata={"source": file_path, "row": 0}, + ), + Document( + page_content="column1: value4\ncolumn2: value5\ncolumn3: value6", + metadata={"source": file_path, "row": 1}, + ), + ] + + # Assert + loader = DirectoryLoader(dir_path, glob=file_name, loader_cls=CSVLoader) + for i, doc in enumerate(loader.lazy_load()): + assert doc == expected_docs[i] + + # Tests that lazy loading an empty CSV file is handled correctly. + def test_directory_loader_lazy_load_empty_file(self) -> None: + # Setup + dir_path = self._get_csv_dir_path() + file_name = "test_empty.csv" + + # Assert + loader = DirectoryLoader(dir_path, glob=file_name, loader_cls=CSVLoader) + for _ in loader.lazy_load(): + pytest.fail( + "DirectoryLoader.lazy_load should not yield something for an empty file" + ) + + # Tests that lazy loading multiple CSV files is handled correctly. + def test_directory_loader_lazy_load_multiple_files(self) -> None: + # Setup + dir_path = self._get_csv_dir_path() + file_name = "test_nominal.csv" + file_path = self._get_csv_file_path(file_name) + expected_docs = [ + Document( + page_content="column1: value1\ncolumn2: value2\ncolumn3: value3", + metadata={"source": file_path, "row": 0}, + ), + Document( + page_content="column1: value4\ncolumn2: value5\ncolumn3: value6", + metadata={"source": file_path, "row": 1}, + ), + ] + file_name = "test_one_col.csv" + file_path = self._get_csv_file_path(file_name) + expected_docs += [ + Document( + page_content="column1: value1", + metadata={"source": file_path, "row": 0}, + ), + Document( + page_content="column1: value2", + metadata={"source": file_path, "row": 1}, + ), + Document( + page_content="column1: value3", + metadata={"source": file_path, "row": 2}, + ), + ] + file_name = "test_one_row.csv" + file_path = self._get_csv_file_path(file_name) + expected_docs += [ + Document( + page_content="column1: value1\ncolumn2: value2\ncolumn3: value3", + metadata={"source": file_path, "row": 0}, + ) + ] + + # Assert + loader = DirectoryLoader(dir_path, loader_cls=CSVLoader) + loaded_docs = [] + for doc in loader.lazy_load(): + assert doc in expected_docs + loaded_docs.append(doc) + assert len(loaded_docs) == len(expected_docs) + + # utility functions + def _get_csv_file_path(self, file_name: str) -> str: + return str(Path(__file__).resolve().parent / "test_docs" / "csv" / file_name) + + def _get_csv_dir_path(self) -> str: + return str(Path(__file__).resolve().parent / "test_docs" / "csv")