diff --git a/docs/docs/how_to/callbacks_custom_events.ipynb b/docs/docs/how_to/callbacks_custom_events.ipynb new file mode 100644 index 00000000000..71a05914bdd --- /dev/null +++ b/docs/docs/how_to/callbacks_custom_events.ipynb @@ -0,0 +1,342 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# How to dispatch custom callback events\n", + "\n", + ":::info Prerequisites\n", + "\n", + "This guide assumes familiarity with the following concepts:\n", + "\n", + "- [Callbacks](/docs/concepts/#callbacks)\n", + "- [Custom callback handlers](/docs/how_to/custom_callbacks)\n", + "- [Astream Events API](/docs/concepts/#astream_events) the `astream_events` method will surface custom callback events.\n", + ":::\n", + "\n", + "In some situations, you may want to dipsatch a custom callback event from within a [Runnable](/docs/concepts/#runnable-interface) so it can be surfaced\n", + "in a custom callback handler or via the [Astream Events API](/docs/concepts/#astream_events).\n", + "\n", + "For example, if you have a long running tool with multiple steps, you can dispatch custom events between the steps and use these custom events to monitor progress.\n", + "You could also surface these custom events to an end user of your application to show them how the current task is progressing.\n", + "\n", + "To dispatch a custom event you need to decide on two attributes for the event: the `name` and the `data`.\n", + "\n", + "| Attribute | Type | Description |\n", + "|-----------|------|----------------------------------------------------------------------------------------------------------|\n", + "| name | str | A user defined name for the event. |\n", + "| data | Any | The data associated with the event. This can be anything, though we suggest making it JSON serializable. |\n", + "\n", + "\n", + ":::{.callout-important}\n", + "* Dispatching custom callback events requires `langchain-core>=0.2.15`.\n", + "* Custom callback events can only be dispatched from within an existing `Runnable`.\n", + "* If using `astream_events`, you must use `version='v2'` to see custom events.\n", + "* Sending or rendering custom callbacks events in LangSmith is not yet supported.\n", + ":::\n", + "\n", + "\n", + ":::caution COMPATIBILITY\n", + "LangChain cannot automatically propagate configuration, including callbacks necessary for astream_events(), to child runnables if you are running async code in python<=3.10. This is a common reason why you may fail to see events being emitted from custom runnables or tools.\n", + "\n", + "If you are running python<=3.10, you will need to manually propagate the `RunnableConfig` object to the child runnable in async environments. For an example of how to manually propagate the config, see the implementation of the `bar` RunnableLambda below.\n", + "\n", + "If you are running python>=3.11, the `RunnableConfig` will automatically propagate to child runnables in async environment. However, it is still a good idea to propagate the `RunnableConfig` manually if your code may run in other Python versions.\n", + ":::" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# | output: false\n", + "# | echo: false\n", + "\n", + "%pip install -qU langchain-core" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Astream Events API\n", + "\n", + "The most useful way to consume custom events is via the [Astream Events API](/docs/concepts/#astream_events).\n", + "\n", + "We can use the `async` `adispatch_custom_event` API to emit custom events in an async setting. \n", + "\n", + "\n", + ":::{.callout-important}\n", + "\n", + "To see custom events via the astream events API, you need to use the newer `v2` API of `astream_events`.\n", + ":::" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "{'event': 'on_chain_start', 'data': {'input': 'hello world'}, 'name': 'foo', 'tags': [], 'run_id': 'f354ffe8-4c22-4881-890a-c1cad038a9a6', 'metadata': {}, 'parent_ids': []}\n", + "{'event': 'on_custom_event', 'run_id': 'f354ffe8-4c22-4881-890a-c1cad038a9a6', 'name': 'event1', 'tags': [], 'metadata': {}, 'data': {'x': 'hello world'}, 'parent_ids': []}\n", + "{'event': 'on_custom_event', 'run_id': 'f354ffe8-4c22-4881-890a-c1cad038a9a6', 'name': 'event2', 'tags': [], 'metadata': {}, 'data': 5, 'parent_ids': []}\n", + "{'event': 'on_chain_stream', 'run_id': 'f354ffe8-4c22-4881-890a-c1cad038a9a6', 'name': 'foo', 'tags': [], 'metadata': {}, 'data': {'chunk': 'hello world'}, 'parent_ids': []}\n", + "{'event': 'on_chain_end', 'data': {'output': 'hello world'}, 'run_id': 'f354ffe8-4c22-4881-890a-c1cad038a9a6', 'name': 'foo', 'tags': [], 'metadata': {}, 'parent_ids': []}\n" + ] + } + ], + "source": [ + "from langchain_core.callbacks.manager import (\n", + " adispatch_custom_event,\n", + ")\n", + "from langchain_core.runnables import RunnableLambda\n", + "from langchain_core.runnables.config import RunnableConfig\n", + "\n", + "\n", + "@RunnableLambda\n", + "async def foo(x: str) -> str:\n", + " await adispatch_custom_event(\"event1\", {\"x\": x})\n", + " await adispatch_custom_event(\"event2\", 5)\n", + " return x\n", + "\n", + "\n", + "async for event in foo.astream_events(\"hello world\", version=\"v2\"):\n", + " print(event)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "In python <= 3.10, you must propagate the config manually!" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "{'event': 'on_chain_start', 'data': {'input': 'hello world'}, 'name': 'bar', 'tags': [], 'run_id': 'c787b09d-698a-41b9-8290-92aaa656f3e7', 'metadata': {}, 'parent_ids': []}\n", + "{'event': 'on_custom_event', 'run_id': 'c787b09d-698a-41b9-8290-92aaa656f3e7', 'name': 'event1', 'tags': [], 'metadata': {}, 'data': {'x': 'hello world'}, 'parent_ids': []}\n", + "{'event': 'on_custom_event', 'run_id': 'c787b09d-698a-41b9-8290-92aaa656f3e7', 'name': 'event2', 'tags': [], 'metadata': {}, 'data': 5, 'parent_ids': []}\n", + "{'event': 'on_chain_stream', 'run_id': 'c787b09d-698a-41b9-8290-92aaa656f3e7', 'name': 'bar', 'tags': [], 'metadata': {}, 'data': {'chunk': 'hello world'}, 'parent_ids': []}\n", + "{'event': 'on_chain_end', 'data': {'output': 'hello world'}, 'run_id': 'c787b09d-698a-41b9-8290-92aaa656f3e7', 'name': 'bar', 'tags': [], 'metadata': {}, 'parent_ids': []}\n" + ] + } + ], + "source": [ + "from langchain_core.callbacks.manager import (\n", + " adispatch_custom_event,\n", + ")\n", + "from langchain_core.runnables import RunnableLambda\n", + "from langchain_core.runnables.config import RunnableConfig\n", + "\n", + "\n", + "@RunnableLambda\n", + "async def bar(x: str, config: RunnableConfig) -> str:\n", + " \"\"\"An example that shows how to manually propagate config.\n", + "\n", + " You must do this if you're running python<=3.10.\n", + " \"\"\"\n", + " await adispatch_custom_event(\"event1\", {\"x\": x}, config=config)\n", + " await adispatch_custom_event(\"event2\", 5, config=config)\n", + " return x\n", + "\n", + "\n", + "async for event in bar.astream_events(\"hello world\", version=\"v2\"):\n", + " print(event)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Async Callback Handler\n", + "\n", + "You can also consume the dispatched event via an async callback handler." + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Received event event1 with data: {'x': 1}, with tags: ['foo', 'bar'], with metadata: {} and run_id: a62b84be-7afd-4829-9947-7165df1f37d9\n", + "Received event event2 with data: 5, with tags: ['foo', 'bar'], with metadata: {} and run_id: a62b84be-7afd-4829-9947-7165df1f37d9\n" + ] + }, + { + "data": { + "text/plain": [ + "1" + ] + }, + "execution_count": 8, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "from typing import Any, Dict, List, Optional\n", + "from uuid import UUID\n", + "\n", + "from langchain_core.callbacks import AsyncCallbackHandler\n", + "from langchain_core.callbacks.manager import (\n", + " adispatch_custom_event,\n", + ")\n", + "from langchain_core.runnables import RunnableLambda\n", + "from langchain_core.runnables.config import RunnableConfig\n", + "\n", + "\n", + "class AsyncCustomCallbackHandler(AsyncCallbackHandler):\n", + " async def on_custom_event(\n", + " self,\n", + " name: str,\n", + " data: Any,\n", + " *,\n", + " run_id: UUID,\n", + " tags: Optional[List[str]] = None,\n", + " metadata: Optional[Dict[str, Any]] = None,\n", + " **kwargs: Any,\n", + " ) -> None:\n", + " print(\n", + " f\"Received event {name} with data: {data}, with tags: {tags}, with metadata: {metadata} and run_id: {run_id}\"\n", + " )\n", + "\n", + "\n", + "@RunnableLambda\n", + "async def bar(x: str, config: RunnableConfig) -> str:\n", + " \"\"\"An example that shows how to manually propagate config.\n", + "\n", + " You must do this if you're running python<=3.10.\n", + " \"\"\"\n", + " await adispatch_custom_event(\"event1\", {\"x\": x}, config=config)\n", + " await adispatch_custom_event(\"event2\", 5, config=config)\n", + " return x\n", + "\n", + "\n", + "async_handler = AsyncCustomCallbackHandler()\n", + "await foo.ainvoke(1, {\"callbacks\": [async_handler], \"tags\": [\"foo\", \"bar\"]})" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Sync Callback Handler\n", + "\n", + "Let's see how to emit custom events in a sync environment using `dispatch_custom_event`.\n", + "\n", + "You **must** call `dispatch_custom_event` from within an existing `Runnable`." + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Received event event1 with data: {'x': 1}, with tags: ['foo', 'bar'], with metadata: {} and run_id: 27b5ce33-dc26-4b34-92dd-08a89cb22268\n", + "Received event event2 with data: {'x': 1}, with tags: ['foo', 'bar'], with metadata: {} and run_id: 27b5ce33-dc26-4b34-92dd-08a89cb22268\n" + ] + }, + { + "data": { + "text/plain": [ + "1" + ] + }, + "execution_count": 5, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "from typing import Any, Dict, List, Optional\n", + "from uuid import UUID\n", + "\n", + "from langchain_core.callbacks import BaseCallbackHandler\n", + "from langchain_core.callbacks.manager import (\n", + " dispatch_custom_event,\n", + ")\n", + "from langchain_core.runnables import RunnableLambda\n", + "from langchain_core.runnables.config import RunnableConfig\n", + "\n", + "\n", + "class CustomHandler(BaseCallbackHandler):\n", + " def on_custom_event(\n", + " self,\n", + " name: str,\n", + " data: Any,\n", + " *,\n", + " run_id: UUID,\n", + " tags: Optional[List[str]] = None,\n", + " metadata: Optional[Dict[str, Any]] = None,\n", + " **kwargs: Any,\n", + " ) -> None:\n", + " print(\n", + " f\"Received event {name} with data: {data}, with tags: {tags}, with metadata: {metadata} and run_id: {run_id}\"\n", + " )\n", + "\n", + "\n", + "@RunnableLambda\n", + "def foo(x: int, config: RunnableConfig) -> int:\n", + " dispatch_custom_event(\"event1\", {\"x\": x})\n", + " dispatch_custom_event(\"event2\", {\"x\": x})\n", + " return x\n", + "\n", + "\n", + "handler = CustomHandler()\n", + "foo.invoke(1, {\"callbacks\": [handler], \"tags\": [\"foo\", \"bar\"]})" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Next steps\n", + "\n", + "You've seen how to emit custom events, you can check out the more in depth guide for [astream events](/docs/how_to/streaming/#using-stream-events) which is the easiest way to leverage custom events." + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.4" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} diff --git a/docs/docs/how_to/index.mdx b/docs/docs/how_to/index.mdx index 88477cb22c3..6b21b0af5e4 100644 --- a/docs/docs/how_to/index.mdx +++ b/docs/docs/how_to/index.mdx @@ -225,6 +225,7 @@ For in depth how-to guides for agents, please check out [LangGraph](https://lang - [How to: pass callbacks into a module constructor](/docs/how_to/callbacks_constructor) - [How to: create custom callback handlers](/docs/how_to/custom_callbacks) - [How to: use callbacks in async environments](/docs/how_to/callbacks_async) +- [How to: dispatch custom callback events](/docs/how_to/callbacks_custom_events) ### Custom @@ -237,6 +238,7 @@ All of LangChain components can easily be extended to support your own versions. - [How to: write a custom output parser class](/docs/how_to/output_parser_custom) - [How to: create custom callback handlers](/docs/how_to/custom_callbacks) - [How to: define a custom tool](/docs/how_to/custom_tools) +- [How to: dispatch custom callback events](/docs/how_to/callbacks_custom_events) ### Serialization - [How to: save and load LangChain objects](/docs/how_to/serialization) diff --git a/docs/docs/how_to/tool_stream_events.ipynb b/docs/docs/how_to/tool_stream_events.ipynb index 38d2f14bdef..498593e9e63 100644 --- a/docs/docs/how_to/tool_stream_events.ipynb +++ b/docs/docs/how_to/tool_stream_events.ipynb @@ -22,10 +22,11 @@ "\n", "LangChain cannot automatically propagate configuration, including callbacks necessary for `astream_events()`, to child runnables if you are running `async` code in `python<=3.10`. This is a common reason why you may fail to see events being emitted from custom runnables or tools.\n", "\n", - "If you are running `python>=3.11`, configuration will automatically propagate to child runnables in async environments, and you don't need to access the `RunnableConfig` object for that tool as shown in this guide. However, it is still a good idea if your code may run in other Python versions.\n", + "If you are running python<=3.10, you will need to manually propagate the `RunnableConfig` object to the child runnable in async environments. For an example of how to manually propagate the config, see the implementation of the `bar` RunnableLambda below.\n", + "\n", + "If you are running python>=3.11, the `RunnableConfig` will automatically propagate to child runnables in async environment. However, it is still a good idea to propagate the `RunnableConfig` manually if your code may run in older Python versions.\n", "\n", "This guide also requires `langchain-core>=0.2.16`.\n", - "\n", ":::\n", "\n", "Say you have a custom tool that calls a chain that condenses its input by prompting a chat model to return only 10 words, then reversing the output. First, define it in a naive way:\n", @@ -268,6 +269,7 @@ "\n", "- Pass [runtime values to tools](/docs/how_to/tool_runtime)\n", "- Pass [tool results back to a model](/docs/how_to/tool_results_pass_to_model)\n", + "- [Dispatch custom callback events](/docs/how_to/callbacks_custom_events)\n", "\n", "You can also check out some more specific uses of tool calling:\n", "\n", @@ -278,7 +280,7 @@ ], "metadata": { "kernelspec": { - "display_name": "Python 3", + "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, @@ -292,9 +294,9 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.10.5" + "version": "3.11.4" } }, "nbformat": 4, - "nbformat_minor": 2 + "nbformat_minor": 4 } diff --git a/libs/core/langchain_core/callbacks/base.py b/libs/core/langchain_core/callbacks/base.py index 719cee6151d..946018da0fb 100644 --- a/libs/core/langchain_core/callbacks/base.py +++ b/libs/core/langchain_core/callbacks/base.py @@ -392,7 +392,7 @@ class RunManagerMixin: metadata: The metadata associated with the custom event (includes inherited metadata). - .. versionadded:: 0.2.14 + .. versionadded:: 0.2.15 """ @@ -851,7 +851,7 @@ class AsyncCallbackHandler(BaseCallbackHandler): metadata: The metadata associated with the custom event (includes inherited metadata). - .. versionadded:: 0.2.14 + .. versionadded:: 0.2.15 """ diff --git a/libs/core/langchain_core/callbacks/manager.py b/libs/core/langchain_core/callbacks/manager.py index 96a29dae46d..82163ee4067 100644 --- a/libs/core/langchain_core/callbacks/manager.py +++ b/libs/core/langchain_core/callbacks/manager.py @@ -2341,7 +2341,7 @@ async def adispatch_custom_event( LangChain from automatically propagating the config object on the user's behalf. - .. versionadded:: 0.2.14 + .. versionadded:: 0.2.15 """ from langchain_core.runnables.config import ( ensure_config, @@ -2410,7 +2410,7 @@ def dispatch_custom_event( foo_ = RunnableLambda(foo) foo_.invoke({"a": "1"}, {"callbacks": [CustomCallbackManager()]}) - .. versionadded:: 0.2.14 + .. versionadded:: 0.2.15 """ from langchain_core.runnables.config import ( ensure_config, diff --git a/libs/core/langchain_core/runnables/schema.py b/libs/core/langchain_core/runnables/schema.py index 2695cfc444c..c5ea213a6a2 100644 --- a/libs/core/langchain_core/runnables/schema.py +++ b/libs/core/langchain_core/runnables/schema.py @@ -159,7 +159,7 @@ class StandardStreamEvent(BaseStreamEvent): class CustomStreamEvent(BaseStreamEvent): """Custom stream event created by the user. - .. versionadded:: 0.2.14 + .. versionadded:: 0.2.15 """ # Overwrite the event field to be more specific.