mirror of
https://github.com/hwchase17/langchain.git
synced 2025-06-07 15:36:30 +00:00
``` https://api\.python\.langchain\.com/en/latest/([^/]*)/langchain_([^.]*)\.(.*)\.html([^"]*) https://python.langchain.com/v0.2/api_reference/$2/$1/langchain_$2.$3.html$4 ``` --------- Co-authored-by: Bagatur <baskaryan@gmail.com>
1541 lines
64 KiB
Plaintext
1541 lines
64 KiB
Plaintext
{
|
|
"cells": [
|
|
{
|
|
"cell_type": "raw",
|
|
"id": "0bdb3b97-4989-4237-b43b-5943dbbd8302",
|
|
"metadata": {
|
|
"vscode": {
|
|
"languageId": "raw"
|
|
}
|
|
},
|
|
"source": [
|
|
"---\n",
|
|
"keywords: [stream]\n",
|
|
"---"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "bb7d49db-04d3-4399-bfe1-09f82bbe6015",
|
|
"metadata": {},
|
|
"source": [
|
|
"# How to stream runnables\n",
|
|
"\n",
|
|
":::info Prerequisites\n",
|
|
"\n",
|
|
"This guide assumes familiarity with the following concepts:\n",
|
|
"- [Chat models](/docs/concepts/#chat-models)\n",
|
|
"- [LangChain Expression Language](/docs/concepts/#langchain-expression-language)\n",
|
|
"- [Output parsers](/docs/concepts/#output-parsers)\n",
|
|
"\n",
|
|
":::\n",
|
|
"\n",
|
|
"Streaming is critical in making applications based on LLMs feel responsive to end-users.\n",
|
|
"\n",
|
|
"Important LangChain primitives like [chat models](/docs/concepts/#chat-models), [output parsers](/docs/concepts/#output-parsers), [prompts](/docs/concepts/#prompt-templates), [retrievers](/docs/concepts/#retrievers), and [agents](/docs/concepts/#agents) implement the LangChain [Runnable Interface](/docs/concepts#interface).\n",
|
|
"\n",
|
|
"This interface provides two general approaches to stream content:\n",
|
|
"\n",
|
|
"1. sync `stream` and async `astream`: a **default implementation** of streaming that streams the **final output** from the chain.\n",
|
|
"2. async `astream_events` and async `astream_log`: these provide a way to stream both **intermediate steps** and **final output** from the chain.\n",
|
|
"\n",
|
|
"Let's take a look at both approaches, and try to understand how to use them.\n",
|
|
"\n",
|
|
":::info\n",
|
|
"For a higher-level overview of streaming techniques in LangChain, see [this section of the conceptual guide](/docs/concepts/#streaming).\n",
|
|
":::\n",
|
|
"\n",
|
|
"## Using Stream\n",
|
|
"\n",
|
|
"All `Runnable` objects implement a sync method called `stream` and an async variant called `astream`. \n",
|
|
"\n",
|
|
"These methods are designed to stream the final output in chunks, yielding each chunk as soon as it is available.\n",
|
|
"\n",
|
|
"Streaming is only possible if all steps in the program know how to process an **input stream**; i.e., process an input chunk one at a time, and yield a corresponding output chunk.\n",
|
|
"\n",
|
|
"The complexity of this processing can vary, from straightforward tasks like emitting tokens produced by an LLM, to more challenging ones like streaming parts of JSON results before the entire JSON is complete.\n",
|
|
"\n",
|
|
"The best place to start exploring streaming is with the single most important components in LLMs apps-- the LLMs themselves!\n",
|
|
"\n",
|
|
"### LLMs and Chat Models\n",
|
|
"\n",
|
|
"Large language models and their chat variants are the primary bottleneck in LLM based apps.\n",
|
|
"\n",
|
|
"Large language models can take **several seconds** to generate a complete response to a query. This is far slower than the **~200-300 ms** threshold at which an application feels responsive to an end user.\n",
|
|
"\n",
|
|
"The key strategy to make the application feel more responsive is to show intermediate progress; viz., to stream the output from the model **token by token**.\n",
|
|
"\n",
|
|
"We will show examples of streaming using a chat model. Choose one from the options below:\n",
|
|
"\n",
|
|
"```{=mdx}\n",
|
|
"import ChatModelTabs from \"@theme/ChatModelTabs\";\n",
|
|
"\n",
|
|
"<ChatModelTabs\n",
|
|
" customVarName=\"model\"\n",
|
|
"/>\n",
|
|
"```"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 1,
|
|
"id": "f123bdcb-8c8b-440c-9bbd-aa5ed4e9cd17",
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"\n",
|
|
"\u001b[1m[\u001b[0m\u001b[34;49mnotice\u001b[0m\u001b[1;39;49m]\u001b[0m\u001b[39;49m A new release of pip is available: \u001b[0m\u001b[31;49m23.2.1\u001b[0m\u001b[39;49m -> \u001b[0m\u001b[32;49m24.0\u001b[0m\n",
|
|
"\u001b[1m[\u001b[0m\u001b[34;49mnotice\u001b[0m\u001b[1;39;49m]\u001b[0m\u001b[39;49m To update, run: \u001b[0m\u001b[32;49mpython -m pip install --upgrade pip\u001b[0m\n",
|
|
"Note: you may need to restart the kernel to use updated packages.\n"
|
|
]
|
|
}
|
|
],
|
|
"source": [
|
|
"# | output: false\n",
|
|
"# | echo: false\n",
|
|
"\n",
|
|
"%pip install -qU langchain langchain_anthropic\n",
|
|
"\n",
|
|
"import os\n",
|
|
"from getpass import getpass\n",
|
|
"\n",
|
|
"keys = [\n",
|
|
" \"ANTHROPIC_API_KEY\",\n",
|
|
" \"OPENAI_API_KEY\",\n",
|
|
"]\n",
|
|
"\n",
|
|
"for key in keys:\n",
|
|
" if key not in os.environ:\n",
|
|
" os.environ[key] = getpass(f\"Enter API Key for {key}=?\")\n",
|
|
"\n",
|
|
"\n",
|
|
"from langchain_anthropic import ChatAnthropic\n",
|
|
"\n",
|
|
"model = ChatAnthropic(model=\"claude-3-sonnet-20240229\", temperature=0)"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "a2464c57-0e89-4159-b21f-5859a21be658",
|
|
"metadata": {},
|
|
"source": [
|
|
"Let's start with the sync `stream` API:"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 2,
|
|
"id": "8b44dfb2-0749-487a-8918-f8b6b8233093",
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"The| sky| appears| blue| during| the| day|.|"
|
|
]
|
|
}
|
|
],
|
|
"source": [
|
|
"chunks = []\n",
|
|
"for chunk in model.stream(\"what color is the sky?\"):\n",
|
|
" chunks.append(chunk)\n",
|
|
" print(chunk.content, end=\"|\", flush=True)"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "8d835b5c-cbb7-41ab-8905-bdc24d515d29",
|
|
"metadata": {},
|
|
"source": [
|
|
"Alternatively, if you're working in an async environment, you may consider using the async `astream` API:"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 3,
|
|
"id": "f180b6a0-0027-4bd8-8bab-fde76e282609",
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"The| sky| appears| blue| during| the| day|.|"
|
|
]
|
|
}
|
|
],
|
|
"source": [
|
|
"chunks = []\n",
|
|
"async for chunk in model.astream(\"what color is the sky?\"):\n",
|
|
" chunks.append(chunk)\n",
|
|
" print(chunk.content, end=\"|\", flush=True)"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "66730a87-77d5-40d6-a68f-315121989bd1",
|
|
"metadata": {},
|
|
"source": [
|
|
"Let's inspect one of the chunks"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 4,
|
|
"id": "dade3000-1ac4-4f5c-b5c6-a0217f9f8a6b",
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"data": {
|
|
"text/plain": [
|
|
"AIMessageChunk(content='The', id='run-b36bea64-5511-4d7a-b6a3-a07b3db0c8e7')"
|
|
]
|
|
},
|
|
"execution_count": 4,
|
|
"metadata": {},
|
|
"output_type": "execute_result"
|
|
}
|
|
],
|
|
"source": [
|
|
"chunks[0]"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "a3a47193-2bd1-46bc-9c7e-ea0f6b08c4a5",
|
|
"metadata": {},
|
|
"source": [
|
|
"We got back something called an `AIMessageChunk`. This chunk represents a part of an `AIMessage`.\n",
|
|
"\n",
|
|
"Message chunks are additive by design -- one can simply add them up to get the state of the response so far!"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 5,
|
|
"id": "d3cf5f38-249c-4da0-94e6-5e5203fad52e",
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"data": {
|
|
"text/plain": [
|
|
"AIMessageChunk(content='The sky appears blue during', id='run-b36bea64-5511-4d7a-b6a3-a07b3db0c8e7')"
|
|
]
|
|
},
|
|
"execution_count": 5,
|
|
"metadata": {},
|
|
"output_type": "execute_result"
|
|
}
|
|
],
|
|
"source": [
|
|
"chunks[0] + chunks[1] + chunks[2] + chunks[3] + chunks[4]"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "59ffbd9a-3b79-44b6-8883-1371f9460c77",
|
|
"metadata": {},
|
|
"source": [
|
|
"### Chains\n",
|
|
"\n",
|
|
"Virtually all LLM applications involve more steps than just a call to a language model.\n",
|
|
"\n",
|
|
"Let's build a simple chain using `LangChain Expression Language` (`LCEL`) that combines a prompt, model and a parser and verify that streaming works.\n",
|
|
"\n",
|
|
"We will use [`StrOutputParser`](https://python.langchain.com/v0.2/api_reference/core/output_parsers/langchain_core.output_parsers.string.StrOutputParser.html) to parse the output from the model. This is a simple parser that extracts the `content` field from an `AIMessageChunk`, giving us the `token` returned by the model.\n",
|
|
"\n",
|
|
":::{.callout-tip}\n",
|
|
"LCEL is a *declarative* way to specify a \"program\" by chainining together different LangChain primitives. Chains created using LCEL benefit from an automatic implementation of `stream` and `astream` allowing streaming of the final output. In fact, chains created with LCEL implement the entire standard Runnable interface.\n",
|
|
":::"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 6,
|
|
"id": "a8562ae2-3fd1-4829-9801-a5a732b1798d",
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"Here|'s| a| joke| about| a| par|rot|:|\n",
|
|
"\n",
|
|
"A man| goes| to| a| pet| shop| to| buy| a| par|rot|.| The| shop| owner| shows| him| two| stunning| pa|rr|ots| with| beautiful| pl|um|age|.|\n",
|
|
"\n",
|
|
"\"|There|'s| a| talking| par|rot| an|d a| non|-|talking| par|rot|,\"| the| owner| says|.| \"|The| talking| par|rot| costs| $|100|,| an|d the| non|-|talking| par|rot| is| $|20|.\"|\n",
|
|
"\n",
|
|
"The| man| says|,| \"|I|'ll| take| the| non|-|talking| par|rot| at| $|20|.\"|\n",
|
|
"\n",
|
|
"He| pays| an|d leaves| with| the| par|rot|.| As| he|'s| walking| down| the| street|,| the| par|rot| looks| up| at| him| an|d says|,| \"|You| know|,| you| really| are| a| stupi|d man|!\"|\n",
|
|
"\n",
|
|
"The| man| is| stun|ne|d an|d looks| at| the| par|rot| in| dis|bel|ief|.| The| par|rot| continues|,| \"|Yes|,| you| got| r|ippe|d off| big| time|!| I| can| talk| just| as| well| as| that| other| par|rot|,| an|d you| only| pai|d $|20| |for| me|!\"|"
|
|
]
|
|
}
|
|
],
|
|
"source": [
|
|
"from langchain_core.output_parsers import StrOutputParser\n",
|
|
"from langchain_core.prompts import ChatPromptTemplate\n",
|
|
"\n",
|
|
"prompt = ChatPromptTemplate.from_template(\"tell me a joke about {topic}\")\n",
|
|
"parser = StrOutputParser()\n",
|
|
"chain = prompt | model | parser\n",
|
|
"\n",
|
|
"async for chunk in chain.astream({\"topic\": \"parrot\"}):\n",
|
|
" print(chunk, end=\"|\", flush=True)"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "868bc412",
|
|
"metadata": {},
|
|
"source": [
|
|
"Note that we're getting streaming output even though we're using `parser` at the end of the chain above. The `parser` operates on each streaming chunk individidually. Many of the [LCEL primitives](/docs/how_to#langchain-expression-language-lcel) also support this kind of transform-style passthrough streaming, which can be very convenient when constructing apps. \n",
|
|
"\n",
|
|
"Custom functions can be [designed to return generators](/docs/how_to/functions#streaming), which are able to operate on streams.\n",
|
|
"\n",
|
|
"Certain runnables, like [prompt templates](/docs/how_to#prompt-templates) and [chat models](/docs/how_to#chat-models), cannot process individual chunks and instead aggregate all previous steps. Such runnables can interrupt the streaming process."
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "1b399fb4-5e3c-4581-9570-6df9b42b623d",
|
|
"metadata": {},
|
|
"source": [
|
|
":::{.callout-note}\n",
|
|
"The LangChain Expression language allows you to separate the construction of a chain from the mode in which it is used (e.g., sync/async, batch/streaming etc.). If this is not relevant to what you're building, you can also rely on a standard **imperative** programming approach by\n",
|
|
"caling `invoke`, `batch` or `stream` on each component individually, assigning the results to variables and then using them downstream as you see fit.\n",
|
|
"\n",
|
|
":::"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "dfff2701-8887-486f-8b3b-eb26383d4bb6",
|
|
"metadata": {},
|
|
"source": [
|
|
"### Working with Input Streams\n",
|
|
"\n",
|
|
"What if you wanted to stream JSON from the output as it was being generated?\n",
|
|
"\n",
|
|
"If you were to rely on `json.loads` to parse the partial json, the parsing would fail as the partial json wouldn't be valid json.\n",
|
|
"\n",
|
|
"You'd likely be at a complete loss of what to do and claim that it wasn't possible to stream JSON.\n",
|
|
"\n",
|
|
"Well, turns out there is a way to do it -- the parser needs to operate on the **input stream**, and attempt to \"auto-complete\" the partial json into a valid state.\n",
|
|
"\n",
|
|
"Let's see such a parser in action to understand what this means."
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 7,
|
|
"id": "5ff63cce-715a-4561-951f-9321c82e8d81",
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"{}\n",
|
|
"{'countries': []}\n",
|
|
"{'countries': [{}]}\n",
|
|
"{'countries': [{'name': ''}]}\n",
|
|
"{'countries': [{'name': 'France'}]}\n",
|
|
"{'countries': [{'name': 'France', 'population': 67}]}\n",
|
|
"{'countries': [{'name': 'France', 'population': 67413}]}\n",
|
|
"{'countries': [{'name': 'France', 'population': 67413000}]}\n",
|
|
"{'countries': [{'name': 'France', 'population': 67413000}, {}]}\n",
|
|
"{'countries': [{'name': 'France', 'population': 67413000}, {'name': ''}]}\n",
|
|
"{'countries': [{'name': 'France', 'population': 67413000}, {'name': 'Spain'}]}\n",
|
|
"{'countries': [{'name': 'France', 'population': 67413000}, {'name': 'Spain', 'population': 47}]}\n",
|
|
"{'countries': [{'name': 'France', 'population': 67413000}, {'name': 'Spain', 'population': 47351}]}\n",
|
|
"{'countries': [{'name': 'France', 'population': 67413000}, {'name': 'Spain', 'population': 47351567}]}\n",
|
|
"{'countries': [{'name': 'France', 'population': 67413000}, {'name': 'Spain', 'population': 47351567}, {}]}\n",
|
|
"{'countries': [{'name': 'France', 'population': 67413000}, {'name': 'Spain', 'population': 47351567}, {'name': ''}]}\n",
|
|
"{'countries': [{'name': 'France', 'population': 67413000}, {'name': 'Spain', 'population': 47351567}, {'name': 'Japan'}]}\n",
|
|
"{'countries': [{'name': 'France', 'population': 67413000}, {'name': 'Spain', 'population': 47351567}, {'name': 'Japan', 'population': 125}]}\n",
|
|
"{'countries': [{'name': 'France', 'population': 67413000}, {'name': 'Spain', 'population': 47351567}, {'name': 'Japan', 'population': 125584}]}\n",
|
|
"{'countries': [{'name': 'France', 'population': 67413000}, {'name': 'Spain', 'population': 47351567}, {'name': 'Japan', 'population': 125584000}]}\n"
|
|
]
|
|
}
|
|
],
|
|
"source": [
|
|
"from langchain_core.output_parsers import JsonOutputParser\n",
|
|
"\n",
|
|
"chain = (\n",
|
|
" model | JsonOutputParser()\n",
|
|
") # Due to a bug in older versions of Langchain, JsonOutputParser did not stream results from some models\n",
|
|
"async for text in chain.astream(\n",
|
|
" \"output a list of the countries france, spain and japan and their populations in JSON format. \"\n",
|
|
" 'Use a dict with an outer key of \"countries\" which contains a list of countries. '\n",
|
|
" \"Each country should have the key `name` and `population`\"\n",
|
|
"):\n",
|
|
" print(text, flush=True)"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "151d4323-a6cf-49be-8779-e8797c5e3b00",
|
|
"metadata": {},
|
|
"source": [
|
|
"Now, let's **break** streaming. We'll use the previous example and append an extraction function at the end that extracts the country names from the finalized JSON.\n",
|
|
"\n",
|
|
":::{.callout-warning}\n",
|
|
"Any steps in the chain that operate on **finalized inputs** rather than on **input streams** can break streaming functionality via `stream` or `astream`.\n",
|
|
":::\n",
|
|
"\n",
|
|
":::{.callout-tip}\n",
|
|
"Later, we will discuss the `astream_events` API which streams results from intermediate steps. This API will stream results from intermediate steps even if the chain contains steps that only operate on **finalized inputs**.\n",
|
|
":::"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 8,
|
|
"id": "d9c90117-9faa-4a01-b484-0db071808d1f",
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"['France', 'Spain', 'Japan']|"
|
|
]
|
|
}
|
|
],
|
|
"source": [
|
|
"from langchain_core.output_parsers import (\n",
|
|
" JsonOutputParser,\n",
|
|
")\n",
|
|
"\n",
|
|
"\n",
|
|
"# A function that operates on finalized inputs\n",
|
|
"# rather than on an input_stream\n",
|
|
"def _extract_country_names(inputs):\n",
|
|
" \"\"\"A function that does not operates on input streams and breaks streaming.\"\"\"\n",
|
|
" if not isinstance(inputs, dict):\n",
|
|
" return \"\"\n",
|
|
"\n",
|
|
" if \"countries\" not in inputs:\n",
|
|
" return \"\"\n",
|
|
"\n",
|
|
" countries = inputs[\"countries\"]\n",
|
|
"\n",
|
|
" if not isinstance(countries, list):\n",
|
|
" return \"\"\n",
|
|
"\n",
|
|
" country_names = [\n",
|
|
" country.get(\"name\") for country in countries if isinstance(country, dict)\n",
|
|
" ]\n",
|
|
" return country_names\n",
|
|
"\n",
|
|
"\n",
|
|
"chain = model | JsonOutputParser() | _extract_country_names\n",
|
|
"\n",
|
|
"async for text in chain.astream(\n",
|
|
" \"output a list of the countries france, spain and japan and their populations in JSON format. \"\n",
|
|
" 'Use a dict with an outer key of \"countries\" which contains a list of countries. '\n",
|
|
" \"Each country should have the key `name` and `population`\"\n",
|
|
"):\n",
|
|
" print(text, end=\"|\", flush=True)"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "cab6dca2-2027-414d-a196-2db6e3ebb8a5",
|
|
"metadata": {},
|
|
"source": [
|
|
"#### Generator Functions\n",
|
|
"\n",
|
|
"Let's fix the streaming using a generator function that can operate on the **input stream**.\n",
|
|
"\n",
|
|
":::{.callout-tip}\n",
|
|
"A generator function (a function that uses `yield`) allows writing code that operates on **input streams**\n",
|
|
":::"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 9,
|
|
"id": "15984b2b-315a-4119-945b-2a3dabea3082",
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"France|Spain|Japan|"
|
|
]
|
|
}
|
|
],
|
|
"source": [
|
|
"from langchain_core.output_parsers import JsonOutputParser\n",
|
|
"\n",
|
|
"\n",
|
|
"async def _extract_country_names_streaming(input_stream):\n",
|
|
" \"\"\"A function that operates on input streams.\"\"\"\n",
|
|
" country_names_so_far = set()\n",
|
|
"\n",
|
|
" async for input in input_stream:\n",
|
|
" if not isinstance(input, dict):\n",
|
|
" continue\n",
|
|
"\n",
|
|
" if \"countries\" not in input:\n",
|
|
" continue\n",
|
|
"\n",
|
|
" countries = input[\"countries\"]\n",
|
|
"\n",
|
|
" if not isinstance(countries, list):\n",
|
|
" continue\n",
|
|
"\n",
|
|
" for country in countries:\n",
|
|
" name = country.get(\"name\")\n",
|
|
" if not name:\n",
|
|
" continue\n",
|
|
" if name not in country_names_so_far:\n",
|
|
" yield name\n",
|
|
" country_names_so_far.add(name)\n",
|
|
"\n",
|
|
"\n",
|
|
"chain = model | JsonOutputParser() | _extract_country_names_streaming\n",
|
|
"\n",
|
|
"async for text in chain.astream(\n",
|
|
" \"output a list of the countries france, spain and japan and their populations in JSON format. \"\n",
|
|
" 'Use a dict with an outer key of \"countries\" which contains a list of countries. '\n",
|
|
" \"Each country should have the key `name` and `population`\",\n",
|
|
"):\n",
|
|
" print(text, end=\"|\", flush=True)"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "d59823f5-9b9a-43c5-a213-34644e2f1d3d",
|
|
"metadata": {},
|
|
"source": [
|
|
":::{.callout-note}\n",
|
|
"Because the code above is relying on JSON auto-completion, you may see partial names of countries (e.g., `Sp` and `Spain`), which is not what one would want for an extraction result!\n",
|
|
"\n",
|
|
"We're focusing on streaming concepts, not necessarily the results of the chains.\n",
|
|
":::"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "6adf65b7-aa47-4321-98c7-a0abe43b833a",
|
|
"metadata": {},
|
|
"source": [
|
|
"### Non-streaming components\n",
|
|
"\n",
|
|
"Some built-in components like Retrievers do not offer any `streaming`. What happens if we try to `stream` them? 🤨"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 10,
|
|
"id": "b9b1c00d-8b44-40d0-9e2b-8a70d238f82b",
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"data": {
|
|
"text/plain": [
|
|
"[[Document(page_content='harrison worked at kensho'),\n",
|
|
" Document(page_content='harrison likes spicy food')]]"
|
|
]
|
|
},
|
|
"execution_count": 10,
|
|
"metadata": {},
|
|
"output_type": "execute_result"
|
|
}
|
|
],
|
|
"source": [
|
|
"from langchain_community.vectorstores import FAISS\n",
|
|
"from langchain_core.output_parsers import StrOutputParser\n",
|
|
"from langchain_core.prompts import ChatPromptTemplate\n",
|
|
"from langchain_core.runnables import RunnablePassthrough\n",
|
|
"from langchain_openai import OpenAIEmbeddings\n",
|
|
"\n",
|
|
"template = \"\"\"Answer the question based only on the following context:\n",
|
|
"{context}\n",
|
|
"\n",
|
|
"Question: {question}\n",
|
|
"\"\"\"\n",
|
|
"prompt = ChatPromptTemplate.from_template(template)\n",
|
|
"\n",
|
|
"vectorstore = FAISS.from_texts(\n",
|
|
" [\"harrison worked at kensho\", \"harrison likes spicy food\"],\n",
|
|
" embedding=OpenAIEmbeddings(),\n",
|
|
")\n",
|
|
"retriever = vectorstore.as_retriever()\n",
|
|
"\n",
|
|
"chunks = [chunk for chunk in retriever.stream(\"where did harrison work?\")]\n",
|
|
"chunks"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "6fd3e71b-439e-418f-8a8a-5232fba3d9fd",
|
|
"metadata": {},
|
|
"source": [
|
|
"Stream just yielded the final result from that component.\n",
|
|
"\n",
|
|
"This is OK 🥹! Not all components have to implement streaming -- in some cases streaming is either unnecessary, difficult or just doesn't make sense.\n",
|
|
"\n",
|
|
":::{.callout-tip}\n",
|
|
"An LCEL chain constructed using non-streaming components, will still be able to stream in a lot of cases, with streaming of partial output starting after the last non-streaming step in the chain.\n",
|
|
":::"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 11,
|
|
"id": "957447e6-1e60-41ef-8c10-2654bd9e738d",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"retrieval_chain = (\n",
|
|
" {\n",
|
|
" \"context\": retriever.with_config(run_name=\"Docs\"),\n",
|
|
" \"question\": RunnablePassthrough(),\n",
|
|
" }\n",
|
|
" | prompt\n",
|
|
" | model\n",
|
|
" | StrOutputParser()\n",
|
|
")"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 12,
|
|
"id": "94e50b5d-bf51-4eee-9da0-ee40dd9ce42b",
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"Base|d on| the| given| context|,| Harrison| worke|d at| K|ens|ho|.|\n",
|
|
"\n",
|
|
"Here| are| |3| |made| up| sentences| about| this| place|:|\n",
|
|
"\n",
|
|
"1|.| K|ens|ho| was| a| cutting|-|edge| technology| company| known| for| its| innovative| solutions| in| artificial| intelligence| an|d data| analytics|.|\n",
|
|
"\n",
|
|
"2|.| The| modern| office| space| at| K|ens|ho| feature|d open| floor| plans|,| collaborative| work|sp|aces|,| an|d a| vib|rant| atmosphere| that| fos|tere|d creativity| an|d team|work|.|\n",
|
|
"\n",
|
|
"3|.| With| its| prime| location| in| the| heart| of| the| city|,| K|ens|ho| attracte|d top| talent| from| aroun|d the| worl|d,| creating| a| diverse| an|d dynamic| work| environment|.|"
|
|
]
|
|
}
|
|
],
|
|
"source": [
|
|
"for chunk in retrieval_chain.stream(\n",
|
|
" \"Where did harrison work? \" \"Write 3 made up sentences about this place.\"\n",
|
|
"):\n",
|
|
" print(chunk, end=\"|\", flush=True)"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "8657aa4e-3469-4b5b-a09c-60b53a23b1e7",
|
|
"metadata": {},
|
|
"source": [
|
|
"Now that we've seen how `stream` and `astream` work, let's venture into the world of streaming events. 🏞️"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "baceb5c0-d4a4-4b98-8733-80ae4407b62d",
|
|
"metadata": {},
|
|
"source": [
|
|
"## Using Stream Events\n",
|
|
"\n",
|
|
"Event Streaming is a **beta** API. This API may change a bit based on feedback.\n",
|
|
"\n",
|
|
":::{.callout-note}\n",
|
|
"\n",
|
|
"This guide demonstrates the `V2` API and requires langchain-core >= 0.2. For the `V1` API compatible with older versions of LangChain, see [here](https://python.langchain.com/v0.1/docs/expression_language/streaming/#using-stream-events).\n",
|
|
":::"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"id": "61348df9-ec58-401e-be89-68a70042f88e",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"import langchain_core\n",
|
|
"\n",
|
|
"langchain_core.__version__"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "52e9e983-bbde-4906-9eca-4ccc06eabd91",
|
|
"metadata": {},
|
|
"source": [
|
|
"For the `astream_events` API to work properly:\n",
|
|
"\n",
|
|
"* Use `async` throughout the code to the extent possible (e.g., async tools etc)\n",
|
|
"* Propagate callbacks if defining custom functions / runnables\n",
|
|
"* Whenever using runnables without LCEL, make sure to call `.astream()` on LLMs rather than `.ainvoke` to force the LLM to stream tokens.\n",
|
|
"* Let us know if anything doesn't work as expected! :)\n",
|
|
"\n",
|
|
"### Event Reference\n",
|
|
"\n",
|
|
"Below is a reference table that shows some events that might be emitted by the various Runnable objects.\n",
|
|
"\n",
|
|
"\n",
|
|
":::{.callout-note}\n",
|
|
"When streaming is implemented properly, the inputs to a runnable will not be known until after the input stream has been entirely consumed. This means that `inputs` will often be included only for `end` events and rather than for `start` events.\n",
|
|
":::\n",
|
|
"\n",
|
|
"| event | name | chunk | input | output |\n",
|
|
"|----------------------|------------------|---------------------------------|-----------------------------------------------|-------------------------------------------------|\n",
|
|
"| on_chat_model_start | [model name] | | {\"messages\": [[SystemMessage, HumanMessage]]} | |\n",
|
|
"| on_chat_model_stream | [model name] | AIMessageChunk(content=\"hello\") | | |\n",
|
|
"| on_chat_model_end | [model name] | | {\"messages\": [[SystemMessage, HumanMessage]]} | AIMessageChunk(content=\"hello world\") |\n",
|
|
"| on_llm_start | [model name] | | {'input': 'hello'} | |\n",
|
|
"| on_llm_stream | [model name] | 'Hello' | | |\n",
|
|
"| on_llm_end | [model name] | | 'Hello human!' | |\n",
|
|
"| on_chain_start | format_docs | | | |\n",
|
|
"| on_chain_stream | format_docs | \"hello world!, goodbye world!\" | | |\n",
|
|
"| on_chain_end | format_docs | | [Document(...)] | \"hello world!, goodbye world!\" |\n",
|
|
"| on_tool_start | some_tool | | {\"x\": 1, \"y\": \"2\"} | |\n",
|
|
"| on_tool_end | some_tool | | | {\"x\": 1, \"y\": \"2\"} |\n",
|
|
"| on_retriever_start | [retriever name] | | {\"query\": \"hello\"} | |\n",
|
|
"| on_retriever_end | [retriever name] | | {\"query\": \"hello\"} | [Document(...), ..] |\n",
|
|
"| on_prompt_start | [template_name] | | {\"question\": \"hello\"} | |\n",
|
|
"| on_prompt_end | [template_name] | | {\"question\": \"hello\"} | ChatPromptValue(messages: [SystemMessage, ...]) |"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "1f6ec135-3348-4041-8f55-bf3e59b3b2d0",
|
|
"metadata": {},
|
|
"source": [
|
|
"### Chat Model\n",
|
|
"\n",
|
|
"Let's start off by looking at the events produced by a chat model."
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 14,
|
|
"id": "c00df46e-7f6b-4e06-8abf-801898c8d57f",
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"name": "stderr",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"/home/eugene/src/langchain/libs/core/langchain_core/_api/beta_decorator.py:87: LangChainBetaWarning: This API is in beta and may change in the future.\n",
|
|
" warn_beta(\n"
|
|
]
|
|
}
|
|
],
|
|
"source": [
|
|
"events = []\n",
|
|
"async for event in model.astream_events(\"hello\", version=\"v2\"):\n",
|
|
" events.append(event)"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "32972939-2995-4b2e-84db-045adb044fad",
|
|
"metadata": {},
|
|
"source": [
|
|
":::{.callout-note}\n",
|
|
"\n",
|
|
"Hey what's that funny version=\"v2\" parameter in the API?! 😾\n",
|
|
"\n",
|
|
"This is a **beta API**, and we're almost certainly going to make some changes to it (in fact, we already have!)\n",
|
|
"\n",
|
|
"This version parameter will allow us to minimize such breaking changes to your code. \n",
|
|
"\n",
|
|
"In short, we are annoying you now, so we don't have to annoy you later.\n",
|
|
"\n",
|
|
"`v2` is only available for langchain-core>=0.2.0.\n",
|
|
"\n",
|
|
":::"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "ad2b8f47-da78-4569-a49a-53a8efaa26bc",
|
|
"metadata": {},
|
|
"source": [
|
|
"Let's take a look at the few of the start event and a few of the end events."
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 15,
|
|
"id": "ce31b525-f47d-4828-85a7-912ce9f2e79b",
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"data": {
|
|
"text/plain": [
|
|
"[{'event': 'on_chat_model_start',\n",
|
|
" 'data': {'input': 'hello'},\n",
|
|
" 'name': 'ChatAnthropic',\n",
|
|
" 'tags': [],\n",
|
|
" 'run_id': 'a81e4c0f-fc36-4d33-93bc-1ac25b9bb2c3',\n",
|
|
" 'metadata': {}},\n",
|
|
" {'event': 'on_chat_model_stream',\n",
|
|
" 'data': {'chunk': AIMessageChunk(content='Hello', id='run-a81e4c0f-fc36-4d33-93bc-1ac25b9bb2c3')},\n",
|
|
" 'run_id': 'a81e4c0f-fc36-4d33-93bc-1ac25b9bb2c3',\n",
|
|
" 'name': 'ChatAnthropic',\n",
|
|
" 'tags': [],\n",
|
|
" 'metadata': {}},\n",
|
|
" {'event': 'on_chat_model_stream',\n",
|
|
" 'data': {'chunk': AIMessageChunk(content='!', id='run-a81e4c0f-fc36-4d33-93bc-1ac25b9bb2c3')},\n",
|
|
" 'run_id': 'a81e4c0f-fc36-4d33-93bc-1ac25b9bb2c3',\n",
|
|
" 'name': 'ChatAnthropic',\n",
|
|
" 'tags': [],\n",
|
|
" 'metadata': {}}]"
|
|
]
|
|
},
|
|
"execution_count": 15,
|
|
"metadata": {},
|
|
"output_type": "execute_result"
|
|
}
|
|
],
|
|
"source": [
|
|
"events[:3]"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 16,
|
|
"id": "76cfe826-ee63-4310-ad48-55a95eb3b9d6",
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"data": {
|
|
"text/plain": [
|
|
"[{'event': 'on_chat_model_stream',\n",
|
|
" 'data': {'chunk': AIMessageChunk(content='?', id='run-a81e4c0f-fc36-4d33-93bc-1ac25b9bb2c3')},\n",
|
|
" 'run_id': 'a81e4c0f-fc36-4d33-93bc-1ac25b9bb2c3',\n",
|
|
" 'name': 'ChatAnthropic',\n",
|
|
" 'tags': [],\n",
|
|
" 'metadata': {}},\n",
|
|
" {'event': 'on_chat_model_end',\n",
|
|
" 'data': {'output': AIMessageChunk(content='Hello! How can I assist you today?', id='run-a81e4c0f-fc36-4d33-93bc-1ac25b9bb2c3')},\n",
|
|
" 'run_id': 'a81e4c0f-fc36-4d33-93bc-1ac25b9bb2c3',\n",
|
|
" 'name': 'ChatAnthropic',\n",
|
|
" 'tags': [],\n",
|
|
" 'metadata': {}}]"
|
|
]
|
|
},
|
|
"execution_count": 16,
|
|
"metadata": {},
|
|
"output_type": "execute_result"
|
|
}
|
|
],
|
|
"source": [
|
|
"events[-2:]"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "98c8f173-e9c7-4c27-81a5-b7c85c12714d",
|
|
"metadata": {},
|
|
"source": [
|
|
"### Chain\n",
|
|
"\n",
|
|
"Let's revisit the example chain that parsed streaming JSON to explore the streaming events API."
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 17,
|
|
"id": "4328c56c-a303-427b-b1f2-f354e9af555c",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"chain = (\n",
|
|
" model | JsonOutputParser()\n",
|
|
") # Due to a bug in older versions of Langchain, JsonOutputParser did not stream results from some models\n",
|
|
"\n",
|
|
"events = [\n",
|
|
" event\n",
|
|
" async for event in chain.astream_events(\n",
|
|
" \"output a list of the countries france, spain and japan and their populations in JSON format. \"\n",
|
|
" 'Use a dict with an outer key of \"countries\" which contains a list of countries. '\n",
|
|
" \"Each country should have the key `name` and `population`\",\n",
|
|
" version=\"v2\",\n",
|
|
" )\n",
|
|
"]"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "4cc00b99-a961-4221-a3c7-9d807114bbfb",
|
|
"metadata": {},
|
|
"source": [
|
|
"If you examine at the first few events, you'll notice that there are **3** different start events rather than **2** start events.\n",
|
|
"\n",
|
|
"The three start events correspond to:\n",
|
|
"\n",
|
|
"1. The chain (model + parser)\n",
|
|
"2. The model\n",
|
|
"3. The parser"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 18,
|
|
"id": "8e66ea3d-a450-436a-aaac-d9478abc6c28",
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"data": {
|
|
"text/plain": [
|
|
"[{'event': 'on_chain_start',\n",
|
|
" 'data': {'input': 'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of \"countries\" which contains a list of countries. Each country should have the key `name` and `population`'},\n",
|
|
" 'name': 'RunnableSequence',\n",
|
|
" 'tags': [],\n",
|
|
" 'run_id': '4765006b-16e2-4b1d-a523-edd9fd64cb92',\n",
|
|
" 'metadata': {}},\n",
|
|
" {'event': 'on_chat_model_start',\n",
|
|
" 'data': {'input': {'messages': [[HumanMessage(content='output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of \"countries\" which contains a list of countries. Each country should have the key `name` and `population`')]]}},\n",
|
|
" 'name': 'ChatAnthropic',\n",
|
|
" 'tags': ['seq:step:1'],\n",
|
|
" 'run_id': '0320c234-7b52-4a14-ae4e-5f100949e589',\n",
|
|
" 'metadata': {}},\n",
|
|
" {'event': 'on_chat_model_stream',\n",
|
|
" 'data': {'chunk': AIMessageChunk(content='{', id='run-0320c234-7b52-4a14-ae4e-5f100949e589')},\n",
|
|
" 'run_id': '0320c234-7b52-4a14-ae4e-5f100949e589',\n",
|
|
" 'name': 'ChatAnthropic',\n",
|
|
" 'tags': ['seq:step:1'],\n",
|
|
" 'metadata': {}}]"
|
|
]
|
|
},
|
|
"execution_count": 18,
|
|
"metadata": {},
|
|
"output_type": "execute_result"
|
|
}
|
|
],
|
|
"source": [
|
|
"events[:3]"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "c8512238-d035-4acd-9248-a8570da064c9",
|
|
"metadata": {},
|
|
"source": [
|
|
"What do you think you'd see if you looked at the last 3 events? what about the middle?"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "c742cfa4-9b03-4a5b-96d9-5fe56e95e3b4",
|
|
"metadata": {},
|
|
"source": [
|
|
"Let's use this API to take output the stream events from the model and the parser. We're ignoring start events, end events and events from the chain."
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 19,
|
|
"id": "630c71d6-8d94-4ce0-a78a-f20e90f628df",
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"Chat model chunk: '{'\n",
|
|
"Parser chunk: {}\n",
|
|
"Chat model chunk: '\\n '\n",
|
|
"Chat model chunk: '\"'\n",
|
|
"Chat model chunk: 'countries'\n",
|
|
"Chat model chunk: '\":'\n",
|
|
"Chat model chunk: ' ['\n",
|
|
"Parser chunk: {'countries': []}\n",
|
|
"Chat model chunk: '\\n '\n",
|
|
"Chat model chunk: '{'\n",
|
|
"Parser chunk: {'countries': [{}]}\n",
|
|
"Chat model chunk: '\\n '\n",
|
|
"Chat model chunk: '\"'\n",
|
|
"Chat model chunk: 'name'\n",
|
|
"Chat model chunk: '\":'\n",
|
|
"Chat model chunk: ' \"'\n",
|
|
"Parser chunk: {'countries': [{'name': ''}]}\n",
|
|
"Chat model chunk: 'France'\n",
|
|
"Parser chunk: {'countries': [{'name': 'France'}]}\n",
|
|
"Chat model chunk: '\",'\n",
|
|
"Chat model chunk: '\\n '\n",
|
|
"Chat model chunk: '\"'\n",
|
|
"Chat model chunk: 'population'\n",
|
|
"...\n"
|
|
]
|
|
}
|
|
],
|
|
"source": [
|
|
"num_events = 0\n",
|
|
"\n",
|
|
"async for event in chain.astream_events(\n",
|
|
" \"output a list of the countries france, spain and japan and their populations in JSON format. \"\n",
|
|
" 'Use a dict with an outer key of \"countries\" which contains a list of countries. '\n",
|
|
" \"Each country should have the key `name` and `population`\",\n",
|
|
" version=\"v2\",\n",
|
|
"):\n",
|
|
" kind = event[\"event\"]\n",
|
|
" if kind == \"on_chat_model_stream\":\n",
|
|
" print(\n",
|
|
" f\"Chat model chunk: {repr(event['data']['chunk'].content)}\",\n",
|
|
" flush=True,\n",
|
|
" )\n",
|
|
" if kind == \"on_parser_stream\":\n",
|
|
" print(f\"Parser chunk: {event['data']['chunk']}\", flush=True)\n",
|
|
" num_events += 1\n",
|
|
" if num_events > 30:\n",
|
|
" # Truncate the output\n",
|
|
" print(\"...\")\n",
|
|
" break"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "798ea891-997c-454c-bf60-43124f40ee1b",
|
|
"metadata": {},
|
|
"source": [
|
|
"Because both the model and the parser support streaming, we see streaming events from both components in real time! Kind of cool isn't it? 🦜"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "5084148b-bcdc-4373-9caa-6568f03e7b23",
|
|
"metadata": {},
|
|
"source": [
|
|
"### Filtering Events\n",
|
|
"\n",
|
|
"Because this API produces so many events, it is useful to be able to filter on events.\n",
|
|
"\n",
|
|
"You can filter by either component `name`, component `tags` or component `type`.\n",
|
|
"\n",
|
|
"#### By Name"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 20,
|
|
"id": "4f0b581b-be63-4663-baba-c6d2b625cdf9",
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"{'event': 'on_parser_start', 'data': {'input': 'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of \"countries\" which contains a list of countries. Each country should have the key `name` and `population`'}, 'name': 'my_parser', 'tags': ['seq:step:2'], 'run_id': 'e058d750-f2c2-40f6-aa61-10f84cd671a9', 'metadata': {}}\n",
|
|
"{'event': 'on_parser_stream', 'data': {'chunk': {}}, 'run_id': 'e058d750-f2c2-40f6-aa61-10f84cd671a9', 'name': 'my_parser', 'tags': ['seq:step:2'], 'metadata': {}}\n",
|
|
"{'event': 'on_parser_stream', 'data': {'chunk': {'countries': []}}, 'run_id': 'e058d750-f2c2-40f6-aa61-10f84cd671a9', 'name': 'my_parser', 'tags': ['seq:step:2'], 'metadata': {}}\n",
|
|
"{'event': 'on_parser_stream', 'data': {'chunk': {'countries': [{}]}}, 'run_id': 'e058d750-f2c2-40f6-aa61-10f84cd671a9', 'name': 'my_parser', 'tags': ['seq:step:2'], 'metadata': {}}\n",
|
|
"{'event': 'on_parser_stream', 'data': {'chunk': {'countries': [{'name': ''}]}}, 'run_id': 'e058d750-f2c2-40f6-aa61-10f84cd671a9', 'name': 'my_parser', 'tags': ['seq:step:2'], 'metadata': {}}\n",
|
|
"{'event': 'on_parser_stream', 'data': {'chunk': {'countries': [{'name': 'France'}]}}, 'run_id': 'e058d750-f2c2-40f6-aa61-10f84cd671a9', 'name': 'my_parser', 'tags': ['seq:step:2'], 'metadata': {}}\n",
|
|
"{'event': 'on_parser_stream', 'data': {'chunk': {'countries': [{'name': 'France', 'population': 67}]}}, 'run_id': 'e058d750-f2c2-40f6-aa61-10f84cd671a9', 'name': 'my_parser', 'tags': ['seq:step:2'], 'metadata': {}}\n",
|
|
"{'event': 'on_parser_stream', 'data': {'chunk': {'countries': [{'name': 'France', 'population': 67413}]}}, 'run_id': 'e058d750-f2c2-40f6-aa61-10f84cd671a9', 'name': 'my_parser', 'tags': ['seq:step:2'], 'metadata': {}}\n",
|
|
"{'event': 'on_parser_stream', 'data': {'chunk': {'countries': [{'name': 'France', 'population': 67413000}]}}, 'run_id': 'e058d750-f2c2-40f6-aa61-10f84cd671a9', 'name': 'my_parser', 'tags': ['seq:step:2'], 'metadata': {}}\n",
|
|
"{'event': 'on_parser_stream', 'data': {'chunk': {'countries': [{'name': 'France', 'population': 67413000}, {}]}}, 'run_id': 'e058d750-f2c2-40f6-aa61-10f84cd671a9', 'name': 'my_parser', 'tags': ['seq:step:2'], 'metadata': {}}\n",
|
|
"{'event': 'on_parser_stream', 'data': {'chunk': {'countries': [{'name': 'France', 'population': 67413000}, {'name': ''}]}}, 'run_id': 'e058d750-f2c2-40f6-aa61-10f84cd671a9', 'name': 'my_parser', 'tags': ['seq:step:2'], 'metadata': {}}\n",
|
|
"...\n"
|
|
]
|
|
}
|
|
],
|
|
"source": [
|
|
"chain = model.with_config({\"run_name\": \"model\"}) | JsonOutputParser().with_config(\n",
|
|
" {\"run_name\": \"my_parser\"}\n",
|
|
")\n",
|
|
"\n",
|
|
"max_events = 0\n",
|
|
"async for event in chain.astream_events(\n",
|
|
" \"output a list of the countries france, spain and japan and their populations in JSON format. \"\n",
|
|
" 'Use a dict with an outer key of \"countries\" which contains a list of countries. '\n",
|
|
" \"Each country should have the key `name` and `population`\",\n",
|
|
" version=\"v2\",\n",
|
|
" include_names=[\"my_parser\"],\n",
|
|
"):\n",
|
|
" print(event)\n",
|
|
" max_events += 1\n",
|
|
" if max_events > 10:\n",
|
|
" # Truncate output\n",
|
|
" print(\"...\")\n",
|
|
" break"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "c59d5626-7dba-4eb3-ad81-76c1092c5146",
|
|
"metadata": {},
|
|
"source": [
|
|
"#### By Type"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 21,
|
|
"id": "096cd904-72f0-4ebe-a8b7-d0e730faea7f",
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"{'event': 'on_chat_model_start', 'data': {'input': 'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of \"countries\" which contains a list of countries. Each country should have the key `name` and `population`'}, 'name': 'model', 'tags': ['seq:step:1'], 'run_id': 'db246792-2a91-4eb3-a14b-29658947065d', 'metadata': {}}\n",
|
|
"{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='{', id='run-db246792-2a91-4eb3-a14b-29658947065d')}, 'run_id': 'db246792-2a91-4eb3-a14b-29658947065d', 'name': 'model', 'tags': ['seq:step:1'], 'metadata': {}}\n",
|
|
"{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='\\n ', id='run-db246792-2a91-4eb3-a14b-29658947065d')}, 'run_id': 'db246792-2a91-4eb3-a14b-29658947065d', 'name': 'model', 'tags': ['seq:step:1'], 'metadata': {}}\n",
|
|
"{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='\"', id='run-db246792-2a91-4eb3-a14b-29658947065d')}, 'run_id': 'db246792-2a91-4eb3-a14b-29658947065d', 'name': 'model', 'tags': ['seq:step:1'], 'metadata': {}}\n",
|
|
"{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='countries', id='run-db246792-2a91-4eb3-a14b-29658947065d')}, 'run_id': 'db246792-2a91-4eb3-a14b-29658947065d', 'name': 'model', 'tags': ['seq:step:1'], 'metadata': {}}\n",
|
|
"{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='\":', id='run-db246792-2a91-4eb3-a14b-29658947065d')}, 'run_id': 'db246792-2a91-4eb3-a14b-29658947065d', 'name': 'model', 'tags': ['seq:step:1'], 'metadata': {}}\n",
|
|
"{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content=' [', id='run-db246792-2a91-4eb3-a14b-29658947065d')}, 'run_id': 'db246792-2a91-4eb3-a14b-29658947065d', 'name': 'model', 'tags': ['seq:step:1'], 'metadata': {}}\n",
|
|
"{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='\\n ', id='run-db246792-2a91-4eb3-a14b-29658947065d')}, 'run_id': 'db246792-2a91-4eb3-a14b-29658947065d', 'name': 'model', 'tags': ['seq:step:1'], 'metadata': {}}\n",
|
|
"{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='{', id='run-db246792-2a91-4eb3-a14b-29658947065d')}, 'run_id': 'db246792-2a91-4eb3-a14b-29658947065d', 'name': 'model', 'tags': ['seq:step:1'], 'metadata': {}}\n",
|
|
"{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='\\n ', id='run-db246792-2a91-4eb3-a14b-29658947065d')}, 'run_id': 'db246792-2a91-4eb3-a14b-29658947065d', 'name': 'model', 'tags': ['seq:step:1'], 'metadata': {}}\n",
|
|
"{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='\"', id='run-db246792-2a91-4eb3-a14b-29658947065d')}, 'run_id': 'db246792-2a91-4eb3-a14b-29658947065d', 'name': 'model', 'tags': ['seq:step:1'], 'metadata': {}}\n",
|
|
"...\n"
|
|
]
|
|
}
|
|
],
|
|
"source": [
|
|
"chain = model.with_config({\"run_name\": \"model\"}) | JsonOutputParser().with_config(\n",
|
|
" {\"run_name\": \"my_parser\"}\n",
|
|
")\n",
|
|
"\n",
|
|
"max_events = 0\n",
|
|
"async for event in chain.astream_events(\n",
|
|
" 'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of \"countries\" which contains a list of countries. Each country should have the key `name` and `population`',\n",
|
|
" version=\"v2\",\n",
|
|
" include_types=[\"chat_model\"],\n",
|
|
"):\n",
|
|
" print(event)\n",
|
|
" max_events += 1\n",
|
|
" if max_events > 10:\n",
|
|
" # Truncate output\n",
|
|
" print(\"...\")\n",
|
|
" break"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "f1ec8dd4-9b5b-4000-b63f-5845bfc5a065",
|
|
"metadata": {},
|
|
"source": [
|
|
"#### By Tags\n",
|
|
"\n",
|
|
":::{.callout-caution}\n",
|
|
"\n",
|
|
"Tags are inherited by child components of a given runnable. \n",
|
|
"\n",
|
|
"If you're using tags to filter, make sure that this is what you want.\n",
|
|
":::"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 22,
|
|
"id": "26bac0d2-76d9-446e-b346-82790236b88d",
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"{'event': 'on_chain_start', 'data': {'input': 'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of \"countries\" which contains a list of countries. Each country should have the key `name` and `population`'}, 'name': 'RunnableSequence', 'tags': ['my_chain'], 'run_id': 'fd68dd64-7a4d-4bdb-a0c2-ee592db0d024', 'metadata': {}}\n",
|
|
"{'event': 'on_chat_model_start', 'data': {'input': {'messages': [[HumanMessage(content='output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of \"countries\" which contains a list of countries. Each country should have the key `name` and `population`')]]}}, 'name': 'ChatAnthropic', 'tags': ['seq:step:1', 'my_chain'], 'run_id': 'efd3c8af-4be5-4f6c-9327-e3f9865dd1cd', 'metadata': {}}\n",
|
|
"{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='{', id='run-efd3c8af-4be5-4f6c-9327-e3f9865dd1cd')}, 'run_id': 'efd3c8af-4be5-4f6c-9327-e3f9865dd1cd', 'name': 'ChatAnthropic', 'tags': ['seq:step:1', 'my_chain'], 'metadata': {}}\n",
|
|
"{'event': 'on_parser_start', 'data': {}, 'name': 'JsonOutputParser', 'tags': ['seq:step:2', 'my_chain'], 'run_id': 'afde30b9-beac-4b36-b4c7-dbbe423ddcdb', 'metadata': {}}\n",
|
|
"{'event': 'on_parser_stream', 'data': {'chunk': {}}, 'run_id': 'afde30b9-beac-4b36-b4c7-dbbe423ddcdb', 'name': 'JsonOutputParser', 'tags': ['seq:step:2', 'my_chain'], 'metadata': {}}\n",
|
|
"{'event': 'on_chain_stream', 'data': {'chunk': {}}, 'run_id': 'fd68dd64-7a4d-4bdb-a0c2-ee592db0d024', 'name': 'RunnableSequence', 'tags': ['my_chain'], 'metadata': {}}\n",
|
|
"{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='\\n ', id='run-efd3c8af-4be5-4f6c-9327-e3f9865dd1cd')}, 'run_id': 'efd3c8af-4be5-4f6c-9327-e3f9865dd1cd', 'name': 'ChatAnthropic', 'tags': ['seq:step:1', 'my_chain'], 'metadata': {}}\n",
|
|
"{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='\"', id='run-efd3c8af-4be5-4f6c-9327-e3f9865dd1cd')}, 'run_id': 'efd3c8af-4be5-4f6c-9327-e3f9865dd1cd', 'name': 'ChatAnthropic', 'tags': ['seq:step:1', 'my_chain'], 'metadata': {}}\n",
|
|
"{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='countries', id='run-efd3c8af-4be5-4f6c-9327-e3f9865dd1cd')}, 'run_id': 'efd3c8af-4be5-4f6c-9327-e3f9865dd1cd', 'name': 'ChatAnthropic', 'tags': ['seq:step:1', 'my_chain'], 'metadata': {}}\n",
|
|
"{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='\":', id='run-efd3c8af-4be5-4f6c-9327-e3f9865dd1cd')}, 'run_id': 'efd3c8af-4be5-4f6c-9327-e3f9865dd1cd', 'name': 'ChatAnthropic', 'tags': ['seq:step:1', 'my_chain'], 'metadata': {}}\n",
|
|
"{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content=' [', id='run-efd3c8af-4be5-4f6c-9327-e3f9865dd1cd')}, 'run_id': 'efd3c8af-4be5-4f6c-9327-e3f9865dd1cd', 'name': 'ChatAnthropic', 'tags': ['seq:step:1', 'my_chain'], 'metadata': {}}\n",
|
|
"...\n"
|
|
]
|
|
}
|
|
],
|
|
"source": [
|
|
"chain = (model | JsonOutputParser()).with_config({\"tags\": [\"my_chain\"]})\n",
|
|
"\n",
|
|
"max_events = 0\n",
|
|
"async for event in chain.astream_events(\n",
|
|
" 'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of \"countries\" which contains a list of countries. Each country should have the key `name` and `population`',\n",
|
|
" version=\"v2\",\n",
|
|
" include_tags=[\"my_chain\"],\n",
|
|
"):\n",
|
|
" print(event)\n",
|
|
" max_events += 1\n",
|
|
" if max_events > 10:\n",
|
|
" # Truncate output\n",
|
|
" print(\"...\")\n",
|
|
" break"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "e05e54c4-61a2-4f6c-aa68-d2b09b5e1d4f",
|
|
"metadata": {},
|
|
"source": [
|
|
"### Non-streaming components\n",
|
|
"\n",
|
|
"Remember how some components don't stream well because they don't operate on **input streams**?\n",
|
|
"\n",
|
|
"While such components can break streaming of the final output when using `astream`, `astream_events` will still yield streaming events from intermediate steps that support streaming!"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 23,
|
|
"id": "0e6451d3-3b11-4a71-ae19-998f4c10180f",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"# Function that does not support streaming.\n",
|
|
"# It operates on the finalizes inputs rather than\n",
|
|
"# operating on the input stream.\n",
|
|
"def _extract_country_names(inputs):\n",
|
|
" \"\"\"A function that does not operates on input streams and breaks streaming.\"\"\"\n",
|
|
" if not isinstance(inputs, dict):\n",
|
|
" return \"\"\n",
|
|
"\n",
|
|
" if \"countries\" not in inputs:\n",
|
|
" return \"\"\n",
|
|
"\n",
|
|
" countries = inputs[\"countries\"]\n",
|
|
"\n",
|
|
" if not isinstance(countries, list):\n",
|
|
" return \"\"\n",
|
|
"\n",
|
|
" country_names = [\n",
|
|
" country.get(\"name\") for country in countries if isinstance(country, dict)\n",
|
|
" ]\n",
|
|
" return country_names\n",
|
|
"\n",
|
|
"\n",
|
|
"chain = (\n",
|
|
" model | JsonOutputParser() | _extract_country_names\n",
|
|
") # This parser only works with OpenAI right now"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "a972e1a6-80cd-4d59-90a0-73563f1503d4",
|
|
"metadata": {},
|
|
"source": [
|
|
"As expected, the `astream` API doesn't work correctly because `_extract_country_names` doesn't operate on streams."
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 24,
|
|
"id": "f9a8fe35-faab-4970-b8c0-5c780845d98a",
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"['France', 'Spain', 'Japan']\n"
|
|
]
|
|
}
|
|
],
|
|
"source": [
|
|
"async for chunk in chain.astream(\n",
|
|
" \"output a list of the countries france, spain and japan and their populations in JSON format. \"\n",
|
|
" 'Use a dict with an outer key of \"countries\" which contains a list of countries. '\n",
|
|
" \"Each country should have the key `name` and `population`\",\n",
|
|
"):\n",
|
|
" print(chunk, flush=True)"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "b279ea33-54f1-400a-acb1-b8445ccbf1fa",
|
|
"metadata": {},
|
|
"source": [
|
|
"Now, let's confirm that with astream_events we're still seeing streaming output from the model and the parser."
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 25,
|
|
"id": "b08215cd-bffa-4e76-aaf3-c52ee34f152c",
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"Chat model chunk: '{'\n",
|
|
"Parser chunk: {}\n",
|
|
"Chat model chunk: '\\n '\n",
|
|
"Chat model chunk: '\"'\n",
|
|
"Chat model chunk: 'countries'\n",
|
|
"Chat model chunk: '\":'\n",
|
|
"Chat model chunk: ' ['\n",
|
|
"Parser chunk: {'countries': []}\n",
|
|
"Chat model chunk: '\\n '\n",
|
|
"Chat model chunk: '{'\n",
|
|
"Parser chunk: {'countries': [{}]}\n",
|
|
"Chat model chunk: '\\n '\n",
|
|
"Chat model chunk: '\"'\n",
|
|
"Chat model chunk: 'name'\n",
|
|
"Chat model chunk: '\":'\n",
|
|
"Chat model chunk: ' \"'\n",
|
|
"Parser chunk: {'countries': [{'name': ''}]}\n",
|
|
"Chat model chunk: 'France'\n",
|
|
"Parser chunk: {'countries': [{'name': 'France'}]}\n",
|
|
"Chat model chunk: '\",'\n",
|
|
"Chat model chunk: '\\n '\n",
|
|
"Chat model chunk: '\"'\n",
|
|
"Chat model chunk: 'population'\n",
|
|
"Chat model chunk: '\":'\n",
|
|
"Chat model chunk: ' '\n",
|
|
"Chat model chunk: '67'\n",
|
|
"Parser chunk: {'countries': [{'name': 'France', 'population': 67}]}\n",
|
|
"...\n"
|
|
]
|
|
}
|
|
],
|
|
"source": [
|
|
"num_events = 0\n",
|
|
"\n",
|
|
"async for event in chain.astream_events(\n",
|
|
" \"output a list of the countries france, spain and japan and their populations in JSON format. \"\n",
|
|
" 'Use a dict with an outer key of \"countries\" which contains a list of countries. '\n",
|
|
" \"Each country should have the key `name` and `population`\",\n",
|
|
" version=\"v2\",\n",
|
|
"):\n",
|
|
" kind = event[\"event\"]\n",
|
|
" if kind == \"on_chat_model_stream\":\n",
|
|
" print(\n",
|
|
" f\"Chat model chunk: {repr(event['data']['chunk'].content)}\",\n",
|
|
" flush=True,\n",
|
|
" )\n",
|
|
" if kind == \"on_parser_stream\":\n",
|
|
" print(f\"Parser chunk: {event['data']['chunk']}\", flush=True)\n",
|
|
" num_events += 1\n",
|
|
" if num_events > 30:\n",
|
|
" # Truncate the output\n",
|
|
" print(\"...\")\n",
|
|
" break"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "6e91bdd3-f4a3-4b3c-b21a-26365c6c1566",
|
|
"metadata": {},
|
|
"source": [
|
|
"### Propagating Callbacks\n",
|
|
"\n",
|
|
":::{.callout-caution}\n",
|
|
"If you're using invoking runnables inside your tools, you need to propagate callbacks to the runnable; otherwise, no stream events will be generated.\n",
|
|
":::\n",
|
|
"\n",
|
|
":::{.callout-note}\n",
|
|
"When using `RunnableLambdas` or `@chain` decorator, callbacks are propagated automatically behind the scenes.\n",
|
|
":::"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 26,
|
|
"id": "1854206d-b3a5-4f91-9e00-bccbaebac61f",
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"{'event': 'on_tool_start', 'data': {'input': 'hello'}, 'name': 'bad_tool', 'tags': [], 'run_id': 'ea900472-a8f7-425d-b627-facdef936ee8', 'metadata': {}}\n",
|
|
"{'event': 'on_chain_start', 'data': {'input': 'hello'}, 'name': 'reverse_word', 'tags': [], 'run_id': '77b01284-0515-48f4-8d7c-eb27c1882f86', 'metadata': {}}\n",
|
|
"{'event': 'on_chain_end', 'data': {'output': 'olleh', 'input': 'hello'}, 'run_id': '77b01284-0515-48f4-8d7c-eb27c1882f86', 'name': 'reverse_word', 'tags': [], 'metadata': {}}\n",
|
|
"{'event': 'on_tool_end', 'data': {'output': 'olleh'}, 'run_id': 'ea900472-a8f7-425d-b627-facdef936ee8', 'name': 'bad_tool', 'tags': [], 'metadata': {}}\n"
|
|
]
|
|
}
|
|
],
|
|
"source": [
|
|
"from langchain_core.runnables import RunnableLambda\n",
|
|
"from langchain_core.tools import tool\n",
|
|
"\n",
|
|
"\n",
|
|
"def reverse_word(word: str):\n",
|
|
" return word[::-1]\n",
|
|
"\n",
|
|
"\n",
|
|
"reverse_word = RunnableLambda(reverse_word)\n",
|
|
"\n",
|
|
"\n",
|
|
"@tool\n",
|
|
"def bad_tool(word: str):\n",
|
|
" \"\"\"Custom tool that doesn't propagate callbacks.\"\"\"\n",
|
|
" return reverse_word.invoke(word)\n",
|
|
"\n",
|
|
"\n",
|
|
"async for event in bad_tool.astream_events(\"hello\", version=\"v2\"):\n",
|
|
" print(event)"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "23e68a99-7886-465b-8575-116022857469",
|
|
"metadata": {},
|
|
"source": [
|
|
"Here's a re-implementation that does propagate callbacks correctly. You'll notice that now we're getting events from the `reverse_word` runnable as well."
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 27,
|
|
"id": "a20a6cb3-bb43-465c-8cfc-0a7349d70968",
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"{'event': 'on_tool_start', 'data': {'input': 'hello'}, 'name': 'correct_tool', 'tags': [], 'run_id': 'd5ea83b9-9278-49cc-9f1d-aa302d671040', 'metadata': {}}\n",
|
|
"{'event': 'on_chain_start', 'data': {'input': 'hello'}, 'name': 'reverse_word', 'tags': [], 'run_id': '44dafbf4-2f87-412b-ae0e-9f71713810df', 'metadata': {}}\n",
|
|
"{'event': 'on_chain_end', 'data': {'output': 'olleh', 'input': 'hello'}, 'run_id': '44dafbf4-2f87-412b-ae0e-9f71713810df', 'name': 'reverse_word', 'tags': [], 'metadata': {}}\n",
|
|
"{'event': 'on_tool_end', 'data': {'output': 'olleh'}, 'run_id': 'd5ea83b9-9278-49cc-9f1d-aa302d671040', 'name': 'correct_tool', 'tags': [], 'metadata': {}}\n"
|
|
]
|
|
}
|
|
],
|
|
"source": [
|
|
"@tool\n",
|
|
"def correct_tool(word: str, callbacks):\n",
|
|
" \"\"\"A tool that correctly propagates callbacks.\"\"\"\n",
|
|
" return reverse_word.invoke(word, {\"callbacks\": callbacks})\n",
|
|
"\n",
|
|
"\n",
|
|
"async for event in correct_tool.astream_events(\"hello\", version=\"v2\"):\n",
|
|
" print(event)"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "640daa94-e4fe-4997-ab6e-45120f18b9ee",
|
|
"metadata": {},
|
|
"source": [
|
|
"If you're invoking runnables from within Runnable Lambdas or `@chains`, then callbacks will be passed automatically on your behalf."
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 28,
|
|
"id": "0ac0a3c1-f3a4-4157-b053-4fec8d2e698c",
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"{'event': 'on_chain_start', 'data': {'input': '1234'}, 'name': 'reverse_and_double', 'tags': [], 'run_id': '03b0e6a1-3e60-42fc-8373-1e7829198d80', 'metadata': {}}\n",
|
|
"{'event': 'on_chain_start', 'data': {'input': '1234'}, 'name': 'reverse_word', 'tags': [], 'run_id': '5cf26fc8-840b-4642-98ed-623dda28707a', 'metadata': {}}\n",
|
|
"{'event': 'on_chain_end', 'data': {'output': '4321', 'input': '1234'}, 'run_id': '5cf26fc8-840b-4642-98ed-623dda28707a', 'name': 'reverse_word', 'tags': [], 'metadata': {}}\n",
|
|
"{'event': 'on_chain_stream', 'data': {'chunk': '43214321'}, 'run_id': '03b0e6a1-3e60-42fc-8373-1e7829198d80', 'name': 'reverse_and_double', 'tags': [], 'metadata': {}}\n",
|
|
"{'event': 'on_chain_end', 'data': {'output': '43214321'}, 'run_id': '03b0e6a1-3e60-42fc-8373-1e7829198d80', 'name': 'reverse_and_double', 'tags': [], 'metadata': {}}\n"
|
|
]
|
|
}
|
|
],
|
|
"source": [
|
|
"from langchain_core.runnables import RunnableLambda\n",
|
|
"\n",
|
|
"\n",
|
|
"async def reverse_and_double(word: str):\n",
|
|
" return await reverse_word.ainvoke(word) * 2\n",
|
|
"\n",
|
|
"\n",
|
|
"reverse_and_double = RunnableLambda(reverse_and_double)\n",
|
|
"\n",
|
|
"await reverse_and_double.ainvoke(\"1234\")\n",
|
|
"\n",
|
|
"async for event in reverse_and_double.astream_events(\"1234\", version=\"v2\"):\n",
|
|
" print(event)"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "35a34268-9b3d-4857-b4ed-65d95f4a1293",
|
|
"metadata": {},
|
|
"source": [
|
|
"And with the `@chain` decorator:"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 29,
|
|
"id": "c896bb94-9d10-41ff-8fe2-d6b05b1ed74b",
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"{'event': 'on_chain_start', 'data': {'input': '1234'}, 'name': 'reverse_and_double', 'tags': [], 'run_id': '1bfcaedc-f4aa-4d8e-beee-9bba6ef17008', 'metadata': {}}\n",
|
|
"{'event': 'on_chain_start', 'data': {'input': '1234'}, 'name': 'reverse_word', 'tags': [], 'run_id': '64fc99f0-5d7d-442b-b4f5-4537129f67d1', 'metadata': {}}\n",
|
|
"{'event': 'on_chain_end', 'data': {'output': '4321', 'input': '1234'}, 'run_id': '64fc99f0-5d7d-442b-b4f5-4537129f67d1', 'name': 'reverse_word', 'tags': [], 'metadata': {}}\n",
|
|
"{'event': 'on_chain_stream', 'data': {'chunk': '43214321'}, 'run_id': '1bfcaedc-f4aa-4d8e-beee-9bba6ef17008', 'name': 'reverse_and_double', 'tags': [], 'metadata': {}}\n",
|
|
"{'event': 'on_chain_end', 'data': {'output': '43214321'}, 'run_id': '1bfcaedc-f4aa-4d8e-beee-9bba6ef17008', 'name': 'reverse_and_double', 'tags': [], 'metadata': {}}\n"
|
|
]
|
|
}
|
|
],
|
|
"source": [
|
|
"from langchain_core.runnables import chain\n",
|
|
"\n",
|
|
"\n",
|
|
"@chain\n",
|
|
"async def reverse_and_double(word: str):\n",
|
|
" return await reverse_word.ainvoke(word) * 2\n",
|
|
"\n",
|
|
"\n",
|
|
"await reverse_and_double.ainvoke(\"1234\")\n",
|
|
"\n",
|
|
"async for event in reverse_and_double.astream_events(\"1234\", version=\"v2\"):\n",
|
|
" print(event)"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "2a3efcd9",
|
|
"metadata": {},
|
|
"source": [
|
|
"## Next steps\n",
|
|
"\n",
|
|
"Now you've learned some ways to stream both final outputs and internal steps with LangChain.\n",
|
|
"\n",
|
|
"To learn more, check out the other how-to guides in this section, or the [conceptual guide on Langchain Expression Language](/docs/concepts/#langchain-expression-language/)."
|
|
]
|
|
}
|
|
],
|
|
"metadata": {
|
|
"kernelspec": {
|
|
"display_name": "Python 3 (ipykernel)",
|
|
"language": "python",
|
|
"name": "python3"
|
|
},
|
|
"language_info": {
|
|
"codemirror_mode": {
|
|
"name": "ipython",
|
|
"version": 3
|
|
},
|
|
"file_extension": ".py",
|
|
"mimetype": "text/x-python",
|
|
"name": "python",
|
|
"nbconvert_exporter": "python",
|
|
"pygments_lexer": "ipython3",
|
|
"version": "3.9.1"
|
|
}
|
|
},
|
|
"nbformat": 4,
|
|
"nbformat_minor": 5
|
|
}
|