diff --git a/libs/core/tests/unit_tests/runnables/test_runnable_events.py b/libs/core/tests/unit_tests/runnables/test_runnable_events.py index 042ce728bd2..7394873f695 100644 --- a/libs/core/tests/unit_tests/runnables/test_runnable_events.py +++ b/libs/core/tests/unit_tests/runnables/test_runnable_events.py @@ -1220,6 +1220,108 @@ async def test_event_stream_on_chain_with_tool() -> None: ] +@pytest.mark.xfail(reason="Fix order of callback invocations in RunnableSequence") +async def test_chain_ordering() -> None: + """Test the event stream with a tool.""" + + def foo(a: str) -> str: + return a + + def bar(a: str) -> str: + return a + + chain = RunnableLambda(foo) | RunnableLambda(bar) + iterable = chain.astream_events("q", version="v1") + + events = [] + + for _ in range(10): + try: + next_chunk = await iterable.__anext__() + events.append(next_chunk) + except Exception: + break + + events = _with_nulled_run_id(events) + for event in events: + event["tags"] = sorted(event["tags"]) + + assert events == [ + { + "data": {"input": "q"}, + "event": "on_chain_start", + "metadata": {}, + "name": "RunnableSequence", + "run_id": "", + "tags": [], + }, + { + "data": {}, + "event": "on_chain_start", + "metadata": {}, + "name": "foo", + "run_id": "", + "tags": ["seq:step:1"], + }, + { + "data": {"chunk": "q"}, + "event": "on_chain_stream", + "metadata": {}, + "name": "foo", + "run_id": "", + "tags": ["seq:step:1"], + }, + { + "data": {"input": "q", "output": "q"}, + "event": "on_chain_end", + "metadata": {}, + "name": "foo", + "run_id": "", + "tags": ["seq:step:1"], + }, + { + "data": {}, + "event": "on_chain_start", + "metadata": {}, + "name": "bar", + "run_id": "", + "tags": ["seq:step:2"], + }, + { + "data": {"chunk": "q"}, + "event": "on_chain_stream", + "metadata": {}, + "name": "bar", + "run_id": "", + "tags": ["seq:step:2"], + }, + { + "data": {"chunk": "q"}, + "event": "on_chain_stream", + "metadata": {}, + "name": "RunnableSequence", + "run_id": "", + "tags": [], + }, + { + "data": {"input": "q", "output": "q"}, + "event": "on_chain_end", + "metadata": {}, + "name": "bar", + "run_id": "", + "tags": ["seq:step:2"], + }, + { + "data": {"output": "q"}, + "event": "on_chain_end", + "metadata": {}, + "name": "RunnableSequence", + "run_id": "", + "tags": [], + }, + ] + + async def test_event_stream_with_retry() -> None: """Test the event stream with a tool."""