diff --git a/libs/langchain/langchain/schema/runnable/base.py b/libs/langchain/langchain/schema/runnable/base.py index 30ca2d816c0..8a429f62e68 100644 --- a/libs/langchain/langchain/schema/runnable/base.py +++ b/libs/langchain/langchain/schema/runnable/base.py @@ -1946,8 +1946,41 @@ class RunnableGenerator(Runnable[Input, Output]): class RunnableLambda(Runnable[Input, Output]): - """ - A runnable that runs a callable. + """RunnableLambda converts a python callable into a Runnable. + + Wrapping a callable in a RunnableLambda makes the callable usable + within either a sync or async context. + + RunnableLambda can be composed as any other Runnable and provides + seamless integration with LangChain tracing. + + Examples: + + .. code-block:: python + + # This is a RunnableLambda + from langchain.schema.runnable import RunnableLambda + + def add_one(x: int) -> int: + return x + 1 + + runnable = RunnableLambda(add_one) + + runnable.invoke(1) # returns 2 + runnable.batch([1, 2, 3]) # returns [2, 3, 4] + + # Async is supported by default by delegating to the sync implementation + await runnable.ainvoke(1) # returns 2 + await runnable.abatch([1, 2, 3]) # returns [2, 3, 4] + + + # Alternatively, can provide both synd and sync implementations + async def add_one_async(x: int) -> int: + return x + 1 + + runnable = RunnableLambda(add_one, afunc=add_one_async) + runnable.invoke(1) # Uses add_one + await runnable.ainvoke(1) # Uses add_one_async """ def __init__( @@ -1955,10 +1988,25 @@ class RunnableLambda(Runnable[Input, Output]): func: Union[Callable[[Input], Output], Callable[[Input], Awaitable[Output]]], afunc: Optional[Callable[[Input], Awaitable[Output]]] = None, ) -> None: + """Create a RunnableLambda from a callable, and async callable or both. + + Accepts both sync and async variants to allow providing efficient + implementations for sync and async execution. + + Args: + func: Either sync or async callable + afunc: An async callable that takes an input and returns an output. + """ if afunc is not None: self.afunc = afunc if inspect.iscoroutinefunction(func): + if afunc is not None: + raise TypeError( + "Func was provided as a coroutine function, but afunc was " + "also provided. If providing both, func should be a regular " + "function to avoid ambiguity." + ) self.afunc = func elif callable(func): self.func = cast(Callable[[Input], Output], func) @@ -1970,6 +2018,7 @@ class RunnableLambda(Runnable[Input, Output]): @property def InputType(self) -> Any: + """The type of the input to this runnable.""" func = getattr(self, "func", None) or getattr(self, "afunc") try: params = inspect.signature(func).parameters @@ -1983,6 +2032,7 @@ class RunnableLambda(Runnable[Input, Output]): @property def input_schema(self) -> Type[BaseModel]: + """The pydantic schema for the input to this runnable.""" func = getattr(self, "func", None) or getattr(self, "afunc") if isinstance(func, itemgetter): @@ -2010,6 +2060,7 @@ class RunnableLambda(Runnable[Input, Output]): @property def OutputType(self) -> Any: + """The type of the output of this runnable as a type annotation.""" func = getattr(self, "func", None) or getattr(self, "afunc") try: sig = inspect.signature(func) @@ -2033,6 +2084,7 @@ class RunnableLambda(Runnable[Input, Output]): return False def __repr__(self) -> str: + """A string representation of this runnable.""" return f"RunnableLambda({get_lambda_source(self.func) or '...'})" def _invoke( @@ -2106,6 +2158,7 @@ class RunnableLambda(Runnable[Input, Output]): config: Optional[RunnableConfig] = None, **kwargs: Optional[Any], ) -> Output: + """Invoke this runnable synchronously.""" if hasattr(self, "func"): return self._call_with_config( self._invoke, @@ -2124,6 +2177,7 @@ class RunnableLambda(Runnable[Input, Output]): config: Optional[RunnableConfig] = None, **kwargs: Optional[Any], ) -> Output: + """Invoke this runnable asynchronously.""" if hasattr(self, "afunc"): return await self._acall_with_config( self._ainvoke,