mirror of
https://github.com/hwchase17/langchain.git
synced 2025-06-25 08:03:39 +00:00
core[patch]: Correctly order parent ids in astream events (from root to immediate parent), add defensive check for cycles (#22637)
This PR makes two changes: 1. Fixes the order of parent IDs to be from root to immediate parent 2. Adds a simple defensive check for cycles
This commit is contained in:
parent
835926153b
commit
28f744c1f5
@ -118,13 +118,20 @@ class _AstreamEventsCallbackHandler(AsyncCallbackHandler, _StreamingCallbackHand
|
||||
def _get_parent_ids(self, run_id: UUID) -> List[str]:
|
||||
"""Get the parent IDs of a run (non-recursively) cast to strings."""
|
||||
parent_ids = []
|
||||
parent_id = self.parent_map[run_id]
|
||||
|
||||
while parent_id is not None:
|
||||
parent_ids.append(str(parent_id))
|
||||
parent_id = self.parent_map[parent_id]
|
||||
while parent_id := self.parent_map.get(run_id):
|
||||
str_parent_id = str(parent_id)
|
||||
if str_parent_id in parent_ids:
|
||||
raise AssertionError(
|
||||
f"Parent ID {parent_id} is already in the parent_ids list. "
|
||||
f"This should never happen."
|
||||
)
|
||||
parent_ids.append(str_parent_id)
|
||||
run_id = parent_id
|
||||
|
||||
return parent_ids
|
||||
# Return the parent IDs in reverse order, so that the first
|
||||
# parent ID is the root and the last ID is the immediate parent.
|
||||
return parent_ids[::-1]
|
||||
|
||||
def _send(self, event: StreamEvent, event_type: str) -> None:
|
||||
"""Send an event to the stream."""
|
||||
|
@ -2033,8 +2033,8 @@ async def test_parent_run_id_assignment() -> None:
|
||||
"metadata": {},
|
||||
"name": "grandchild",
|
||||
"parent_ids": [
|
||||
"00000000-0000-0000-0000-000000000008",
|
||||
"00000000-0000-0000-0000-000000000007",
|
||||
"00000000-0000-0000-0000-000000000008",
|
||||
],
|
||||
"run_id": "00000000-0000-0000-0000-000000000009",
|
||||
"tags": [],
|
||||
@ -2045,8 +2045,8 @@ async def test_parent_run_id_assignment() -> None:
|
||||
"metadata": {},
|
||||
"name": "grandchild",
|
||||
"parent_ids": [
|
||||
"00000000-0000-0000-0000-000000000008",
|
||||
"00000000-0000-0000-0000-000000000007",
|
||||
"00000000-0000-0000-0000-000000000008",
|
||||
],
|
||||
"run_id": "00000000-0000-0000-0000-000000000009",
|
||||
"tags": [],
|
||||
@ -2081,6 +2081,42 @@ async def test_parent_run_id_assignment() -> None:
|
||||
]
|
||||
|
||||
|
||||
async def test_bad_parent_ids() -> None:
|
||||
"""Test handling of situation where a run id is duplicated in the run tree."""
|
||||
|
||||
# Type ignores in the code below need to be investigated.
|
||||
# Looks like a typing issue when using RunnableLambda as a decorator
|
||||
# with async functions.
|
||||
@RunnableLambda # type: ignore
|
||||
async def child(x: str) -> str:
|
||||
return x
|
||||
|
||||
@RunnableLambda # type: ignore
|
||||
async def parent(x: str, config: RunnableConfig) -> str:
|
||||
config["run_id"] = uuid.UUID(int=7)
|
||||
return await child.ainvoke(x, config) # type: ignore
|
||||
|
||||
bond = uuid.UUID(int=7)
|
||||
events = await _collect_events(
|
||||
parent.astream_events("hello", {"run_id": bond}, version="v2"),
|
||||
with_nulled_ids=False,
|
||||
)
|
||||
# Includes only a partial list of events since the run ID gets duplicated
|
||||
# between parent and child run ID and the callback handler throws an exception.
|
||||
# The exception does not get bubbled up to the user.
|
||||
assert events == [
|
||||
{
|
||||
"data": {"input": "hello"},
|
||||
"event": "on_chain_start",
|
||||
"metadata": {},
|
||||
"name": "parent",
|
||||
"parent_ids": [],
|
||||
"run_id": "00000000-0000-0000-0000-000000000007",
|
||||
"tags": [],
|
||||
}
|
||||
]
|
||||
|
||||
|
||||
async def test_runnable_generator() -> None:
|
||||
"""Test async events from sync lambda."""
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user