core[patch]: Share executor for async callbacks run in sync context (#30779)

To avoid having to create ephemeral threads, grab the thread lock, etc.
This commit is contained in:
William FH 2025-04-11 10:34:43 -07:00 committed by GitHub
parent fdc2b4bcac
commit 2803a48661
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 27 additions and 9 deletions

View File

@ -3,6 +3,7 @@
from __future__ import annotations
import asyncio
import atexit
import functools
import logging
import uuid
@ -314,12 +315,13 @@ def handle_event(
# If we try to submit this coroutine to the running loop
# we end up in a deadlock, as we'd have gotten here from a
# running coroutine, which we cannot interrupt to run this one.
# The solution is to create a new loop in a new thread.
with ThreadPoolExecutor(1) as executor:
executor.submit(
cast("Callable", copy_context().run), _run_coros, coros
).result()
# The solution is to run the synchronous function on the globally shared
# thread pool executor to avoid blocking the main event loop.
_executor().submit(
cast("Callable", copy_context().run), _run_coros, coros
).result()
else:
# If there's no running loop, we can run the coroutines directly.
_run_coros(coros)
@ -2618,3 +2620,19 @@ def dispatch_custom_event(
data,
run_id=callback_manager.parent_run_id,
)
@functools.lru_cache(maxsize=1)
def _executor() -> ThreadPoolExecutor:
# If the user is specifying ASYNC callback handlers to be run from a
# SYNC context, and an event loop is already running,
# we cannot submit the coroutine to the running loop, because it
# would result in a deadlock. Instead we have to schedule them
# on a background thread. To avoid creating & shutting down
# a new executor every time, we use a lazily-created, shared
# executor. If you're using regular langgchain parallelism (batch, etc.)
# you'd only ever need 1 worker, but we permit more for now to reduce the chance
# of slowdown if you are mixing with your own executor.
cutie = ThreadPoolExecutor(max_workers=10)
atexit.register(cutie.shutdown, wait=True)
return cutie

View File

@ -45,12 +45,12 @@ class MyCustomAsyncHandler(AsyncCallbackHandler):
@pytest.mark.benchmark
async def test_async_callbacks(benchmark: BenchmarkFixture) -> None:
infinite_cycle = cycle([AIMessage(content=" ".join(["hello", "goodbye"] * 1000))])
async def test_async_callbacks_in_sync(benchmark: BenchmarkFixture) -> None:
infinite_cycle = cycle([AIMessage(content=" ".join(["hello", "goodbye"] * 500))])
model = GenericFakeChatModel(messages=infinite_cycle)
@benchmark
def async_callbacks() -> None:
for _ in range(10):
def sync_callbacks() -> None:
for _ in range(5):
for _ in model.stream("meow", {"callbacks": [MyCustomAsyncHandler()]}):
pass