feat: Add ApifyWrapper class (#10067)

If you look at documentation
https://python.langchain.com/docs/integrations/tools/apify (or the
actual file
https://github.com/langchain-ai/langchain/blob/master/docs/extras/integrations/tools/apify.ipynb
), there's a class `ApifyWrapper` mentioned. It seems it got lost in
some refactoring, i.e. it does not exist in the codebase ATM.

I just propose to add it back.
It would fix issues e.g.
https://github.com/langchain-ai/langchain/issues/8307 or
https://github.com/langchain-ai/langchain/issues/8201

To add, Apify is a wanted integration, e.g. see
https://twitter.com/hwchase17/status/1695490295914545626 or
https://twitter.com/hwchase17/status/1695470765343461756

Lastly, I offer taking ownership of the Apify-related parts of the
codebase, so you can tag me if anything is needed.
This commit is contained in:
Jiří Moravčík 2023-09-01 00:47:44 +02:00 committed by GitHub
parent 02e51f4217
commit 86646ec555
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 196 additions and 0 deletions

View File

@ -4,6 +4,7 @@ Other LangChain classes use **Utilities** to interact with third-part systems
and packages. and packages.
""" """
from langchain.utilities.alpha_vantage import AlphaVantageAPIWrapper from langchain.utilities.alpha_vantage import AlphaVantageAPIWrapper
from langchain.utilities.apify import ApifyWrapper
from langchain.utilities.arxiv import ArxivAPIWrapper from langchain.utilities.arxiv import ArxivAPIWrapper
from langchain.utilities.awslambda import LambdaWrapper from langchain.utilities.awslambda import LambdaWrapper
from langchain.utilities.bash import BashProcess from langchain.utilities.bash import BashProcess
@ -38,6 +39,7 @@ from langchain.utilities.zapier import ZapierNLAWrapper
__all__ = [ __all__ = [
"AlphaVantageAPIWrapper", "AlphaVantageAPIWrapper",
"ApifyWrapper",
"ArxivAPIWrapper", "ArxivAPIWrapper",
"BashProcess", "BashProcess",
"BibtexparserWrapper", "BibtexparserWrapper",

View File

@ -0,0 +1,194 @@
from typing import Any, Callable, Dict, Optional
from langchain.document_loaders import ApifyDatasetLoader
from langchain.document_loaders.base import Document
from langchain.pydantic_v1 import BaseModel, root_validator
from langchain.utils import get_from_dict_or_env
class ApifyWrapper(BaseModel):
"""Wrapper around Apify.
To use, you should have the ``apify-client`` python package installed,
and the environment variable ``APIFY_API_TOKEN`` set with your API key, or pass
`apify_api_token` as a named parameter to the constructor.
"""
apify_client: Any
apify_client_async: Any
@root_validator()
def validate_environment(cls, values: Dict) -> Dict:
"""Validate environment.
Validate that an Apify API token is set and the apify-client
Python package exists in the current environment.
"""
apify_api_token = get_from_dict_or_env(
values, "apify_api_token", "APIFY_API_TOKEN"
)
try:
from apify_client import ApifyClient, ApifyClientAsync
values["apify_client"] = ApifyClient(apify_api_token)
values["apify_client_async"] = ApifyClientAsync(apify_api_token)
except ImportError:
raise ValueError(
"Could not import apify-client Python package. "
"Please install it with `pip install apify-client`."
)
return values
def call_actor(
self,
actor_id: str,
run_input: Dict,
dataset_mapping_function: Callable[[Dict], Document],
*,
build: Optional[str] = None,
memory_mbytes: Optional[int] = None,
timeout_secs: Optional[int] = None,
) -> ApifyDatasetLoader:
"""Run an Actor on the Apify platform and wait for results to be ready.
Args:
actor_id (str): The ID or name of the Actor on the Apify platform.
run_input (Dict): The input object of the Actor that you're trying to run.
dataset_mapping_function (Callable): A function that takes a single
dictionary (an Apify dataset item) and converts it to an
instance of the Document class.
build (str, optional): Optionally specifies the actor build to run.
It can be either a build tag or build number.
memory_mbytes (int, optional): Optional memory limit for the run,
in megabytes.
timeout_secs (int, optional): Optional timeout for the run, in seconds.
Returns:
ApifyDatasetLoader: A loader that will fetch the records from the
Actor run's default dataset.
"""
actor_call = self.apify_client.actor(actor_id).call(
run_input=run_input,
build=build,
memory_mbytes=memory_mbytes,
timeout_secs=timeout_secs,
)
return ApifyDatasetLoader(
dataset_id=actor_call["defaultDatasetId"],
dataset_mapping_function=dataset_mapping_function,
)
async def acall_actor(
self,
actor_id: str,
run_input: Dict,
dataset_mapping_function: Callable[[Dict], Document],
*,
build: Optional[str] = None,
memory_mbytes: Optional[int] = None,
timeout_secs: Optional[int] = None,
) -> ApifyDatasetLoader:
"""Run an Actor on the Apify platform and wait for results to be ready.
Args:
actor_id (str): The ID or name of the Actor on the Apify platform.
run_input (Dict): The input object of the Actor that you're trying to run.
dataset_mapping_function (Callable): A function that takes a single
dictionary (an Apify dataset item) and converts it to
an instance of the Document class.
build (str, optional): Optionally specifies the actor build to run.
It can be either a build tag or build number.
memory_mbytes (int, optional): Optional memory limit for the run,
in megabytes.
timeout_secs (int, optional): Optional timeout for the run, in seconds.
Returns:
ApifyDatasetLoader: A loader that will fetch the records from the
Actor run's default dataset.
"""
actor_call = await self.apify_client_async.actor(actor_id).call(
run_input=run_input,
build=build,
memory_mbytes=memory_mbytes,
timeout_secs=timeout_secs,
)
return ApifyDatasetLoader(
dataset_id=actor_call["defaultDatasetId"],
dataset_mapping_function=dataset_mapping_function,
)
def call_actor_task(
self,
task_id: str,
task_input: Dict,
dataset_mapping_function: Callable[[Dict], Document],
*,
build: Optional[str] = None,
memory_mbytes: Optional[int] = None,
timeout_secs: Optional[int] = None,
) -> ApifyDatasetLoader:
"""Run a saved Actor task on Apify and wait for results to be ready.
Args:
task_id (str): The ID or name of the task on the Apify platform.
task_input (Dict): The input object of the task that you're trying to run.
Overrides the task's saved input.
dataset_mapping_function (Callable): A function that takes a single
dictionary (an Apify dataset item) and converts it to an
instance of the Document class.
build (str, optional): Optionally specifies the actor build to run.
It can be either a build tag or build number.
memory_mbytes (int, optional): Optional memory limit for the run,
in megabytes.
timeout_secs (int, optional): Optional timeout for the run, in seconds.
Returns:
ApifyDatasetLoader: A loader that will fetch the records from the
task run's default dataset.
"""
task_call = self.apify_client.task(task_id).call(
task_input=task_input,
build=build,
memory_mbytes=memory_mbytes,
timeout_secs=timeout_secs,
)
return ApifyDatasetLoader(
dataset_id=task_call["defaultDatasetId"],
dataset_mapping_function=dataset_mapping_function,
)
async def acall_actor_task(
self,
task_id: str,
task_input: Dict,
dataset_mapping_function: Callable[[Dict], Document],
*,
build: Optional[str] = None,
memory_mbytes: Optional[int] = None,
timeout_secs: Optional[int] = None,
) -> ApifyDatasetLoader:
"""Run a saved Actor task on Apify and wait for results to be ready.
Args:
task_id (str): The ID or name of the task on the Apify platform.
task_input (Dict): The input object of the task that you're trying to run.
Overrides the task's saved input.
dataset_mapping_function (Callable): A function that takes a single
dictionary (an Apify dataset item) and converts it to an
instance of the Document class.
build (str, optional): Optionally specifies the actor build to run.
It can be either a build tag or build number.
memory_mbytes (int, optional): Optional memory limit for the run,
in megabytes.
timeout_secs (int, optional): Optional timeout for the run, in seconds.
Returns:
ApifyDatasetLoader: A loader that will fetch the records from the
task run's default dataset.
"""
task_call = await self.apify_client_async.task(task_id).call(
task_input=task_input,
build=build,
memory_mbytes=memory_mbytes,
timeout_secs=timeout_secs,
)
return ApifyDatasetLoader(
dataset_id=task_call["defaultDatasetId"],
dataset_mapping_function=dataset_mapping_function,
)