mirror of
https://github.com/hwchase17/langchain.git
synced 2025-08-12 14:23:58 +00:00
Docs: Add how to dispatch custom callback events (#24278)
* Add how-to guide for dispatching custom callback events. * Add links from index to the how to guide * Add link from streaming from within a tool * Update versionadded to correct release https://github.com/langchain-ai/langchain/releases/tag/langchain-core%3D%3D0.2.15
This commit is contained in:
parent
dd7938ace8
commit
616196c620
342
docs/docs/how_to/callbacks_custom_events.ipynb
Normal file
342
docs/docs/how_to/callbacks_custom_events.ipynb
Normal file
@ -0,0 +1,342 @@
|
||||
{
|
||||
"cells": [
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"# How to dispatch custom callback events\n",
|
||||
"\n",
|
||||
":::info Prerequisites\n",
|
||||
"\n",
|
||||
"This guide assumes familiarity with the following concepts:\n",
|
||||
"\n",
|
||||
"- [Callbacks](/docs/concepts/#callbacks)\n",
|
||||
"- [Custom callback handlers](/docs/how_to/custom_callbacks)\n",
|
||||
"- [Astream Events API](/docs/concepts/#astream_events) the `astream_events` method will surface custom callback events.\n",
|
||||
":::\n",
|
||||
"\n",
|
||||
"In some situations, you may want to dipsatch a custom callback event from within a [Runnable](/docs/concepts/#runnable-interface) so it can be surfaced\n",
|
||||
"in a custom callback handler or via the [Astream Events API](/docs/concepts/#astream_events).\n",
|
||||
"\n",
|
||||
"For example, if you have a long running tool with multiple steps, you can dispatch custom events between the steps and use these custom events to monitor progress.\n",
|
||||
"You could also surface these custom events to an end user of your application to show them how the current task is progressing.\n",
|
||||
"\n",
|
||||
"To dispatch a custom event you need to decide on two attributes for the event: the `name` and the `data`.\n",
|
||||
"\n",
|
||||
"| Attribute | Type | Description |\n",
|
||||
"|-----------|------|----------------------------------------------------------------------------------------------------------|\n",
|
||||
"| name | str | A user defined name for the event. |\n",
|
||||
"| data | Any | The data associated with the event. This can be anything, though we suggest making it JSON serializable. |\n",
|
||||
"\n",
|
||||
"\n",
|
||||
":::{.callout-important}\n",
|
||||
"* Dispatching custom callback events requires `langchain-core>=0.2.15`.\n",
|
||||
"* Custom callback events can only be dispatched from within an existing `Runnable`.\n",
|
||||
"* If using `astream_events`, you must use `version='v2'` to see custom events.\n",
|
||||
"* Sending or rendering custom callbacks events in LangSmith is not yet supported.\n",
|
||||
":::\n",
|
||||
"\n",
|
||||
"\n",
|
||||
":::caution COMPATIBILITY\n",
|
||||
"LangChain cannot automatically propagate configuration, including callbacks necessary for astream_events(), to child runnables if you are running async code in python<=3.10. This is a common reason why you may fail to see events being emitted from custom runnables or tools.\n",
|
||||
"\n",
|
||||
"If you are running python<=3.10, you will need to manually propagate the `RunnableConfig` object to the child runnable in async environments. For an example of how to manually propagate the config, see the implementation of the `bar` RunnableLambda below.\n",
|
||||
"\n",
|
||||
"If you are running python>=3.11, the `RunnableConfig` will automatically propagate to child runnables in async environment. However, it is still a good idea to propagate the `RunnableConfig` manually if your code may run in other Python versions.\n",
|
||||
":::"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"# | output: false\n",
|
||||
"# | echo: false\n",
|
||||
"\n",
|
||||
"%pip install -qU langchain-core"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"## Astream Events API\n",
|
||||
"\n",
|
||||
"The most useful way to consume custom events is via the [Astream Events API](/docs/concepts/#astream_events).\n",
|
||||
"\n",
|
||||
"We can use the `async` `adispatch_custom_event` API to emit custom events in an async setting. \n",
|
||||
"\n",
|
||||
"\n",
|
||||
":::{.callout-important}\n",
|
||||
"\n",
|
||||
"To see custom events via the astream events API, you need to use the newer `v2` API of `astream_events`.\n",
|
||||
":::"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 3,
|
||||
"metadata": {},
|
||||
"outputs": [
|
||||
{
|
||||
"name": "stdout",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
"{'event': 'on_chain_start', 'data': {'input': 'hello world'}, 'name': 'foo', 'tags': [], 'run_id': 'f354ffe8-4c22-4881-890a-c1cad038a9a6', 'metadata': {}, 'parent_ids': []}\n",
|
||||
"{'event': 'on_custom_event', 'run_id': 'f354ffe8-4c22-4881-890a-c1cad038a9a6', 'name': 'event1', 'tags': [], 'metadata': {}, 'data': {'x': 'hello world'}, 'parent_ids': []}\n",
|
||||
"{'event': 'on_custom_event', 'run_id': 'f354ffe8-4c22-4881-890a-c1cad038a9a6', 'name': 'event2', 'tags': [], 'metadata': {}, 'data': 5, 'parent_ids': []}\n",
|
||||
"{'event': 'on_chain_stream', 'run_id': 'f354ffe8-4c22-4881-890a-c1cad038a9a6', 'name': 'foo', 'tags': [], 'metadata': {}, 'data': {'chunk': 'hello world'}, 'parent_ids': []}\n",
|
||||
"{'event': 'on_chain_end', 'data': {'output': 'hello world'}, 'run_id': 'f354ffe8-4c22-4881-890a-c1cad038a9a6', 'name': 'foo', 'tags': [], 'metadata': {}, 'parent_ids': []}\n"
|
||||
]
|
||||
}
|
||||
],
|
||||
"source": [
|
||||
"from langchain_core.callbacks.manager import (\n",
|
||||
" adispatch_custom_event,\n",
|
||||
")\n",
|
||||
"from langchain_core.runnables import RunnableLambda\n",
|
||||
"from langchain_core.runnables.config import RunnableConfig\n",
|
||||
"\n",
|
||||
"\n",
|
||||
"@RunnableLambda\n",
|
||||
"async def foo(x: str) -> str:\n",
|
||||
" await adispatch_custom_event(\"event1\", {\"x\": x})\n",
|
||||
" await adispatch_custom_event(\"event2\", 5)\n",
|
||||
" return x\n",
|
||||
"\n",
|
||||
"\n",
|
||||
"async for event in foo.astream_events(\"hello world\", version=\"v2\"):\n",
|
||||
" print(event)"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"In python <= 3.10, you must propagate the config manually!"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 7,
|
||||
"metadata": {},
|
||||
"outputs": [
|
||||
{
|
||||
"name": "stdout",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
"{'event': 'on_chain_start', 'data': {'input': 'hello world'}, 'name': 'bar', 'tags': [], 'run_id': 'c787b09d-698a-41b9-8290-92aaa656f3e7', 'metadata': {}, 'parent_ids': []}\n",
|
||||
"{'event': 'on_custom_event', 'run_id': 'c787b09d-698a-41b9-8290-92aaa656f3e7', 'name': 'event1', 'tags': [], 'metadata': {}, 'data': {'x': 'hello world'}, 'parent_ids': []}\n",
|
||||
"{'event': 'on_custom_event', 'run_id': 'c787b09d-698a-41b9-8290-92aaa656f3e7', 'name': 'event2', 'tags': [], 'metadata': {}, 'data': 5, 'parent_ids': []}\n",
|
||||
"{'event': 'on_chain_stream', 'run_id': 'c787b09d-698a-41b9-8290-92aaa656f3e7', 'name': 'bar', 'tags': [], 'metadata': {}, 'data': {'chunk': 'hello world'}, 'parent_ids': []}\n",
|
||||
"{'event': 'on_chain_end', 'data': {'output': 'hello world'}, 'run_id': 'c787b09d-698a-41b9-8290-92aaa656f3e7', 'name': 'bar', 'tags': [], 'metadata': {}, 'parent_ids': []}\n"
|
||||
]
|
||||
}
|
||||
],
|
||||
"source": [
|
||||
"from langchain_core.callbacks.manager import (\n",
|
||||
" adispatch_custom_event,\n",
|
||||
")\n",
|
||||
"from langchain_core.runnables import RunnableLambda\n",
|
||||
"from langchain_core.runnables.config import RunnableConfig\n",
|
||||
"\n",
|
||||
"\n",
|
||||
"@RunnableLambda\n",
|
||||
"async def bar(x: str, config: RunnableConfig) -> str:\n",
|
||||
" \"\"\"An example that shows how to manually propagate config.\n",
|
||||
"\n",
|
||||
" You must do this if you're running python<=3.10.\n",
|
||||
" \"\"\"\n",
|
||||
" await adispatch_custom_event(\"event1\", {\"x\": x}, config=config)\n",
|
||||
" await adispatch_custom_event(\"event2\", 5, config=config)\n",
|
||||
" return x\n",
|
||||
"\n",
|
||||
"\n",
|
||||
"async for event in bar.astream_events(\"hello world\", version=\"v2\"):\n",
|
||||
" print(event)"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"## Async Callback Handler\n",
|
||||
"\n",
|
||||
"You can also consume the dispatched event via an async callback handler."
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 8,
|
||||
"metadata": {},
|
||||
"outputs": [
|
||||
{
|
||||
"name": "stdout",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
"Received event event1 with data: {'x': 1}, with tags: ['foo', 'bar'], with metadata: {} and run_id: a62b84be-7afd-4829-9947-7165df1f37d9\n",
|
||||
"Received event event2 with data: 5, with tags: ['foo', 'bar'], with metadata: {} and run_id: a62b84be-7afd-4829-9947-7165df1f37d9\n"
|
||||
]
|
||||
},
|
||||
{
|
||||
"data": {
|
||||
"text/plain": [
|
||||
"1"
|
||||
]
|
||||
},
|
||||
"execution_count": 8,
|
||||
"metadata": {},
|
||||
"output_type": "execute_result"
|
||||
}
|
||||
],
|
||||
"source": [
|
||||
"from typing import Any, Dict, List, Optional\n",
|
||||
"from uuid import UUID\n",
|
||||
"\n",
|
||||
"from langchain_core.callbacks import AsyncCallbackHandler\n",
|
||||
"from langchain_core.callbacks.manager import (\n",
|
||||
" adispatch_custom_event,\n",
|
||||
")\n",
|
||||
"from langchain_core.runnables import RunnableLambda\n",
|
||||
"from langchain_core.runnables.config import RunnableConfig\n",
|
||||
"\n",
|
||||
"\n",
|
||||
"class AsyncCustomCallbackHandler(AsyncCallbackHandler):\n",
|
||||
" async def on_custom_event(\n",
|
||||
" self,\n",
|
||||
" name: str,\n",
|
||||
" data: Any,\n",
|
||||
" *,\n",
|
||||
" run_id: UUID,\n",
|
||||
" tags: Optional[List[str]] = None,\n",
|
||||
" metadata: Optional[Dict[str, Any]] = None,\n",
|
||||
" **kwargs: Any,\n",
|
||||
" ) -> None:\n",
|
||||
" print(\n",
|
||||
" f\"Received event {name} with data: {data}, with tags: {tags}, with metadata: {metadata} and run_id: {run_id}\"\n",
|
||||
" )\n",
|
||||
"\n",
|
||||
"\n",
|
||||
"@RunnableLambda\n",
|
||||
"async def bar(x: str, config: RunnableConfig) -> str:\n",
|
||||
" \"\"\"An example that shows how to manually propagate config.\n",
|
||||
"\n",
|
||||
" You must do this if you're running python<=3.10.\n",
|
||||
" \"\"\"\n",
|
||||
" await adispatch_custom_event(\"event1\", {\"x\": x}, config=config)\n",
|
||||
" await adispatch_custom_event(\"event2\", 5, config=config)\n",
|
||||
" return x\n",
|
||||
"\n",
|
||||
"\n",
|
||||
"async_handler = AsyncCustomCallbackHandler()\n",
|
||||
"await foo.ainvoke(1, {\"callbacks\": [async_handler], \"tags\": [\"foo\", \"bar\"]})"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"## Sync Callback Handler\n",
|
||||
"\n",
|
||||
"Let's see how to emit custom events in a sync environment using `dispatch_custom_event`.\n",
|
||||
"\n",
|
||||
"You **must** call `dispatch_custom_event` from within an existing `Runnable`."
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 5,
|
||||
"metadata": {},
|
||||
"outputs": [
|
||||
{
|
||||
"name": "stdout",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
"Received event event1 with data: {'x': 1}, with tags: ['foo', 'bar'], with metadata: {} and run_id: 27b5ce33-dc26-4b34-92dd-08a89cb22268\n",
|
||||
"Received event event2 with data: {'x': 1}, with tags: ['foo', 'bar'], with metadata: {} and run_id: 27b5ce33-dc26-4b34-92dd-08a89cb22268\n"
|
||||
]
|
||||
},
|
||||
{
|
||||
"data": {
|
||||
"text/plain": [
|
||||
"1"
|
||||
]
|
||||
},
|
||||
"execution_count": 5,
|
||||
"metadata": {},
|
||||
"output_type": "execute_result"
|
||||
}
|
||||
],
|
||||
"source": [
|
||||
"from typing import Any, Dict, List, Optional\n",
|
||||
"from uuid import UUID\n",
|
||||
"\n",
|
||||
"from langchain_core.callbacks import BaseCallbackHandler\n",
|
||||
"from langchain_core.callbacks.manager import (\n",
|
||||
" dispatch_custom_event,\n",
|
||||
")\n",
|
||||
"from langchain_core.runnables import RunnableLambda\n",
|
||||
"from langchain_core.runnables.config import RunnableConfig\n",
|
||||
"\n",
|
||||
"\n",
|
||||
"class CustomHandler(BaseCallbackHandler):\n",
|
||||
" def on_custom_event(\n",
|
||||
" self,\n",
|
||||
" name: str,\n",
|
||||
" data: Any,\n",
|
||||
" *,\n",
|
||||
" run_id: UUID,\n",
|
||||
" tags: Optional[List[str]] = None,\n",
|
||||
" metadata: Optional[Dict[str, Any]] = None,\n",
|
||||
" **kwargs: Any,\n",
|
||||
" ) -> None:\n",
|
||||
" print(\n",
|
||||
" f\"Received event {name} with data: {data}, with tags: {tags}, with metadata: {metadata} and run_id: {run_id}\"\n",
|
||||
" )\n",
|
||||
"\n",
|
||||
"\n",
|
||||
"@RunnableLambda\n",
|
||||
"def foo(x: int, config: RunnableConfig) -> int:\n",
|
||||
" dispatch_custom_event(\"event1\", {\"x\": x})\n",
|
||||
" dispatch_custom_event(\"event2\", {\"x\": x})\n",
|
||||
" return x\n",
|
||||
"\n",
|
||||
"\n",
|
||||
"handler = CustomHandler()\n",
|
||||
"foo.invoke(1, {\"callbacks\": [handler], \"tags\": [\"foo\", \"bar\"]})"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"## Next steps\n",
|
||||
"\n",
|
||||
"You've seen how to emit custom events, you can check out the more in depth guide for [astream events](/docs/how_to/streaming/#using-stream-events) which is the easiest way to leverage custom events."
|
||||
]
|
||||
}
|
||||
],
|
||||
"metadata": {
|
||||
"kernelspec": {
|
||||
"display_name": "Python 3 (ipykernel)",
|
||||
"language": "python",
|
||||
"name": "python3"
|
||||
},
|
||||
"language_info": {
|
||||
"codemirror_mode": {
|
||||
"name": "ipython",
|
||||
"version": 3
|
||||
},
|
||||
"file_extension": ".py",
|
||||
"mimetype": "text/x-python",
|
||||
"name": "python",
|
||||
"nbconvert_exporter": "python",
|
||||
"pygments_lexer": "ipython3",
|
||||
"version": "3.11.4"
|
||||
}
|
||||
},
|
||||
"nbformat": 4,
|
||||
"nbformat_minor": 4
|
||||
}
|
@ -225,6 +225,7 @@ For in depth how-to guides for agents, please check out [LangGraph](https://lang
|
||||
- [How to: pass callbacks into a module constructor](/docs/how_to/callbacks_constructor)
|
||||
- [How to: create custom callback handlers](/docs/how_to/custom_callbacks)
|
||||
- [How to: use callbacks in async environments](/docs/how_to/callbacks_async)
|
||||
- [How to: dispatch custom callback events](/docs/how_to/callbacks_custom_events)
|
||||
|
||||
### Custom
|
||||
|
||||
@ -237,6 +238,7 @@ All of LangChain components can easily be extended to support your own versions.
|
||||
- [How to: write a custom output parser class](/docs/how_to/output_parser_custom)
|
||||
- [How to: create custom callback handlers](/docs/how_to/custom_callbacks)
|
||||
- [How to: define a custom tool](/docs/how_to/custom_tools)
|
||||
- [How to: dispatch custom callback events](/docs/how_to/callbacks_custom_events)
|
||||
|
||||
### Serialization
|
||||
- [How to: save and load LangChain objects](/docs/how_to/serialization)
|
||||
|
@ -22,10 +22,11 @@
|
||||
"\n",
|
||||
"LangChain cannot automatically propagate configuration, including callbacks necessary for `astream_events()`, to child runnables if you are running `async` code in `python<=3.10`. This is a common reason why you may fail to see events being emitted from custom runnables or tools.\n",
|
||||
"\n",
|
||||
"If you are running `python>=3.11`, configuration will automatically propagate to child runnables in async environments, and you don't need to access the `RunnableConfig` object for that tool as shown in this guide. However, it is still a good idea if your code may run in other Python versions.\n",
|
||||
"If you are running python<=3.10, you will need to manually propagate the `RunnableConfig` object to the child runnable in async environments. For an example of how to manually propagate the config, see the implementation of the `bar` RunnableLambda below.\n",
|
||||
"\n",
|
||||
"If you are running python>=3.11, the `RunnableConfig` will automatically propagate to child runnables in async environment. However, it is still a good idea to propagate the `RunnableConfig` manually if your code may run in older Python versions.\n",
|
||||
"\n",
|
||||
"This guide also requires `langchain-core>=0.2.16`.\n",
|
||||
"\n",
|
||||
":::\n",
|
||||
"\n",
|
||||
"Say you have a custom tool that calls a chain that condenses its input by prompting a chat model to return only 10 words, then reversing the output. First, define it in a naive way:\n",
|
||||
@ -268,6 +269,7 @@
|
||||
"\n",
|
||||
"- Pass [runtime values to tools](/docs/how_to/tool_runtime)\n",
|
||||
"- Pass [tool results back to a model](/docs/how_to/tool_results_pass_to_model)\n",
|
||||
"- [Dispatch custom callback events](/docs/how_to/callbacks_custom_events)\n",
|
||||
"\n",
|
||||
"You can also check out some more specific uses of tool calling:\n",
|
||||
"\n",
|
||||
@ -278,7 +280,7 @@
|
||||
],
|
||||
"metadata": {
|
||||
"kernelspec": {
|
||||
"display_name": "Python 3",
|
||||
"display_name": "Python 3 (ipykernel)",
|
||||
"language": "python",
|
||||
"name": "python3"
|
||||
},
|
||||
@ -292,9 +294,9 @@
|
||||
"name": "python",
|
||||
"nbconvert_exporter": "python",
|
||||
"pygments_lexer": "ipython3",
|
||||
"version": "3.10.5"
|
||||
"version": "3.11.4"
|
||||
}
|
||||
},
|
||||
"nbformat": 4,
|
||||
"nbformat_minor": 2
|
||||
"nbformat_minor": 4
|
||||
}
|
||||
|
@ -392,7 +392,7 @@ class RunManagerMixin:
|
||||
metadata: The metadata associated with the custom event
|
||||
(includes inherited metadata).
|
||||
|
||||
.. versionadded:: 0.2.14
|
||||
.. versionadded:: 0.2.15
|
||||
"""
|
||||
|
||||
|
||||
@ -851,7 +851,7 @@ class AsyncCallbackHandler(BaseCallbackHandler):
|
||||
metadata: The metadata associated with the custom event
|
||||
(includes inherited metadata).
|
||||
|
||||
.. versionadded:: 0.2.14
|
||||
.. versionadded:: 0.2.15
|
||||
"""
|
||||
|
||||
|
||||
|
@ -2341,7 +2341,7 @@ async def adispatch_custom_event(
|
||||
LangChain from automatically propagating the config object on the user's
|
||||
behalf.
|
||||
|
||||
.. versionadded:: 0.2.14
|
||||
.. versionadded:: 0.2.15
|
||||
"""
|
||||
from langchain_core.runnables.config import (
|
||||
ensure_config,
|
||||
@ -2410,7 +2410,7 @@ def dispatch_custom_event(
|
||||
foo_ = RunnableLambda(foo)
|
||||
foo_.invoke({"a": "1"}, {"callbacks": [CustomCallbackManager()]})
|
||||
|
||||
.. versionadded:: 0.2.14
|
||||
.. versionadded:: 0.2.15
|
||||
"""
|
||||
from langchain_core.runnables.config import (
|
||||
ensure_config,
|
||||
|
@ -159,7 +159,7 @@ class StandardStreamEvent(BaseStreamEvent):
|
||||
class CustomStreamEvent(BaseStreamEvent):
|
||||
"""Custom stream event created by the user.
|
||||
|
||||
.. versionadded:: 0.2.14
|
||||
.. versionadded:: 0.2.15
|
||||
"""
|
||||
|
||||
# Overwrite the event field to be more specific.
|
||||
|
Loading…
Reference in New Issue
Block a user