mirror of
https://github.com/hwchase17/langchain.git
synced 2025-09-09 23:12:38 +00:00
community[minor]: Implement Doctran async execution (#22372)
**Description** The DoctranTextTranslator has an async transform function that was not implemented because [the doctran library](https://github.com/psychic-api/doctran) uses a sync version of the `execute` method. - I implemented the `DoctranTextTranslator.atransform_documents()` method using `asyncio.to_thread` to run the function in a separate thread. - I updated the example in the Notebook with the new async version. - The performance improvements can be appreciated when a big document is divided into multiple chunks. Relates to: - Issue #14645: https://github.com/langchain-ai/langchain/issues/14645 - Issue #14437: https://github.com/langchain-ai/langchain/issues/14437 - https://github.com/langchain-ai/langchain/pull/15264 --------- Co-authored-by: Eugene Yurtsev <eyurtsev@gmail.com>
This commit is contained in:
@@ -1,6 +1,10 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from typing import Any, Optional, Sequence
|
||||
|
||||
from langchain_core.documents import BaseDocumentTransformer, Document
|
||||
from langchain_core.runnables.config import run_in_executor
|
||||
from langchain_core.utils import get_from_env
|
||||
|
||||
|
||||
@@ -36,10 +40,58 @@ class DoctranTextTranslator(BaseDocumentTransformer):
|
||||
)
|
||||
self.language = language
|
||||
|
||||
async def _aparse_document(
|
||||
self, doctran: Any, index: int, doc: Document
|
||||
) -> tuple[int, Any]:
|
||||
parsed_doc = await run_in_executor(
|
||||
None, doctran.parse, content=doc.page_content, metadata=doc.metadata
|
||||
)
|
||||
return index, parsed_doc
|
||||
|
||||
async def _atranslate_document(
|
||||
self, index: int, doc: Any, language: str
|
||||
) -> tuple[int, Any]:
|
||||
translated_doc = await run_in_executor(
|
||||
None, lambda: doc.translate(language=language).execute()
|
||||
)
|
||||
return index, translated_doc
|
||||
|
||||
async def atransform_documents(
|
||||
self, documents: Sequence[Document], **kwargs: Any
|
||||
) -> Sequence[Document]:
|
||||
raise NotImplementedError
|
||||
"""Translates text documents using doctran."""
|
||||
try:
|
||||
from doctran import Doctran
|
||||
|
||||
doctran = Doctran(
|
||||
openai_api_key=self.openai_api_key, openai_model=self.openai_api_model
|
||||
)
|
||||
except ImportError:
|
||||
raise ImportError(
|
||||
"Install doctran to use this parser. (pip install doctran)"
|
||||
)
|
||||
|
||||
parse_tasks = [
|
||||
self._aparse_document(doctran, i, doc) for i, doc in enumerate(documents)
|
||||
]
|
||||
parsed_results = await asyncio.gather(*parse_tasks)
|
||||
|
||||
parsed_results.sort(key=lambda x: x[0])
|
||||
doctran_docs = [doc for _, doc in parsed_results]
|
||||
|
||||
translate_tasks = [
|
||||
self._atranslate_document(i, doc, self.language)
|
||||
for i, doc in enumerate(doctran_docs)
|
||||
]
|
||||
translated_results = await asyncio.gather(*translate_tasks)
|
||||
|
||||
translated_results.sort(key=lambda x: x[0])
|
||||
translated_docs = [doc for _, doc in translated_results]
|
||||
|
||||
return [
|
||||
Document(page_content=doc.transformed_content, metadata=doc.metadata)
|
||||
for doc in translated_docs
|
||||
]
|
||||
|
||||
def transform_documents(
|
||||
self, documents: Sequence[Document], **kwargs: Any
|
||||
|
Reference in New Issue
Block a user