mirror of
https://github.com/hwchase17/langchain.git
synced 2025-06-28 01:19:31 +00:00
Added a pdf parser based on DocAI (#9579)
#9578 --------- Co-authored-by: Leonid Kuligin <kuligin@google.com> Co-authored-by: Eugene Yurtsev <eyurtsev@gmail.com>
This commit is contained in:
parent
adb21782b8
commit
87da56fb1e
283
docs/extras/integrations/document_transformers/docai.ipynb
Normal file
283
docs/extras/integrations/document_transformers/docai.ipynb
Normal file
@ -0,0 +1,283 @@
|
||||
{
|
||||
"cells": [
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 1,
|
||||
"id": "48438efb-9f0d-473b-a91c-9f1e29c2539d",
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"from langchain.document_loaders.blob_loaders import Blob\n",
|
||||
"from langchain.document_loaders.parsers import DocAIParser"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"id": "f95ac25b-f025-40c3-95b8-77919fc4da7f",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"DocAI is a Google Cloud platform to transform unstructured data from documents into structured data, making it easier to understand, analyze, and consume. You can read more about it: https://cloud.google.com/document-ai/docs/overview "
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"id": "51946817-798c-4d11-abd6-db2ae53a0270",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"First, you need to set up a GCS bucket and create your own OCR processor as described here: https://cloud.google.com/document-ai/docs/create-processor\n",
|
||||
"The GCS_OUTPUT_PATH should be a path to a folder on GCS (starting with `gs://`) and a processor name should look like `projects/PROJECT_NUMBER/locations/LOCATION/processors/PROCESSOR_ID`. You can get it either programmatically or copy from the `Prediction endpoint` section of the `Processor details` tab in the Google Cloud Console."
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 2,
|
||||
"id": "ac85f7f3-3ef6-41d5-920a-b55f2939c202",
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"PROJECT = \"PUT_SOMETHING_HERE\"\n",
|
||||
"GCS_OUTPUT_PATH = \"PUT_SOMETHING_HERE\"\n",
|
||||
"PROCESSOR_NAME = \"PUT_SOMETHING_HERE\""
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"id": "fad2bcca-1c0e-4888-b82d-15823ba57e60",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"Now, let's create a parser:"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 3,
|
||||
"id": "dcc0c65a-86c5-448d-8b21-2e564b1903b7",
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"parser = DocAIParser(location=\"us\", processor_name=PROCESSOR_NAME, gcs_output_path=GCS_OUTPUT_PATH)"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"id": "b8b5a3ff-650a-4ad3-a73a-395f86e4c9e1",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"Let's go and parse an Alphabet's take from here: https://abc.xyz/assets/a7/5b/9e5ae0364b12b4c883f3cf748226/goog-exhibit-99-1-q1-2023-19.pdf. Copy it to your GCS bucket first, and adjust the path below."
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 4,
|
||||
"id": "373cc18e-a311-4c8d-8180-47e4ade1d2ad",
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"blob = Blob(path=\"gs://vertex-pgt/examples/goog-exhibit-99-1-q1-2023-19.pdf\")"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 5,
|
||||
"id": "6ef84fad-2981-456d-a6b4-3a6a1a46d511",
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"docs = list(parser.lazy_parse(blob))"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"id": "3f8e4ee1-e07d-4c29-a120-4d56aae91859",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"We'll get one document per page, 11 in total:"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 8,
|
||||
"id": "343919f5-35d2-47fb-9790-de464649ebdf",
|
||||
"metadata": {},
|
||||
"outputs": [
|
||||
{
|
||||
"name": "stdout",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
"11\n"
|
||||
]
|
||||
}
|
||||
],
|
||||
"source": [
|
||||
"print(len(docs))"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"id": "b104ae56-011b-4abe-ac07-e999c69494c5",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"You can run end-to-end parsing of a blob one-by-one. If you have many documents, it might be a better approach to batch them together and maybe even detach parsing from handling the results of parsing."
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 9,
|
||||
"id": "9ecc1b99-5cef-47b0-a125-dbb2c41d2224",
|
||||
"metadata": {},
|
||||
"outputs": [
|
||||
{
|
||||
"name": "stdout",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
"['projects/543079149601/locations/us/operations/16447136779727347991']\n"
|
||||
]
|
||||
}
|
||||
],
|
||||
"source": [
|
||||
"operations = parser.docai_parse([blob])\n",
|
||||
"print([op.operation.name for op in operations])"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"id": "a2d24d63-c2c7-454c-9df3-2a9cf51309a6",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"You can check whether operations are finished:"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 10,
|
||||
"id": "ab11efb0-e514-4f44-9ba5-3d638a59c9e6",
|
||||
"metadata": {},
|
||||
"outputs": [
|
||||
{
|
||||
"data": {
|
||||
"text/plain": [
|
||||
"True"
|
||||
]
|
||||
},
|
||||
"execution_count": 10,
|
||||
"metadata": {},
|
||||
"output_type": "execute_result"
|
||||
}
|
||||
],
|
||||
"source": [
|
||||
"parser.is_running(operations)"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"id": "602ca0bc-080a-4a4e-a413-0e705aeab189",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"And when they're finished, you can parse the results:"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 11,
|
||||
"id": "ec1e6041-bc10-47d4-ba64-d09055c14f27",
|
||||
"metadata": {},
|
||||
"outputs": [
|
||||
{
|
||||
"data": {
|
||||
"text/plain": [
|
||||
"False"
|
||||
]
|
||||
},
|
||||
"execution_count": 11,
|
||||
"metadata": {},
|
||||
"output_type": "execute_result"
|
||||
}
|
||||
],
|
||||
"source": [
|
||||
"parser.is_running(operations)"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 12,
|
||||
"id": "95d89da4-1c8a-413d-8473-ddd4a39375a5",
|
||||
"metadata": {},
|
||||
"outputs": [
|
||||
{
|
||||
"name": "stdout",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
"DocAIParsingResults(source_path='gs://vertex-pgt/examples/goog-exhibit-99-1-q1-2023-19.pdf', parsed_path='gs://vertex-pgt/test/run1/16447136779727347991/0')\n"
|
||||
]
|
||||
}
|
||||
],
|
||||
"source": [
|
||||
"results = parser.get_results(operations)\n",
|
||||
"print(results[0])"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"id": "87e5b606-1679-46c7-9577-4cf9bc93a752",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"And now we can finally generate Documents from parsed results:"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 15,
|
||||
"id": "08e8878d-889b-41ad-9500-2f772d38782f",
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"docs = list(parser.parse_from_results(results))"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 16,
|
||||
"id": "c59525fb-448d-444b-8f12-c4aea791e19b",
|
||||
"metadata": {},
|
||||
"outputs": [
|
||||
{
|
||||
"name": "stdout",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
"11\n"
|
||||
]
|
||||
}
|
||||
],
|
||||
"source": [
|
||||
"print(len(docs))"
|
||||
]
|
||||
}
|
||||
],
|
||||
"metadata": {
|
||||
"environment": {
|
||||
"kernel": "python3",
|
||||
"name": "common-cpu.m109",
|
||||
"type": "gcloud",
|
||||
"uri": "gcr.io/deeplearning-platform-release/base-cpu:m109"
|
||||
},
|
||||
"kernelspec": {
|
||||
"display_name": "Python 3",
|
||||
"language": "python",
|
||||
"name": "python3"
|
||||
},
|
||||
"language_info": {
|
||||
"codemirror_mode": {
|
||||
"name": "ipython",
|
||||
"version": 3
|
||||
},
|
||||
"file_extension": ".py",
|
||||
"mimetype": "text/x-python",
|
||||
"name": "python",
|
||||
"nbconvert_exporter": "python",
|
||||
"pygments_lexer": "ipython3",
|
||||
"version": "3.10.11"
|
||||
}
|
||||
},
|
||||
"nbformat": 4,
|
||||
"nbformat_minor": 5
|
||||
}
|
@ -1,4 +1,5 @@
|
||||
from langchain.document_loaders.parsers.audio import OpenAIWhisperParser
|
||||
from langchain.document_loaders.parsers.docai import DocAIParser
|
||||
from langchain.document_loaders.parsers.grobid import GrobidParser
|
||||
from langchain.document_loaders.parsers.html import BS4HTMLParser
|
||||
from langchain.document_loaders.parsers.language import LanguageParser
|
||||
@ -12,6 +13,7 @@ from langchain.document_loaders.parsers.pdf import (
|
||||
|
||||
__all__ = [
|
||||
"BS4HTMLParser",
|
||||
"DocAIParser",
|
||||
"GrobidParser",
|
||||
"LanguageParser",
|
||||
"OpenAIWhisperParser",
|
||||
|
292
libs/langchain/langchain/document_loaders/parsers/docai.py
Normal file
292
libs/langchain/langchain/document_loaders/parsers/docai.py
Normal file
@ -0,0 +1,292 @@
|
||||
"""Module contains a PDF parser based on DocAI from Google Cloud.
|
||||
|
||||
You need to install two libraries to use this parser:
|
||||
pip install google-cloud-documentai
|
||||
pip install google-cloud-documentai-toolbox
|
||||
"""
|
||||
import logging
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
from typing import TYPE_CHECKING, Iterator, List, Optional, Sequence
|
||||
|
||||
from langchain.docstore.document import Document
|
||||
from langchain.document_loaders.base import BaseBlobParser
|
||||
from langchain.document_loaders.blob_loaders import Blob
|
||||
from langchain.utils.iter import batch_iterate
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from google.api_core.operation import Operation
|
||||
from google.cloud.documentai import DocumentProcessorServiceClient
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class DocAIParsingResults:
|
||||
"""A dataclass to store DocAI parsing results."""
|
||||
|
||||
source_path: str
|
||||
parsed_path: str
|
||||
|
||||
|
||||
class DocAIParser(BaseBlobParser):
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
client: Optional["DocumentProcessorServiceClient"] = None,
|
||||
location: Optional[str] = None,
|
||||
gcs_output_path: Optional[str] = None,
|
||||
processor_name: Optional[str] = None,
|
||||
):
|
||||
"""Initializes the parser.
|
||||
|
||||
Args:
|
||||
client: a DocumentProcessorServiceClient to use
|
||||
location: a GCP location where a DOcAI parser is located
|
||||
gcs_output_path: a path on GCS to store parsing results
|
||||
processor_name: name of a processor
|
||||
|
||||
You should provide either a client or location (and then a client
|
||||
would be instantiated).
|
||||
"""
|
||||
if client and location:
|
||||
raise ValueError(
|
||||
"You should provide either a client or a location but not both "
|
||||
"of them."
|
||||
)
|
||||
if not client and not location:
|
||||
raise ValueError(
|
||||
"You must specify either a client or a location to instantiate "
|
||||
"a client."
|
||||
)
|
||||
|
||||
self._gcs_output_path = gcs_output_path
|
||||
self._processor_name = processor_name
|
||||
if client:
|
||||
self._client = client
|
||||
else:
|
||||
try:
|
||||
from google.api_core.client_options import ClientOptions
|
||||
from google.cloud.documentai import DocumentProcessorServiceClient
|
||||
except ImportError:
|
||||
raise ImportError(
|
||||
"documentai package not found, please install it with"
|
||||
" `pip install google-cloud-documentai`"
|
||||
)
|
||||
options = ClientOptions(
|
||||
api_endpoint=f"{location}-documentai.googleapis.com"
|
||||
)
|
||||
self._client = DocumentProcessorServiceClient(client_options=options)
|
||||
|
||||
def lazy_parse(self, blob: Blob) -> Iterator[Document]:
|
||||
"""Parses a blob lazily.
|
||||
|
||||
Args:
|
||||
blobs: a Blob to parse
|
||||
|
||||
This is a long-running operations! A recommended way is to batch
|
||||
documents together and use `batch_parse` method.
|
||||
"""
|
||||
yield from self.batch_parse([blob], gcs_output_path=self._gcs_output_path)
|
||||
|
||||
def batch_parse(
|
||||
self,
|
||||
blobs: Sequence[Blob],
|
||||
gcs_output_path: Optional[str] = None,
|
||||
timeout_sec: int = 3600,
|
||||
check_in_interval_sec: int = 60,
|
||||
) -> Iterator[Document]:
|
||||
"""Parses a list of blobs lazily.
|
||||
|
||||
Args:
|
||||
blobs: a list of blobs to parse
|
||||
gcs_output_path: a path on GCS to store parsing results
|
||||
timeout_sec: a timeout to wait for DocAI to complete, in seconds
|
||||
check_in_interval_sec: an interval to wait until next check
|
||||
whether parsing operations have been completed, in seconds
|
||||
This is a long-running operations! A recommended way is to decouple
|
||||
parsing from creating Langchain Documents:
|
||||
>>> operations = parser.docai_parse(blobs, gcs_path)
|
||||
>>> parser.is_running(operations)
|
||||
You can get operations names and save them:
|
||||
>>> names = [op.operation.name for op in operations]
|
||||
And when all operations are finished, you can use their results:
|
||||
>>> operations = parser.operations_from_names(operation_names)
|
||||
>>> results = parser.get_results(operations)
|
||||
>>> docs = parser.parse_from_results(results)
|
||||
"""
|
||||
output_path = gcs_output_path if gcs_output_path else self._gcs_output_path
|
||||
if output_path is None:
|
||||
raise ValueError("An output path on GCS should be provided!")
|
||||
operations = self.docai_parse(blobs, gcs_output_path=output_path)
|
||||
operation_names = [op.operation.name for op in operations]
|
||||
logger.debug(
|
||||
f"Started parsing with DocAI, submitted operations {operation_names}"
|
||||
)
|
||||
is_running, time_elapsed = True, 0
|
||||
while is_running:
|
||||
is_running = self.is_running(operations)
|
||||
if not is_running:
|
||||
break
|
||||
time.sleep(check_in_interval_sec)
|
||||
time_elapsed += check_in_interval_sec
|
||||
if time_elapsed > timeout_sec:
|
||||
raise ValueError(
|
||||
"Timeout exceeded! Check operations " f"{operation_names} later!"
|
||||
)
|
||||
logger.debug(".")
|
||||
|
||||
results = self.get_results(operations=operations)
|
||||
yield from self.parse_from_results(results)
|
||||
|
||||
def parse_from_results(
|
||||
self, results: List[DocAIParsingResults]
|
||||
) -> Iterator[Document]:
|
||||
try:
|
||||
from google.cloud.documentai_toolbox.wrappers.document import _get_shards
|
||||
from google.cloud.documentai_toolbox.wrappers.page import _text_from_layout
|
||||
except ImportError:
|
||||
raise ImportError(
|
||||
"documentai_toolbox package not found, please install it with"
|
||||
" `pip install google-cloud-documentai-toolbox`"
|
||||
)
|
||||
for result in results:
|
||||
output_gcs = result.parsed_path.split("/")
|
||||
gcs_bucket_name = output_gcs[2]
|
||||
gcs_prefix = "/".join(output_gcs[3:]) + "/"
|
||||
shards = _get_shards(gcs_bucket_name, gcs_prefix)
|
||||
docs, page_number = [], 1
|
||||
for shard in shards:
|
||||
for page in shard.pages:
|
||||
docs.append(
|
||||
Document(
|
||||
page_content=_text_from_layout(page.layout, shard.text),
|
||||
metadata={
|
||||
"page": page_number,
|
||||
"source": result.source_path,
|
||||
},
|
||||
)
|
||||
)
|
||||
page_number += 1
|
||||
yield from docs
|
||||
|
||||
def operations_from_names(self, operation_names: List[str]) -> List["Operation"]:
|
||||
"""Initializes Long-Running Operations from their names."""
|
||||
try:
|
||||
from google.longrunning.operations_pb2 import (
|
||||
GetOperationRequest, # type: ignore
|
||||
)
|
||||
except ImportError:
|
||||
raise ImportError(
|
||||
"documentai package not found, please install it with"
|
||||
" `pip install gapic-google-longrunning`"
|
||||
)
|
||||
|
||||
operations = []
|
||||
for name in operation_names:
|
||||
request = GetOperationRequest(name=name)
|
||||
operations.append(self._client.get_operation(request=request))
|
||||
return operations
|
||||
|
||||
def is_running(self, operations: List["Operation"]) -> bool:
|
||||
for op in operations:
|
||||
if not op.done():
|
||||
return True
|
||||
return False
|
||||
|
||||
def docai_parse(
|
||||
self,
|
||||
blobs: Sequence[Blob],
|
||||
*,
|
||||
gcs_output_path: Optional[str] = None,
|
||||
batch_size: int = 4000,
|
||||
enable_native_pdf_parsing: bool = True,
|
||||
) -> List["Operation"]:
|
||||
"""Runs Google DocAI PDF parser on a list of blobs.
|
||||
|
||||
Args:
|
||||
blobs: a list of blobs to be parsed
|
||||
gcs_output_path: a path (folder) on GCS to store results
|
||||
batch_size: amount of documents per batch
|
||||
enable_native_pdf_parsing: a config option for the parser
|
||||
|
||||
DocAI has a limit on the amount of documents per batch, that's why split a
|
||||
batch into mini-batches. Parsing is an async long-running operation
|
||||
on Google Cloud and results are stored in a output GCS bucket.
|
||||
"""
|
||||
try:
|
||||
from google.cloud import documentai
|
||||
from google.cloud.documentai_v1.types import OcrConfig, ProcessOptions
|
||||
except ImportError:
|
||||
raise ImportError(
|
||||
"documentai package not found, please install it with"
|
||||
" `pip install google-cloud-documentai`"
|
||||
)
|
||||
|
||||
if not self._processor_name:
|
||||
raise ValueError("Processor name is not defined, aborting!")
|
||||
output_path = gcs_output_path if gcs_output_path else self._gcs_output_path
|
||||
if output_path is None:
|
||||
raise ValueError("An output path on GCS should be provided!")
|
||||
|
||||
operations = []
|
||||
for batch in batch_iterate(size=batch_size, iterable=blobs):
|
||||
documents = []
|
||||
for blob in batch:
|
||||
gcs_document = documentai.GcsDocument(
|
||||
gcs_uri=blob.path, mime_type="application/pdf"
|
||||
)
|
||||
documents.append(gcs_document)
|
||||
gcs_documents = documentai.GcsDocuments(documents=documents)
|
||||
|
||||
input_config = documentai.BatchDocumentsInputConfig(
|
||||
gcs_documents=gcs_documents
|
||||
)
|
||||
|
||||
gcs_output_config = documentai.DocumentOutputConfig.GcsOutputConfig(
|
||||
gcs_uri=output_path, field_mask=None
|
||||
)
|
||||
output_config = documentai.DocumentOutputConfig(
|
||||
gcs_output_config=gcs_output_config
|
||||
)
|
||||
|
||||
if enable_native_pdf_parsing:
|
||||
process_options = ProcessOptions(
|
||||
ocr_config=OcrConfig(
|
||||
enable_native_pdf_parsing=enable_native_pdf_parsing
|
||||
)
|
||||
)
|
||||
else:
|
||||
process_options = ProcessOptions()
|
||||
request = documentai.BatchProcessRequest(
|
||||
name=self._processor_name,
|
||||
input_documents=input_config,
|
||||
document_output_config=output_config,
|
||||
process_options=process_options,
|
||||
)
|
||||
operations.append(self._client.batch_process_documents(request))
|
||||
return operations
|
||||
|
||||
def get_results(self, operations: List["Operation"]) -> List[DocAIParsingResults]:
|
||||
try:
|
||||
from google.cloud.documentai_v1 import BatchProcessMetadata
|
||||
except ImportError:
|
||||
raise ImportError(
|
||||
"documentai package not found, please install it with"
|
||||
" `pip install google-cloud-documentai`"
|
||||
)
|
||||
|
||||
results = []
|
||||
for op in operations:
|
||||
if isinstance(op.metadata, BatchProcessMetadata):
|
||||
metadata = op.metadata
|
||||
else:
|
||||
metadata = BatchProcessMetadata.deserialize(op.metadata.value)
|
||||
for status in metadata.individual_process_statuses:
|
||||
source = status.input_gcs_source
|
||||
output = status.output_gcs_destination
|
||||
results.append(
|
||||
DocAIParsingResults(source_path=source, parsed_path=output)
|
||||
)
|
||||
return results
|
@ -5,6 +5,7 @@ def test_parsers_public_api_correct() -> None:
|
||||
"""Test public API of parsers for breaking changes."""
|
||||
assert set(__all__) == {
|
||||
"BS4HTMLParser",
|
||||
"DocAIParser",
|
||||
"GrobidParser",
|
||||
"LanguageParser",
|
||||
"OpenAIWhisperParser",
|
||||
|
Loading…
Reference in New Issue
Block a user