From 86646ec555970e01130994dc75f3a0c5d4e52de9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ji=C5=99=C3=AD=20Morav=C4=8D=C3=ADk?= Date: Fri, 1 Sep 2023 00:47:44 +0200 Subject: [PATCH] feat: Add `ApifyWrapper` class (#10067) If you look at documentation https://python.langchain.com/docs/integrations/tools/apify (or the actual file https://github.com/langchain-ai/langchain/blob/master/docs/extras/integrations/tools/apify.ipynb ), there's a class `ApifyWrapper` mentioned. It seems it got lost in some refactoring, i.e. it does not exist in the codebase ATM. I just propose to add it back. It would fix issues e.g. https://github.com/langchain-ai/langchain/issues/8307 or https://github.com/langchain-ai/langchain/issues/8201 To add, Apify is a wanted integration, e.g. see https://twitter.com/hwchase17/status/1695490295914545626 or https://twitter.com/hwchase17/status/1695470765343461756 Lastly, I offer taking ownership of the Apify-related parts of the codebase, so you can tag me if anything is needed. --- .../langchain/langchain/utilities/__init__.py | 2 + libs/langchain/langchain/utilities/apify.py | 194 ++++++++++++++++++ 2 files changed, 196 insertions(+) create mode 100644 libs/langchain/langchain/utilities/apify.py diff --git a/libs/langchain/langchain/utilities/__init__.py b/libs/langchain/langchain/utilities/__init__.py index 3365c60442b..9f7ebc7dcbe 100644 --- a/libs/langchain/langchain/utilities/__init__.py +++ b/libs/langchain/langchain/utilities/__init__.py @@ -4,6 +4,7 @@ Other LangChain classes use **Utilities** to interact with third-part systems and packages. """ from langchain.utilities.alpha_vantage import AlphaVantageAPIWrapper +from langchain.utilities.apify import ApifyWrapper from langchain.utilities.arxiv import ArxivAPIWrapper from langchain.utilities.awslambda import LambdaWrapper from langchain.utilities.bash import BashProcess @@ -38,6 +39,7 @@ from langchain.utilities.zapier import ZapierNLAWrapper __all__ = [ "AlphaVantageAPIWrapper", + "ApifyWrapper", "ArxivAPIWrapper", "BashProcess", "BibtexparserWrapper", diff --git a/libs/langchain/langchain/utilities/apify.py b/libs/langchain/langchain/utilities/apify.py new file mode 100644 index 00000000000..dd7ddcd01d0 --- /dev/null +++ b/libs/langchain/langchain/utilities/apify.py @@ -0,0 +1,194 @@ +from typing import Any, Callable, Dict, Optional + +from langchain.document_loaders import ApifyDatasetLoader +from langchain.document_loaders.base import Document +from langchain.pydantic_v1 import BaseModel, root_validator +from langchain.utils import get_from_dict_or_env + + +class ApifyWrapper(BaseModel): + """Wrapper around Apify. + To use, you should have the ``apify-client`` python package installed, + and the environment variable ``APIFY_API_TOKEN`` set with your API key, or pass + `apify_api_token` as a named parameter to the constructor. + """ + + apify_client: Any + apify_client_async: Any + + @root_validator() + def validate_environment(cls, values: Dict) -> Dict: + """Validate environment. + Validate that an Apify API token is set and the apify-client + Python package exists in the current environment. + """ + apify_api_token = get_from_dict_or_env( + values, "apify_api_token", "APIFY_API_TOKEN" + ) + + try: + from apify_client import ApifyClient, ApifyClientAsync + + values["apify_client"] = ApifyClient(apify_api_token) + values["apify_client_async"] = ApifyClientAsync(apify_api_token) + except ImportError: + raise ValueError( + "Could not import apify-client Python package. " + "Please install it with `pip install apify-client`." + ) + + return values + + def call_actor( + self, + actor_id: str, + run_input: Dict, + dataset_mapping_function: Callable[[Dict], Document], + *, + build: Optional[str] = None, + memory_mbytes: Optional[int] = None, + timeout_secs: Optional[int] = None, + ) -> ApifyDatasetLoader: + """Run an Actor on the Apify platform and wait for results to be ready. + Args: + actor_id (str): The ID or name of the Actor on the Apify platform. + run_input (Dict): The input object of the Actor that you're trying to run. + dataset_mapping_function (Callable): A function that takes a single + dictionary (an Apify dataset item) and converts it to an + instance of the Document class. + build (str, optional): Optionally specifies the actor build to run. + It can be either a build tag or build number. + memory_mbytes (int, optional): Optional memory limit for the run, + in megabytes. + timeout_secs (int, optional): Optional timeout for the run, in seconds. + Returns: + ApifyDatasetLoader: A loader that will fetch the records from the + Actor run's default dataset. + """ + actor_call = self.apify_client.actor(actor_id).call( + run_input=run_input, + build=build, + memory_mbytes=memory_mbytes, + timeout_secs=timeout_secs, + ) + + return ApifyDatasetLoader( + dataset_id=actor_call["defaultDatasetId"], + dataset_mapping_function=dataset_mapping_function, + ) + + async def acall_actor( + self, + actor_id: str, + run_input: Dict, + dataset_mapping_function: Callable[[Dict], Document], + *, + build: Optional[str] = None, + memory_mbytes: Optional[int] = None, + timeout_secs: Optional[int] = None, + ) -> ApifyDatasetLoader: + """Run an Actor on the Apify platform and wait for results to be ready. + Args: + actor_id (str): The ID or name of the Actor on the Apify platform. + run_input (Dict): The input object of the Actor that you're trying to run. + dataset_mapping_function (Callable): A function that takes a single + dictionary (an Apify dataset item) and converts it to + an instance of the Document class. + build (str, optional): Optionally specifies the actor build to run. + It can be either a build tag or build number. + memory_mbytes (int, optional): Optional memory limit for the run, + in megabytes. + timeout_secs (int, optional): Optional timeout for the run, in seconds. + Returns: + ApifyDatasetLoader: A loader that will fetch the records from the + Actor run's default dataset. + """ + actor_call = await self.apify_client_async.actor(actor_id).call( + run_input=run_input, + build=build, + memory_mbytes=memory_mbytes, + timeout_secs=timeout_secs, + ) + + return ApifyDatasetLoader( + dataset_id=actor_call["defaultDatasetId"], + dataset_mapping_function=dataset_mapping_function, + ) + + def call_actor_task( + self, + task_id: str, + task_input: Dict, + dataset_mapping_function: Callable[[Dict], Document], + *, + build: Optional[str] = None, + memory_mbytes: Optional[int] = None, + timeout_secs: Optional[int] = None, + ) -> ApifyDatasetLoader: + """Run a saved Actor task on Apify and wait for results to be ready. + Args: + task_id (str): The ID or name of the task on the Apify platform. + task_input (Dict): The input object of the task that you're trying to run. + Overrides the task's saved input. + dataset_mapping_function (Callable): A function that takes a single + dictionary (an Apify dataset item) and converts it to an + instance of the Document class. + build (str, optional): Optionally specifies the actor build to run. + It can be either a build tag or build number. + memory_mbytes (int, optional): Optional memory limit for the run, + in megabytes. + timeout_secs (int, optional): Optional timeout for the run, in seconds. + Returns: + ApifyDatasetLoader: A loader that will fetch the records from the + task run's default dataset. + """ + task_call = self.apify_client.task(task_id).call( + task_input=task_input, + build=build, + memory_mbytes=memory_mbytes, + timeout_secs=timeout_secs, + ) + + return ApifyDatasetLoader( + dataset_id=task_call["defaultDatasetId"], + dataset_mapping_function=dataset_mapping_function, + ) + + async def acall_actor_task( + self, + task_id: str, + task_input: Dict, + dataset_mapping_function: Callable[[Dict], Document], + *, + build: Optional[str] = None, + memory_mbytes: Optional[int] = None, + timeout_secs: Optional[int] = None, + ) -> ApifyDatasetLoader: + """Run a saved Actor task on Apify and wait for results to be ready. + Args: + task_id (str): The ID or name of the task on the Apify platform. + task_input (Dict): The input object of the task that you're trying to run. + Overrides the task's saved input. + dataset_mapping_function (Callable): A function that takes a single + dictionary (an Apify dataset item) and converts it to an + instance of the Document class. + build (str, optional): Optionally specifies the actor build to run. + It can be either a build tag or build number. + memory_mbytes (int, optional): Optional memory limit for the run, + in megabytes. + timeout_secs (int, optional): Optional timeout for the run, in seconds. + Returns: + ApifyDatasetLoader: A loader that will fetch the records from the + task run's default dataset. + """ + task_call = await self.apify_client_async.task(task_id).call( + task_input=task_input, + build=build, + memory_mbytes=memory_mbytes, + timeout_secs=timeout_secs, + ) + + return ApifyDatasetLoader( + dataset_id=task_call["defaultDatasetId"], + dataset_mapping_function=dataset_mapping_function, + )