diff --git a/libs/core/langchain_core/runnables/base.py b/libs/core/langchain_core/runnables/base.py index d9ee99555eb..36153a78935 100644 --- a/libs/core/langchain_core/runnables/base.py +++ b/libs/core/langchain_core/runnables/base.py @@ -94,7 +94,7 @@ from langchain_core.tracers.root_listeners import ( AsyncRootListenersTracer, RootListenersTracer, ) -from langchain_core.utils.aiter import aclosing, atee, py_anext +from langchain_core.utils.aiter import aclosing, atee from langchain_core.utils.iter import safetee from langchain_core.utils.pydantic import create_model_v2 @@ -2377,7 +2377,7 @@ class Runnable(ABC, Generic[Input, Output]): # tee the input so we can iterate over it twice input_for_tracing, input_for_transform = atee(inputs, 2) # Start the input iterator to ensure the input Runnable starts before this one - final_input: Input | None = await py_anext(input_for_tracing, None) + final_input: Input | None = await anext(input_for_tracing, None) final_input_supported = True final_output: Output | None = None final_output_supported = True @@ -2417,7 +2417,7 @@ class Runnable(ABC, Generic[Input, Output]): iterator = iterator_ try: while True: - chunk = await coro_with_context(py_anext(iterator), context) + chunk = await coro_with_context(anext(iterator), context) yield chunk if final_output_supported: if final_output is None: @@ -4025,7 +4025,7 @@ class RunnableParallel(RunnableSerializable[Input, dict[str, Any]]): # Wrap in a coroutine to satisfy linter async def get_next_chunk(generator: AsyncIterator) -> Output | None: - return await py_anext(generator) + return await anext(generator) # Start the first iteration of each generator tasks = { diff --git a/libs/core/langchain_core/runnables/fallbacks.py b/libs/core/langchain_core/runnables/fallbacks.py index 0584d64e60c..8d8a3d7524e 100644 --- a/libs/core/langchain_core/runnables/fallbacks.py +++ b/libs/core/langchain_core/runnables/fallbacks.py @@ -28,7 +28,6 @@ from langchain_core.runnables.utils import ( coro_with_context, get_unique_config_specs, ) -from langchain_core.utils.aiter import py_anext if TYPE_CHECKING: from langchain_core.callbacks.manager import AsyncCallbackManagerForChainRun @@ -563,7 +562,7 @@ class RunnableWithFallbacks(RunnableSerializable[Input, Output]): child_config, **kwargs, ) - chunk = await coro_with_context(py_anext(stream), context) + chunk = await coro_with_context(anext(stream), context) except self.exceptions_to_handle as e: first_error = e if first_error is None else first_error last_error = e diff --git a/libs/core/langchain_core/runnables/passthrough.py b/libs/core/langchain_core/runnables/passthrough.py index 740bcab8d24..e20a3076175 100644 --- a/libs/core/langchain_core/runnables/passthrough.py +++ b/libs/core/langchain_core/runnables/passthrough.py @@ -33,7 +33,7 @@ from langchain_core.runnables.utils import ( AddableDict, ConfigurableFieldSpec, ) -from langchain_core.utils.aiter import atee, py_anext +from langchain_core.utils.aiter import atee from langchain_core.utils.iter import safetee from langchain_core.utils.pydantic import create_model_v2 @@ -614,7 +614,7 @@ class RunnableAssign(RunnableSerializable[dict[str, Any], dict[str, Any]]): ) # start map output stream first_map_chunk_task: asyncio.Task = asyncio.create_task( - py_anext(map_output, None), # type: ignore[arg-type] + anext(map_output, None), ) # consume passthrough stream async for chunk in for_passthrough: diff --git a/libs/core/langchain_core/tracers/event_stream.py b/libs/core/langchain_core/tracers/event_stream.py index 16c52fd594c..80ce1b7bf15 100644 --- a/libs/core/langchain_core/tracers/event_stream.py +++ b/libs/core/langchain_core/tracers/event_stream.py @@ -42,7 +42,7 @@ from langchain_core.tracers.log_stream import ( _astream_log_implementation, ) from langchain_core.tracers.memory_stream import _MemoryStream -from langchain_core.utils.aiter import aclosing, py_anext +from langchain_core.utils.aiter import aclosing from langchain_core.utils.uuid import uuid7 if TYPE_CHECKING: @@ -189,7 +189,7 @@ class _AstreamEventsCallbackHandler(AsyncCallbackHandler, _StreamingCallbackHand # atomic check and set tap = self.is_tapped.setdefault(run_id, sentinel) # wait for first chunk - first = await py_anext(output, default=sentinel) + first = await anext(output, sentinel) if first is sentinel: return # get run info diff --git a/libs/core/langchain_core/utils/aiter.py b/libs/core/langchain_core/utils/aiter.py index b196b43aba8..e773cb899cd 100644 --- a/libs/core/langchain_core/utils/aiter.py +++ b/libs/core/langchain_core/utils/aiter.py @@ -26,13 +26,15 @@ from typing import ( from typing_extensions import override +from langchain_core._api.deprecation import deprecated + T = TypeVar("T") _no_default = object() # https://github.com/python/cpython/blob/main/Lib/test/test_asyncgen.py#L54 -# before 3.10, the builtin anext() was not available +@deprecated(since="1.1.2", removal="2.0.0") def py_anext( iterator: AsyncIterator[T], default: T | Any = _no_default ) -> Awaitable[T | Any | None]: