diff --git a/libs/experimental/langchain_experimental/plan_and_execute/agent_executor.py b/libs/experimental/langchain_experimental/plan_and_execute/agent_executor.py index 8a519637a8f..dbebfe2b63c 100644 --- a/libs/experimental/langchain_experimental/plan_and_execute/agent_executor.py +++ b/libs/experimental/langchain_experimental/plan_and_execute/agent_executor.py @@ -1,6 +1,9 @@ from typing import Any, Dict, List, Optional -from langchain.callbacks.manager import CallbackManagerForChainRun +from langchain.callbacks.manager import ( + AsyncCallbackManagerForChainRun, + CallbackManagerForChainRun, +) from langchain.chains.base import Chain from pydantic import Field @@ -58,3 +61,35 @@ class PlanAndExecute(Chain): ) self.step_container.add_step(step, response) return {self.output_key: self.step_container.get_final_response()} + + async def _acall( + self, + inputs: Dict[str, Any], + run_manager: Optional[AsyncCallbackManagerForChainRun] = None, + ) -> Dict[str, Any]: + plan = await self.planner.aplan( + inputs, + callbacks=run_manager.get_child() if run_manager else None, + ) + if run_manager: + await run_manager.on_text(str(plan), verbose=self.verbose) + for step in plan.steps: + _new_inputs = { + "previous_steps": self.step_container, + "current_step": step, + "objective": inputs[self.input_key], + } + new_inputs = {**_new_inputs, **inputs} + response = await self.executor.astep( + new_inputs, + callbacks=run_manager.get_child() if run_manager else None, + ) + if run_manager: + await run_manager.on_text( + f"*****\n\nStep: {step.value}", verbose=self.verbose + ) + await run_manager.on_text( + f"\n\nResponse: {response.response}", verbose=self.verbose + ) + self.step_container.add_step(step, response) + return {self.output_key: self.step_container.get_final_response()} diff --git a/libs/langchain/langchain/experimental/plan_and_execute/agent_executor.py b/libs/langchain/langchain/experimental/plan_and_execute/agent_executor.py index a4780a96609..d01d8b7517a 100644 --- a/libs/langchain/langchain/experimental/plan_and_execute/agent_executor.py +++ b/libs/langchain/langchain/experimental/plan_and_execute/agent_executor.py @@ -2,7 +2,10 @@ from typing import Any, Dict, List, Optional from pydantic import Field -from langchain.callbacks.manager import CallbackManagerForChainRun +from langchain.callbacks.manager import ( + AsyncCallbackManagerForChainRun, + CallbackManagerForChainRun, +) from langchain.chains.base import Chain from langchain.experimental.plan_and_execute.executors.base import BaseExecutor from langchain.experimental.plan_and_execute.planners.base import BasePlanner @@ -63,3 +66,35 @@ class PlanAndExecute(Chain): ) self.step_container.add_step(step, response) return {self.output_key: self.step_container.get_final_response()} + + async def _acall( + self, + inputs: Dict[str, Any], + run_manager: Optional[AsyncCallbackManagerForChainRun] = None, + ) -> Dict[str, Any]: + plan = await self.planner.aplan( + inputs, + callbacks=run_manager.get_child() if run_manager else None, + ) + if run_manager: + await run_manager.on_text(str(plan), verbose=self.verbose) + for step in plan.steps: + _new_inputs = { + "previous_steps": self.step_container, + "current_step": step, + "objective": inputs[self.input_key], + } + new_inputs = {**_new_inputs, **inputs} + response = await self.executor.astep( + new_inputs, + callbacks=run_manager.get_child() if run_manager else None, + ) + if run_manager: + await run_manager.on_text( + f"*****\n\nStep: {step.value}", verbose=self.verbose + ) + await run_manager.on_text( + f"\n\nResponse: {response.response}", verbose=self.verbose + ) + self.step_container.add_step(step, response) + return {self.output_key: self.step_container.get_final_response()}