From ad77fa15eec4dae431171b7e8c13b8c1f9edec98 Mon Sep 17 00:00:00 2001 From: Timothy Date: Wed, 27 Mar 2024 00:12:24 +0000 Subject: [PATCH] community[patch]: Adding try-except block for GCSDirectoryLoader (#19591) - **Description:** Implemented try-except block for `GCSDirectoryLoader`. Reason: Users processing large number of unstructured files in a folder may experience many different errors. A try-exception block is added to capture these errors. A new argument `use_try_except=True` is added to enable *silent failure* so that error caused by processing one file does not break the whole function. - **Issue:** N/A - **Dependencies:** no new dependencies - **Twitter handle:** timothywong731 --------- Co-authored-by: Bagatur --- .../google_cloud_storage_directory.ipynb | 36 ++++++++++++++++--- .../document_loaders/gcs_directory.py | 23 ++++++++++++ 2 files changed, 55 insertions(+), 4 deletions(-) diff --git a/docs/docs/integrations/document_loaders/google_cloud_storage_directory.ipynb b/docs/docs/integrations/document_loaders/google_cloud_storage_directory.ipynb index 0c3d674fd9f..ee00a82569c 100644 --- a/docs/docs/integrations/document_loaders/google_cloud_storage_directory.ipynb +++ b/docs/docs/integrations/document_loaders/google_cloud_storage_directory.ipynb @@ -126,17 +126,40 @@ ] }, { - "cell_type": "code", - "execution_count": null, + "cell_type": "markdown", "id": "f9c0734f", "metadata": {}, + "source": [ + "## Continue on failure to load a single file\n", + "Files in a GCS bucket may cause errors during processing. Enable the `continue_on_failure=True` argument to allow silent failure. This means failure to process a single file will not break the function, it will log a warning instead. " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "3d774795", + "metadata": {}, "outputs": [], - "source": [] + "source": [ + "loader = GCSDirectoryLoader(\n", + " project_name=\"aist\", bucket=\"testing-hwc\", continue_on_failure=True\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "0d15f536", + "metadata": {}, + "outputs": [], + "source": [ + "loader.load()" + ] } ], "metadata": { "kernelspec": { - "display_name": "Python 3 (ipykernel)", + "display_name": "Python 3.10.6 64-bit", "language": "python", "name": "python3" }, @@ -151,6 +174,11 @@ "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.10.6" + }, + "vscode": { + "interpreter": { + "hash": "5f90d085fc70553c85f15dd96b84c64a94d58988a621c9dbc38cac6a7e6079b3" + } } }, "nbformat": 4, diff --git a/libs/community/langchain_community/document_loaders/gcs_directory.py b/libs/community/langchain_community/document_loaders/gcs_directory.py index d6f5b61c0db..3414ec3b670 100644 --- a/libs/community/langchain_community/document_loaders/gcs_directory.py +++ b/libs/community/langchain_community/document_loaders/gcs_directory.py @@ -1,3 +1,4 @@ +import logging from typing import Callable, List, Optional from langchain_core.documents import Document @@ -6,6 +7,8 @@ from langchain_community.document_loaders.base import BaseLoader from langchain_community.document_loaders.gcs_file import GCSFileLoader from langchain_community.utilities.vertexai import get_client_info +logger = logging.getLogger(__name__) + class GCSDirectoryLoader(BaseLoader): """Load from GCS directory.""" @@ -16,6 +19,7 @@ class GCSDirectoryLoader(BaseLoader): bucket: str, prefix: str = "", loader_func: Optional[Callable[[str], BaseLoader]] = None, + continue_on_failure: bool = False, ): """Initialize with bucket and key name. @@ -26,11 +30,15 @@ class GCSDirectoryLoader(BaseLoader): loader_func: A loader function that instantiates a loader based on a file_path argument. If nothing is provided, the GCSFileLoader would use its default loader. + continue_on_failure: To use try-except block for each file within the GCS + directory. If set to `True`, then failure to process a file will not + cause an error. """ self.project_name = project_name self.bucket = bucket self.prefix = prefix self._loader_func = loader_func + self.continue_on_failure = continue_on_failure def load(self) -> List[Document]: """Load documents.""" @@ -55,4 +63,19 @@ class GCSDirectoryLoader(BaseLoader): self.project_name, self.bucket, blob.name, loader_func=self._loader_func ) docs.extend(loader.load()) + # Use the try-except block here + try: + loader = GCSFileLoader( + self.project_name, + self.bucket, + blob.name, + loader_func=self._loader_func, + ) + docs.extend(loader.load()) + except Exception as e: + if self.continue_on_failure: + logger.warning(f"Problem processing blob {blob.name}, message: {e}") + continue + else: + raise e return docs