langchain/docs/docs/how_to/callbacks_custom_events.ipynb
Eugene Yurtsev 4466caadba
concepts: update llm stub page and re-link (#27567)
Update text llm stub page and re-link content
2024-10-22 23:03:36 -04:00

343 lines
13 KiB
Plaintext

{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# How to dispatch custom callback events\n",
"\n",
":::info Prerequisites\n",
"\n",
"This guide assumes familiarity with the following concepts:\n",
"\n",
"- [Callbacks](/docs/concepts/callbacks)\n",
"- [Custom callback handlers](/docs/how_to/custom_callbacks)\n",
"- [Astream Events API](/docs/concepts/streaming/#astream_events) the `astream_events` method will surface custom callback events.\n",
":::\n",
"\n",
"In some situations, you may want to dipsatch a custom callback event from within a [Runnable](/docs/concepts/runnables) so it can be surfaced\n",
"in a custom callback handler or via the [Astream Events API](/docs/concepts/streaming/#astream_events).\n",
"\n",
"For example, if you have a long running tool with multiple steps, you can dispatch custom events between the steps and use these custom events to monitor progress.\n",
"You could also surface these custom events to an end user of your application to show them how the current task is progressing.\n",
"\n",
"To dispatch a custom event you need to decide on two attributes for the event: the `name` and the `data`.\n",
"\n",
"| Attribute | Type | Description |\n",
"|-----------|------|----------------------------------------------------------------------------------------------------------|\n",
"| name | str | A user defined name for the event. |\n",
"| data | Any | The data associated with the event. This can be anything, though we suggest making it JSON serializable. |\n",
"\n",
"\n",
":::important\n",
"* Dispatching custom callback events requires `langchain-core>=0.2.15`.\n",
"* Custom callback events can only be dispatched from within an existing `Runnable`.\n",
"* If using `astream_events`, you must use `version='v2'` to see custom events.\n",
"* Sending or rendering custom callbacks events in LangSmith is not yet supported.\n",
":::\n",
"\n",
"\n",
":::caution COMPATIBILITY\n",
"LangChain cannot automatically propagate configuration, including callbacks necessary for astream_events(), to child runnables if you are running async code in python<=3.10. This is a common reason why you may fail to see events being emitted from custom runnables or tools.\n",
"\n",
"If you are running python<=3.10, you will need to manually propagate the `RunnableConfig` object to the child runnable in async environments. For an example of how to manually propagate the config, see the implementation of the `bar` RunnableLambda below.\n",
"\n",
"If you are running python>=3.11, the `RunnableConfig` will automatically propagate to child runnables in async environment. However, it is still a good idea to propagate the `RunnableConfig` manually if your code may run in other Python versions.\n",
":::"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# | output: false\n",
"# | echo: false\n",
"\n",
"%pip install -qU langchain-core"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Astream Events API\n",
"\n",
"The most useful way to consume custom events is via the [Astream Events API](/docs/concepts/streaming/#astream_events).\n",
"\n",
"We can use the `async` `adispatch_custom_event` API to emit custom events in an async setting. \n",
"\n",
"\n",
":::important\n",
"\n",
"To see custom events via the astream events API, you need to use the newer `v2` API of `astream_events`.\n",
":::"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"{'event': 'on_chain_start', 'data': {'input': 'hello world'}, 'name': 'foo', 'tags': [], 'run_id': 'f354ffe8-4c22-4881-890a-c1cad038a9a6', 'metadata': {}, 'parent_ids': []}\n",
"{'event': 'on_custom_event', 'run_id': 'f354ffe8-4c22-4881-890a-c1cad038a9a6', 'name': 'event1', 'tags': [], 'metadata': {}, 'data': {'x': 'hello world'}, 'parent_ids': []}\n",
"{'event': 'on_custom_event', 'run_id': 'f354ffe8-4c22-4881-890a-c1cad038a9a6', 'name': 'event2', 'tags': [], 'metadata': {}, 'data': 5, 'parent_ids': []}\n",
"{'event': 'on_chain_stream', 'run_id': 'f354ffe8-4c22-4881-890a-c1cad038a9a6', 'name': 'foo', 'tags': [], 'metadata': {}, 'data': {'chunk': 'hello world'}, 'parent_ids': []}\n",
"{'event': 'on_chain_end', 'data': {'output': 'hello world'}, 'run_id': 'f354ffe8-4c22-4881-890a-c1cad038a9a6', 'name': 'foo', 'tags': [], 'metadata': {}, 'parent_ids': []}\n"
]
}
],
"source": [
"from langchain_core.callbacks.manager import (\n",
" adispatch_custom_event,\n",
")\n",
"from langchain_core.runnables import RunnableLambda\n",
"from langchain_core.runnables.config import RunnableConfig\n",
"\n",
"\n",
"@RunnableLambda\n",
"async def foo(x: str) -> str:\n",
" await adispatch_custom_event(\"event1\", {\"x\": x})\n",
" await adispatch_custom_event(\"event2\", 5)\n",
" return x\n",
"\n",
"\n",
"async for event in foo.astream_events(\"hello world\", version=\"v2\"):\n",
" print(event)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"In python <= 3.10, you must propagate the config manually!"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"{'event': 'on_chain_start', 'data': {'input': 'hello world'}, 'name': 'bar', 'tags': [], 'run_id': 'c787b09d-698a-41b9-8290-92aaa656f3e7', 'metadata': {}, 'parent_ids': []}\n",
"{'event': 'on_custom_event', 'run_id': 'c787b09d-698a-41b9-8290-92aaa656f3e7', 'name': 'event1', 'tags': [], 'metadata': {}, 'data': {'x': 'hello world'}, 'parent_ids': []}\n",
"{'event': 'on_custom_event', 'run_id': 'c787b09d-698a-41b9-8290-92aaa656f3e7', 'name': 'event2', 'tags': [], 'metadata': {}, 'data': 5, 'parent_ids': []}\n",
"{'event': 'on_chain_stream', 'run_id': 'c787b09d-698a-41b9-8290-92aaa656f3e7', 'name': 'bar', 'tags': [], 'metadata': {}, 'data': {'chunk': 'hello world'}, 'parent_ids': []}\n",
"{'event': 'on_chain_end', 'data': {'output': 'hello world'}, 'run_id': 'c787b09d-698a-41b9-8290-92aaa656f3e7', 'name': 'bar', 'tags': [], 'metadata': {}, 'parent_ids': []}\n"
]
}
],
"source": [
"from langchain_core.callbacks.manager import (\n",
" adispatch_custom_event,\n",
")\n",
"from langchain_core.runnables import RunnableLambda\n",
"from langchain_core.runnables.config import RunnableConfig\n",
"\n",
"\n",
"@RunnableLambda\n",
"async def bar(x: str, config: RunnableConfig) -> str:\n",
" \"\"\"An example that shows how to manually propagate config.\n",
"\n",
" You must do this if you're running python<=3.10.\n",
" \"\"\"\n",
" await adispatch_custom_event(\"event1\", {\"x\": x}, config=config)\n",
" await adispatch_custom_event(\"event2\", 5, config=config)\n",
" return x\n",
"\n",
"\n",
"async for event in bar.astream_events(\"hello world\", version=\"v2\"):\n",
" print(event)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Async Callback Handler\n",
"\n",
"You can also consume the dispatched event via an async callback handler."
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Received event event1 with data: {'x': 1}, with tags: ['foo', 'bar'], with metadata: {} and run_id: a62b84be-7afd-4829-9947-7165df1f37d9\n",
"Received event event2 with data: 5, with tags: ['foo', 'bar'], with metadata: {} and run_id: a62b84be-7afd-4829-9947-7165df1f37d9\n"
]
},
{
"data": {
"text/plain": [
"1"
]
},
"execution_count": 8,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"from typing import Any, Dict, List, Optional\n",
"from uuid import UUID\n",
"\n",
"from langchain_core.callbacks import AsyncCallbackHandler\n",
"from langchain_core.callbacks.manager import (\n",
" adispatch_custom_event,\n",
")\n",
"from langchain_core.runnables import RunnableLambda\n",
"from langchain_core.runnables.config import RunnableConfig\n",
"\n",
"\n",
"class AsyncCustomCallbackHandler(AsyncCallbackHandler):\n",
" async def on_custom_event(\n",
" self,\n",
" name: str,\n",
" data: Any,\n",
" *,\n",
" run_id: UUID,\n",
" tags: Optional[List[str]] = None,\n",
" metadata: Optional[Dict[str, Any]] = None,\n",
" **kwargs: Any,\n",
" ) -> None:\n",
" print(\n",
" f\"Received event {name} with data: {data}, with tags: {tags}, with metadata: {metadata} and run_id: {run_id}\"\n",
" )\n",
"\n",
"\n",
"@RunnableLambda\n",
"async def bar(x: str, config: RunnableConfig) -> str:\n",
" \"\"\"An example that shows how to manually propagate config.\n",
"\n",
" You must do this if you're running python<=3.10.\n",
" \"\"\"\n",
" await adispatch_custom_event(\"event1\", {\"x\": x}, config=config)\n",
" await adispatch_custom_event(\"event2\", 5, config=config)\n",
" return x\n",
"\n",
"\n",
"async_handler = AsyncCustomCallbackHandler()\n",
"await foo.ainvoke(1, {\"callbacks\": [async_handler], \"tags\": [\"foo\", \"bar\"]})"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Sync Callback Handler\n",
"\n",
"Let's see how to emit custom events in a sync environment using `dispatch_custom_event`.\n",
"\n",
"You **must** call `dispatch_custom_event` from within an existing `Runnable`."
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Received event event1 with data: {'x': 1}, with tags: ['foo', 'bar'], with metadata: {} and run_id: 27b5ce33-dc26-4b34-92dd-08a89cb22268\n",
"Received event event2 with data: {'x': 1}, with tags: ['foo', 'bar'], with metadata: {} and run_id: 27b5ce33-dc26-4b34-92dd-08a89cb22268\n"
]
},
{
"data": {
"text/plain": [
"1"
]
},
"execution_count": 5,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"from typing import Any, Dict, List, Optional\n",
"from uuid import UUID\n",
"\n",
"from langchain_core.callbacks import BaseCallbackHandler\n",
"from langchain_core.callbacks.manager import (\n",
" dispatch_custom_event,\n",
")\n",
"from langchain_core.runnables import RunnableLambda\n",
"from langchain_core.runnables.config import RunnableConfig\n",
"\n",
"\n",
"class CustomHandler(BaseCallbackHandler):\n",
" def on_custom_event(\n",
" self,\n",
" name: str,\n",
" data: Any,\n",
" *,\n",
" run_id: UUID,\n",
" tags: Optional[List[str]] = None,\n",
" metadata: Optional[Dict[str, Any]] = None,\n",
" **kwargs: Any,\n",
" ) -> None:\n",
" print(\n",
" f\"Received event {name} with data: {data}, with tags: {tags}, with metadata: {metadata} and run_id: {run_id}\"\n",
" )\n",
"\n",
"\n",
"@RunnableLambda\n",
"def foo(x: int, config: RunnableConfig) -> int:\n",
" dispatch_custom_event(\"event1\", {\"x\": x})\n",
" dispatch_custom_event(\"event2\", {\"x\": x})\n",
" return x\n",
"\n",
"\n",
"handler = CustomHandler()\n",
"foo.invoke(1, {\"callbacks\": [handler], \"tags\": [\"foo\", \"bar\"]})"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Next steps\n",
"\n",
"You've seen how to emit custom events, you can check out the more in depth guide for [astream events](/docs/how_to/streaming/#using-stream-events) which is the easiest way to leverage custom events."
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.4"
}
},
"nbformat": 4,
"nbformat_minor": 4
}