From 8313c218dacdaa7e456ad5762415e37f101055bc Mon Sep 17 00:00:00 2001 From: Eugene Yurtsev Date: Fri, 10 Nov 2023 13:14:57 -0500 Subject: [PATCH] Add more runnable documentation (#13083) - Adding documentation to the runnable. - Documentation is not organized in the best way for the runnable; i.e., in terms of LCEL vs. other standard methods, will follow up with more edits. --- .../langchain/schema/runnable/base.py | 164 ++++++++++++++++-- 1 file changed, 147 insertions(+), 17 deletions(-) diff --git a/libs/langchain/langchain/schema/runnable/base.py b/libs/langchain/langchain/schema/runnable/base.py index 643bdb1b45d..c9919adc530 100644 --- a/libs/langchain/langchain/schema/runnable/base.py +++ b/libs/langchain/langchain/schema/runnable/base.py @@ -82,18 +82,22 @@ Other = TypeVar("Other") class Runnable(Generic[Input, Output], ABC): """A unit of work that can be invoked, batched, streamed, transformed and composed. - Key methods: + Key Methods + =========== * invoke/ainvoke: Transforms a single input into an output. * batch/abatch: Efficiently transforms multiple inputs into outputs. * stream/astream: Streams output from a single input as it's produced. * astream_log: Streams output and selected intermediate results from an input. - Batch: By default, batch runs invoke() in parallel using a thread pool executor. - Override to optimize batching. + Built-in optimizations: - Async: Methods with "a" suffix are asynchronous. By default, they execute - the sync counterpart using asyncio's thread pool. Override for native async. + * Batch: By default, batch runs invoke() in parallel using a thread pool executor. + Override to optimize batching. + + * Async: Methods with "a" suffix are asynchronous. By default, they execute + the sync counterpart using asyncio's thread pool. + Override for native async. All methods accept an optional config argument, which can be used to configure execution, add tags and metadata for tracing and debugging etc. @@ -101,6 +105,9 @@ class Runnable(Generic[Input, Output], ABC): Runnables expose schematic information about their input, output and config via the input_schema property, the output_schema property and config_schema method. + LCEL and Composition + ==================== + The LangChain Expression Language (LCEL) is a declarative way to compose Runnables into chains. Any chain constructed this way will automatically have sync, async, batch, and streaming support. @@ -115,6 +122,7 @@ class Runnable(Generic[Input, Output], ABC): to each. Construct it using a dict literal within a sequence or by passing a dict to RunnableParallel. + For example, .. code-block:: python @@ -133,6 +141,72 @@ class Runnable(Generic[Input, Output], ABC): 'mul_5': RunnableLambda(lambda x: x * 5) } sequence.invoke(1) # {'mul_2': 4, 'mul_5': 10} + + Standard Methods + ================ + + All Runnables expose additional methods that can be used to modify their behavior + (e.g., add a retry policy, add lifecycle listeners, make them configurable, etc.). + + These methods will work on any Runnable, including Runnable chains constructed + by composing other Runnables. See the individual methods for details. + + For example, + + .. code-block:: python + + from langchain.schema.runnable import RunnableLambda + + import random + + def add_one(x: int) -> int: + return x + 1 + + + def buggy_double(y: int) -> int: + '''Buggy code that will fail 70% of the time''' + if random.random() > 0.3: + print('This code failed, and will probably be retried!') + raise ValueError('Triggered buggy code') + return y * 2 + + sequence = ( + RunnableLambda(add_one) | + RunnableLambda(buggy_double).with_retry( # Retry on failure + stop_after_attempt=10, + wait_exponential_jitter=False + ) + ) + + print(sequence.input_schema.schema()) # Show inferred input schema + print(sequence.output_schema.schema()) # Show inferred output schema + print(sequence.invoke(2)) # invoke the sequence (note the retry above!!) + + Debugging and tracing + ===================== + + As the chains get longer, it can be useful to be able to see intermediate results + to debug and trace the chain. + + You can set the global debug flag to True to enable debug output for all chains: + + .. code-block:: python + + from langchain.globals import set_debug + set_debug(True) + + Alternatively, you can pass existing or custom callbacks to any given chain: + + ... code-block:: python + + from langchain.callbacks.tracers import ConsoleCallbackHandler + + chain.invoke( + ..., + config={'callbacks': [ConsoleCallbackHandler()]} + ) + + For a UI (and much more) checkout LangSmith: https://docs.smith.langchain.com/ """ @property @@ -169,7 +243,20 @@ class Runnable(Generic[Input, Output], ABC): def get_input_schema( self, config: Optional[RunnableConfig] = None ) -> Type[BaseModel]: - """The type of input this runnable accepts specified as a pydantic model.""" + """Get a pydantic model that can be used to validate input to the runnable. + + Runnables that leverage the configurable_fields and configurable_alternatives + methods will have a dynamic input schema that depends on which + configuration the runnable is invoked with. + + This method allows to get an input schema for a specific configuration. + + Args: + config: A config to use when generating the schema. + + Returns: + A pydantic model that can be used to validate input. + """ root_type = self.InputType if inspect.isclass(root_type) and issubclass(root_type, BaseModel): @@ -187,7 +274,20 @@ class Runnable(Generic[Input, Output], ABC): def get_output_schema( self, config: Optional[RunnableConfig] = None ) -> Type[BaseModel]: - """The type of output this runnable produces specified as a pydantic model.""" + """Get a pydantic model that can be used to validate output to the runnable. + + Runnables that leverage the configurable_fields and configurable_alternatives + methods will have a dynamic output schema that depends on which + configuration the runnable is invoked with. + + This method allows to get an output schema for a specific configuration. + + Args: + config: A config to use when generating the schema. + + Returns: + A pydantic model that can be used to validate output. + """ root_type = self.OutputType if inspect.isclass(root_type) and issubclass(root_type, BaseModel): @@ -278,14 +378,28 @@ class Runnable(Generic[Input, Output], ABC): @abstractmethod def invoke(self, input: Input, config: Optional[RunnableConfig] = None) -> Output: - """Transform a single input into an output. Override to implement.""" - ... + """Transform a single input into an output. Override to implement. + + Args: + input: The input to the runnable. + config: A config to use when invoking the runnable. + The config supports standard keys like 'tags', 'metadata' for tracing + purposes, 'max_concurrency' for controlling how much work to do + in parallel, and other keys. Please refer to the RunnableConfig + for more details. + + Returns: + The output of the runnable. + """ async def ainvoke( self, input: Input, config: Optional[RunnableConfig] = None, **kwargs: Any ) -> Output: - """ - Default implementation of ainvoke, which calls invoke in a thread pool. + """Default implementation of ainvoke, calls invoke from a thread. + + The default implementation allows usage of async code even if + the runnable did not implement a native async version of invoke. + Subclasses should override this method if they can run asynchronously. """ return await asyncio.get_running_loop().run_in_executor( @@ -300,9 +414,12 @@ class Runnable(Generic[Input, Output], ABC): return_exceptions: bool = False, **kwargs: Optional[Any], ) -> List[Output]: - """ - Default implementation of batch, which calls invoke N times. - Subclasses should override this method if they can batch more efficiently. + """Default implementation runs invoke in parallel using a thread pool executor. + + The default implementation of batch works well for IO bound runnables. + + Subclasses should override this method if they can batch more efficiently; + e.g., if the underlying runnable uses an API which supports a batch mode. """ if not inputs: return [] @@ -333,9 +450,12 @@ class Runnable(Generic[Input, Output], ABC): return_exceptions: bool = False, **kwargs: Optional[Any], ) -> List[Output]: - """ - Default implementation of abatch, which calls ainvoke N times. - Subclasses should override this method if they can batch more efficiently. + """Default implementation runs ainvoke in parallel using asyncio.gather. + + The default implementation of batch works well for IO bound runnables. + + Subclasses should override this method if they can batch more efficiently; + e.g., if the underlying runnable uses an API which supports a batch mode. """ if not inputs: return [] @@ -682,6 +802,16 @@ class Runnable(Generic[Input, Output], ABC): *, exceptions_to_handle: Tuple[Type[BaseException], ...] = (Exception,), ) -> RunnableWithFallbacksT[Input, Output]: + """Add fallbacks to a runnable, returning a new Runnable. + + Args: + fallbacks: A sequence of runnables to try if the original runnable fails. + exceptions_to_handle: A tuple of exception types to handle. + + Returns: + A new Runnable that will try the original runnable, and then each + fallback in order, upon failures. + """ from langchain.schema.runnable.fallbacks import RunnableWithFallbacks return RunnableWithFallbacks(