mirror of
https://github.com/hwchase17/langchain.git
synced 2025-06-19 05:13:46 +00:00
feat: Update Google Document AI Parser (#11413)
- **Description:** Code Refactoring, Documentation Improvements for Google Document AI PDF Parser - Adds Online (synchronous) processing option. - Adds default field mask to limit payload size. - Skips Human review by default. - **Issue:** Fixes #10589 --------- Co-authored-by: Erick Friis <erick@langchain.dev>
This commit is contained in:
parent
628cc4cce8
commit
09c66fe04f
@ -2,39 +2,45 @@
|
|||||||
"cells": [
|
"cells": [
|
||||||
{
|
{
|
||||||
"cell_type": "markdown",
|
"cell_type": "markdown",
|
||||||
"id": "310fce10-e051-40db-89b0-5b5bb85cd145",
|
"id": "b317191d",
|
||||||
"metadata": {},
|
"metadata": {},
|
||||||
"source": [
|
"source": [
|
||||||
"# Document AI\n"
|
"# Google Cloud Document AI\n"
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"cell_type": "markdown",
|
"cell_type": "markdown",
|
||||||
"id": "f95ac25b-f025-40c3-95b8-77919fc4da7f",
|
"id": "a19e6f94",
|
||||||
"metadata": {},
|
"metadata": {},
|
||||||
"source": [
|
"source": [
|
||||||
">[Document AI](https://cloud.google.com/document-ai/docs/overview) is a `Google Cloud Platform` service to transform unstructured data from documents into structured data, making it easier to understand, analyze, and consume. "
|
"Document AI is a document understanding platform from Google Cloud to transform unstructured data from documents into structured data, making it easier to understand, analyze, and consume.\n",
|
||||||
|
"\n",
|
||||||
|
"Learn more:\n",
|
||||||
|
"\n",
|
||||||
|
"- [Document AI overview](https://cloud.google.com/document-ai/docs/overview)\n",
|
||||||
|
"- [Document AI videos and labs](https://cloud.google.com/document-ai/docs/videos)\n",
|
||||||
|
"- [Try it!](https://cloud.google.com/document-ai/docs/drag-and-drop)\n"
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"cell_type": "markdown",
|
"cell_type": "markdown",
|
||||||
"id": "275f2193-248f-4565-a872-93a89589cf2b",
|
"id": "184c0af8",
|
||||||
"metadata": {},
|
"metadata": {},
|
||||||
"source": [
|
"source": [
|
||||||
"The module contains a `PDF` parser based on DocAI from Google Cloud.\n",
|
"The module contains a `PDF` parser based on DocAI from Google Cloud.\n",
|
||||||
"\n",
|
"\n",
|
||||||
"You need to install two libraries to use this parser:"
|
"You need to install two libraries to use this parser:\n"
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"cell_type": "code",
|
"cell_type": "code",
|
||||||
"execution_count": null,
|
"execution_count": null,
|
||||||
"id": "34132fab-0069-4942-b68b-5b093ccfc92a",
|
"id": "c86b2f59",
|
||||||
"metadata": {},
|
"metadata": {},
|
||||||
"outputs": [],
|
"outputs": [],
|
||||||
"source": [
|
"source": [
|
||||||
"!pip install google-cloud-documentai\n",
|
"%pip install google-cloud-documentai\n",
|
||||||
"!pip install google-cloud-documentai-toolbox"
|
"%pip install google-cloud-documentai-toolbox\n"
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
@ -42,8 +48,9 @@
|
|||||||
"id": "51946817-798c-4d11-abd6-db2ae53a0270",
|
"id": "51946817-798c-4d11-abd6-db2ae53a0270",
|
||||||
"metadata": {},
|
"metadata": {},
|
||||||
"source": [
|
"source": [
|
||||||
"First, you need to set up a [`GCS` bucket and create your own OCR processor](https://cloud.google.com/document-ai/docs/create-processor) \n",
|
"First, you need to set up a Google Cloud Storage (GCS) bucket and create your own Optical Character Recognition (OCR) processor as described here: https://cloud.google.com/document-ai/docs/create-processor\n",
|
||||||
"The `GCS_OUTPUT_PATH` should be a path to a folder on GCS (starting with `gs://`) and a processor name should look like `projects/PROJECT_NUMBER/locations/LOCATION/processors/PROCESSOR_ID`. You can get it either programmatically or copy from the `Prediction endpoint` section of the `Processor details` tab in the Google Cloud Console."
|
"\n",
|
||||||
|
"The `GCS_OUTPUT_PATH` should be a path to a folder on GCS (starting with `gs://`) and a `PROCESSOR_NAME` should look like `projects/PROJECT_NUMBER/locations/LOCATION/processors/PROCESSOR_ID` or `projects/PROJECT_NUMBER/locations/LOCATION/processors/PROCESSOR_ID/processorVersions/PROCESSOR_VERSION_ID`. You can get it either programmatically or copy from the `Prediction endpoint` section of the `Processor details` tab in the Google Cloud Console.\n"
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
@ -53,9 +60,8 @@
|
|||||||
"metadata": {},
|
"metadata": {},
|
||||||
"outputs": [],
|
"outputs": [],
|
||||||
"source": [
|
"source": [
|
||||||
"PROJECT = \"PUT_SOMETHING_HERE\"\n",
|
"GCS_OUTPUT_PATH = \"gs://BUCKET_NAME/FOLDER_PATH\"\n",
|
||||||
"GCS_OUTPUT_PATH = \"PUT_SOMETHING_HERE\"\n",
|
"PROCESSOR_NAME = \"projects/PROJECT_NUMBER/locations/LOCATION/processors/PROCESSOR_ID\"\n"
|
||||||
"PROCESSOR_NAME = \"PUT_SOMETHING_HERE\""
|
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
@ -66,7 +72,7 @@
|
|||||||
"outputs": [],
|
"outputs": [],
|
||||||
"source": [
|
"source": [
|
||||||
"from langchain.document_loaders.blob_loaders import Blob\n",
|
"from langchain.document_loaders.blob_loaders import Blob\n",
|
||||||
"from langchain.document_loaders.parsers import DocAIParser"
|
"from langchain.document_loaders.parsers import DocAIParser\n"
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
@ -74,7 +80,7 @@
|
|||||||
"id": "fad2bcca-1c0e-4888-b82d-15823ba57e60",
|
"id": "fad2bcca-1c0e-4888-b82d-15823ba57e60",
|
||||||
"metadata": {},
|
"metadata": {},
|
||||||
"source": [
|
"source": [
|
||||||
"Now, let's create a parser:"
|
"Now, create a `DocAIParser`.\n"
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
@ -84,7 +90,8 @@
|
|||||||
"metadata": {},
|
"metadata": {},
|
||||||
"outputs": [],
|
"outputs": [],
|
||||||
"source": [
|
"source": [
|
||||||
"parser = DocAIParser(location=\"us\", processor_name=PROCESSOR_NAME, gcs_output_path=GCS_OUTPUT_PATH)"
|
"parser = DocAIParser(\n",
|
||||||
|
" location=\"us\", processor_name=PROCESSOR_NAME, gcs_output_path=GCS_OUTPUT_PATH)\n"
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
@ -92,7 +99,11 @@
|
|||||||
"id": "b8b5a3ff-650a-4ad3-a73a-395f86e4c9e1",
|
"id": "b8b5a3ff-650a-4ad3-a73a-395f86e4c9e1",
|
||||||
"metadata": {},
|
"metadata": {},
|
||||||
"source": [
|
"source": [
|
||||||
"Let's go and parse an Alphabet's take from here: https://abc.xyz/assets/a7/5b/9e5ae0364b12b4c883f3cf748226/goog-exhibit-99-1-q1-2023-19.pdf. Copy it to your GCS bucket first, and adjust the path below."
|
"For this example, you can use an Alphabet earnings report that's uploaded to a public GCS bucket.\n",
|
||||||
|
"\n",
|
||||||
|
"[2022Q1_alphabet_earnings_release.pdf](https://storage.googleapis.com/cloud-samples-data/gen-app-builder/search/alphabet-investor-pdfs/2022Q1_alphabet_earnings_release.pdf)\n",
|
||||||
|
"\n",
|
||||||
|
"Pass the document to the `lazy_parse()` method to\n"
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
@ -102,17 +113,7 @@
|
|||||||
"metadata": {},
|
"metadata": {},
|
||||||
"outputs": [],
|
"outputs": [],
|
||||||
"source": [
|
"source": [
|
||||||
"blob = Blob(path=\"gs://vertex-pgt/examples/goog-exhibit-99-1-q1-2023-19.pdf\")"
|
"blob = Blob(path=\"gs://cloud-samples-data/gen-app-builder/search/alphabet-investor-pdfs/2022Q1_alphabet_earnings_release.pdf\")\n"
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"cell_type": "code",
|
|
||||||
"execution_count": 5,
|
|
||||||
"id": "6ef84fad-2981-456d-a6b4-3a6a1a46d511",
|
|
||||||
"metadata": {},
|
|
||||||
"outputs": [],
|
|
||||||
"source": [
|
|
||||||
"docs = list(parser.lazy_parse(blob))"
|
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
@ -120,7 +121,7 @@
|
|||||||
"id": "3f8e4ee1-e07d-4c29-a120-4d56aae91859",
|
"id": "3f8e4ee1-e07d-4c29-a120-4d56aae91859",
|
||||||
"metadata": {},
|
"metadata": {},
|
||||||
"source": [
|
"source": [
|
||||||
"We'll get one document per page, 11 in total:"
|
"We'll get one document per page, 11 in total:\n"
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
@ -138,7 +139,8 @@
|
|||||||
}
|
}
|
||||||
],
|
],
|
||||||
"source": [
|
"source": [
|
||||||
"print(len(docs))"
|
"docs = list(parser.lazy_parse(blob))\n",
|
||||||
|
"print(len(docs))\n"
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
@ -146,7 +148,7 @@
|
|||||||
"id": "b104ae56-011b-4abe-ac07-e999c69494c5",
|
"id": "b104ae56-011b-4abe-ac07-e999c69494c5",
|
||||||
"metadata": {},
|
"metadata": {},
|
||||||
"source": [
|
"source": [
|
||||||
"You can run end-to-end parsing of a blob one-by-one. If you have many documents, it might be a better approach to batch them together and maybe even detach parsing from handling the results of parsing."
|
"You can run end-to-end parsing of a blob one-by-one. If you have many documents, it might be a better approach to batch them together and maybe even detach parsing from handling the results of parsing.\n"
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
@ -165,7 +167,7 @@
|
|||||||
],
|
],
|
||||||
"source": [
|
"source": [
|
||||||
"operations = parser.docai_parse([blob])\n",
|
"operations = parser.docai_parse([blob])\n",
|
||||||
"print([op.operation.name for op in operations])"
|
"print([op.operation.name for op in operations])\n"
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
@ -173,7 +175,7 @@
|
|||||||
"id": "a2d24d63-c2c7-454c-9df3-2a9cf51309a6",
|
"id": "a2d24d63-c2c7-454c-9df3-2a9cf51309a6",
|
||||||
"metadata": {},
|
"metadata": {},
|
||||||
"source": [
|
"source": [
|
||||||
"You can check whether operations are finished:"
|
"You can check whether operations are finished:\n"
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
@ -194,7 +196,7 @@
|
|||||||
}
|
}
|
||||||
],
|
],
|
||||||
"source": [
|
"source": [
|
||||||
"parser.is_running(operations)"
|
"parser.is_running(operations)\n"
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
@ -202,7 +204,7 @@
|
|||||||
"id": "602ca0bc-080a-4a4e-a413-0e705aeab189",
|
"id": "602ca0bc-080a-4a4e-a413-0e705aeab189",
|
||||||
"metadata": {},
|
"metadata": {},
|
||||||
"source": [
|
"source": [
|
||||||
"And when they're finished, you can parse the results:"
|
"And when they're finished, you can parse the results:\n"
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
@ -223,7 +225,7 @@
|
|||||||
}
|
}
|
||||||
],
|
],
|
||||||
"source": [
|
"source": [
|
||||||
"parser.is_running(operations)"
|
"parser.is_running(operations)\n"
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
@ -242,7 +244,7 @@
|
|||||||
],
|
],
|
||||||
"source": [
|
"source": [
|
||||||
"results = parser.get_results(operations)\n",
|
"results = parser.get_results(operations)\n",
|
||||||
"print(results[0])"
|
"print(results[0])\n"
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
@ -250,7 +252,7 @@
|
|||||||
"id": "87e5b606-1679-46c7-9577-4cf9bc93a752",
|
"id": "87e5b606-1679-46c7-9577-4cf9bc93a752",
|
||||||
"metadata": {},
|
"metadata": {},
|
||||||
"source": [
|
"source": [
|
||||||
"And now we can finally generate Documents from parsed results:"
|
"And now we can finally generate Documents from parsed results:\n"
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
@ -260,7 +262,7 @@
|
|||||||
"metadata": {},
|
"metadata": {},
|
||||||
"outputs": [],
|
"outputs": [],
|
||||||
"source": [
|
"source": [
|
||||||
"docs = list(parser.parse_from_results(results))"
|
"docs = list(parser.parse_from_results(results))\n"
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
@ -278,7 +280,7 @@
|
|||||||
}
|
}
|
||||||
],
|
],
|
||||||
"source": [
|
"source": [
|
||||||
"print(len(docs))"
|
"print(len(docs))\n"
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
],
|
],
|
||||||
@ -290,7 +292,7 @@
|
|||||||
"uri": "gcr.io/deeplearning-platform-release/base-cpu:m109"
|
"uri": "gcr.io/deeplearning-platform-release/base-cpu:m109"
|
||||||
},
|
},
|
||||||
"kernelspec": {
|
"kernelspec": {
|
||||||
"display_name": "Python 3 (ipykernel)",
|
"display_name": "Python 3",
|
||||||
"language": "python",
|
"language": "python",
|
||||||
"name": "python3"
|
"name": "python3"
|
||||||
},
|
},
|
||||||
@ -304,7 +306,7 @@
|
|||||||
"name": "python",
|
"name": "python",
|
||||||
"nbconvert_exporter": "python",
|
"nbconvert_exporter": "python",
|
||||||
"pygments_lexer": "ipython3",
|
"pygments_lexer": "ipython3",
|
||||||
"version": "3.10.12"
|
"version": "3.10.11"
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"nbformat": 4,
|
"nbformat": 4,
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
"""Module contains a PDF parser based on DocAI from Google Cloud.
|
"""Module contains a PDF parser based on Document AI from Google Cloud.
|
||||||
|
|
||||||
You need to install two libraries to use this parser:
|
You need to install two libraries to use this parser:
|
||||||
pip install google-cloud-documentai
|
pip install google-cloud-documentai
|
||||||
@ -24,13 +24,19 @@ logger = logging.getLogger(__name__)
|
|||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class DocAIParsingResults:
|
class DocAIParsingResults:
|
||||||
"""A dataclass to store DocAI parsing results."""
|
"""A dataclass to store Document AI parsing results."""
|
||||||
|
|
||||||
source_path: str
|
source_path: str
|
||||||
parsed_path: str
|
parsed_path: str
|
||||||
|
|
||||||
|
|
||||||
class DocAIParser(BaseBlobParser):
|
class DocAIParser(BaseBlobParser):
|
||||||
|
"""`Google Cloud Document AI` parser.
|
||||||
|
|
||||||
|
For a detailed explanation of Document AI, refer to the product documentation.
|
||||||
|
https://cloud.google.com/document-ai/docs/overview
|
||||||
|
"""
|
||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
*,
|
*,
|
||||||
@ -43,19 +49,16 @@ class DocAIParser(BaseBlobParser):
|
|||||||
|
|
||||||
Args:
|
Args:
|
||||||
client: a DocumentProcessorServiceClient to use
|
client: a DocumentProcessorServiceClient to use
|
||||||
location: a GCP location where a DOcAI parser is located
|
location: a Google Cloud location where a Document AI processor is located
|
||||||
gcs_output_path: a path on GCS to store parsing results
|
gcs_output_path: a path on Google Cloud Storage to store parsing results
|
||||||
processor_name: name of a processor
|
processor_name: full resource name of a Document AI processor or processor
|
||||||
|
version
|
||||||
|
|
||||||
You should provide either a client or location (and then a client
|
You should provide either a client or location (and then a client
|
||||||
would be instantiated).
|
would be instantiated).
|
||||||
"""
|
"""
|
||||||
if client and location:
|
|
||||||
raise ValueError(
|
if bool(client) == bool(location):
|
||||||
"You should provide either a client or a location but not both "
|
|
||||||
"of them."
|
|
||||||
)
|
|
||||||
if not client and not location:
|
|
||||||
raise ValueError(
|
raise ValueError(
|
||||||
"You must specify either a client or a location to instantiate "
|
"You must specify either a client or a location to instantiate "
|
||||||
"a client."
|
"a client."
|
||||||
@ -69,11 +72,11 @@ class DocAIParser(BaseBlobParser):
|
|||||||
try:
|
try:
|
||||||
from google.api_core.client_options import ClientOptions
|
from google.api_core.client_options import ClientOptions
|
||||||
from google.cloud.documentai import DocumentProcessorServiceClient
|
from google.cloud.documentai import DocumentProcessorServiceClient
|
||||||
except ImportError:
|
except ImportError as exc:
|
||||||
raise ImportError(
|
raise ImportError(
|
||||||
"documentai package not found, please install it with"
|
"documentai package not found, please install it with"
|
||||||
" `pip install google-cloud-documentai`"
|
" `pip install google-cloud-documentai`"
|
||||||
)
|
) from exc
|
||||||
options = ClientOptions(
|
options = ClientOptions(
|
||||||
api_endpoint=f"{location}-documentai.googleapis.com"
|
api_endpoint=f"{location}-documentai.googleapis.com"
|
||||||
)
|
)
|
||||||
@ -85,11 +88,86 @@ class DocAIParser(BaseBlobParser):
|
|||||||
Args:
|
Args:
|
||||||
blobs: a Blob to parse
|
blobs: a Blob to parse
|
||||||
|
|
||||||
This is a long-running operations! A recommended way is to batch
|
This is a long-running operation. A recommended way is to batch
|
||||||
documents together and use `batch_parse` method.
|
documents together and use the `batch_parse()` method.
|
||||||
"""
|
"""
|
||||||
yield from self.batch_parse([blob], gcs_output_path=self._gcs_output_path)
|
yield from self.batch_parse([blob], gcs_output_path=self._gcs_output_path)
|
||||||
|
|
||||||
|
def online_process(
|
||||||
|
self,
|
||||||
|
blob: Blob,
|
||||||
|
enable_native_pdf_parsing: bool = True,
|
||||||
|
field_mask: Optional[str] = None,
|
||||||
|
page_range: Optional[List[int]] = None,
|
||||||
|
) -> Iterator[Document]:
|
||||||
|
"""Parses a blob lazily using online processing.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
blob: a blob to parse.
|
||||||
|
enable_native_pdf_parsing: enable pdf embedded text extraction
|
||||||
|
field_mask: a comma-separated list of which fields to include in the
|
||||||
|
Document AI response.
|
||||||
|
suggested: "text,pages.pageNumber,pages.layout"
|
||||||
|
page_range: list of page numbers to parse. If `None`,
|
||||||
|
entire document will be parsed.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
from google.cloud import documentai
|
||||||
|
from google.cloud.documentai_v1.types import (
|
||||||
|
IndividualPageSelector,
|
||||||
|
OcrConfig,
|
||||||
|
ProcessOptions,
|
||||||
|
)
|
||||||
|
except ImportError as exc:
|
||||||
|
raise ImportError(
|
||||||
|
"documentai package not found, please install it with"
|
||||||
|
" `pip install google-cloud-documentai`"
|
||||||
|
) from exc
|
||||||
|
try:
|
||||||
|
from google.cloud.documentai_toolbox.wrappers.document import (
|
||||||
|
Document as WrappedDocument,
|
||||||
|
)
|
||||||
|
except ImportError as exc:
|
||||||
|
raise ImportError(
|
||||||
|
"documentai_toolbox package not found, please install it with"
|
||||||
|
" `pip install google-cloud-documentai-toolbox`"
|
||||||
|
) from exc
|
||||||
|
ocr_config = (
|
||||||
|
OcrConfig(enable_native_pdf_parsing=enable_native_pdf_parsing)
|
||||||
|
if enable_native_pdf_parsing
|
||||||
|
else None
|
||||||
|
)
|
||||||
|
individual_page_selector = (
|
||||||
|
IndividualPageSelector(pages=page_range) if page_range else None
|
||||||
|
)
|
||||||
|
|
||||||
|
response = self._client.process_document(
|
||||||
|
documentai.ProcessRequest(
|
||||||
|
name=self._processor_name,
|
||||||
|
gcs_document=documentai.GcsDocument(
|
||||||
|
gcs_uri=blob.path,
|
||||||
|
mime_type=blob.mimetype or "application/pdf",
|
||||||
|
),
|
||||||
|
process_options=ProcessOptions(
|
||||||
|
ocr_config=ocr_config,
|
||||||
|
individual_page_selector=individual_page_selector,
|
||||||
|
),
|
||||||
|
skip_human_review=True,
|
||||||
|
field_mask=field_mask,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
wrapped_document = WrappedDocument.from_documentai_document(response.document)
|
||||||
|
yield from (
|
||||||
|
Document(
|
||||||
|
page_content=page.text,
|
||||||
|
metadata={
|
||||||
|
"page": page.page_number,
|
||||||
|
"source": wrapped_document.gcs_input_uri,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
for page in wrapped_document.pages
|
||||||
|
)
|
||||||
|
|
||||||
def batch_parse(
|
def batch_parse(
|
||||||
self,
|
self,
|
||||||
blobs: Sequence[Blob],
|
blobs: Sequence[Blob],
|
||||||
@ -100,13 +178,13 @@ class DocAIParser(BaseBlobParser):
|
|||||||
"""Parses a list of blobs lazily.
|
"""Parses a list of blobs lazily.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
blobs: a list of blobs to parse
|
blobs: a list of blobs to parse.
|
||||||
gcs_output_path: a path on GCS to store parsing results
|
gcs_output_path: a path on Google Cloud Storage to store parsing results.
|
||||||
timeout_sec: a timeout to wait for DocAI to complete, in seconds
|
timeout_sec: a timeout to wait for Document AI to complete, in seconds.
|
||||||
check_in_interval_sec: an interval to wait until next check
|
check_in_interval_sec: an interval to wait until next check
|
||||||
whether parsing operations have been completed, in seconds
|
whether parsing operations have been completed, in seconds
|
||||||
This is a long-running operations! A recommended way is to decouple
|
This is a long-running operation. A recommended way is to decouple
|
||||||
parsing from creating Langchain Documents:
|
parsing from creating LangChain Documents:
|
||||||
>>> operations = parser.docai_parse(blobs, gcs_path)
|
>>> operations = parser.docai_parse(blobs, gcs_path)
|
||||||
>>> parser.is_running(operations)
|
>>> parser.is_running(operations)
|
||||||
You can get operations names and save them:
|
You can get operations names and save them:
|
||||||
@ -116,23 +194,22 @@ class DocAIParser(BaseBlobParser):
|
|||||||
>>> results = parser.get_results(operations)
|
>>> results = parser.get_results(operations)
|
||||||
>>> docs = parser.parse_from_results(results)
|
>>> docs = parser.parse_from_results(results)
|
||||||
"""
|
"""
|
||||||
output_path = gcs_output_path if gcs_output_path else self._gcs_output_path
|
output_path = gcs_output_path or self._gcs_output_path
|
||||||
if output_path is None:
|
if not output_path:
|
||||||
raise ValueError("An output path on GCS should be provided!")
|
raise ValueError(
|
||||||
|
"An output path on Google Cloud Storage should be provided."
|
||||||
|
)
|
||||||
operations = self.docai_parse(blobs, gcs_output_path=output_path)
|
operations = self.docai_parse(blobs, gcs_output_path=output_path)
|
||||||
operation_names = [op.operation.name for op in operations]
|
operation_names = [op.operation.name for op in operations]
|
||||||
logger.debug(
|
logger.debug(
|
||||||
f"Started parsing with DocAI, submitted operations {operation_names}"
|
"Started parsing with Document AI, submitted operations %s", operation_names
|
||||||
)
|
)
|
||||||
is_running, time_elapsed = True, 0
|
time_elapsed = 0
|
||||||
while is_running:
|
while self.is_running(operations):
|
||||||
is_running = self.is_running(operations)
|
|
||||||
if not is_running:
|
|
||||||
break
|
|
||||||
time.sleep(check_in_interval_sec)
|
time.sleep(check_in_interval_sec)
|
||||||
time_elapsed += check_in_interval_sec
|
time_elapsed += check_in_interval_sec
|
||||||
if time_elapsed > timeout_sec:
|
if time_elapsed > timeout_sec:
|
||||||
raise ValueError(
|
raise TimeoutError(
|
||||||
"Timeout exceeded! Check operations " f"{operation_names} later!"
|
"Timeout exceeded! Check operations " f"{operation_names} later!"
|
||||||
)
|
)
|
||||||
logger.debug(".")
|
logger.debug(".")
|
||||||
@ -144,32 +221,32 @@ class DocAIParser(BaseBlobParser):
|
|||||||
self, results: List[DocAIParsingResults]
|
self, results: List[DocAIParsingResults]
|
||||||
) -> Iterator[Document]:
|
) -> Iterator[Document]:
|
||||||
try:
|
try:
|
||||||
from google.cloud.documentai_toolbox.wrappers.document import _get_shards
|
from google.cloud.documentai_toolbox.utilities.gcs_utilities import (
|
||||||
from google.cloud.documentai_toolbox.wrappers.page import _text_from_layout
|
split_gcs_uri,
|
||||||
except ImportError:
|
)
|
||||||
|
from google.cloud.documentai_toolbox.wrappers.document import (
|
||||||
|
Document as WrappedDocument,
|
||||||
|
)
|
||||||
|
except ImportError as exc:
|
||||||
raise ImportError(
|
raise ImportError(
|
||||||
"documentai_toolbox package not found, please install it with"
|
"documentai_toolbox package not found, please install it with"
|
||||||
" `pip install google-cloud-documentai-toolbox`"
|
" `pip install google-cloud-documentai-toolbox`"
|
||||||
)
|
) from exc
|
||||||
for result in results:
|
for result in results:
|
||||||
output_gcs = result.parsed_path.split("/")
|
gcs_bucket_name, gcs_prefix = split_gcs_uri(result.parsed_path)
|
||||||
gcs_bucket_name = output_gcs[2]
|
wrapped_document = WrappedDocument.from_gcs(
|
||||||
gcs_prefix = "/".join(output_gcs[3:]) + "/"
|
gcs_bucket_name, gcs_prefix, gcs_input_uri=result.source_path
|
||||||
shards = _get_shards(gcs_bucket_name, gcs_prefix)
|
)
|
||||||
docs, page_number = [], 1
|
yield from (
|
||||||
for shard in shards:
|
|
||||||
for page in shard.pages:
|
|
||||||
docs.append(
|
|
||||||
Document(
|
Document(
|
||||||
page_content=_text_from_layout(page.layout, shard.text),
|
page_content=page.text,
|
||||||
metadata={
|
metadata={
|
||||||
"page": page_number,
|
"page": page.page_number,
|
||||||
"source": result.source_path,
|
"source": wrapped_document.gcs_input_uri,
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
for page in wrapped_document.pages
|
||||||
)
|
)
|
||||||
page_number += 1
|
|
||||||
yield from docs
|
|
||||||
|
|
||||||
def operations_from_names(self, operation_names: List[str]) -> List["Operation"]:
|
def operations_from_names(self, operation_names: List[str]) -> List["Operation"]:
|
||||||
"""Initializes Long-Running Operations from their names."""
|
"""Initializes Long-Running Operations from their names."""
|
||||||
@ -177,116 +254,127 @@ class DocAIParser(BaseBlobParser):
|
|||||||
from google.longrunning.operations_pb2 import (
|
from google.longrunning.operations_pb2 import (
|
||||||
GetOperationRequest, # type: ignore
|
GetOperationRequest, # type: ignore
|
||||||
)
|
)
|
||||||
except ImportError:
|
except ImportError as exc:
|
||||||
raise ImportError(
|
raise ImportError(
|
||||||
"documentai package not found, please install it with"
|
"long running operations package not found, please install it with"
|
||||||
" `pip install gapic-google-longrunning`"
|
" `pip install gapic-google-longrunning`"
|
||||||
)
|
) from exc
|
||||||
|
|
||||||
operations = []
|
return [
|
||||||
for name in operation_names:
|
self._client.get_operation(request=GetOperationRequest(name=name))
|
||||||
request = GetOperationRequest(name=name)
|
for name in operation_names
|
||||||
operations.append(self._client.get_operation(request=request))
|
]
|
||||||
return operations
|
|
||||||
|
|
||||||
def is_running(self, operations: List["Operation"]) -> bool:
|
def is_running(self, operations: List["Operation"]) -> bool:
|
||||||
for op in operations:
|
return any(not op.done() for op in operations)
|
||||||
if not op.done():
|
|
||||||
return True
|
|
||||||
return False
|
|
||||||
|
|
||||||
def docai_parse(
|
def docai_parse(
|
||||||
self,
|
self,
|
||||||
blobs: Sequence[Blob],
|
blobs: Sequence[Blob],
|
||||||
*,
|
*,
|
||||||
gcs_output_path: Optional[str] = None,
|
gcs_output_path: Optional[str] = None,
|
||||||
batch_size: int = 4000,
|
processor_name: Optional[str] = None,
|
||||||
|
batch_size: int = 1000,
|
||||||
enable_native_pdf_parsing: bool = True,
|
enable_native_pdf_parsing: bool = True,
|
||||||
|
field_mask: Optional[str] = None,
|
||||||
) -> List["Operation"]:
|
) -> List["Operation"]:
|
||||||
"""Runs Google DocAI PDF parser on a list of blobs.
|
"""Runs Google Document AI PDF Batch Processing on a list of blobs.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
blobs: a list of blobs to be parsed
|
blobs: a list of blobs to be parsed
|
||||||
gcs_output_path: a path (folder) on GCS to store results
|
gcs_output_path: a path (folder) on GCS to store results
|
||||||
|
processor_name: name of a Document AI processor.
|
||||||
batch_size: amount of documents per batch
|
batch_size: amount of documents per batch
|
||||||
enable_native_pdf_parsing: a config option for the parser
|
enable_native_pdf_parsing: a config option for the parser
|
||||||
|
field_mask: a comma-separated list of which fields to include in the
|
||||||
|
Document AI response.
|
||||||
|
suggested: "text,pages.pageNumber,pages.layout"
|
||||||
|
|
||||||
DocAI has a limit on the amount of documents per batch, that's why split a
|
Document AI has a 1000 file limit per batch, so batches larger than that need
|
||||||
batch into mini-batches. Parsing is an async long-running operation
|
to be split into multiple requests.
|
||||||
on Google Cloud and results are stored in a output GCS bucket.
|
Batch processing is an async long-running operation
|
||||||
|
and results are stored in a output GCS bucket.
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
from google.cloud import documentai
|
from google.cloud import documentai
|
||||||
from google.cloud.documentai_v1.types import OcrConfig, ProcessOptions
|
from google.cloud.documentai_v1.types import OcrConfig, ProcessOptions
|
||||||
except ImportError:
|
except ImportError as exc:
|
||||||
raise ImportError(
|
raise ImportError(
|
||||||
"documentai package not found, please install it with"
|
"documentai package not found, please install it with"
|
||||||
" `pip install google-cloud-documentai`"
|
" `pip install google-cloud-documentai`"
|
||||||
)
|
) from exc
|
||||||
|
|
||||||
if not self._processor_name:
|
output_path = gcs_output_path or self._gcs_output_path
|
||||||
raise ValueError("Processor name is not defined, aborting!")
|
|
||||||
output_path = gcs_output_path if gcs_output_path else self._gcs_output_path
|
|
||||||
if output_path is None:
|
if output_path is None:
|
||||||
raise ValueError("An output path on GCS should be provided!")
|
raise ValueError(
|
||||||
|
"An output path on Google Cloud Storage should be provided."
|
||||||
|
)
|
||||||
|
processor_name = processor_name or self._processor_name
|
||||||
|
if processor_name is None:
|
||||||
|
raise ValueError("A Document AI processor name should be provided.")
|
||||||
|
|
||||||
operations = []
|
operations = []
|
||||||
for batch in batch_iterate(size=batch_size, iterable=blobs):
|
for batch in batch_iterate(size=batch_size, iterable=blobs):
|
||||||
documents = []
|
|
||||||
for blob in batch:
|
|
||||||
gcs_document = documentai.GcsDocument(
|
|
||||||
gcs_uri=blob.path, mime_type="application/pdf"
|
|
||||||
)
|
|
||||||
documents.append(gcs_document)
|
|
||||||
gcs_documents = documentai.GcsDocuments(documents=documents)
|
|
||||||
|
|
||||||
input_config = documentai.BatchDocumentsInputConfig(
|
input_config = documentai.BatchDocumentsInputConfig(
|
||||||
gcs_documents=gcs_documents
|
gcs_documents=documentai.GcsDocuments(
|
||||||
|
documents=[
|
||||||
|
documentai.GcsDocument(
|
||||||
|
gcs_uri=blob.path,
|
||||||
|
mime_type=blob.mimetype or "application/pdf",
|
||||||
|
)
|
||||||
|
for blob in batch
|
||||||
|
]
|
||||||
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
gcs_output_config = documentai.DocumentOutputConfig.GcsOutputConfig(
|
|
||||||
gcs_uri=output_path, field_mask=None
|
|
||||||
)
|
|
||||||
output_config = documentai.DocumentOutputConfig(
|
output_config = documentai.DocumentOutputConfig(
|
||||||
gcs_output_config=gcs_output_config
|
gcs_output_config=documentai.DocumentOutputConfig.GcsOutputConfig(
|
||||||
|
gcs_uri=output_path, field_mask=field_mask
|
||||||
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
if enable_native_pdf_parsing:
|
process_options = (
|
||||||
process_options = ProcessOptions(
|
ProcessOptions(
|
||||||
ocr_config=OcrConfig(
|
ocr_config=OcrConfig(
|
||||||
enable_native_pdf_parsing=enable_native_pdf_parsing
|
enable_native_pdf_parsing=enable_native_pdf_parsing
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
else:
|
if enable_native_pdf_parsing
|
||||||
process_options = ProcessOptions()
|
else None
|
||||||
request = documentai.BatchProcessRequest(
|
)
|
||||||
name=self._processor_name,
|
operations.append(
|
||||||
|
self._client.batch_process_documents(
|
||||||
|
documentai.BatchProcessRequest(
|
||||||
|
name=processor_name,
|
||||||
input_documents=input_config,
|
input_documents=input_config,
|
||||||
document_output_config=output_config,
|
document_output_config=output_config,
|
||||||
process_options=process_options,
|
process_options=process_options,
|
||||||
|
skip_human_review=True,
|
||||||
|
)
|
||||||
|
)
|
||||||
)
|
)
|
||||||
operations.append(self._client.batch_process_documents(request))
|
|
||||||
return operations
|
return operations
|
||||||
|
|
||||||
def get_results(self, operations: List["Operation"]) -> List[DocAIParsingResults]:
|
def get_results(self, operations: List["Operation"]) -> List[DocAIParsingResults]:
|
||||||
try:
|
try:
|
||||||
from google.cloud.documentai_v1 import BatchProcessMetadata
|
from google.cloud.documentai_v1 import BatchProcessMetadata
|
||||||
except ImportError:
|
except ImportError as exc:
|
||||||
raise ImportError(
|
raise ImportError(
|
||||||
"documentai package not found, please install it with"
|
"documentai package not found, please install it with"
|
||||||
" `pip install google-cloud-documentai`"
|
" `pip install google-cloud-documentai`"
|
||||||
)
|
) from exc
|
||||||
|
|
||||||
results = []
|
return [
|
||||||
for op in operations:
|
DocAIParsingResults(
|
||||||
if isinstance(op.metadata, BatchProcessMetadata):
|
source_path=status.input_gcs_source,
|
||||||
metadata = op.metadata
|
parsed_path=status.output_gcs_destination,
|
||||||
else:
|
|
||||||
metadata = BatchProcessMetadata.deserialize(op.metadata.value)
|
|
||||||
for status in metadata.individual_process_statuses:
|
|
||||||
source = status.input_gcs_source
|
|
||||||
output = status.output_gcs_destination
|
|
||||||
results.append(
|
|
||||||
DocAIParsingResults(source_path=source, parsed_path=output)
|
|
||||||
)
|
)
|
||||||
return results
|
for op in operations
|
||||||
|
for status in (
|
||||||
|
op.metadata.individual_process_statuses
|
||||||
|
if isinstance(op.metadata, BatchProcessMetadata)
|
||||||
|
else BatchProcessMetadata.deserialize(
|
||||||
|
op.metadata.value
|
||||||
|
).individual_process_statuses
|
||||||
|
)
|
||||||
|
]
|
||||||
|
Loading…
Reference in New Issue
Block a user