diff --git a/libs/langchain/langchain/agents/agent.py b/libs/langchain/langchain/agents/agent.py index e6f9df49398..f37c8e1ab9f 100644 --- a/libs/langchain/langchain/agents/agent.py +++ b/libs/langchain/langchain/agents/agent.py @@ -345,6 +345,14 @@ class RunnableAgent(BaseSingleActionAgent): """Runnable to call to get agent action.""" input_keys_arg: List[str] = [] return_keys_arg: List[str] = [] + stream_runnable: bool = True + """Whether to stream from the runnable or not. + + If True then underlying LLM is invoked in a streaming fashion to make it possible + to get access to the individual LLM tokens when using stream_log with the Agent + Executor. If False then LLM is invoked in a non-streaming fashion and + individual LLM tokens will not be available in stream_log. + """ class Config: """Configuration for this pydantic object.""" @@ -378,17 +386,21 @@ class RunnableAgent(BaseSingleActionAgent): Action specifying what tool to use. """ inputs = {**kwargs, **{"intermediate_steps": intermediate_steps}} - # Use streaming to make sure that the underlying LLM is invoked in a streaming - # fashion to make it possible to get access to the individual LLM tokens - # when using stream_log with the Agent Executor. - # Because the response from the plan is not a generator, we need to - # accumulate the output into final output and return that. final_output: Any = None - for chunk in self.runnable.stream(inputs, config={"callbacks": callbacks}): - if final_output is None: - final_output = chunk - else: - final_output += chunk + if self.stream_runnable: + # Use streaming to make sure that the underlying LLM is invoked in a + # streaming + # fashion to make it possible to get access to the individual LLM tokens + # when using stream_log with the Agent Executor. + # Because the response from the plan is not a generator, we need to + # accumulate the output into final output and return that. + for chunk in self.runnable.stream(inputs, config={"callbacks": callbacks}): + if final_output is None: + final_output = chunk + else: + final_output += chunk + else: + final_output = self.runnable.invoke(inputs, config={"callbacks": callbacks}) return final_output @@ -414,18 +426,24 @@ class RunnableAgent(BaseSingleActionAgent): """ inputs = {**kwargs, **{"intermediate_steps": intermediate_steps}} final_output: Any = None - # Use streaming to make sure that the underlying LLM is invoked in a streaming - # fashion to make it possible to get access to the individual LLM tokens - # when using stream_log with the Agent Executor. - # Because the response from the plan is not a generator, we need to - # accumulate the output into final output and return that. - async for chunk in self.runnable.astream( - inputs, config={"callbacks": callbacks} - ): - if final_output is None: - final_output = chunk - else: - final_output += chunk + if self.stream_runnable: + # Use streaming to make sure that the underlying LLM is invoked in a + # streaming + # fashion to make it possible to get access to the individual LLM tokens + # when using stream_log with the Agent Executor. + # Because the response from the plan is not a generator, we need to + # accumulate the output into final output and return that. + async for chunk in self.runnable.astream( + inputs, config={"callbacks": callbacks} + ): + if final_output is None: + final_output = chunk + else: + final_output += chunk + else: + final_output = await self.runnable.ainvoke( + inputs, config={"callbacks": callbacks} + ) return final_output @@ -436,6 +454,14 @@ class RunnableMultiActionAgent(BaseMultiActionAgent): """Runnable to call to get agent actions.""" input_keys_arg: List[str] = [] return_keys_arg: List[str] = [] + stream_runnable: bool = True + """Whether to stream from the runnable or not. + + If True then underlying LLM is invoked in a streaming fashion to make it possible + to get access to the individual LLM tokens when using stream_log with the Agent + Executor. If False then LLM is invoked in a non-streaming fashion and + individual LLM tokens will not be available in stream_log. + """ class Config: """Configuration for this pydantic object.""" @@ -477,17 +503,21 @@ class RunnableMultiActionAgent(BaseMultiActionAgent): Action specifying what tool to use. """ inputs = {**kwargs, **{"intermediate_steps": intermediate_steps}} - # Use streaming to make sure that the underlying LLM is invoked in a streaming - # fashion to make it possible to get access to the individual LLM tokens - # when using stream_log with the Agent Executor. - # Because the response from the plan is not a generator, we need to - # accumulate the output into final output and return that. final_output: Any = None - for chunk in self.runnable.stream(inputs, config={"callbacks": callbacks}): - if final_output is None: - final_output = chunk - else: - final_output += chunk + if self.stream_runnable: + # Use streaming to make sure that the underlying LLM is invoked in a + # streaming + # fashion to make it possible to get access to the individual LLM tokens + # when using stream_log with the Agent Executor. + # Because the response from the plan is not a generator, we need to + # accumulate the output into final output and return that. + for chunk in self.runnable.stream(inputs, config={"callbacks": callbacks}): + if final_output is None: + final_output = chunk + else: + final_output += chunk + else: + final_output = self.runnable.invoke(inputs, config={"callbacks": callbacks}) return final_output @@ -512,19 +542,25 @@ class RunnableMultiActionAgent(BaseMultiActionAgent): Action specifying what tool to use. """ inputs = {**kwargs, **{"intermediate_steps": intermediate_steps}} - # Use streaming to make sure that the underlying LLM is invoked in a streaming - # fashion to make it possible to get access to the individual LLM tokens - # when using stream_log with the Agent Executor. - # Because the response from the plan is not a generator, we need to - # accumulate the output into final output and return that. final_output: Any = None - async for chunk in self.runnable.astream( - inputs, config={"callbacks": callbacks} - ): - if final_output is None: - final_output = chunk - else: - final_output += chunk + if self.stream_runnable: + # Use streaming to make sure that the underlying LLM is invoked in a + # streaming + # fashion to make it possible to get access to the individual LLM tokens + # when using stream_log with the Agent Executor. + # Because the response from the plan is not a generator, we need to + # accumulate the output into final output and return that. + async for chunk in self.runnable.astream( + inputs, config={"callbacks": callbacks} + ): + if final_output is None: + final_output = chunk + else: + final_output += chunk + else: + final_output = await self.runnable.ainvoke( + inputs, config={"callbacks": callbacks} + ) return final_output @@ -977,10 +1013,15 @@ class AgentExecutor(Chain): else: multi_action = output_type == Union[List[AgentAction], AgentFinish] + stream_runnable = values.pop("stream_runnable", True) if multi_action: - values["agent"] = RunnableMultiActionAgent(runnable=agent) + values["agent"] = RunnableMultiActionAgent( + runnable=agent, stream_runnable=stream_runnable + ) else: - values["agent"] = RunnableAgent(runnable=agent) + values["agent"] = RunnableAgent( + runnable=agent, stream_runnable=stream_runnable + ) return values def save(self, file_path: Union[Path, str]) -> None: