Add call_actor_task to the Apify integration (#6862)

A user has been testing the Apify integration inside langchain and he
was not able to run saved Actor tasks.

This PR adds support for calling saved Actor tasks on the Apify platform
to the existing integration. The structure of very similar to the one of
calling Actors.
This commit is contained in:
Jiří Moravčík 2023-06-29 07:13:47 +02:00 committed by GitHub
parent 99cfe192da
commit a6b40b73e5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 85 additions and 1 deletions

View File

@ -50,5 +50,7 @@ class ApifyDatasetLoader(BaseLoader, BaseModel):
def load(self) -> List[Document]:
"""Load documents."""
dataset_items = self.apify_client.dataset(self.dataset_id).list_items().items
dataset_items = (
self.apify_client.dataset(self.dataset_id).list_items(clean=True).items
)
return list(map(self.dataset_mapping_function, dataset_items))

View File

@ -121,3 +121,85 @@ class ApifyWrapper(BaseModel):
dataset_id=actor_call["defaultDatasetId"],
dataset_mapping_function=dataset_mapping_function,
)
def call_actor_task(
self,
task_id: str,
task_input: Dict,
dataset_mapping_function: Callable[[Dict], Document],
*,
build: Optional[str] = None,
memory_mbytes: Optional[int] = None,
timeout_secs: Optional[int] = None,
) -> ApifyDatasetLoader:
"""Run a saved Actor task on Apify and wait for results to be ready.
Args:
task_id (str): The ID or name of the task on the Apify platform.
task_input (Dict): The input object of the task that you're trying to run.
Overrides the task's saved input.
dataset_mapping_function (Callable): A function that takes a single
dictionary (an Apify dataset item) and converts it to an
instance of the Document class.
build (str, optional): Optionally specifies the actor build to run.
It can be either a build tag or build number.
memory_mbytes (int, optional): Optional memory limit for the run,
in megabytes.
timeout_secs (int, optional): Optional timeout for the run, in seconds.
Returns:
ApifyDatasetLoader: A loader that will fetch the records from the
task run's default dataset.
"""
task_call = self.apify_client.task(task_id).call(
task_input=task_input,
build=build,
memory_mbytes=memory_mbytes,
timeout_secs=timeout_secs,
)
return ApifyDatasetLoader(
dataset_id=task_call["defaultDatasetId"],
dataset_mapping_function=dataset_mapping_function,
)
async def acall_actor_task(
self,
task_id: str,
task_input: Dict,
dataset_mapping_function: Callable[[Dict], Document],
*,
build: Optional[str] = None,
memory_mbytes: Optional[int] = None,
timeout_secs: Optional[int] = None,
) -> ApifyDatasetLoader:
"""Run a saved Actor task on Apify and wait for results to be ready.
Args:
task_id (str): The ID or name of the task on the Apify platform.
task_input (Dict): The input object of the task that you're trying to run.
Overrides the task's saved input.
dataset_mapping_function (Callable): A function that takes a single
dictionary (an Apify dataset item) and converts it to an
instance of the Document class.
build (str, optional): Optionally specifies the actor build to run.
It can be either a build tag or build number.
memory_mbytes (int, optional): Optional memory limit for the run,
in megabytes.
timeout_secs (int, optional): Optional timeout for the run, in seconds.
Returns:
ApifyDatasetLoader: A loader that will fetch the records from the
task run's default dataset.
"""
task_call = await self.apify_client_async.task(task_id).call(
task_input=task_input,
build=build,
memory_mbytes=memory_mbytes,
timeout_secs=timeout_secs,
)
return ApifyDatasetLoader(
dataset_id=task_call["defaultDatasetId"],
dataset_mapping_function=dataset_mapping_function,
)