diff --git a/docs/extras/integrations/document_transformers/docai.ipynb b/docs/extras/integrations/document_transformers/docai.ipynb new file mode 100644 index 00000000000..8cf81ff6789 --- /dev/null +++ b/docs/extras/integrations/document_transformers/docai.ipynb @@ -0,0 +1,283 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 1, + "id": "48438efb-9f0d-473b-a91c-9f1e29c2539d", + "metadata": {}, + "outputs": [], + "source": [ + "from langchain.document_loaders.blob_loaders import Blob\n", + "from langchain.document_loaders.parsers import DocAIParser" + ] + }, + { + "cell_type": "markdown", + "id": "f95ac25b-f025-40c3-95b8-77919fc4da7f", + "metadata": {}, + "source": [ + "DocAI is a Google Cloud platform to transform unstructured data from documents into structured data, making it easier to understand, analyze, and consume. You can read more about it: https://cloud.google.com/document-ai/docs/overview " + ] + }, + { + "cell_type": "markdown", + "id": "51946817-798c-4d11-abd6-db2ae53a0270", + "metadata": {}, + "source": [ + "First, you need to set up a GCS bucket and create your own OCR processor as described here: https://cloud.google.com/document-ai/docs/create-processor\n", + "The GCS_OUTPUT_PATH should be a path to a folder on GCS (starting with `gs://`) and a processor name should look like `projects/PROJECT_NUMBER/locations/LOCATION/processors/PROCESSOR_ID`. You can get it either programmatically or copy from the `Prediction endpoint` section of the `Processor details` tab in the Google Cloud Console." + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "ac85f7f3-3ef6-41d5-920a-b55f2939c202", + "metadata": {}, + "outputs": [], + "source": [ + "PROJECT = \"PUT_SOMETHING_HERE\"\n", + "GCS_OUTPUT_PATH = \"PUT_SOMETHING_HERE\"\n", + "PROCESSOR_NAME = \"PUT_SOMETHING_HERE\"" + ] + }, + { + "cell_type": "markdown", + "id": "fad2bcca-1c0e-4888-b82d-15823ba57e60", + "metadata": {}, + "source": [ + "Now, let's create a parser:" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "id": "dcc0c65a-86c5-448d-8b21-2e564b1903b7", + "metadata": {}, + "outputs": [], + "source": [ + "parser = DocAIParser(location=\"us\", processor_name=PROCESSOR_NAME, gcs_output_path=GCS_OUTPUT_PATH)" + ] + }, + { + "cell_type": "markdown", + "id": "b8b5a3ff-650a-4ad3-a73a-395f86e4c9e1", + "metadata": {}, + "source": [ + "Let's go and parse an Alphabet's take from here: https://abc.xyz/assets/a7/5b/9e5ae0364b12b4c883f3cf748226/goog-exhibit-99-1-q1-2023-19.pdf. Copy it to your GCS bucket first, and adjust the path below." + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "id": "373cc18e-a311-4c8d-8180-47e4ade1d2ad", + "metadata": {}, + "outputs": [], + "source": [ + "blob = Blob(path=\"gs://vertex-pgt/examples/goog-exhibit-99-1-q1-2023-19.pdf\")" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "6ef84fad-2981-456d-a6b4-3a6a1a46d511", + "metadata": {}, + "outputs": [], + "source": [ + "docs = list(parser.lazy_parse(blob))" + ] + }, + { + "cell_type": "markdown", + "id": "3f8e4ee1-e07d-4c29-a120-4d56aae91859", + "metadata": {}, + "source": [ + "We'll get one document per page, 11 in total:" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "id": "343919f5-35d2-47fb-9790-de464649ebdf", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "11\n" + ] + } + ], + "source": [ + "print(len(docs))" + ] + }, + { + "cell_type": "markdown", + "id": "b104ae56-011b-4abe-ac07-e999c69494c5", + "metadata": {}, + "source": [ + "You can run end-to-end parsing of a blob one-by-one. If you have many documents, it might be a better approach to batch them together and maybe even detach parsing from handling the results of parsing." + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "id": "9ecc1b99-5cef-47b0-a125-dbb2c41d2224", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "['projects/543079149601/locations/us/operations/16447136779727347991']\n" + ] + } + ], + "source": [ + "operations = parser.docai_parse([blob])\n", + "print([op.operation.name for op in operations])" + ] + }, + { + "cell_type": "markdown", + "id": "a2d24d63-c2c7-454c-9df3-2a9cf51309a6", + "metadata": {}, + "source": [ + "You can check whether operations are finished:" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "id": "ab11efb0-e514-4f44-9ba5-3d638a59c9e6", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "True" + ] + }, + "execution_count": 10, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "parser.is_running(operations)" + ] + }, + { + "cell_type": "markdown", + "id": "602ca0bc-080a-4a4e-a413-0e705aeab189", + "metadata": {}, + "source": [ + "And when they're finished, you can parse the results:" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "id": "ec1e6041-bc10-47d4-ba64-d09055c14f27", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "False" + ] + }, + "execution_count": 11, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "parser.is_running(operations)" + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "id": "95d89da4-1c8a-413d-8473-ddd4a39375a5", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "DocAIParsingResults(source_path='gs://vertex-pgt/examples/goog-exhibit-99-1-q1-2023-19.pdf', parsed_path='gs://vertex-pgt/test/run1/16447136779727347991/0')\n" + ] + } + ], + "source": [ + "results = parser.get_results(operations)\n", + "print(results[0])" + ] + }, + { + "cell_type": "markdown", + "id": "87e5b606-1679-46c7-9577-4cf9bc93a752", + "metadata": {}, + "source": [ + "And now we can finally generate Documents from parsed results:" + ] + }, + { + "cell_type": "code", + "execution_count": 15, + "id": "08e8878d-889b-41ad-9500-2f772d38782f", + "metadata": {}, + "outputs": [], + "source": [ + "docs = list(parser.parse_from_results(results))" + ] + }, + { + "cell_type": "code", + "execution_count": 16, + "id": "c59525fb-448d-444b-8f12-c4aea791e19b", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "11\n" + ] + } + ], + "source": [ + "print(len(docs))" + ] + } + ], + "metadata": { + "environment": { + "kernel": "python3", + "name": "common-cpu.m109", + "type": "gcloud", + "uri": "gcr.io/deeplearning-platform-release/base-cpu:m109" + }, + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.10.11" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/libs/langchain/langchain/document_loaders/parsers/__init__.py b/libs/langchain/langchain/document_loaders/parsers/__init__.py index 5d4843e9abe..e2233e5cc6d 100644 --- a/libs/langchain/langchain/document_loaders/parsers/__init__.py +++ b/libs/langchain/langchain/document_loaders/parsers/__init__.py @@ -1,4 +1,5 @@ from langchain.document_loaders.parsers.audio import OpenAIWhisperParser +from langchain.document_loaders.parsers.docai import DocAIParser from langchain.document_loaders.parsers.grobid import GrobidParser from langchain.document_loaders.parsers.html import BS4HTMLParser from langchain.document_loaders.parsers.language import LanguageParser @@ -12,6 +13,7 @@ from langchain.document_loaders.parsers.pdf import ( __all__ = [ "BS4HTMLParser", + "DocAIParser", "GrobidParser", "LanguageParser", "OpenAIWhisperParser", diff --git a/libs/langchain/langchain/document_loaders/parsers/docai.py b/libs/langchain/langchain/document_loaders/parsers/docai.py new file mode 100644 index 00000000000..dd6913ac6ea --- /dev/null +++ b/libs/langchain/langchain/document_loaders/parsers/docai.py @@ -0,0 +1,292 @@ +"""Module contains a PDF parser based on DocAI from Google Cloud. + +You need to install two libraries to use this parser: +pip install google-cloud-documentai +pip install google-cloud-documentai-toolbox +""" +import logging +import time +from dataclasses import dataclass +from typing import TYPE_CHECKING, Iterator, List, Optional, Sequence + +from langchain.docstore.document import Document +from langchain.document_loaders.base import BaseBlobParser +from langchain.document_loaders.blob_loaders import Blob +from langchain.utils.iter import batch_iterate + +if TYPE_CHECKING: + from google.api_core.operation import Operation + from google.cloud.documentai import DocumentProcessorServiceClient + + +logger = logging.getLogger(__name__) + + +@dataclass +class DocAIParsingResults: + """A dataclass to store DocAI parsing results.""" + + source_path: str + parsed_path: str + + +class DocAIParser(BaseBlobParser): + def __init__( + self, + *, + client: Optional["DocumentProcessorServiceClient"] = None, + location: Optional[str] = None, + gcs_output_path: Optional[str] = None, + processor_name: Optional[str] = None, + ): + """Initializes the parser. + + Args: + client: a DocumentProcessorServiceClient to use + location: a GCP location where a DOcAI parser is located + gcs_output_path: a path on GCS to store parsing results + processor_name: name of a processor + + You should provide either a client or location (and then a client + would be instantiated). + """ + if client and location: + raise ValueError( + "You should provide either a client or a location but not both " + "of them." + ) + if not client and not location: + raise ValueError( + "You must specify either a client or a location to instantiate " + "a client." + ) + + self._gcs_output_path = gcs_output_path + self._processor_name = processor_name + if client: + self._client = client + else: + try: + from google.api_core.client_options import ClientOptions + from google.cloud.documentai import DocumentProcessorServiceClient + except ImportError: + raise ImportError( + "documentai package not found, please install it with" + " `pip install google-cloud-documentai`" + ) + options = ClientOptions( + api_endpoint=f"{location}-documentai.googleapis.com" + ) + self._client = DocumentProcessorServiceClient(client_options=options) + + def lazy_parse(self, blob: Blob) -> Iterator[Document]: + """Parses a blob lazily. + + Args: + blobs: a Blob to parse + + This is a long-running operations! A recommended way is to batch + documents together and use `batch_parse` method. + """ + yield from self.batch_parse([blob], gcs_output_path=self._gcs_output_path) + + def batch_parse( + self, + blobs: Sequence[Blob], + gcs_output_path: Optional[str] = None, + timeout_sec: int = 3600, + check_in_interval_sec: int = 60, + ) -> Iterator[Document]: + """Parses a list of blobs lazily. + + Args: + blobs: a list of blobs to parse + gcs_output_path: a path on GCS to store parsing results + timeout_sec: a timeout to wait for DocAI to complete, in seconds + check_in_interval_sec: an interval to wait until next check + whether parsing operations have been completed, in seconds + This is a long-running operations! A recommended way is to decouple + parsing from creating Langchain Documents: + >>> operations = parser.docai_parse(blobs, gcs_path) + >>> parser.is_running(operations) + You can get operations names and save them: + >>> names = [op.operation.name for op in operations] + And when all operations are finished, you can use their results: + >>> operations = parser.operations_from_names(operation_names) + >>> results = parser.get_results(operations) + >>> docs = parser.parse_from_results(results) + """ + output_path = gcs_output_path if gcs_output_path else self._gcs_output_path + if output_path is None: + raise ValueError("An output path on GCS should be provided!") + operations = self.docai_parse(blobs, gcs_output_path=output_path) + operation_names = [op.operation.name for op in operations] + logger.debug( + f"Started parsing with DocAI, submitted operations {operation_names}" + ) + is_running, time_elapsed = True, 0 + while is_running: + is_running = self.is_running(operations) + if not is_running: + break + time.sleep(check_in_interval_sec) + time_elapsed += check_in_interval_sec + if time_elapsed > timeout_sec: + raise ValueError( + "Timeout exceeded! Check operations " f"{operation_names} later!" + ) + logger.debug(".") + + results = self.get_results(operations=operations) + yield from self.parse_from_results(results) + + def parse_from_results( + self, results: List[DocAIParsingResults] + ) -> Iterator[Document]: + try: + from google.cloud.documentai_toolbox.wrappers.document import _get_shards + from google.cloud.documentai_toolbox.wrappers.page import _text_from_layout + except ImportError: + raise ImportError( + "documentai_toolbox package not found, please install it with" + " `pip install google-cloud-documentai-toolbox`" + ) + for result in results: + output_gcs = result.parsed_path.split("/") + gcs_bucket_name = output_gcs[2] + gcs_prefix = "/".join(output_gcs[3:]) + "/" + shards = _get_shards(gcs_bucket_name, gcs_prefix) + docs, page_number = [], 1 + for shard in shards: + for page in shard.pages: + docs.append( + Document( + page_content=_text_from_layout(page.layout, shard.text), + metadata={ + "page": page_number, + "source": result.source_path, + }, + ) + ) + page_number += 1 + yield from docs + + def operations_from_names(self, operation_names: List[str]) -> List["Operation"]: + """Initializes Long-Running Operations from their names.""" + try: + from google.longrunning.operations_pb2 import ( + GetOperationRequest, # type: ignore + ) + except ImportError: + raise ImportError( + "documentai package not found, please install it with" + " `pip install gapic-google-longrunning`" + ) + + operations = [] + for name in operation_names: + request = GetOperationRequest(name=name) + operations.append(self._client.get_operation(request=request)) + return operations + + def is_running(self, operations: List["Operation"]) -> bool: + for op in operations: + if not op.done(): + return True + return False + + def docai_parse( + self, + blobs: Sequence[Blob], + *, + gcs_output_path: Optional[str] = None, + batch_size: int = 4000, + enable_native_pdf_parsing: bool = True, + ) -> List["Operation"]: + """Runs Google DocAI PDF parser on a list of blobs. + + Args: + blobs: a list of blobs to be parsed + gcs_output_path: a path (folder) on GCS to store results + batch_size: amount of documents per batch + enable_native_pdf_parsing: a config option for the parser + + DocAI has a limit on the amount of documents per batch, that's why split a + batch into mini-batches. Parsing is an async long-running operation + on Google Cloud and results are stored in a output GCS bucket. + """ + try: + from google.cloud import documentai + from google.cloud.documentai_v1.types import OcrConfig, ProcessOptions + except ImportError: + raise ImportError( + "documentai package not found, please install it with" + " `pip install google-cloud-documentai`" + ) + + if not self._processor_name: + raise ValueError("Processor name is not defined, aborting!") + output_path = gcs_output_path if gcs_output_path else self._gcs_output_path + if output_path is None: + raise ValueError("An output path on GCS should be provided!") + + operations = [] + for batch in batch_iterate(size=batch_size, iterable=blobs): + documents = [] + for blob in batch: + gcs_document = documentai.GcsDocument( + gcs_uri=blob.path, mime_type="application/pdf" + ) + documents.append(gcs_document) + gcs_documents = documentai.GcsDocuments(documents=documents) + + input_config = documentai.BatchDocumentsInputConfig( + gcs_documents=gcs_documents + ) + + gcs_output_config = documentai.DocumentOutputConfig.GcsOutputConfig( + gcs_uri=output_path, field_mask=None + ) + output_config = documentai.DocumentOutputConfig( + gcs_output_config=gcs_output_config + ) + + if enable_native_pdf_parsing: + process_options = ProcessOptions( + ocr_config=OcrConfig( + enable_native_pdf_parsing=enable_native_pdf_parsing + ) + ) + else: + process_options = ProcessOptions() + request = documentai.BatchProcessRequest( + name=self._processor_name, + input_documents=input_config, + document_output_config=output_config, + process_options=process_options, + ) + operations.append(self._client.batch_process_documents(request)) + return operations + + def get_results(self, operations: List["Operation"]) -> List[DocAIParsingResults]: + try: + from google.cloud.documentai_v1 import BatchProcessMetadata + except ImportError: + raise ImportError( + "documentai package not found, please install it with" + " `pip install google-cloud-documentai`" + ) + + results = [] + for op in operations: + if isinstance(op.metadata, BatchProcessMetadata): + metadata = op.metadata + else: + metadata = BatchProcessMetadata.deserialize(op.metadata.value) + for status in metadata.individual_process_statuses: + source = status.input_gcs_source + output = status.output_gcs_destination + results.append( + DocAIParsingResults(source_path=source, parsed_path=output) + ) + return results diff --git a/libs/langchain/tests/unit_tests/document_loaders/parsers/test_public_api.py b/libs/langchain/tests/unit_tests/document_loaders/parsers/test_public_api.py index 84f2db36bcd..f1037064b08 100644 --- a/libs/langchain/tests/unit_tests/document_loaders/parsers/test_public_api.py +++ b/libs/langchain/tests/unit_tests/document_loaders/parsers/test_public_api.py @@ -5,6 +5,7 @@ def test_parsers_public_api_correct() -> None: """Test public API of parsers for breaking changes.""" assert set(__all__) == { "BS4HTMLParser", + "DocAIParser", "GrobidParser", "LanguageParser", "OpenAIWhisperParser",