diff --git a/libs/core/langchain_core/callbacks/manager.py b/libs/core/langchain_core/callbacks/manager.py index e871cf8a35a..98f66c34536 100644 --- a/libs/core/langchain_core/callbacks/manager.py +++ b/libs/core/langchain_core/callbacks/manager.py @@ -3,6 +3,7 @@ from __future__ import annotations import asyncio +import atexit import functools import logging import uuid @@ -314,12 +315,13 @@ def handle_event( # If we try to submit this coroutine to the running loop # we end up in a deadlock, as we'd have gotten here from a # running coroutine, which we cannot interrupt to run this one. - # The solution is to create a new loop in a new thread. - with ThreadPoolExecutor(1) as executor: - executor.submit( - cast("Callable", copy_context().run), _run_coros, coros - ).result() + # The solution is to run the synchronous function on the globally shared + # thread pool executor to avoid blocking the main event loop. + _executor().submit( + cast("Callable", copy_context().run), _run_coros, coros + ).result() else: + # If there's no running loop, we can run the coroutines directly. _run_coros(coros) @@ -2618,3 +2620,19 @@ def dispatch_custom_event( data, run_id=callback_manager.parent_run_id, ) + + +@functools.lru_cache(maxsize=1) +def _executor() -> ThreadPoolExecutor: + # If the user is specifying ASYNC callback handlers to be run from a + # SYNC context, and an event loop is already running, + # we cannot submit the coroutine to the running loop, because it + # would result in a deadlock. Instead we have to schedule them + # on a background thread. To avoid creating & shutting down + # a new executor every time, we use a lazily-created, shared + # executor. If you're using regular langgchain parallelism (batch, etc.) + # you'd only ever need 1 worker, but we permit more for now to reduce the chance + # of slowdown if you are mixing with your own executor. + cutie = ThreadPoolExecutor(max_workers=10) + atexit.register(cutie.shutdown, wait=True) + return cutie diff --git a/libs/core/tests/benchmarks/test_async_callbacks.py b/libs/core/tests/benchmarks/test_async_callbacks.py index d07224c375e..b8224a35dd0 100644 --- a/libs/core/tests/benchmarks/test_async_callbacks.py +++ b/libs/core/tests/benchmarks/test_async_callbacks.py @@ -45,12 +45,12 @@ class MyCustomAsyncHandler(AsyncCallbackHandler): @pytest.mark.benchmark -async def test_async_callbacks(benchmark: BenchmarkFixture) -> None: - infinite_cycle = cycle([AIMessage(content=" ".join(["hello", "goodbye"] * 1000))]) +async def test_async_callbacks_in_sync(benchmark: BenchmarkFixture) -> None: + infinite_cycle = cycle([AIMessage(content=" ".join(["hello", "goodbye"] * 500))]) model = GenericFakeChatModel(messages=infinite_cycle) @benchmark - def async_callbacks() -> None: - for _ in range(10): + def sync_callbacks() -> None: + for _ in range(5): for _ in model.stream("meow", {"callbacks": [MyCustomAsyncHandler()]}): pass