mirror of
https://github.com/hwchase17/langchain.git
synced 2026-06-09 10:17:00 +00:00
chore(infra): merge v1.4 into master (#37350)
This commit is contained in:
@@ -1,13 +1,14 @@
|
||||
"""Validator for LangChain content-block protocol event streams.
|
||||
|
||||
Checks that an event stream emitted by a chat model (via `stream_v2`,
|
||||
Checks that an event stream emitted by a chat model (via `stream_events(version="v3")`,
|
||||
or by the compat bridge's `chunks_to_events` / `message_to_events`)
|
||||
conforms to the protocol lifecycle rules:
|
||||
|
||||
- `message-start` opens and `message-finish` closes the stream.
|
||||
- Content blocks do not interleave: each block runs
|
||||
- Content blocks may interleave: each block index runs
|
||||
`content-block-start` → optional `content-block-delta`s →
|
||||
`content-block-finish` before the next block begins.
|
||||
`content-block-finish`, while other block indices may start or receive
|
||||
deltas before that block finishes.
|
||||
- Wire indices on content-block events are sequential `uint` values
|
||||
starting at 0.
|
||||
- For deltaable block types (`text`, `reasoning`, `tool_call_chunk`,
|
||||
@@ -37,7 +38,7 @@ def assert_valid_event_stream(events: Iterable[Any]) -> None:
|
||||
|
||||
Args:
|
||||
events: Iterable of protocol event dicts (as yielded by
|
||||
`stream_v2` or `chunks_to_events`).
|
||||
`stream_events(version="v3")` or `chunks_to_events`).
|
||||
|
||||
Raises:
|
||||
AssertionError: On the first lifecycle violation found. The
|
||||
@@ -71,7 +72,7 @@ def assert_valid_event_stream(events: Iterable[Any]) -> None:
|
||||
"`message-finish` must be the final event"
|
||||
)
|
||||
|
||||
open_idx: int | None = None
|
||||
open_indices: set[int] = set()
|
||||
expected_next_idx = 0
|
||||
start_events: dict[int, dict[str, Any]] = {}
|
||||
finish_events: dict[int, dict[str, Any]] = {}
|
||||
@@ -83,8 +84,9 @@ def assert_valid_event_stream(events: Iterable[Any]) -> None:
|
||||
assert i == 0, f"duplicate `message-start` at event {i}"
|
||||
continue
|
||||
if ev == "message-finish":
|
||||
assert open_idx is None, (
|
||||
f"`message-finish` while block {open_idx} still open (event {i})"
|
||||
assert not open_indices, (
|
||||
f"`message-finish` while blocks {sorted(open_indices)} "
|
||||
f"still open (event {i})"
|
||||
)
|
||||
continue
|
||||
if ev == "error":
|
||||
@@ -102,36 +104,41 @@ def assert_valid_event_stream(events: Iterable[Any]) -> None:
|
||||
assert idx == expected_next_idx, (
|
||||
f"expected next wire index {expected_next_idx}, got {idx} at event {i}"
|
||||
)
|
||||
assert open_idx is None, (
|
||||
f"content-block-start at idx={idx} while block {open_idx} "
|
||||
f"still open (event {i}); blocks must not interleave"
|
||||
assert idx not in start_events, (
|
||||
f"duplicate content-block-start for idx={idx} at event {i}"
|
||||
)
|
||||
open_idx = idx
|
||||
start_events[idx] = event["content_block"]
|
||||
open_indices.add(idx)
|
||||
start_events[idx] = event.get("content") or event["content_block"]
|
||||
delta_accum[idx] = {}
|
||||
expected_next_idx += 1
|
||||
elif ev == "content-block-delta":
|
||||
idx = event["index"]
|
||||
assert idx == open_idx, (
|
||||
f"content-block-delta at idx={idx} but currently-open block is "
|
||||
f"{open_idx} (event {i})"
|
||||
assert idx in open_indices, (
|
||||
f"content-block-delta at idx={idx} but that block is not open "
|
||||
f"(event {i})"
|
||||
)
|
||||
block = event["content_block"]
|
||||
_accumulate_delta(delta_accum[idx], block)
|
||||
delta = event.get("delta")
|
||||
if delta is None and "content_block" in event:
|
||||
delta = _legacy_block_to_delta(event["content_block"])
|
||||
_accumulate_delta(delta_accum[idx], delta)
|
||||
elif ev == "content-block-finish":
|
||||
idx = event["index"]
|
||||
assert idx == open_idx, (
|
||||
f"content-block-finish at idx={idx} but currently-open block is "
|
||||
f"{open_idx} (event {i})"
|
||||
assert idx in open_indices, (
|
||||
f"content-block-finish at idx={idx} but that block is not open "
|
||||
f"(event {i})"
|
||||
)
|
||||
finish_events[idx] = event["content_block"]
|
||||
open_idx = None
|
||||
assert idx not in finish_events, (
|
||||
f"duplicate content-block-finish for idx={idx} at event {i}"
|
||||
)
|
||||
finish_events[idx] = event.get("content") or event["content_block"]
|
||||
open_indices.remove(idx)
|
||||
else:
|
||||
# Unknown event types are accepted; the CDDL allows extensions.
|
||||
continue
|
||||
|
||||
assert open_idx is None, (
|
||||
f"block {open_idx} still open at end of stream — no content-block-finish"
|
||||
assert not open_indices, (
|
||||
f"blocks {sorted(open_indices)} still open at end of stream — "
|
||||
"no content-block-finish"
|
||||
)
|
||||
missing = set(start_events) - set(finish_events)
|
||||
assert not missing, (
|
||||
@@ -143,21 +150,40 @@ def assert_valid_event_stream(events: Iterable[Any]) -> None:
|
||||
_assert_delta_matches_finish(idx, delta_accum[idx], finish_block)
|
||||
|
||||
|
||||
def _accumulate_delta(accum: dict[str, Any], block: dict[str, Any]) -> None:
|
||||
"""Fold a delta block into the running accumulator for its index."""
|
||||
def _legacy_block_to_delta(block: dict[str, Any]) -> dict[str, Any]:
|
||||
"""Convert the old content-block delta shape to an explicit delta."""
|
||||
btype = block.get("type")
|
||||
if btype not in _DELTAABLE_TYPES:
|
||||
return
|
||||
if btype == "text":
|
||||
accum["text"] = accum.get("text", "") + block.get("text", "")
|
||||
elif btype == "reasoning":
|
||||
accum["reasoning"] = accum.get("reasoning", "") + block.get("reasoning", "")
|
||||
else: # tool_call_chunk / server_tool_call_chunk
|
||||
accum["args"] = accum.get("args", "") + (block.get("args") or "")
|
||||
if block.get("id") is not None:
|
||||
accum["id"] = block["id"]
|
||||
if block.get("name") is not None:
|
||||
accum["name"] = block["name"]
|
||||
return {"type": "text-delta", "text": block.get("text", "")}
|
||||
if btype == "reasoning":
|
||||
return {
|
||||
"type": "reasoning-delta",
|
||||
"reasoning": block.get("reasoning", ""),
|
||||
}
|
||||
if "data" in block:
|
||||
return {"type": "data-delta", "data": block.get("data", "")}
|
||||
return {"type": "block-delta", "fields": block}
|
||||
|
||||
|
||||
def _accumulate_delta(accum: dict[str, Any], delta: dict[str, Any] | None) -> None:
|
||||
"""Fold a delta block into the running accumulator for its index."""
|
||||
if delta is None:
|
||||
return
|
||||
dtype = delta.get("type")
|
||||
if dtype == "text-delta":
|
||||
accum["text"] = accum.get("text", "") + delta.get("text", "")
|
||||
elif dtype == "reasoning-delta":
|
||||
accum["reasoning"] = accum.get("reasoning", "") + delta.get("reasoning", "")
|
||||
elif dtype == "data-delta":
|
||||
accum["data"] = accum.get("data", "") + delta.get("data", "")
|
||||
elif dtype == "block-delta":
|
||||
fields = delta.get("fields")
|
||||
if not isinstance(fields, dict):
|
||||
return
|
||||
btype = fields.get("type")
|
||||
if btype not in _DELTAABLE_TYPES:
|
||||
return
|
||||
accum.update({k: v for k, v in fields.items() if v is not None})
|
||||
|
||||
|
||||
def _assert_delta_matches_finish(
|
||||
@@ -197,6 +223,8 @@ def _assert_delta_matches_finish(
|
||||
except json.JSONDecodeError:
|
||||
parsed = None
|
||||
assert finish_block.get("args") == parsed
|
||||
elif "data" in accum:
|
||||
assert finish_block.get("data") == accum["data"]
|
||||
|
||||
|
||||
__all__ = ["assert_valid_event_stream"]
|
||||
|
||||
Reference in New Issue
Block a user