diff --git a/libs/core/langchain_core/tracers/memory_stream.py b/libs/core/langchain_core/tracers/memory_stream.py index 871b5ca932e..24559194ff4 100644 --- a/libs/core/langchain_core/tracers/memory_stream.py +++ b/libs/core/langchain_core/tracers/memory_stream.py @@ -37,7 +37,11 @@ class _SendStream(Generic[T]): def send_nowait(self, item: T) -> None: """Schedule the item to be written to the queue using the original loop.""" - self._reader_loop.call_soon_threadsafe(self._queue.put_nowait, item) + try: + self._reader_loop.call_soon_threadsafe(self._queue.put_nowait, item) + except RuntimeError: + if not self._reader_loop.is_closed(): + raise # Raise the exception if the loop is not closed async def aclose(self) -> None: """Schedule the done object write the queue using the original loop.""" @@ -45,7 +49,11 @@ class _SendStream(Generic[T]): def close(self) -> None: """Schedule the done object write the queue using the original loop.""" - self._reader_loop.call_soon_threadsafe(self._queue.put_nowait, self._done) + try: + self._reader_loop.call_soon_threadsafe(self._queue.put_nowait, self._done) + except RuntimeError: + if not self._reader_loop.is_closed(): + raise # Raise the exception if the loop is not closed class _ReceiveStream(Generic[T]): diff --git a/libs/core/tests/unit_tests/tracers/test_memory_stream.py b/libs/core/tests/unit_tests/tracers/test_memory_stream.py index 85d16986827..cbdd4d125c7 100644 --- a/libs/core/tests/unit_tests/tracers/test_memory_stream.py +++ b/libs/core/tests/unit_tests/tracers/test_memory_stream.py @@ -112,6 +112,24 @@ async def test_queue_for_streaming_via_sync_call() -> None: ), f"delta_time: {delta_time}" +def test_send_to_closed_stream() -> None: + """Test that sending to a closed stream doesn't raise an error. + + We may want to handle this in a better way in the future. + """ + event_loop = asyncio.get_event_loop() + channel = _MemoryStream[str](event_loop) + writer = channel.get_send_stream() + # send with an open even loop + writer.send_nowait("hello") + event_loop.close() + writer.send_nowait("hello") + # now close the loop + event_loop.close() + writer.close() + writer.send_nowait("hello") + + async def test_closed_stream() -> None: reader_loop = asyncio.get_event_loop() channel = _MemoryStream[str](reader_loop)