chore(core): use anext and deprecate py_anext (#34211)

LangChain uses Python 3.10+ so `py_anext` isn't needed anymore.

---------

Co-authored-by: Mason Daugherty <mason@langchain.dev>
This commit is contained in:
Christophe Bornet
2025-12-08 15:50:40 +01:00
committed by GitHub
parent 9875ffbabc
commit bb71f53585
5 changed files with 12 additions and 11 deletions

View File

@@ -94,7 +94,7 @@ from langchain_core.tracers.root_listeners import (
AsyncRootListenersTracer,
RootListenersTracer,
)
from langchain_core.utils.aiter import aclosing, atee, py_anext
from langchain_core.utils.aiter import aclosing, atee
from langchain_core.utils.iter import safetee
from langchain_core.utils.pydantic import create_model_v2
@@ -2377,7 +2377,7 @@ class Runnable(ABC, Generic[Input, Output]):
# tee the input so we can iterate over it twice
input_for_tracing, input_for_transform = atee(inputs, 2)
# Start the input iterator to ensure the input Runnable starts before this one
final_input: Input | None = await py_anext(input_for_tracing, None)
final_input: Input | None = await anext(input_for_tracing, None)
final_input_supported = True
final_output: Output | None = None
final_output_supported = True
@@ -2417,7 +2417,7 @@ class Runnable(ABC, Generic[Input, Output]):
iterator = iterator_
try:
while True:
chunk = await coro_with_context(py_anext(iterator), context)
chunk = await coro_with_context(anext(iterator), context)
yield chunk
if final_output_supported:
if final_output is None:
@@ -4025,7 +4025,7 @@ class RunnableParallel(RunnableSerializable[Input, dict[str, Any]]):
# Wrap in a coroutine to satisfy linter
async def get_next_chunk(generator: AsyncIterator) -> Output | None:
return await py_anext(generator)
return await anext(generator)
# Start the first iteration of each generator
tasks = {

View File

@@ -28,7 +28,6 @@ from langchain_core.runnables.utils import (
coro_with_context,
get_unique_config_specs,
)
from langchain_core.utils.aiter import py_anext
if TYPE_CHECKING:
from langchain_core.callbacks.manager import AsyncCallbackManagerForChainRun
@@ -563,7 +562,7 @@ class RunnableWithFallbacks(RunnableSerializable[Input, Output]):
child_config,
**kwargs,
)
chunk = await coro_with_context(py_anext(stream), context)
chunk = await coro_with_context(anext(stream), context)
except self.exceptions_to_handle as e:
first_error = e if first_error is None else first_error
last_error = e

View File

@@ -33,7 +33,7 @@ from langchain_core.runnables.utils import (
AddableDict,
ConfigurableFieldSpec,
)
from langchain_core.utils.aiter import atee, py_anext
from langchain_core.utils.aiter import atee
from langchain_core.utils.iter import safetee
from langchain_core.utils.pydantic import create_model_v2
@@ -614,7 +614,7 @@ class RunnableAssign(RunnableSerializable[dict[str, Any], dict[str, Any]]):
)
# start map output stream
first_map_chunk_task: asyncio.Task = asyncio.create_task(
py_anext(map_output, None), # type: ignore[arg-type]
anext(map_output, None),
)
# consume passthrough stream
async for chunk in for_passthrough:

View File

@@ -42,7 +42,7 @@ from langchain_core.tracers.log_stream import (
_astream_log_implementation,
)
from langchain_core.tracers.memory_stream import _MemoryStream
from langchain_core.utils.aiter import aclosing, py_anext
from langchain_core.utils.aiter import aclosing
from langchain_core.utils.uuid import uuid7
if TYPE_CHECKING:
@@ -189,7 +189,7 @@ class _AstreamEventsCallbackHandler(AsyncCallbackHandler, _StreamingCallbackHand
# atomic check and set
tap = self.is_tapped.setdefault(run_id, sentinel)
# wait for first chunk
first = await py_anext(output, default=sentinel)
first = await anext(output, sentinel)
if first is sentinel:
return
# get run info

View File

@@ -26,13 +26,15 @@ from typing import (
from typing_extensions import override
from langchain_core._api.deprecation import deprecated
T = TypeVar("T")
_no_default = object()
# https://github.com/python/cpython/blob/main/Lib/test/test_asyncgen.py#L54
# before 3.10, the builtin anext() was not available
@deprecated(since="1.1.2", removal="2.0.0")
def py_anext(
iterator: AsyncIterator[T], default: T | Any = _no_default
) -> Awaitable[T | Any | None]: