mirror of
https://github.com/hwchase17/langchain.git
synced 2025-05-08 00:28:47 +00:00
180 lines
7.2 KiB
Plaintext
180 lines
7.2 KiB
Plaintext
{
|
|
"cells": [
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"# How to use callbacks in async environments\n",
|
|
"\n",
|
|
":::info Prerequisites\n",
|
|
"\n",
|
|
"This guide assumes familiarity with the following concepts:\n",
|
|
"\n",
|
|
"- [Callbacks](/docs/concepts/callbacks)\n",
|
|
"- [Custom callback handlers](/docs/how_to/custom_callbacks)\n",
|
|
":::\n",
|
|
"\n",
|
|
"If you are planning to use the async APIs, it is recommended to use and extend [`AsyncCallbackHandler`](https://python.langchain.com/api_reference/core/callbacks/langchain_core.callbacks.base.AsyncCallbackHandler.html) to avoid blocking the event.\n",
|
|
"\n",
|
|
"\n",
|
|
":::warning\n",
|
|
"If you use a sync `CallbackHandler` while using an async method to run your LLM / Chain / Tool / Agent, it will still work. However, under the hood, it will be called with [`run_in_executor`](https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.run_in_executor) which can cause issues if your `CallbackHandler` is not thread-safe.\n",
|
|
":::\n",
|
|
"\n",
|
|
":::danger\n",
|
|
"\n",
|
|
"If you're on `python<=3.10`, you need to remember to propagate `config` or `callbacks` when invoking other `runnable` from within a `RunnableLambda`, `RunnableGenerator` or `@tool`. If you do not do this,\n",
|
|
"the callbacks will not be propagated to the child runnables being invoked.\n",
|
|
":::"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"# | output: false\n",
|
|
"# | echo: false\n",
|
|
"\n",
|
|
"%pip install -qU langchain langchain_anthropic\n",
|
|
"\n",
|
|
"import getpass\n",
|
|
"import os\n",
|
|
"\n",
|
|
"os.environ[\"ANTHROPIC_API_KEY\"] = getpass.getpass()"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 2,
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"zzzz....\n",
|
|
"Hi! I just woke up. Your llm is starting\n",
|
|
"Sync handler being called in a `thread_pool_executor`: token: Here\n",
|
|
"Sync handler being called in a `thread_pool_executor`: token: 's\n",
|
|
"Sync handler being called in a `thread_pool_executor`: token: a\n",
|
|
"Sync handler being called in a `thread_pool_executor`: token: little\n",
|
|
"Sync handler being called in a `thread_pool_executor`: token: joke\n",
|
|
"Sync handler being called in a `thread_pool_executor`: token: for\n",
|
|
"Sync handler being called in a `thread_pool_executor`: token: you\n",
|
|
"Sync handler being called in a `thread_pool_executor`: token: :\n",
|
|
"Sync handler being called in a `thread_pool_executor`: token: \n",
|
|
"\n",
|
|
"Why\n",
|
|
"Sync handler being called in a `thread_pool_executor`: token: can\n",
|
|
"Sync handler being called in a `thread_pool_executor`: token: 't\n",
|
|
"Sync handler being called in a `thread_pool_executor`: token: a\n",
|
|
"Sync handler being called in a `thread_pool_executor`: token: bicycle\n",
|
|
"Sync handler being called in a `thread_pool_executor`: token: stan\n",
|
|
"Sync handler being called in a `thread_pool_executor`: token: d up\n",
|
|
"Sync handler being called in a `thread_pool_executor`: token: by\n",
|
|
"Sync handler being called in a `thread_pool_executor`: token: itself\n",
|
|
"Sync handler being called in a `thread_pool_executor`: token: ?\n",
|
|
"Sync handler being called in a `thread_pool_executor`: token: Because\n",
|
|
"Sync handler being called in a `thread_pool_executor`: token: it\n",
|
|
"Sync handler being called in a `thread_pool_executor`: token: 's\n",
|
|
"Sync handler being called in a `thread_pool_executor`: token: two\n",
|
|
"Sync handler being called in a `thread_pool_executor`: token: -\n",
|
|
"Sync handler being called in a `thread_pool_executor`: token: tire\n",
|
|
"zzzz....\n",
|
|
"Hi! I just woke up. Your llm is ending\n"
|
|
]
|
|
},
|
|
{
|
|
"data": {
|
|
"text/plain": [
|
|
"LLMResult(generations=[[ChatGeneration(text=\"Here's a little joke for you:\\n\\nWhy can't a bicycle stand up by itself? Because it's two-tire\", message=AIMessage(content=\"Here's a little joke for you:\\n\\nWhy can't a bicycle stand up by itself? Because it's two-tire\", id='run-8afc89e8-02c0-4522-8480-d96977240bd4-0'))]], llm_output={}, run=[RunInfo(run_id=UUID('8afc89e8-02c0-4522-8480-d96977240bd4'))])"
|
|
]
|
|
},
|
|
"execution_count": 2,
|
|
"metadata": {},
|
|
"output_type": "execute_result"
|
|
}
|
|
],
|
|
"source": [
|
|
"import asyncio\n",
|
|
"from typing import Any, Dict, List\n",
|
|
"\n",
|
|
"from langchain_anthropic import ChatAnthropic\n",
|
|
"from langchain_core.callbacks import AsyncCallbackHandler, BaseCallbackHandler\n",
|
|
"from langchain_core.messages import HumanMessage\n",
|
|
"from langchain_core.outputs import LLMResult\n",
|
|
"\n",
|
|
"\n",
|
|
"class MyCustomSyncHandler(BaseCallbackHandler):\n",
|
|
" def on_llm_new_token(self, token: str, **kwargs) -> None:\n",
|
|
" print(f\"Sync handler being called in a `thread_pool_executor`: token: {token}\")\n",
|
|
"\n",
|
|
"\n",
|
|
"class MyCustomAsyncHandler(AsyncCallbackHandler):\n",
|
|
" \"\"\"Async callback handler that can be used to handle callbacks from langchain.\"\"\"\n",
|
|
"\n",
|
|
" async def on_llm_start(\n",
|
|
" self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any\n",
|
|
" ) -> None:\n",
|
|
" \"\"\"Run when chain starts running.\"\"\"\n",
|
|
" print(\"zzzz....\")\n",
|
|
" await asyncio.sleep(0.3)\n",
|
|
" class_name = serialized[\"name\"]\n",
|
|
" print(\"Hi! I just woke up. Your llm is starting\")\n",
|
|
"\n",
|
|
" async def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None:\n",
|
|
" \"\"\"Run when chain ends running.\"\"\"\n",
|
|
" print(\"zzzz....\")\n",
|
|
" await asyncio.sleep(0.3)\n",
|
|
" print(\"Hi! I just woke up. Your llm is ending\")\n",
|
|
"\n",
|
|
"\n",
|
|
"# To enable streaming, we pass in `streaming=True` to the ChatModel constructor\n",
|
|
"# Additionally, we pass in a list with our custom handler\n",
|
|
"chat = ChatAnthropic(\n",
|
|
" model=\"claude-3-sonnet-20240229\",\n",
|
|
" max_tokens=25,\n",
|
|
" streaming=True,\n",
|
|
" callbacks=[MyCustomSyncHandler(), MyCustomAsyncHandler()],\n",
|
|
")\n",
|
|
"\n",
|
|
"await chat.agenerate([[HumanMessage(content=\"Tell me a joke\")]])"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"## Next steps\n",
|
|
"\n",
|
|
"You've now learned how to create your own custom callback handlers.\n",
|
|
"\n",
|
|
"Next, check out the other how-to guides in this section, such as [how to attach callbacks to a runnable](/docs/how_to/callbacks_attach)."
|
|
]
|
|
}
|
|
],
|
|
"metadata": {
|
|
"kernelspec": {
|
|
"display_name": "Python 3 (ipykernel)",
|
|
"language": "python",
|
|
"name": "python3"
|
|
},
|
|
"language_info": {
|
|
"codemirror_mode": {
|
|
"name": "ipython",
|
|
"version": 3
|
|
},
|
|
"file_extension": ".py",
|
|
"mimetype": "text/x-python",
|
|
"name": "python",
|
|
"nbconvert_exporter": "python",
|
|
"pygments_lexer": "ipython3",
|
|
"version": "3.9.6"
|
|
}
|
|
},
|
|
"nbformat": 4,
|
|
"nbformat_minor": 4
|
|
}
|