docs: streaming concept docs (#27516)

Streaming concept docs
This commit is contained in:
Eugene Yurtsev 2024-10-21 16:18:08 -04:00 committed by GitHub
parent aa0b25cb2a
commit 606225e774
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -5,86 +5,187 @@
* [Chat Models](/docs/concepts/chat_models) * [Chat Models](/docs/concepts/chat_models)
::: :::
:::info Related Resources **Streaming** is crucial for enhancing the responsiveness of applications built on [LLMs](/docs/concepts/chat_models). By displaying output progressively, even before a complete response is ready, streaming significantly improves user experience (UX), particularly when dealing with the latency of LLMs.
Please see the following how-to guides for specific examples of streaming in LangChain: ## Overview
* [How to stream chat models](/docs/how_to/chat_streaming/)
* [How to stream tool calls](/docs/how_to/tool_streaming/)
* [How to stream Runnables](/docs/how_to/streaming/)
:::
Streaming is critical in making applications based on [LLMs](/docs/concepts/chat_models) feel responsive to end-users. Generating full responses from [LLMs](/docs/concepts/chat_models) often incurs a delay of several seconds, which becomes more noticeable in complex applications with multiple model calls. Fortunately, LLMs generate responses iteratively, allowing for intermediate results to be displayed as they are produced. By streaming these intermediate outputs, LangChain enables smoother UX in LLM-powered apps and offers built-in support for streaming at the core of its design.
## Why Streaming? In this guide, we'll discuss streaming in LLM applications and explore how LangChain's streaming APIs facilitate real-time output from various components in your application.
[LLMs](/docs/concepts/chat_models) have noticeable latency on the order of seconds. This is much longer than the typical response time for most APIs, which are usually sub-second. The latency issue compounds quickly ## What to stream in LLM applications
as you build more complex applications that involve multiple calls to a model.
Fortunately, LLMs generate output iteratively, which means it's possible to show sensible intermediate results before the final response is ready. Consuming output as soon as it becomes available has therefore become a vital part of the UX around building apps with LLMs to help alleviate latency issues, and LangChain aims to have first-class support for streaming. In applications involving LLMs, several types of data can be streamed to improve user experience by reducing perceived latency and increasing transparency. These include:
### 1. Streaming LLM outputs
The most common and critical data to stream is the output generated by the LLM itself. LLMs often take time to generate full responses, and by streaming the output in real-time, users can see partial results as they are produced. This provides immediate feedback and helps reduce the wait time for users.
### 2. Streaming pipeline or workflow progress
Beyond just streaming LLM output, its useful to stream progress through more complex workflows or pipelines, giving users a sense of how the application is progressing overall. This could include:
- **In LangGraph Workflows:**
With [LangGraph](/docs/concepts/langgraph), workflows are composed of nodes and edges that represent various steps. Streaming here involves tracking changes to the **graph state** as individual **nodes** request updates. This allows for more granular monitoring of which node in the workflow is currently active, giving real-time updates about the status of the workflow as it progresses through different stages.
- **In LCEL Pipelines:**
Streaming updates from an [LCEL](/docs/concepts/lcel) pipeline involves capturing progress from individual **sub-runnables**. For example, as different steps or components of the pipeline execute, you can stream which sub-runnable is currently running, providing real-time insight into the overall pipeline's progress.
Streaming pipeline or workflow progress is essential in providing users with a clear picture of where the application is in the execution process.
### 3. Streaming custom data
In some cases, you may need to stream **custom data** that goes beyond the information provided by the pipeline or workflow structure. This custom information is injected within a specific step in the workflow, whether that step is a tool or a LangGraph node. For example, you could stream updates about what a tool is doing in real-time or the progress through a LangGraph node. This granular data, which is emitted directly from within the step, provides more detailed insights into the execution of the workflow and is especially useful in complex processes where more visibility is needed.
## Streaming APIs ## Streaming APIs
Every LangChain component that implements the [Runnable Interface](/docs/concepts/runnables) supports streaming. LangChain two main APIs for streaming output in real-time. These APIs are supported by any component that implements the [Runnable Interface](/docs/concepts/runnables), including [LLMs](/docs/concepts/chat_models), [compiled LangGraph graphs](https://langchain-ai.github.io/langgraph/concepts/low_level/), and any Runnable generated with [LCEL](/docs/concepts/lcel).
There are three main APIs for streaming in LangChain: 1. sync [stream](https://python.langchain.com/api_reference/core/runnables/langchain_core.runnables.base.Runnable.html#langchain_core.runnables.base.Runnable.stream) and async [astream](https://python.langchain.com/api_reference/core/runnables/langchain_core.runnables.base.Runnable.html#langchain_core.runnables.base.Runnable.astream): Use to stream outputs from individual Runnables (e.g., a chat model) as they are generated or stream any workflow created with LangGraph.
2. The async only [astream_events](https://python.langchain.com/api_reference/core/runnables/langchain_core.runnables.base.Runnable.html#langchain_core.runnables.base.Runnable.astream_events): Use this API to get access to custom events and intermediate outputs from LLM applications built entirely with [LCEL](/docs/concepts/lcel). Note that this API is available, but not needed when working with LangGraph.
1. sync [stream](https://python.langchain.com/api_reference/core/runnables/langchain_core.runnables.base.Runnable.html#langchain_core.runnables.base.Runnable.stream) and async [astream](https://python.langchain.com/api_reference/core/runnables/langchain_core.runnables.base.Runnable.html#langchain_core.runnables.base.Runnable.astream): yields the output a Runnable as it is generated. :::note
2. The async [astream_events](https://python.langchain.com/api_reference/core/runnables/langchain_core.runnables.base.Runnable.html#langchain_core.runnables.base.Runnable.astream_events): a streaming API that allows streaming intermediate steps from a Runnable. This API returns a stream of events. In addition, there is a **legacy** async [astream_log](https://python.langchain.com/api_reference/core/runnables/langchain_core.runnables.base.Runnable.html#langchain_core.runnables.base.Runnable.astream_log) API. This API is not recommended for new projects it is more complex and less feature-rich than the other streaming APIs.
3. The **legacy** async [astream_log](https://python.langchain.com/api_reference/core/runnables/langchain_core.runnables.base.Runnable.html#langchain_core.runnables.base.Runnable.astream_log): This is an advanced streaming API that allows streaming intermediate steps from a Runnable. Users should **not** use this API when writing new code.
### Streaming with LangGraph
[LangGraph](/docs/concepts/langgraph) compiled graphs are [Runnables](/docs/concepts/runnables) and support the same streaming APIs.
In LangGraph the `stream` and `astream` methods are phrased in terms of changes to the [graph state](https://langchain-ai.github.io/langgraph/concepts/low_level/#state), and as a result are much more helpful for getting intermediate states of the graph as they are generated.
Please review the [LangGraph streaming guide](https://langchain-ai.github.io/langgraph/concepts/streaming/) for more information on how to stream when working with LangGraph.
### `.stream()` and `.astream()`
<span data-heading-keywords="stream"></span>
:::info Related Resources
* [How to use streaming](/docs/how_to/streaming/#using-stream)
::: :::
The `.stream()` returns an iterator, which you can consume with a simple `for` loop. Here's an example with a chat model: ### `stream()` and `astream()`
The `stream()` method returns an iterator that yields chunks of output synchronously as they are produced. You can use a `for` loop to process each chunk in real-time. For example, when using an LLM, this allows the output to be streamed incrementally as it is generated, reducing the wait time for users.
The type of chunk yielded by the `stream()` and `astream()` methods depends on the component being streamed. For example, when streaming from an [LLM](/docs/concepts/chat_models) each component will be an [AIMessageChunk](/docs/concepts/messages#aimessagechunk); however, for other components, the chunk may be different.
The `stream()` method returns an iterator that yields these chunks as they are produced. For example,
```python ```python
for chunk in component.stream(some_input):
# IMPORTANT: Keep the processing of each chunk as efficient as possible.
# While you're processing the current chunk, the upstream component is
# waiting to produce the next one. For example, if working with LangGraph,
# graph execution is paused while the current chunk is being processed.
# In extreme cases, this could even result in timeouts (e.g., when llm outputs are
# streamed from an API that has a timeout).
print(chunk)
```
The [asynchronous version](/docs/concepts/async), `astream()`, works similarly but is designed for non-blocking workflows. You can use it in asynchronous code to achieve the same real-time streaming behavior.
#### Usage with chat models
When using `stream()` or `astream()` with chat models, the output is streamed as [AIMessageChunks](/docs/concepts/messages#aimessagechunk) as it is generated by the LLM. This allows you to present or process the LLM's output incrementally as it's being produced, which is particularly useful in interactive applications or interfaces.
#### Usage with LangGraph
[LangGraph](/docs/concepts/langgraph) compiled graphs are [Runnables](/docs/concepts/runnables) and support the standard streaming APIs.
When using the *stream* and *astream* methods with LangGraph, you can **one or more** [streaming mode](https://langchain-ai.github.io/langgraph/reference/types/#langgraph.types.StreamMode) which allow you to control the type of output that is streamed. The available streaming modes are:
- **"values"**: Emit all values of the [state](https://langchain-ai.github.io/langgraph/concepts/low_level/) for each step.
- **"updates"**: Emit only the node name(s) and updates that were returned by the node(s) after each step.
- **"debug"**: Emit debug events for each step.
- **"messages"**: Emit LLM [messages](/docs/concepts/messages) [token-by-token](/docs/concepts/tokens).
- **"custom"**: Emit custom output witten using [LangGraph's StreamWriter](https://langchain-ai.github.io/langgraph/reference/types/#langgraph.types.StreamWriter).
For more information, please see:
* [LangGraph streaming conceptual guide](https://langchain-ai.github.io/langgraph/concepts/streaming/) for more information on how to stream when working with LangGraph.
* [LangGraph streaming how-to guides](https://langchain-ai.github.io/langgraph/how-tos/#streaming) for specific examples of streaming in LangGraph.
#### Usage with LCEL
If you compose multiple Runnables using [LangChains Expression Language (LCEL)](/docs/concepts/lcel), the `stream()` and `astream()` methods will, by convention, stream the output of the last step in the chain. This allows the final processed result to be streamed incrementally. **LCEL** tries to optimize streaming latency in pipelines such that the streaming results from the last step are available as soon as possible.
### `astream_events`
<span data-heading-keywords="astream_events,stream_events,stream events"></span>
:::tip
Use the `astream_events` API to access custom data and intermediate outputs from LLM applications built entirely with [LCEL](/docs/concepts/lcel).
While this API is available for use with [LangGraph](/docs/concepts/langgraph) as well, it is usually not necessary when working with LangGraph, as the `stream` and `astream` methods provide comprehensive streaming capabilities for LangGraph graphs.
:::
For chains constructed using **LCEL**, the `.stream()` method only streams the output of the final step from te chain. This might be sufficient for some applications, but as you build more complex chains of several LLM calls together, you may want to use the intermediate values of the chain alongside the final output. For example, you may want to return sources alongside the final generation when building a chat-over-documents app.
There are ways to do this [using callbacks](/docs/concepts/#callbacks-1), or by constructing your chain in such a way that it passes intermediate
values to the end with something like chained [`.assign()`](/docs/how_to/passthrough/) calls, but LangChain also includes an
`.astream_events()` method that combines the flexibility of callbacks with the ergonomics of `.stream()`. When called, it returns an iterator
which yields [various types of events](/docs/how_to/streaming/#event-reference) that you can filter and process according
to the needs of your project.
Here's one small example that prints just events containing streamed chat model output:
```python
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_anthropic import ChatAnthropic from langchain_anthropic import ChatAnthropic
model = ChatAnthropic(model="claude-3-sonnet-20240229") model = ChatAnthropic(model="claude-3-sonnet-20240229")
for chunk in model.stream("what color is the sky?"): prompt = ChatPromptTemplate.from_template("tell me a joke about {topic}")
print(chunk.content, end="|", flush=True) parser = StrOutputParser()
chain = prompt | model | parser
async for event in chain.astream_events({"topic": "parrot"}, version="v2"):
kind = event["event"]
if kind == "on_chat_model_stream":
print(event, end="|", flush=True)
``` ```
For models (or other components) that don't support streaming natively, this iterator would just yield a single chunk, but You can roughly think of it as an iterator over callback events (though the format differs) - and you can use it on almost all LangChain components!
you could still use the same general pattern when calling them. Using `.stream()` will also automatically call the model in streaming mode
without the need to provide additional config.
The type of each outputted chunk depends on the type of component - for example, chat models yield [`AIMessageChunks`](https://python.langchain.com/api_reference/core/messages/langchain_core.messages.ai.AIMessageChunk.html). See [this guide](/docs/how_to/streaming/#using-stream-events) for more detailed information on how to use `.astream_events()`, including a table listing available events.
Because this method is part of [LangChain Expression Language](/docs/concepts/#langchain-expression-language-lcel),
you can handle formatting differences from different outputs using an [output parser](/docs/concepts/#output-parsers) to transform
each yielded chunk.
## Dispatching Custom Events ## Writing custom data to the stream
You can dispatch custom [callback events](/docs/concepts#callbacks) if you want to add custom data to the event stream of [astream events](https://python.langchain.com/api_reference/core/runnables/langchain_core.runnables.base.Runnable.html#langchain_core.runnables.base.Runnable.astream_events). To write custom data to the stream, you will need to choose one of the following methods based on the component you are working with:
You can use custom events to provide additional information about the progress of a long-running task. 1. LangGraph's [StreamWriter](https://langchain-ai.github.io/langgraph/reference/types/#langgraph.types.StreamWriter) can be used to write custom data that will surface through **stream** and **astream** APIs when working with LangGraph. **Important** this is a LangGraph feature, so it is not available when working with pure LCEL. See [how to streaming custom data](https://langchain-ai.github.io/langgraph/how-tos/streaming-content/) for more information.
2. [dispatch_events](https://python.langchain.com/api_reference/core/callbacks/langchain_core.callbacks.manager.dispatch_custom_event.html#) / [adispatch_events](https://python.langchain.com/api_reference/core/callbacks/langchain_core.callbacks.manager.adispatch_custom_event.html) can be used to write custom data that will be surfaced through the **astream_events** API. See [how to dispatch custom callback events](https://python.langchain.com/docs/how_to/callbacks_custom_events/#astream-events-api) for more information.
For example, if you have a long-running [tool](/docs/concepts/tools) that involves multiple steps (e.g., multiple API calls) with multiple steps, you can dispatch custom events between the steps and use these custom events to monitor progress. You could also surface these custom events to an end user of your application to show them how the current task is progressing. ## "Auto-Streaming" Chat Models
:::info Related Resources LangChain simplifies streaming from [chat models](/docs/concepts/chat_models) by automatically enabling streaming mode in certain cases, even when youre not explicitly calling the streaming methods. This is particularly useful when you use the non-streaming `invoke` method but still want to stream the entire application, including intermediate results from the chat model.
* [How to dispatch custom callback events](https://python.langchain.com/docs/how_to/callbacks_custom_events/#astream-events-api)
:::
## Chat Models ### How It Works
### "Auto-Streaming" Chat Models When you call the `invoke` (or `ainvoke`) method on a chat model, LangChain will automatically switch to streaming mode if it detects that you are trying to stream the overall application.
## Using Astream Events API Under the hood, it'll have `invoke` (or `ainvoke`) use the `stream` (or `astream`) method to generate its output. The result of the invocation will be the same as far as the code that was using `invoke` is concerned; however, while the chat model is being streamed, LangChain will take care of invoking `on_llm_new_token` events in LangChain's [callback system](/docs/concepts/callbacks). These callback events
allow LangGraph `stream`/`astream` and `astream_events` to surface the chat model's output in real-time.
### Async throughout Example:
Important LangChain primitives like chat models, output parsers, prompts, retrievers, and agents implement the LangChain Runnable Interface. ```python
def node(state):
...
# The code below uses the invoke method, but LangChain will
# automatically switch to streaming mode
# when it detects that the overall
# application is being streamed.
ai_message = model.invoke(state["messages"])
...
for chunk in compiled_graph.stream(..., mode="messages"):
...
```
## Async Programming
LangChain offers both synchronous (sync) and asynchronous (async) versions of many of its methods. The async methods are typically prefixed with an "a" (e.g., `ainvoke`, `astream`). When writing async code, it's crucial to consistently use these asynchronous methods to ensure non-blocking behavior and optimal performance.
If streaming data fails to appear in real-time, please ensure that you are using the correct async methods for your workflow.
Please review the [async programming in LangChain guide](/docs/concepts/async) for more information on writing async code with LangChain.
## Related Resources
Please see the following how-to guides for specific examples of streaming in LangChain:
* [LangGraph conceptual guide on streaming](https://langchain-ai.github.io/langgraph/concepts/streaming/)
* [LangGraph streaming how-to guides](https://langchain-ai.github.io/langgraph/how-tos/#streaming)
* [How to stream runnables](/docs/how_to/streaming/): This how-to guide goes over common streaming patterns with LangChain components (e.g., chat models) and with [LCEL](/docs/concepts/lcel).
* [How to stream chat models](/docs/how_to/chat_streaming/)
* [How to stream tool calls](/docs/how_to/tool_streaming/)
For writing custom data to the stream, please see the following resources:
* If using LangGraph, see [how to stream custom data](https://langchain-ai.github.io/langgraph/how-tos/streaming-content/).
* If using LCEL, see [how to dispatch custom callback events](https://python.langchain.com/docs/how_to/callbacks_custom_events/#astream-events-api).