From d08245f70d82cbbc10afcb5f71f97cdcc8a67424 Mon Sep 17 00:00:00 2001 From: Nick Hollon Date: Fri, 22 May 2026 17:27:02 -0400 Subject: [PATCH] feat(langchain): redact streamed PII in flight on `PIIMiddleware` (#37616) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `PIIMiddleware` previously scrubbed detected PII only at the state level via its `after_model` / `before_model` hooks. Consumers reading the live stream — `astream_events(version="v3")` or `run.messages` / `run.tool_calls` / `run.values` — saw the raw model text, the raw tool-call args, the raw tool outputs, and the raw state snapshots until the run finished and the canonical conversation history was written. This change registers a stream transformer ahead of `MessagesTransformer` that redacts every wire surface of an agent run. The transformer holds a sliding lookback buffer (default 128 characters) per `(run_id, content-block index)` so PII patterns that straddle delta boundaries are caught before the safe prefix is released downstream. Anything older than the lookback is run through the configured detector and emitted; the trailing tail stays buffered until a later delta extends it past the cap or the block finishes. `_finalize_block` always re-runs detection over the full block snapshot so the finalized content lands fully redacted even when the in-flight buffer never released a tail (short responses, or PII arriving in the final delta). The `block` strategy is now supported on the streaming path via a buffering mode that withholds every delta until the block resolves — clean blocks release the full text at finalize, PII-bearing blocks zero the wire and let `after_model` / `apply_to_tool_results` raise `PIIDetectionError` on the original state message. Activation is gated on `apply_to_output=True`, matching the existing post-hoc semantics. The middleware's transformer factory is cloned by `StreamMux._make_child` into every subgraph scope, so attaching `PIIMiddleware` at the outer agent also redacts streamed deltas from sub-agents invoked inside tools. ## Tool-call and tools-channel coverage The transformer covers every wire surface of an agent run, not just AI message text: - **Streamed AI text deltas** (`content-block-delta` of type `text-delta`) — lookback machinery, redacted in place. - **Streamed tool-call args** (`content-block-delta` with `tool_call_chunk` / `server_tool_call_chunk` fields) — each delta carries the full cumulative args string; detection runs on the field directly and redacts in place. Verified empirically against `_compat_bridge.py` and the consumer-side `_merge_block_delta_into_store` snapshot-replace semantics. - **Finalized tool-call blocks** (`content-block-finish` with `tool_call` / `server_tool_call` / `invalid_tool_call`) — `args` dict walked recursively and each string leaf redacted. - **Tool execution events on the `tools` channel** — `tool-started.input`, `tool-output-delta`, `tool-finished.output`, `tool-error.message` all run through detection. String deltas use the same lookback machinery as text-deltas keyed by `tool_call_id`; structured payloads walk recursively. - **State snapshots on the `values` channel** — message lists are walked and each message's `.content` is redacted on a fresh copy. Graph state itself stays intact for the state-level enforcer (`apply_to_tool_results` via `before_model`) to act on independently. - **Legacy `(BaseMessage, metadata)` payloads** on the `messages` channel (Python 3.10 path, where `langgraph`'s `ASYNCIO_ACCEPTS_CONTEXT = sys.version_info >= (3, 11)` falls back to a code path that doesn't propagate the streaming callback into the chat model) — `.content` and `AIMessage.tool_calls[*].args` are scrubbed. For `block`, the event's `data` tuple is replaced with an empty-content copy so the original message stays in state for `after_model` to raise on. ## Worth a careful look - `_PIIStreamTransformer._mutate_text_delta` — lookback partition. Anything older than `lookback` characters is released after redaction; the tail stays buffered. Bulletproof against whitespace-permissive detectors (notably `credit_card`, whose regex matches across spaces). - `_PIIStreamTransformer._mutate_tool_call_chunk_delta` — direct in-place redaction of the cumulative args string. No buffer; the wire shape is cumulative-snapshot, the consumer-side merge is replace-not-append. - `_PIIStreamTransformer._mutate_legacy_payload` — the dual path: mutate-in-place for non-`block` (idempotent with `after_model`), replace-with-empty-copy for `block` (keeps original in graph state for `after_model` to raise on). - `_PIIStreamTransformer._redact_value` — the recursive walker. `BaseMessage` branch returns a fresh `.content`-redacted copy via `model_copy(update=...)` — never mutates in place — so tool-output payloads that wrap a `ToolMessage` and message lists in state snapshots flow through cleanly. - The new `transformers` attribute on `PIIMiddleware`: this is what makes `create_agent` pick the factory up. Multiple `PIIMiddleware` instances each register one transformer; ordering is preserved within the `before_builtins` lane. ## Compatibility Bumps `langgraph` to `>=1.2.1` for the `before_builtins` opt-in on `StreamTransformer`. --- .../langchain/agents/middleware/pii.py | 506 ++++- libs/langchain_v1/pyproject.toml | 2 +- .../middleware/implementations/test_pii.py | 1670 ++++++++++++++++- libs/langchain_v1/uv.lock | 8 +- 4 files changed, 2178 insertions(+), 8 deletions(-) diff --git a/libs/langchain_v1/langchain/agents/middleware/pii.py b/libs/langchain_v1/langchain/agents/middleware/pii.py index a00c229a148..c3b99eee8a7 100644 --- a/libs/langchain_v1/langchain/agents/middleware/pii.py +++ b/libs/langchain_v1/langchain/agents/middleware/pii.py @@ -2,9 +2,11 @@ from __future__ import annotations -from typing import TYPE_CHECKING, Any, Literal +from functools import partial +from typing import TYPE_CHECKING, Any, ClassVar, Literal -from langchain_core.messages import AIMessage, AnyMessage, HumanMessage, ToolMessage +from langchain_core.messages import AIMessage, AnyMessage, BaseMessage, HumanMessage, ToolMessage +from langgraph.stream import StreamTransformer from typing_extensions import override from langchain.agents.middleware._redaction import ( @@ -31,6 +33,460 @@ if TYPE_CHECKING: from collections.abc import Callable from langgraph.runtime import Runtime + from langgraph.stream._types import ProtocolEvent + + +_DEFAULT_STREAM_LOOKBACK = 128 +"""Default trailing-buffer size for cross-delta PII detection. + +The transformer always holds the last `lookback` characters in a per-content +block buffer so that PII patterns straddling delta boundaries are detected +before any text is released downstream. 128 comfortably covers the built-in +detectors (the credit-card regex tops out at 19 characters; URLs and emails +are typically well under 100) while bounding first-token latency. +""" + + +class _PIIStreamTransformer(StreamTransformer): + """Mutates `content-block-delta` text on `messages` events in flight. + + Runs before built-in stream transformers so the redacted text is what + every downstream consumer sees — both the main protocol event log and + the `run.messages` projection that `MessagesTransformer` snapshots into. + + Holds a sliding buffer of the most recent text per (run_id, content + block index) so PII patterns that straddle delta boundaries are caught. + Anything older than `lookback` characters is redacted with the resolved + rule's strategy and emitted as the new delta text; the trailing tail + stays in the buffer until a later delta extends it past the cap or the + block's finish event flushes the snapshot. + """ + + before_builtins: ClassVar[bool] = True + required_stream_modes: ClassVar[tuple[str, ...]] = ("messages", "tools", "values") + + def __init__( + self, + scope: tuple[str, ...] = (), + *, + rule: ResolvedRedactionRule, + lookback: int = _DEFAULT_STREAM_LOOKBACK, + ) -> None: + super().__init__(scope) + self._rule = rule + self._lookback = lookback + # Text/reasoning deltas keyed by `(run_id, content_block_index)`. + self._buffers: dict[tuple[str, int], str] = {} + # Tool-output-delta buffers keyed by `tool_call_id`. Held in a + # separate dict so `_drop_run` on the messages channel can't + # sweep active tool-output state. + self._tool_buffers: dict[str, str] = {} + + def init(self) -> dict[str, Any]: + # No projection — this transformer mutates events in place rather + # than building a derived view. + return {} + + def process(self, event: ProtocolEvent) -> bool: + method = event["method"] + if method == "messages": + return self._process_messages_event(event) + if method == "tools": + return self._process_tools_event(event) + if method == "values": + return self._process_values_event(event) + return True + + def _process_values_event(self, event: ProtocolEvent) -> bool: + """Redact the state snapshot on the `values` channel. + + State snapshots emitted between nodes carry the full state dict, + which typically includes the messages list. Walking the snapshot + with `_redact_value` returns a fresh structure where every + message has a redacted copy of its content — the original + objects in graph state remain intact for the state-level + enforcer (`apply_to_tool_results` via `before_model`) to act on + independently when the agent loops back. + """ + data = event["params"].get("data") + if data is None: + return True + event["params"]["data"] = self._redact_value(data) + return True + + def _process_messages_event(self, event: ProtocolEvent) -> bool: + params = event["params"] + data = params.get("data") + if not isinstance(data, tuple) or len(data) != 2: # noqa: PLR2004 + return True + payload, metadata = data + + # Legacy `(BaseMessage, metadata)` shape: the langgraph→langchain + # integration emits this when a model only implements `_generate` + # (or when its `_astream` falls back), producing a single event + # carrying the full message rather than streamed content-block + # deltas. Swap in a redacted copy so the consumer sees scrubbed + # text on the wire while the original stays intact in graph state + # for `after_model` to act on independently. Under `block`, + # `_redact_base_message` raises `PIIDetectionError` via + # `apply_strategy` before we get here. + if isinstance(payload, BaseMessage): + redacted = self._redact_base_message(payload) + if redacted is not payload: + params["data"] = (redacted, metadata) + return True + + if not isinstance(payload, dict): + return True + kind = payload.get("event") + run_id = str(metadata.get("run_id") or "") if metadata else "" + + if kind == "content-block-delta": + self._mutate_delta(payload, run_id) + elif kind == "content-block-finish": + self._finalize_block(payload, run_id) + elif kind in {"message-finish", "error"}: + self._drop_run(run_id) + return True + + def _process_tools_event(self, event: ProtocolEvent) -> bool: + data = event["params"].get("data") + if not isinstance(data, dict): + return True + kind = data.get("event") + tool_call_id = data.get("tool_call_id") + + if kind == "tool-started": + # Tool inputs may be a dict (multi-arg tools), a string + # (single-arg tools — `BaseTool._parse_input` passes the + # raw string through), or a list (array-input tools). + # `_redact_value` handles all three uniformly. + if "input" in data: + data["input"] = self._redact_value(data["input"]) + elif kind == "tool-output-delta": + # Use the tool_call_id as buffer key when present; fall back + # to a None-keyed slot for the rare malformed/custom emitter + # case (the buffer becomes shared but at least redaction runs). + self._mutate_tool_output_delta( + data, tool_call_id if isinstance(tool_call_id, str) else "" + ) + elif kind == "tool-finished": + if "output" in data: + data["output"] = self._redact_value(data["output"]) + if isinstance(tool_call_id, str): + self._tool_buffers.pop(tool_call_id, None) + elif kind == "tool-error": + msg = data.get("message") + if isinstance(msg, str) and msg: + matches = self._rule.detector(msg) + if matches: + data["message"] = apply_strategy(msg, matches, self._rule.strategy) + if isinstance(tool_call_id, str): + self._tool_buffers.pop(tool_call_id, None) + + return True + + def _mutate_tool_output_delta(self, data: dict[str, Any], tool_call_id: str) -> None: + """Redact a `tool-output-delta` payload. + + String deltas go through the same lookback machinery as + text-deltas, keyed by `tool_call_id` in the disjoint + `_tool_buffers` dict so `_drop_run` on the messages channel + can't sweep active tool-output state. + + Structured deltas (dict/list) walk recursively without + buffering — they don't have a position-stable shape across + deltas to buffer against. + """ + delta = data.get("delta") + if isinstance(delta, str): + held = self._tool_buffers.get(tool_call_id, "") + combined = held + delta + + matches = self._rule.detector(combined) + if matches: + # `apply_strategy` raises `PIIDetectionError` under + # `strategy="block"`, failing the run immediately — + # cleaner than withholding deltas until `after_model` + # raises later. + combined = apply_strategy(combined, matches, self._rule.strategy) + + emit_end = max(0, len(combined) - self._lookback) + self._tool_buffers[tool_call_id] = combined[emit_end:] + data["delta"] = combined[:emit_end] + elif isinstance(delta, (dict, list)): + data["delta"] = self._redact_value(delta) + + def _redact_tool_call_list(self, calls: list[Any] | None) -> tuple[list[Any], bool]: + """Walk a list of tool-call (or invalid-tool-call) dicts. + + Returns `(new_list, changed)`. Each element's `args` is run + through `_redact_value` regardless of its type — `tool_call.args` + is a dict, `invalid_tool_call.args` is a raw JSON string, and + `_redact_value` handles both shapes uniformly. If nothing + changed, returns the input list and `changed=False`. + """ + if not calls: + return calls or [], False + new_calls: list[Any] = [] + changed = False + for tc in calls: + if isinstance(tc, dict) and "args" in tc and tc["args"] is not None: + redacted = self._redact_value(tc["args"]) + if redacted != tc["args"]: + new_tc = dict(tc) + new_tc["args"] = redacted + new_calls.append(new_tc) + changed = True + continue + new_calls.append(tc) + return new_calls, changed + + def _redact_value(self, value: Any) -> Any: + """Recursively redact PII in string leaves of a nested structure. + + Returns a new value where every `str` leaf that contains PII has + been replaced (or emptied under `block`). Non-string leaves and + the structure itself are preserved. + + `BaseMessage` payloads (typically `ToolMessage` from + `tool-finished.output`, or any message reached via the `values` + channel) return a fresh copy with `.content` redacted plus + `AIMessage.tool_calls[*].args` / `invalid_tool_calls[*].args` + walked. The original object stays intact for state-level + enforcers (`after_model`, `before_model` with + `apply_to_tool_results`) to act on independently. + + Scope mirrors the pre-streaming state-level surfaces: + `.content` (string or list-of-content-blocks) and `tool_calls` + args. Other message attributes (`additional_kwargs`, + `response_metadata`, `ToolMessage.artifact`) are intentionally + not walked here — they aren't scrubbed in graph state by the + existing hooks, so scrubbing them on the wire would create + a wire/state divergence. + """ + if isinstance(value, str): + if not value: + return value + matches = self._rule.detector(value) + if not matches: + return value + # `apply_strategy` raises `PIIDetectionError` under `block` + # — the run fails immediately rather than buffering until a + # state-level hook can raise. + return apply_strategy(value, matches, self._rule.strategy) + if isinstance(value, BaseMessage): + return self._redact_base_message(value) + if isinstance(value, dict): + return {k: self._redact_value(v) for k, v in value.items()} + if isinstance(value, list): + return [self._redact_value(v) for v in value] + if isinstance(value, tuple): + return tuple(self._redact_value(v) for v in value) + return value + + def _redact_base_message(self, value: BaseMessage) -> BaseMessage: + """Return a fresh copy of `value` with PII-carrying surfaces redacted.""" + update: dict[str, Any] = {} + + content = value.content + if isinstance(content, str) and content: + matches = self._rule.detector(content) + if matches: + update["content"] = apply_strategy(content, matches, self._rule.strategy) + elif isinstance(content, list) and content: + # Structured content-blocks shape: + # `[{"type": "text", "text": "..."}, {"type": "tool_call", ...}, ...]`. + redacted_content = self._redact_value(content) + if redacted_content != content: + update["content"] = redacted_content + + # `AIMessage.tool_calls` and `.invalid_tool_calls` carry PII in + # `args` independently of `.content`. `tool_call.args` is a + # dict; `invalid_tool_call.args` is a raw JSON string — + # `_redact_value` handles both shapes via the recursion. + if isinstance(value, AIMessage): + new_tc_list, tc_changed = self._redact_tool_call_list(value.tool_calls) + if tc_changed: + update["tool_calls"] = new_tc_list + new_inv_list, inv_changed = self._redact_tool_call_list(value.invalid_tool_calls) + if inv_changed: + update["invalid_tool_calls"] = new_inv_list + + if not update: + return value + return value.model_copy(update=update) + + def _mutate_delta(self, payload: dict[str, Any], run_id: str) -> None: + delta = payload.get("delta") + if not isinstance(delta, dict): + return + delta_type = delta.get("type") + if delta_type == "text-delta": + self._mutate_string_field_delta(delta, payload, run_id, "text") + return + if delta_type == "reasoning-delta": + # Reasoning content (chain-of-thought from extended-thinking + # models) is a real PII surface — models echo back + # user-supplied data or synthesize it from context. Run the + # same lookback machinery as text-delta against the + # `reasoning` field. Block indices are unique within a + # message regardless of block type, so the buffer key + # `(run_id, index)` naturally disjoint from text-delta keys. + self._mutate_string_field_delta(delta, payload, run_id, "reasoning") + return + if delta_type == "block-delta": + fields = delta.get("fields") + if isinstance(fields, dict) and fields.get("type") in { + "tool_call_chunk", + "server_tool_call_chunk", + }: + self._mutate_tool_call_chunk_delta(fields) + # Other delta types (`data-delta`, vendor block types) pass + # through. The pre-streaming middleware scrubbed `.content` text + # on state messages only; binary payloads and provider-specific + # block shapes are out of scope for parity with that surface. + + def _mutate_string_field_delta( + self, + delta: dict[str, Any], + payload: dict[str, Any], + run_id: str, + field: str, + ) -> None: + """Apply the lookback-buffer redaction to a string field on a delta. + + Shared by `text-delta` (`field="text"`) and `reasoning-delta` + (`field="reasoning"`). Buffer is keyed by `(run_id, block_index)`; + block indices are unique within a message so different block + types share the same key space without collision. + """ + text = delta.get(field) + if not isinstance(text, str) or not text: + return + index = payload.get("index") + if not isinstance(index, int): + return + + key = (run_id, index) + held = self._buffers.get(key, "") + combined = held + text + + # Run detection on the full accumulated buffer before splitting. + # Detecting only on the about-to-emit prefix would miss matches + # that straddle the lookback boundary — the detector's regex + # needs a complete, boundary-anchored hit, so a truncated prefix + # would fail to match and the partial PII would leak on the + # wire. Under `strategy="block"`, `apply_strategy` raises + # `PIIDetectionError` here, failing the run as soon as PII + # arrives rather than buffering until `after_model`. + matches = self._rule.detector(combined) + if matches: + combined = apply_strategy(combined, matches, self._rule.strategy) + + emit_end = max(0, len(combined) - self._lookback) + self._buffers[key] = combined[emit_end:] + delta[field] = combined[:emit_end] + + def _mutate_tool_call_chunk_delta(self, fields: dict[str, Any]) -> None: + """Redact cumulative tool-call args with lookback withholding. + + Each `tool_call_chunk` `block-delta` event carries the full + accumulated args string (verified against `_compat_bridge.py` + — `delta_source = current` for these block types — and against + the consumer-side `_merge_block_delta_into_store`, which + replaces wholesale rather than appends). + + Detection runs on the full cumulative args so any complete PII + anywhere in the string is redacted before emission. Lookback + withholding then trims the trailing the lookback window characters + from what reaches the consumer — those characters might be the + start of a partial PII match that completes in a future + cumulative delta. The trimmed tail surfaces at `content-block- + finish` where `_finalize_block` redacts the parsed args dict. + + For args that fit within the lookback window (the typical case), + this withholds the entire args string during streaming — the + redacted args dict appears only at finalize. For args that + exceed the lookback window, the safe prefix streams incrementally + as the cumulative state grows. PII that appears more than + the lookback window characters from the cumulative tail in a + delta where it hasn't yet completed can still surface in the + emit prefix — same residual exposure as PII longer than + the lookback window on the text path. The `content-block-finish` + snapshot redaction is the backstop. + """ + args = fields.get("args") + if not isinstance(args, str) or not args: + return + + matches = self._rule.detector(args) + if matches: + # `apply_strategy` raises `PIIDetectionError` under + # `strategy="block"` — the run fails the moment a complete + # PII pattern surfaces in the cumulative args string. + args = apply_strategy(args, matches, self._rule.strategy) + + emit_end = max(0, len(args) - self._lookback) + fields["args"] = args[:emit_end] + + def _finalize_block(self, payload: dict[str, Any], run_id: str) -> None: + index = payload.get("index") + if not isinstance(index, int): + return + key = (run_id, index) + # The finalized block carries the model's original concatenation + # of deltas, not what we emitted on the wire. Re-run detection over + # its full text so the snapshot matches the redacted stream. + content = payload.get("content") + if isinstance(content, dict): + ctype = content.get("type") + if ctype == "text": + self._finalize_string_field(content, "text") + elif ctype == "reasoning": + self._finalize_string_field(content, "reasoning") + elif ( + ctype in {"tool_call", "server_tool_call", "invalid_tool_call"} + and "args" in content + and content["args"] is not None + ): + # `tool_call` / `server_tool_call` args are dicts; + # `invalid_tool_call.args` is the raw unparsed JSON + # string. `_redact_value` handles both shapes. + content["args"] = self._redact_value(content["args"]) + self._buffers.pop(key, None) + + def _finalize_string_field(self, content: dict[str, Any], field: str) -> None: + """Re-redact a string content-block field on `content-block-finish`. + + Used for `text` and `reasoning` content blocks. Under + `strategy="block"` `apply_strategy` raises `PIIDetectionError`, + failing the run immediately. + """ + text = content.get(field) + if not isinstance(text, str) or not text: + return + matches = self._rule.detector(text) + if not matches: + return + content[field] = apply_strategy(text, matches, self._rule.strategy) + + def _drop_run(self, run_id: str) -> None: + # Release any buffered tails for this run_id — content-block-finish + # should have already done so for normal completion, but message-finish + # / error paths need an explicit sweep so abandoned blocks don't + # accumulate in long-lived processes. + stale = [key for key in self._buffers if key[0] == run_id] + for key in stale: + del self._buffers[key] + + def finalize(self) -> None: + self._buffers.clear() + self._tool_buffers.clear() + + def fail(self, err: BaseException) -> None: # noqa: ARG002 + self._buffers.clear() + self._tool_buffers.clear() class PIIMiddleware(AgentMiddleware[AgentState[ResponseT], ContextT, ResponseT]): @@ -133,6 +589,32 @@ class PIIMiddleware(AgentMiddleware[AgentState[ResponseT], ContextT, ResponseT]) * If `None`: Uses built-in detector for the `pii_type` apply_to_input: Whether to check user messages before model call. apply_to_output: Whether to check AI messages after model call. + + When `True`, a stream transformer is also installed so + that every wire surface of an agent run is redacted in + flight: + + * Streamed AI text deltas (`content-block-delta` of type + `text-delta`) + * Streamed tool-call arguments (`content-block-delta` + with `tool_call_chunk` / `server_tool_call_chunk` + fields, plus the finalized `tool_call` content block + on `content-block-finish`) + * Tool execution events on the `tools` channel + (`tool-started.input`, `tool-output-delta`, + `tool-finished.output`, `tool-error.message`) + * State snapshots on the `values` channel — message + lists are walked and each message's `.content` is + redacted on a fresh copy (state itself stays intact + for `before_model` / `after_model` to act on + independently) + + State-level redaction via `after_model` (and + `before_model` with `apply_to_tool_results`) remains the + canonical enforcer; the streaming transformer ensures + consumers reading `astream_events(version="v3")` or + `run.messages` / `run.tool_calls` / `run.values` never + see PII on the wire. apply_to_tool_results: Whether to check tool result messages after tool execution. Raises: @@ -153,6 +635,26 @@ class PIIMiddleware(AgentMiddleware[AgentState[ResponseT], ContextT, ResponseT]) self.strategy = self._resolved_rule.strategy self.detector = self._resolved_rule.detector + # Stream transformer scrubs the streamed surface of the same + # messages that the state-level hooks scrub in graph state. + # Installed whenever any output-side scrubbing is enabled — + # `apply_to_output` covers AI messages (text, tool-call args, + # reasoning), `apply_to_tool_results` covers tool execution + # (the `tools` channel + ToolMessage content on `values` and + # `messages`). For `block` the transformer raises + # `PIIDetectionError` directly from its event handler the + # moment a complete PII pattern is detected, failing the run + # via langgraph's `StreamMux.afail` path. The state-level + # `after_model` / `before_model` hooks remain a backstop for + # non-streaming consumers. + if self.apply_to_output or self.apply_to_tool_results: + self.transformers = ( + partial( + _PIIStreamTransformer, + rule=self._resolved_rule, + ), + ) + @property def name(self) -> str: """Name of the middleware.""" diff --git a/libs/langchain_v1/pyproject.toml b/libs/langchain_v1/pyproject.toml index 483ceb4d0f9..da73e1312a3 100644 --- a/libs/langchain_v1/pyproject.toml +++ b/libs/langchain_v1/pyproject.toml @@ -25,7 +25,7 @@ version = "1.3.1" requires-python = ">=3.10.0,<4.0.0" dependencies = [ "langchain-core>=1.4.0,<2.0.0", - "langgraph>=1.2.0,<1.3.0", + "langgraph>=1.2.1,<1.3.0", "pydantic>=2.7.4,<3.0.0", ] diff --git a/libs/langchain_v1/tests/unit_tests/agents/middleware/implementations/test_pii.py b/libs/langchain_v1/tests/unit_tests/agents/middleware/implementations/test_pii.py index 87389425626..acac3cfc444 100644 --- a/libs/langchain_v1/tests/unit_tests/agents/middleware/implementations/test_pii.py +++ b/libs/langchain_v1/tests/unit_tests/agents/middleware/implementations/test_pii.py @@ -4,15 +4,27 @@ import re from typing import Any import pytest -from langchain_core.messages import AIMessage, HumanMessage, ToolCall, ToolMessage +from langchain_core.language_models.fake_chat_models import GenericFakeChatModel +from langchain_core.messages import ( + AIMessage, + BaseMessage, + HumanMessage, + InvalidToolCall, + ToolCall, + ToolMessage, +) +from langchain_core.tools import tool from langgraph.runtime import Runtime +from langgraph.stream.transformers import MessagesTransformer from langchain.agents import AgentState from langchain.agents.factory import create_agent +from langchain.agents.middleware._redaction import RedactionRule from langchain.agents.middleware.pii import ( PIIDetectionError, PIIMatch, PIIMiddleware, + _PIIStreamTransformer, detect_credit_card, detect_email, detect_ip, @@ -696,3 +708,1659 @@ class TestMultipleMiddleware: content = result["messages"][0].content assert "test@example.com" not in content assert "10.0.0.1" not in content + + +# ============================================================================ +# Stream Transformer Tests +# ============================================================================ + + +def _make_delta_event(text: str, *, index: int = 0, run_id: str = "r1") -> dict[str, Any]: + """Build a `messages` protocol event for a text content-block delta.""" + return { + "type": "event", + "method": "messages", + "params": { + "namespace": [], + "timestamp": 0, + "data": ( + { + "event": "content-block-delta", + "index": index, + "delta": {"type": "text-delta", "text": text}, + }, + {"run_id": run_id}, + ), + }, + } + + +def _make_finish_event(text: str, *, index: int = 0, run_id: str = "r1") -> dict[str, Any]: + """Build a `messages` protocol event for content-block-finish on a text block.""" + return { + "type": "event", + "method": "messages", + "params": { + "namespace": [], + "timestamp": 0, + "data": ( + { + "event": "content-block-finish", + "index": index, + "content": {"type": "text", "text": text}, + }, + {"run_id": run_id}, + ), + }, + } + + +def _emitted_text(events: list[dict[str, Any]]) -> str: + """Concatenate delta + finalized text the way a streaming consumer would.""" + parts = [] + final_by_index: dict[int, str] = {} + for event in events: + payload = event["params"]["data"][0] + kind = payload.get("event") + if kind == "content-block-delta": + delta = payload["delta"] + if delta.get("type") == "text-delta": + parts.append(delta["text"]) + elif kind == "content-block-finish": + content = payload.get("content", {}) + if content.get("type") == "text": + final_by_index[payload["index"]] = content["text"] + # Concatenated delta stream is what the consumer sees in real time; + # finalized text is the snapshot. Return both via a tuple-like dict. + return "".join(parts), final_by_index # type: ignore[return-value] + + +def _run_transformer(transformer: Any, events: list[dict[str, Any]]) -> list[dict[str, Any]]: + """Feed events through the transformer (mutates in place) and return them.""" + for event in events: + transformer.process(event) + return events + + +class TestPIIStreamTransformer: + """Tests for the in-flight stream transformer.""" + + def test_redact_value_walks_nested_strings(self) -> None: + """`_redact_value` redacts PII in string leaves of nested dict/list.""" + rule = RedactionRule(pii_type="email").resolve() + transformer = _PIIStreamTransformer(rule=rule) + + value = { + "to": "alice@example.com", + "cc": ["bob@example.com", "no-pii"], + "nested": {"hidden": "user mail: charlie@example.com"}, + "count": 3, + "flag": True, + } + redacted = transformer._redact_value(value) + + assert redacted == { + "to": "[REDACTED_EMAIL]", + "cc": ["[REDACTED_EMAIL]", "no-pii"], + "nested": {"hidden": "user mail: [REDACTED_EMAIL]"}, + "count": 3, + "flag": True, + } + + def test_redact_value_block_strategy_raises(self) -> None: + """Under `block`, `_redact_value` raises on the first matching leaf.""" + rule = RedactionRule(pii_type="email", strategy="block").resolve() + transformer = _PIIStreamTransformer(rule=rule) + + value = {"to": "alice@example.com", "subject": "clean"} + with pytest.raises(PIIDetectionError): + transformer._redact_value(value) + + def test_redact_value_passthrough_for_clean_input(self) -> None: + """No PII anywhere → returns an equal value.""" + rule = RedactionRule(pii_type="email").resolve() + transformer = _PIIStreamTransformer(rule=rule) + + value = {"a": "hello", "b": [1, 2, "world"], "c": None} + redacted = transformer._redact_value(value) + assert redacted == value + + def test_redact_value_walks_structured_message_content(self) -> None: + """`_redact_value` walks list-typed `.content` (content-blocks shape).""" + rule = RedactionRule(pii_type="email").resolve() + transformer = _PIIStreamTransformer(rule=rule) + + msg = AIMessage( + content=[ + {"type": "text", "text": "Reach me at alice@example.com"}, + {"type": "reasoning", "reasoning": "User mentioned bob@example.com"}, + ], + id="m1", + ) + redacted = transformer._redact_value(msg) + + # Original untouched. + assert msg.content[0]["text"] == "Reach me at alice@example.com" + # Redacted copy walked every block. + assert "alice@example.com" not in redacted.content[0]["text"] + assert "[REDACTED_EMAIL]" in redacted.content[0]["text"] + assert "bob@example.com" not in redacted.content[1]["reasoning"] + assert "[REDACTED_EMAIL]" in redacted.content[1]["reasoning"] + + def test_redact_value_walks_ai_message_tool_calls(self) -> None: + """`_redact_value` redacts `AIMessage.tool_calls[*].args` even with empty content. + + The legacy `(AIMessage, metadata)` payload path on the `messages` + channel mutates `tool_calls` in place before the `values` event + fires, but on the v3 streaming path the state's AIMessage is + assembled by langgraph without going through that mutation. + `_redact_value` is what scrubs the message when the `values` + snapshot walks it — it must not stop at empty content. + """ + rule = RedactionRule(pii_type="email").resolve() + transformer = _PIIStreamTransformer(rule=rule) + + msg = AIMessage( + content="", + tool_calls=[ + ToolCall(name="send_email", args={"to": "alice@example.com"}, id="c1"), + ], + id="m1", + ) + redacted = transformer._redact_value(msg) + + # Original message stays intact for state-level enforcers. + assert msg.tool_calls[0]["args"] == {"to": "alice@example.com"} + # The returned copy has scrubbed tool_calls and is a fresh object. + assert redacted is not msg + assert redacted.tool_calls[0]["args"] == {"to": "[REDACTED_EMAIL]"} + + def test_redact_value_walks_both_content_and_tool_calls(self) -> None: + """Content and tool_calls both carry PII — both get scrubbed in one pass.""" + rule = RedactionRule(pii_type="email").resolve() + transformer = _PIIStreamTransformer(rule=rule) + + msg = AIMessage( + content="Reaching out from bob@example.com", + tool_calls=[ + ToolCall(name="send_email", args={"to": "alice@example.com"}, id="c1"), + ], + id="m1", + ) + redacted = transformer._redact_value(msg) + + assert "bob@example.com" not in redacted.content + assert "[REDACTED_EMAIL]" in redacted.content + assert redacted.tool_calls[0]["args"] == {"to": "[REDACTED_EMAIL]"} + + def test_reasoning_delta_uses_lookback(self) -> None: + """`reasoning-delta` events go through the same lookback as text-delta.""" + rule = RedactionRule(pii_type="email").resolve() + transformer = _PIIStreamTransformer(rule=rule, lookback=32) + + events = [ + { + "type": "event", + "method": "messages", + "params": { + "namespace": [], + "timestamp": 0, + "data": ( + { + "event": "content-block-delta", + "index": 0, + "delta": { + "type": "reasoning-delta", + "reasoning": "User mentioned alice@example.com", + }, + }, + {"run_id": "r1"}, + ), + }, + }, + { + "type": "event", + "method": "messages", + "params": { + "namespace": [], + "timestamp": 0, + "data": ( + { + "event": "content-block-finish", + "index": 0, + "content": { + "type": "reasoning", + "reasoning": "User mentioned alice@example.com", + }, + }, + {"run_id": "r1"}, + ), + }, + }, + ] + for e in events: + transformer.process(e) + + # The finalize snapshot is redacted. The delta itself may emit + # nothing (entire string fits within lookback) — the consumer's + # ChatModelStream reconciles against the finalize content. + finalized = events[1]["params"]["data"][0]["content"]["reasoning"] + assert "alice@example.com" not in finalized + assert "[REDACTED_EMAIL]" in finalized + + def test_reasoning_delta_block_strategy_raises(self) -> None: + """`block` raises immediately when reasoning content contains PII.""" + rule = RedactionRule(pii_type="email", strategy="block").resolve() + transformer = _PIIStreamTransformer(rule=rule) + + event = { + "type": "event", + "method": "messages", + "params": { + "namespace": [], + "timestamp": 0, + "data": ( + { + "event": "content-block-delta", + "index": 0, + "delta": { + "type": "reasoning-delta", + "reasoning": "User mentioned alice@example.com", + }, + }, + {"run_id": "r1"}, + ), + }, + } + with pytest.raises(PIIDetectionError): + transformer.process(event) + + def test_tool_call_args_short_args_withheld_during_streaming(self) -> None: + """Tool-call args shorter than `stream_lookback` are withheld on each chunk. + + The redacted args dict surfaces on `content-block-finish` via + `_finalize_block` — the consumer's `tool_calls_proj` replaces + wholesale on each chunk, so this is equivalent to "no args + during streaming, redacted dict at finalize". + """ + rule = RedactionRule(pii_type="email").resolve() + transformer = _PIIStreamTransformer(rule=rule) # default lookback=128 + + events = [ + { + "type": "event", + "method": "messages", + "params": { + "namespace": [], + "timestamp": 0, + "data": ( + { + "event": "content-block-delta", + "index": 0, + "delta": { + "type": "block-delta", + "fields": { + "type": "tool_call_chunk", + "id": "call_1", + "name": "send_email", + "args": '{"to": "alice@example.com"}', + }, + }, + }, + {"run_id": "r1"}, + ), + }, + }, + ] + _run_transformer(transformer, events) + + fields = events[0]["params"]["data"][0]["delta"]["fields"] + # 27 chars < lookback (128), so emit_end = 0 — nothing reaches the wire. + assert fields["args"] == "" + + def test_tool_call_args_long_args_emit_safe_prefix(self) -> None: + """Args longer than `stream_lookback` stream the redacted safe prefix. + + Detection runs on the full cumulative args, so any complete PII + anywhere in the string is redacted before emission. The trailing + `stream_lookback` characters are withheld — they might be the + start of a partial PII match that completes in a future delta. + """ + rule = RedactionRule(pii_type="email").resolve() + transformer = _PIIStreamTransformer(rule=rule, lookback=8) + + # 50-char args with PII near the start; emit_end = 50 - 8 = 42. + args = '{"to": "alice@example.com", "subject": "hi"}' + events = [ + { + "type": "event", + "method": "messages", + "params": { + "namespace": [], + "timestamp": 0, + "data": ( + { + "event": "content-block-delta", + "index": 0, + "delta": { + "type": "block-delta", + "fields": { + "type": "tool_call_chunk", + "id": "call_1", + "name": "send_email", + "args": args, + }, + }, + }, + {"run_id": "r1"}, + ), + }, + }, + ] + _run_transformer(transformer, events) + + emitted = events[0]["params"]["data"][0]["delta"]["fields"]["args"] + # The full cumulative args was detected and redacted before + # truncation, so the prefix that lands on the wire has no PII. + assert "alice@example.com" not in emitted + assert "[REDACTED_EMAIL]" in emitted + + def test_tool_call_args_partial_pii_across_chunks_withheld(self) -> None: + """Cumulative chunks that grow into PII don't leak intermediate states. + + Regression for Corridor's partial-exposure window: a model + streaming `{"to": "alice@` then `{"to": "alice@example.com"}` + would expose the partial first chunk under the old in-place + redaction. With lookback withholding, the partial chunk is + below the lookback threshold and never reaches the wire. + """ + rule = RedactionRule(pii_type="email").resolve() + transformer = _PIIStreamTransformer(rule=rule) # default lookback=128 + + chunks = [ + '{"to": "alice@', + '{"to": "alice@example', + '{"to": "alice@example.com"}', + ] + events = [ + { + "type": "event", + "method": "messages", + "params": { + "namespace": [], + "timestamp": 0, + "data": ( + { + "event": "content-block-delta", + "index": 0, + "delta": { + "type": "block-delta", + "fields": { + "type": "tool_call_chunk", + "id": "c1", + "name": "send_email", + "args": c, + }, + }, + }, + {"run_id": "r1"}, + ), + }, + } + for c in chunks + ] + _run_transformer(transformer, events) + + # None of the intermediate accumulation states reach the wire — + # all chunks are below the 128-char lookback, so emit_end = 0. + for e in events: + args_on_wire = e["params"]["data"][0]["delta"]["fields"]["args"] + assert "alice@" not in args_on_wire + assert "alice" not in args_on_wire + assert args_on_wire == "" + + def test_finalize_block_redacts_tool_call_args(self) -> None: + """`content-block-finish` with type=tool_call walks args dict.""" + rule = RedactionRule(pii_type="email").resolve() + transformer = _PIIStreamTransformer(rule=rule) + + events = [ + { + "type": "event", + "method": "messages", + "params": { + "namespace": [], + "timestamp": 0, + "data": ( + { + "event": "content-block-finish", + "index": 0, + "content": { + "type": "tool_call", + "id": "call_1", + "name": "send_email", + "args": {"to": "alice@example.com", "subject": "clean"}, + }, + }, + {"run_id": "r1"}, + ), + }, + }, + ] + _run_transformer(transformer, events) + args = events[0]["params"]["data"][0]["content"]["args"] + assert args == {"to": "[REDACTED_EMAIL]", "subject": "clean"} + + def test_finalize_block_raises_on_tool_call_args_under_block(self) -> None: + """`block` raises when the finalized tool-call args contain PII.""" + rule = RedactionRule(pii_type="email", strategy="block").resolve() + transformer = _PIIStreamTransformer(rule=rule) + + event = { + "type": "event", + "method": "messages", + "params": { + "namespace": [], + "timestamp": 0, + "data": ( + { + "event": "content-block-finish", + "index": 0, + "content": { + "type": "tool_call", + "id": "call_1", + "name": "send_email", + "args": {"to": "alice@example.com"}, + }, + }, + {"run_id": "r1"}, + ), + }, + } + with pytest.raises(PIIDetectionError): + transformer.process(event) + + def test_legacy_payload_redacts_tool_call_args(self) -> None: + """`(AIMessage, metadata)` shape redacts tool_calls[].args.""" + rule = RedactionRule(pii_type="email").resolve() + transformer = _PIIStreamTransformer(rule=rule) + + msg = AIMessage( + content="", + tool_calls=[ + ToolCall(name="send_email", args={"to": "alice@example.com"}, id="c1"), + ], + id="m1", + ) + event: dict[str, Any] = { + "type": "event", + "method": "messages", + "params": { + "namespace": [], + "timestamp": 0, + "data": (msg, {"run_id": "r1"}), + }, + } + transformer.process(event) + out_msg = event["params"]["data"][0] + assert out_msg.tool_calls[0]["args"] == {"to": "[REDACTED_EMAIL]"} + + def test_legacy_payload_block_raises_when_tool_call_has_pii(self) -> None: + """`block` + legacy AIMessage with PII in tool_calls → raises immediately.""" + rule = RedactionRule(pii_type="email", strategy="block").resolve() + transformer = _PIIStreamTransformer(rule=rule) + + msg = AIMessage( + content="", + tool_calls=[ + ToolCall(name="send_email", args={"to": "alice@example.com"}, id="c1"), + ], + id="m1", + ) + event: dict[str, Any] = { + "type": "event", + "method": "messages", + "params": { + "namespace": [], + "timestamp": 0, + "data": (msg, {"run_id": "r1"}), + }, + } + with pytest.raises(PIIDetectionError): + transformer.process(event) + + def test_legacy_payload_redacts_tool_message_content(self) -> None: + """`(ToolMessage, metadata)` payload redacts `.content`.""" + rule = RedactionRule(pii_type="email").resolve() + transformer = _PIIStreamTransformer(rule=rule) + + msg = ToolMessage(content="Result: alice@example.com", tool_call_id="c1", id="m1") + event: dict[str, Any] = { + "type": "event", + "method": "messages", + "params": { + "namespace": [], + "timestamp": 0, + "data": (msg, {"run_id": "r1"}), + }, + } + transformer.process(event) + out_msg = event["params"]["data"][0] + assert "alice@example.com" not in out_msg.content + assert "[REDACTED_EMAIL]" in out_msg.content + + def test_tools_in_required_stream_modes(self) -> None: + """The transformer subscribes to both `messages` and `tools`.""" + assert "tools" in _PIIStreamTransformer.required_stream_modes + assert "messages" in _PIIStreamTransformer.required_stream_modes + + def test_process_tools_event_passes_through(self) -> None: + """Tools events route to the new handler without error.""" + rule = RedactionRule(pii_type="email").resolve() + transformer = _PIIStreamTransformer(rule=rule) + event = { + "type": "event", + "method": "tools", + "params": { + "namespace": [], + "timestamp": 0, + "data": { + "event": "tool-started", + "tool_call_id": "c1", + "tool_name": "echo", + "input": {}, + }, + }, + } + assert transformer.process(event) is True + + def test_tool_started_string_input_redacted(self) -> None: + """Single-argument tools pass `input` as a raw string.""" + rule = RedactionRule(pii_type="email").resolve() + transformer = _PIIStreamTransformer(rule=rule) + + event = { + "type": "event", + "method": "tools", + "params": { + "namespace": [], + "timestamp": 0, + "data": { + "event": "tool-started", + "tool_call_id": "c1", + "tool_name": "echo", + "input": "user mail is alice@example.com", + }, + }, + } + transformer.process(event) + out = event["params"]["data"]["input"] + assert "alice@example.com" not in out + assert "[REDACTED_EMAIL]" in out + + def test_tool_started_list_input_redacted(self) -> None: + """Array-input tools pass `input` as a list.""" + rule = RedactionRule(pii_type="email").resolve() + transformer = _PIIStreamTransformer(rule=rule) + + event = { + "type": "event", + "method": "tools", + "params": { + "namespace": [], + "timestamp": 0, + "data": { + "event": "tool-started", + "tool_call_id": "c1", + "tool_name": "fanout", + "input": ["bob@example.com", "clean"], + }, + }, + } + transformer.process(event) + assert event["params"]["data"]["input"] == ["[REDACTED_EMAIL]", "clean"] + + def test_drop_run_does_not_sweep_tool_buffers(self) -> None: + """`_drop_run` on the messages channel must not wipe tool buffers.""" + rule = RedactionRule(pii_type="email").resolve() + transformer = _PIIStreamTransformer(rule=rule, lookback=64) + + # Put a tool-output buffer entry in place. + transformer.process( + { + "type": "event", + "method": "tools", + "params": { + "namespace": [], + "timestamp": 0, + "data": { + "event": "tool-output-delta", + "tool_call_id": "c1", + "delta": "partial", + }, + }, + } + ) + assert "c1" in transformer._tool_buffers + + # An errant message-finish with no run_id used to sweep all + # `("", *)` buffer keys when tool buffers shared the dict. Now + # `_drop_run` only touches `_buffers`, so tool entries are + # structurally isolated. + transformer.process( + { + "type": "event", + "method": "messages", + "params": { + "namespace": [], + "timestamp": 0, + "data": ({"event": "message-finish"}, {}), + }, + } + ) + assert "c1" in transformer._tool_buffers + + def test_finalize_invalid_tool_call_redacts_string_args(self) -> None: + """`invalid_tool_call.args` is a raw JSON string, not a dict.""" + rule = RedactionRule(pii_type="email").resolve() + transformer = _PIIStreamTransformer(rule=rule) + + events = [ + { + "type": "event", + "method": "messages", + "params": { + "namespace": [], + "timestamp": 0, + "data": ( + { + "event": "content-block-finish", + "index": 0, + "content": { + "type": "invalid_tool_call", + "id": "c1", + "name": "send_email", + "args": '{"to": "alice@example.com", "bad json', + "error": "Unterminated string", + }, + }, + {"run_id": "r1"}, + ), + }, + }, + ] + _run_transformer(transformer, events) + out = events[0]["params"]["data"][0]["content"]["args"] + assert "alice@example.com" not in out + assert "[REDACTED_EMAIL]" in out + + def test_redact_value_walks_invalid_tool_calls(self) -> None: + """`AIMessage.invalid_tool_calls` go through the same recursion.""" + rule = RedactionRule(pii_type="email").resolve() + transformer = _PIIStreamTransformer(rule=rule) + + msg = AIMessage( + content="", + invalid_tool_calls=[ + InvalidToolCall( + name="send_email", + args='{"to": "alice@example.com"} BROKEN', + id="c1", + error="parse failed", + ), + ], + id="m1", + ) + redacted = transformer._redact_value(msg) + assert "alice@example.com" not in redacted.invalid_tool_calls[0]["args"] + assert "[REDACTED_EMAIL]" in redacted.invalid_tool_calls[0]["args"] + assert "alice@example.com" in msg.invalid_tool_calls[0]["args"] + + def test_tool_started_input_is_redacted(self) -> None: + rule = RedactionRule(pii_type="email").resolve() + transformer = _PIIStreamTransformer(rule=rule) + + event = { + "type": "event", + "method": "tools", + "params": { + "namespace": [], + "timestamp": 0, + "data": { + "event": "tool-started", + "tool_call_id": "c1", + "tool_name": "send_email", + "input": {"to": "alice@example.com", "subject": "clean"}, + }, + }, + } + transformer.process(event) + assert event["params"]["data"]["input"] == { + "to": "[REDACTED_EMAIL]", + "subject": "clean", + } + + def test_tool_output_delta_string_uses_lookback(self) -> None: + """String tool-output deltas get the lookback redaction same as text.""" + rule = RedactionRule(pii_type="email").resolve() + transformer = _PIIStreamTransformer(rule=rule, lookback=64) + + events = [ + { + "type": "event", + "method": "tools", + "params": { + "namespace": [], + "timestamp": 0, + "data": { + "event": "tool-output-delta", + "tool_call_id": "c1", + "delta": "Result: alice", + }, + }, + }, + { + "type": "event", + "method": "tools", + "params": { + "namespace": [], + "timestamp": 0, + "data": { + "event": "tool-output-delta", + "tool_call_id": "c1", + "delta": "@example.com end", + }, + }, + }, + ] + for e in events: + transformer.process(e) + streamed = events[0]["params"]["data"]["delta"] + events[1]["params"]["data"]["delta"] + assert "alice@example.com" not in streamed + + def test_tool_output_delta_dict_walks_strings(self) -> None: + """Structured tool-output deltas redact each string leaf in place.""" + rule = RedactionRule(pii_type="email").resolve() + transformer = _PIIStreamTransformer(rule=rule) + + event = { + "type": "event", + "method": "tools", + "params": { + "namespace": [], + "timestamp": 0, + "data": { + "event": "tool-output-delta", + "tool_call_id": "c1", + "delta": {"row": {"email": "alice@example.com"}, "ok": True}, + }, + }, + } + transformer.process(event) + assert event["params"]["data"]["delta"] == { + "row": {"email": "[REDACTED_EMAIL]"}, + "ok": True, + } + + def test_tool_finished_output_redacted(self) -> None: + rule = RedactionRule(pii_type="email").resolve() + transformer = _PIIStreamTransformer(rule=rule) + + event = { + "type": "event", + "method": "tools", + "params": { + "namespace": [], + "timestamp": 0, + "data": { + "event": "tool-finished", + "tool_call_id": "c1", + "output": {"email": "alice@example.com", "clean": "yes"}, + }, + }, + } + transformer.process(event) + assert event["params"]["data"]["output"] == { + "email": "[REDACTED_EMAIL]", + "clean": "yes", + } + + def test_tool_error_message_redacted(self) -> None: + rule = RedactionRule(pii_type="email").resolve() + transformer = _PIIStreamTransformer(rule=rule) + + event = { + "type": "event", + "method": "tools", + "params": { + "namespace": [], + "timestamp": 0, + "data": { + "event": "tool-error", + "tool_call_id": "c1", + "message": "failed for alice@example.com", + }, + }, + } + transformer.process(event) + assert "alice@example.com" not in event["params"]["data"]["message"] + assert "[REDACTED_EMAIL]" in event["params"]["data"]["message"] + + def test_tool_finished_clears_per_tool_buffer(self) -> None: + """tool-finished drops the lookback buffer for that tool_call_id.""" + rule = RedactionRule(pii_type="email").resolve() + transformer = _PIIStreamTransformer(rule=rule, lookback=64) + + delta_event = { + "type": "event", + "method": "tools", + "params": { + "namespace": [], + "timestamp": 0, + "data": { + "event": "tool-output-delta", + "tool_call_id": "c1", + "delta": "partial text", + }, + }, + } + transformer.process(delta_event) + assert "c1" in transformer._tool_buffers + + finish_event = { + "type": "event", + "method": "tools", + "params": { + "namespace": [], + "timestamp": 0, + "data": { + "event": "tool-finished", + "tool_call_id": "c1", + "output": "done", + }, + }, + } + transformer.process(finish_event) + assert "c1" not in transformer._tool_buffers + + def test_tool_output_delta_block_strategy_raises(self) -> None: + """`block` raises immediately on PII in a `tool-output-delta`.""" + rule = RedactionRule(pii_type="email", strategy="block").resolve() + transformer = _PIIStreamTransformer(rule=rule) + + event = { + "type": "event", + "method": "tools", + "params": { + "namespace": [], + "timestamp": 0, + "data": { + "event": "tool-output-delta", + "tool_call_id": "c1", + "delta": "Result: alice@example.com", + }, + }, + } + with pytest.raises(PIIDetectionError): + transformer.process(event) + + def test_tool_call_args_block_strategy_raises(self) -> None: + """`block` raises immediately when streamed tool-call args contain PII.""" + rule = RedactionRule(pii_type="email", strategy="block").resolve() + transformer = _PIIStreamTransformer(rule=rule) + + event = { + "type": "event", + "method": "messages", + "params": { + "namespace": [], + "timestamp": 0, + "data": ( + { + "event": "content-block-delta", + "index": 0, + "delta": { + "type": "block-delta", + "fields": { + "type": "tool_call_chunk", + "id": "call_1", + "name": "send_email", + "args": '{"to": "alice@example.com"}', + }, + }, + }, + {"run_id": "r1"}, + ), + }, + } + with pytest.raises(PIIDetectionError): + transformer.process(event) + + def test_pii_fully_inside_one_delta_is_redacted_on_finalize(self) -> None: + """A delta shorter than `lookback` is held until finalize redacts the snapshot.""" + rule = RedactionRule(pii_type="email").resolve() + transformer = _PIIStreamTransformer(rule=rule) + + events = [ + _make_delta_event("Reach me at alice@example.com tomorrow."), + _make_finish_event("Reach me at alice@example.com tomorrow."), + ] + _run_transformer(transformer, events) + streamed, finals = _emitted_text(events) # type: ignore[misc] + + # The raw email never reaches the wire — the delta is held in the + # lookback buffer and the finalize snapshot is the redacted text. + assert "alice@example.com" not in streamed + assert "alice@example.com" not in finals[0] + assert "[REDACTED_EMAIL]" in finals[0] + + def test_pii_split_across_deltas_is_caught(self) -> None: + """Email split mid-string across deltas should still be redacted in stream.""" + rule = RedactionRule(pii_type="email").resolve() + transformer = _PIIStreamTransformer(rule=rule, lookback=64) + + events = [ + _make_delta_event("Hi, contact alice"), + _make_delta_event("@example.com when ready"), + _make_finish_event("Hi, contact alice@example.com when ready"), + ] + _run_transformer(transformer, events) + streamed, finals = _emitted_text(events) # type: ignore[misc] + + # The held-buffer should have prevented the raw email from being + # released until detection ran over the concatenation. + assert "alice@example.com" not in streamed + assert "alice@example.com" not in finals[0] + + def test_pii_straddling_lookback_boundary_is_caught(self) -> None: + r"""PII whose start falls in the safe prefix and end in the held tail. + + When `len(combined) > lookback`, the safe/tail split lands inside + the buffer. Detecting only on the about-to-emit prefix misses + PII that straddles the boundary — the regex's `\b...\b` anchors + require a complete match, so a truncated prefix produces no + detection and the partial PII would leak raw. The transformer + must run detection on the full accumulated buffer first. + """ + rule = RedactionRule(pii_type="email").resolve() + transformer = _PIIStreamTransformer(rule=rule, lookback=32) + + # Single 50-char delta with a 30-char email at position 0. + # `safe_end = 50 - 32 = 18` falls inside the email, so the old + # logic would emit `"alice@longerdomain"` raw and hold the rest. + email = "alice@longerdomain.example.com" + text = email + "x" * 20 + events = [ + _make_delta_event(text), + _make_finish_event(text), + ] + _run_transformer(transformer, events) + streamed, finals = _emitted_text(events) # type: ignore[misc] + + # No prefix of the email reaches the wire. + assert email not in streamed + assert "alice@longerdomain" not in streamed + assert "alice@" not in streamed + # And the finalized snapshot is fully redacted. + assert email not in finals[0] + assert "[REDACTED_EMAIL]" in finals[0] + + def test_credit_card_split_across_whitespace_is_caught(self) -> None: + """Card with whitespace separators must not leak across deltas.""" + rule = RedactionRule(pii_type="credit_card").resolve() + transformer = _PIIStreamTransformer(rule=rule, lookback=64) + + events = [ + _make_delta_event("Card: 5425 "), + _make_delta_event("2334 3010 9903 next"), + _make_finish_event("Card: 5425 2334 3010 9903 next"), + ] + _run_transformer(transformer, events) + streamed, finals = _emitted_text(events) # type: ignore[misc] + + # No prefix of the card may reach the wire — the lookback buffer + # holds whitespace-separated groups until detection runs over the + # full concatenation. + assert "5425 2334 3010 9903" not in streamed + assert "5425" not in streamed + assert "5425 2334 3010 9903" not in finals[0] + assert "[REDACTED_CREDIT_CARD]" in finals[0] + + def test_no_transformer_when_neither_output_nor_tool_results_apply(self) -> None: + """The transformer is gated on either output- or tool-result scrubbing.""" + middleware = PIIMiddleware("email", apply_to_output=False, apply_to_tool_results=False) + assert middleware.transformers == () + + def test_transformer_installed_for_tool_results_only(self) -> None: + """`apply_to_tool_results=True` alone installs the stream transformer. + + Stream consumers see the `tools` channel and `ToolMessage` + payloads on the `messages` / `values` channels before the + state-level `before_model` enforcer runs. Without the + transformer those surfaces would leak raw tool-result PII. + """ + middleware = PIIMiddleware("email", apply_to_output=False, apply_to_tool_results=True) + assert len(middleware.transformers) == 1 + + def test_block_strategy_installs_buffering_stream_transformer(self) -> None: + """`block` + output streaming installs a buffering transformer. + + `after_model` is still the canonical blocker. The transformer's + job is to make sure no PII reaches the streamed surface before + that hook can raise. + """ + middleware = PIIMiddleware("email", strategy="block", apply_to_output=True) + assert len(middleware.transformers) == 1 + + def test_block_strategy_raises_on_first_pii_detection(self) -> None: + """Stream PII under `block`: raises immediately when the pattern completes.""" + rule = RedactionRule(pii_type="email", strategy="block").resolve() + transformer = _PIIStreamTransformer(rule=rule) + + # First delta carries an incomplete email — detection won't match. + clean_event = _make_delta_event("Reach me at alice") + transformer.process(clean_event) # no raise yet + + # Second delta completes the email — detection fires, raises. + completing_event = _make_delta_event("@example.com soon") + with pytest.raises(PIIDetectionError): + transformer.process(completing_event) + + def test_block_strategy_releases_full_text_at_finalize_when_clean(self) -> None: + """Stream clean text under `block`: deltas empty, finalize is the full text.""" + rule = RedactionRule(pii_type="email", strategy="block").resolve() + transformer = _PIIStreamTransformer(rule=rule) + + events = [ + _make_delta_event("Hello there, "), + _make_delta_event("how are you?"), + _make_finish_event("Hello there, how are you?"), + ] + _run_transformer(transformer, events) + streamed, finals = _emitted_text(events) # type: ignore[misc] + + # Deltas hold everything back; the finalize event carries the + # whole block at once. `ChatModelStream._resolve_block_text` + # turns this into a single trailing delta for the consumer's + # `msg.text` projection. + assert streamed == "" + assert finals[0] == "Hello there, how are you?" + + def test_finalize_block_redacts_full_text_even_if_stream_redaction_partial( + self, + ) -> None: + """content-block-finish always re-redacts the finalized text snapshot.""" + rule = RedactionRule(pii_type="email").resolve() + transformer = _PIIStreamTransformer(rule=rule) + + # Stream the email in a single delta WITHOUT trailing whitespace, + # so the in-stream lookback might not redact it yet — finalize must + # still produce a redacted snapshot. + events = [ + _make_delta_event("alice@example.com"), + _make_finish_event("alice@example.com"), + ] + _run_transformer(transformer, events) + _, finals = _emitted_text(events) # type: ignore[misc] + + assert "alice@example.com" not in finals[0] + assert "[REDACTED_EMAIL]" in finals[0] + + def test_buffers_isolated_by_run_id(self) -> None: + """Two concurrent runs share the transformer instance but not buffer state.""" + rule = RedactionRule(pii_type="email").resolve() + transformer = _PIIStreamTransformer(rule=rule, lookback=64) + + events = [ + _make_delta_event("Hi alice", run_id="run-A"), + _make_delta_event("Bob's addr is bob", run_id="run-B"), + _make_delta_event("@example.com soon", run_id="run-A"), + _make_delta_event("@other.com soon", run_id="run-B"), + _make_finish_event("Hi alice@example.com soon", run_id="run-A"), + _make_finish_event("Bob's addr is bob@other.com soon", run_id="run-B"), + ] + _run_transformer(transformer, events) + + run_a = "".join( + e["params"]["data"][0]["delta"]["text"] + for e in events + if e["params"]["data"][0].get("event") == "content-block-delta" + and e["params"]["data"][1].get("run_id") == "run-A" + ) + run_b = "".join( + e["params"]["data"][0]["delta"]["text"] + for e in events + if e["params"]["data"][0].get("event") == "content-block-delta" + and e["params"]["data"][1].get("run_id") == "run-B" + ) + # Splits would have leaked if buffers crossed run_ids. + assert "alice@example.com" not in run_a + assert "alice@example.com" not in run_b + assert "bob@other.com" not in run_a + assert "bob@other.com" not in run_b + + def test_message_finish_drops_buffers(self) -> None: + """Abandoned blocks (no content-block-finish) should still release memory.""" + rule = RedactionRule(pii_type="email").resolve() + transformer = _PIIStreamTransformer(rule=rule, lookback=64) + + _run_transformer( + transformer, + [_make_delta_event("partial text without finish")], + ) + assert ("r1", 0) in transformer._buffers + + # message-finish for the run wipes any (run-id, *) entries. + message_finish_event = { + "type": "event", + "method": "messages", + "params": { + "namespace": [], + "timestamp": 0, + "data": ({"event": "message-finish"}, {"run_id": "r1"}), + }, + } + transformer.process(message_finish_event) + assert ("r1", 0) not in transformer._buffers + + def test_finalize_clears_all_state(self) -> None: + """Mux close should be safe — finalize wipes any held buffers.""" + rule = RedactionRule(pii_type="email").resolve() + transformer = _PIIStreamTransformer(rule=rule, lookback=64) + _run_transformer(transformer, [_make_delta_event("hanging text")]) + assert transformer._buffers + transformer.finalize() + assert transformer._buffers == {} + + def test_long_pii_exceeding_lookback_still_caught_on_finalize(self) -> None: + """Patterns longer than `lookback` may slip past the in-stream cap. + + The finalize snapshot is always redacted in full regardless. + """ + # Choose an absurdly small lookback so a normal email exceeds it. + rule = RedactionRule(pii_type="email").resolve() + transformer = _PIIStreamTransformer(rule=rule, lookback=4) + + events = [ + _make_delta_event("hello alice@example.com goodbye"), + _make_finish_event("hello alice@example.com goodbye"), + ] + _run_transformer(transformer, events) + _, finals = _emitted_text(events) # type: ignore[misc] + # The finalized snapshot always re-runs detection over the full text. + assert "alice@example.com" not in finals[0] + + def test_data_delta_passes_through_untouched(self) -> None: + """`data-delta` (binary/base64 payloads) is not a typical PII surface. + + Regex detection on base64 strings would produce false positives + and rarely catches real PII. Users with structured-data PII + concerns should attach a custom detector that understands their + payload format. + """ + rule = RedactionRule(pii_type="email").resolve() + transformer = _PIIStreamTransformer(rule=rule) + + data_event = { + "type": "event", + "method": "messages", + "params": { + "namespace": [], + "timestamp": 0, + "data": ( + { + "event": "content-block-delta", + "index": 0, + "delta": { + "type": "data-delta", + "data": "YWxpY2VAZXhhbXBsZS5jb20=", # base64 of email + }, + }, + {"run_id": "r1"}, + ), + }, + } + transformer.process(data_event) + assert data_event["params"]["data"][0]["delta"]["data"] == "YWxpY2VAZXhhbXBsZS5jb20=" + + def test_transformer_registered_before_messages_transformer_on_agent(self) -> None: + """The PIIMiddleware transformer must run before MessagesTransformer. + + Otherwise the built-in `messages` projection snapshots the original + text before redaction, defeating the whole point of the in-flight + path. + """ + model = FakeToolCallingModel() + agent = create_agent(model, [], middleware=[PIIMiddleware("email", apply_to_output=True)]) + + run = agent.stream_events({"messages": [HumanMessage("hi")]}, version="v3") + transformers = run._mux._transformers # type: ignore[attr-defined] + + pii_idx = next( + i for i, t in enumerate(transformers) if isinstance(t, _PIIStreamTransformer) + ) + messages_idx = next( + i for i, t in enumerate(transformers) if isinstance(t, MessagesTransformer) + ) + assert pii_idx < messages_idx, ( + "PIIStreamTransformer must be registered before MessagesTransformer " + "so it can mutate delta.text before the messages projection snapshots it" + ) + + # Drain to close cleanly. + list(run.tool_calls) + + +class TestPIIStreamingEndToEnd: + """End-to-end tests with a real streaming chat model wired into create_agent.""" + + @pytest.mark.anyio + async def test_streamed_messages_projection_is_redacted(self) -> None: + """Iterating `run.messages` should yield text with PII already redacted. + + Drives a `GenericFakeChatModel` (which actually streams content via + `_stream` / `_astream` and produces `content-block-delta` protocol + events) through `create_agent` and asserts the `.text` projection + of each `ChatModelStream` does not contain the original PII. + """ + model = GenericFakeChatModel( + messages=iter([AIMessage(content="Reach me at alice@example.com today.")]) + ) + agent = create_agent(model, [], middleware=[PIIMiddleware("email", apply_to_output=True)]) + + run = await agent.astream_events({"messages": [HumanMessage("hi")]}, version="v3") + + collected_text = "" + async for msg in run.messages: + async for chunk in msg.text: + collected_text += chunk + + assert "alice@example.com" not in collected_text + assert "[REDACTED_EMAIL]" in collected_text + + @pytest.mark.anyio + async def test_main_event_log_carries_redacted_deltas(self) -> None: + """Raw protocol events on the main log must not leak the original PII. + + Iterates the run as a raw protocol event stream (the same surface + external consumers see via `stream_events(version="v3")`) and + asserts every `content-block-delta` event's `delta.text` is + already redacted. + """ + model = GenericFakeChatModel( + messages=iter([AIMessage(content="Reach me at alice@example.com today.")]) + ) + agent = create_agent(model, [], middleware=[PIIMiddleware("email", apply_to_output=True)]) + + run = await agent.astream_events({"messages": [HumanMessage("hi")]}, version="v3") + + surfaces: list[str] = [] + async for event in run: + if event.get("method") != "messages": + continue + data = event["params"].get("data") + if not isinstance(data, tuple) or len(data) != 2: + continue + payload = data[0] + # v3 protocol-event shape: dict with `event` discriminator. + if isinstance(payload, dict): + kind = payload.get("event") + if kind == "content-block-delta": + delta = payload.get("delta") or {} + if delta.get("type") == "text-delta": + surfaces.append(delta.get("text", "")) + elif kind == "content-block-finish": + content = payload.get("content") or {} + if content.get("type") == "text": + surfaces.append(content.get("text", "")) + # Legacy `(BaseMessage, metadata)` shape: message carries text + # directly on `.content`. The langgraph→langchain adapter falls + # back to this when `_astream` isn't streaming chunks. + elif isinstance(payload, BaseMessage): + content = payload.content + if isinstance(content, str): + surfaces.append(content) + + # Every observed text surface — deltas, finalized snapshots, or + # legacy whole-message payloads — must already be redacted. + for text in surfaces: + assert "alice@example.com" not in text + # And the redaction marker actually shows up somewhere. + assert any("[REDACTED_EMAIL]" in text for text in surfaces) + + @pytest.mark.anyio + async def test_block_strategy_emits_no_pii_and_run_raises(self) -> None: + """`block` + `apply_to_output=True` + streaming. + + Closes the bypass where the transformer was skipped for `block`, + leaving plaintext deltas to reach the consumer before + `after_model` raised. The buffering transformer now keeps every + wire surface empty and `after_model` raises on the original + message in state. + """ + model = GenericFakeChatModel( + messages=iter([AIMessage(content="Email me at alice@example.com.")]) + ) + agent = create_agent( + model, + [], + middleware=[PIIMiddleware("email", strategy="block", apply_to_output=True)], + ) + + collected = "" + + async def drain() -> None: + nonlocal collected + run = await agent.astream_events({"messages": [HumanMessage("hi")]}, version="v3") + async for msg in run.messages: + async for chunk in msg.text: + collected += chunk + + with pytest.raises(PIIDetectionError): + await drain() + + # No characters of the PII surface ever reach the consumer — the + # raised error is the only signal that something was blocked. + assert "alice@example.com" not in collected + assert "alice" not in collected + + @pytest.mark.anyio + async def test_tool_call_args_redacted_end_to_end(self) -> None: + """Tool-call args containing PII don't reach the consumer.""" + + @tool + def echo(text: str) -> str: + """Echo.""" + return f"echo: {text}" + + model = FakeToolCallingModel( + tool_calls=[ + [ToolCall(name="echo", args={"text": "ping alice@example.com"}, id="c1")], + [], + ] + ) + agent = create_agent( + model, + [echo], + middleware=[PIIMiddleware("email", apply_to_output=True)], + ) + + surfaces: list[str] = [] + run = await agent.astream_events({"messages": [HumanMessage("hi")]}, version="v3") + async for event in run: + if not isinstance(event, dict): + continue + data = event.get("params", {}).get("data") + if isinstance(data, tuple) and len(data) == 2: + p = data[0] + if isinstance(p, BaseMessage): + surfaces.append(str(p.content)) + surfaces.extend( + str(tc.get("args")) for tc in getattr(p, "tool_calls", None) or [] + ) + elif isinstance(p, dict): + if p.get("event") == "content-block-finish": + c = p.get("content") or {} + if c.get("type") == "text": + surfaces.append(str(c.get("text", ""))) + elif c.get("type") in {"tool_call", "server_tool_call"}: + surfaces.append(str(c.get("args"))) + elif p.get("event") == "content-block-delta": + d = p.get("delta") or {} + if d.get("type") == "block-delta": + f = d.get("fields") or {} + if f.get("type") in { + "tool_call_chunk", + "server_tool_call_chunk", + }: + surfaces.append(str(f.get("args", ""))) + elif isinstance(data, dict): + e = data.get("event") + if e == "tool-started": + surfaces.append(str(data.get("input"))) + elif e == "tool-output-delta": + surfaces.append(str(data.get("delta"))) + elif e == "tool-finished": + surfaces.append(str(data.get("output"))) + elif e == "tool-error": + surfaces.append(str(data.get("message", ""))) + + for s in surfaces: + assert "alice@example.com" not in s, f"PII leaked on surface: {s!r}" + assert any("[REDACTED_EMAIL]" in s for s in surfaces), ( + f"redaction marker not observed; surfaces={surfaces}" + ) + + @pytest.mark.anyio + async def test_tool_output_redacted_end_to_end(self) -> None: + """Tool output containing PII is redacted on every stream surface.""" + + @tool + def lookup_user(user_id: str) -> str: + """Look up a user — returns PII.""" + return f"User {user_id}: alice@example.com" + + model = FakeToolCallingModel( + tool_calls=[ + [ToolCall(name="lookup_user", args={"user_id": "u1"}, id="c1")], + [], + ] + ) + agent = create_agent( + model, + [lookup_user], + middleware=[ + PIIMiddleware( + "email", + apply_to_input=True, + apply_to_output=True, + apply_to_tool_results=True, + ) + ], + ) + + surfaces: list[str] = [] + run = await agent.astream_events({"messages": [HumanMessage("hi")]}, version="v3") + async for event in run: + if not isinstance(event, dict): + continue + data = event.get("params", {}).get("data") + if isinstance(data, tuple) and len(data) == 2: + p = data[0] + if isinstance(p, BaseMessage): + surfaces.append(str(p.content)) + elif isinstance(p, dict): + surfaces.append(repr(p)) + elif isinstance(data, dict): + surfaces.append(repr(data)) + + for s in surfaces: + assert "alice@example.com" not in s, f"tool output leaked on surface: {s!r}" + + @pytest.mark.anyio + async def test_block_raises_on_tool_output_pii(self) -> None: + """`block` + tool output with PII → run raises, no leak on the wire.""" + + @tool + def lookup_user(user_id: str) -> str: + """Look up a user — returns PII.""" + return f"User {user_id}: alice@example.com" + + model = FakeToolCallingModel( + tool_calls=[ + [ToolCall(name="lookup_user", args={"user_id": "u1"}, id="c1")], + [], + ] + ) + agent = create_agent( + model, + [lookup_user], + middleware=[ + PIIMiddleware( + "email", + strategy="block", + apply_to_input=True, + apply_to_output=True, + apply_to_tool_results=True, + ) + ], + ) + + collected: list[str] = [] + + async def drain() -> None: + run = await agent.astream_events({"messages": [HumanMessage("hi")]}, version="v3") + async for event in run: + if isinstance(event, dict): + data = event.get("params", {}).get("data") + if isinstance(data, tuple) and len(data) == 2: + p = data[0] + if isinstance(p, BaseMessage): + collected.append(str(p.content)) + elif isinstance(p, dict): + collected.append(repr(p)) + elif isinstance(data, dict): + collected.append(repr(data)) + + with pytest.raises(PIIDetectionError): + await drain() + + for s in collected: + assert "alice@example.com" not in s, f"PII leaked under block: {s!r}" + + @pytest.mark.anyio + async def test_block_fails_all_projections_cleanly(self) -> None: + """Block raise propagates to every projection cursor via `afail`. + + Verifies the langgraph cleanup chain: `process()` raises → + `_apump_next` catches and calls `_mux.afail(err)` → every + registered channel (`messages` log, `tool_calls` log, `values` + log, raw event log) is failed with the error → every inner + ChatModelStream / ToolCallStream the transformers were tracking + gets `fail()` called. + """ + + @tool + def lookup(user_id: str) -> str: + """Look up.""" + return f"User {user_id}: alice@example.com" + + def _agent() -> Any: + model = FakeToolCallingModel( + tool_calls=[ + [ToolCall(name="lookup", args={"user_id": "u1"}, id="c1")], + [], + ] + ) + return create_agent( + model, + [lookup], + middleware=[ + PIIMiddleware( + "email", + strategy="block", + apply_to_output=True, + apply_to_tool_results=True, + ) + ], + ) + + # Raw events log + run = await _agent().astream_events({"messages": [HumanMessage("hi")]}, version="v3") + with pytest.raises(PIIDetectionError): + async for _event in run: + pass + + # `run.messages` projection — inner ChatModelStreams also fail. + run = await _agent().astream_events({"messages": [HumanMessage("hi")]}, version="v3") + + async def drain_messages() -> None: + async for msg in run.messages: + async for _chunk in msg.text: + pass + + with pytest.raises(PIIDetectionError): + await drain_messages() + + # `run.values` projection. + run = await _agent().astream_events({"messages": [HumanMessage("hi")]}, version="v3") + with pytest.raises(PIIDetectionError): + async for _snapshot in run.values: + pass + + @pytest.mark.anyio + async def test_subgraph_redaction_via_create_agent_in_tool(self) -> None: + """A sub-agent invoked inside a tool inherits the parent's transformer. + + `StreamMux._make_child` clones the factory list down to every + subgraph scope, so a fresh `_PIIStreamTransformer` runs at the + sub-agent's mini-mux too. This is the supported pattern: attach + `PIIMiddleware` to the outer agent and every nested model call + — including those run by sub-agents inside tools — gets redacted + in flight by its own scoped instance of the transformer. + """ + inner_model = GenericFakeChatModel( + messages=iter([AIMessage(content="Hi bob@example.com, here is data.")]) + ) + # No PII middleware on the inner agent — the outer's transformer + # factory propagates down to the subgraph scope. + inner_agent = create_agent(inner_model, []) + + @tool + def delegate(query: str) -> str: + """Hand the query off to the inner agent.""" + result = inner_agent.invoke({"messages": [HumanMessage(query)]}) + return str(result["messages"][-1].content) + + outer_model = FakeToolCallingModel( + tool_calls=[ + [{"name": "delegate", "args": {"query": "hi"}, "id": "tc1"}], + [], + ] + ) + outer_agent = create_agent( + outer_model, + [delegate], + middleware=[PIIMiddleware("email", apply_to_output=True)], + ) + + run = await outer_agent.astream_events({"messages": [HumanMessage("go")]}, version="v3") + + seen_email_in_deltas = False + seen_email_in_finalized = False + seen_redaction = False + async for event in run: + if event.get("method") != "messages": + continue + data = event["params"].get("data") + if not isinstance(data, tuple) or len(data) != 2: + continue + payload = data[0] + if not isinstance(payload, dict): + continue + kind = payload.get("event") + if kind == "content-block-delta": + delta = payload.get("delta") or {} + if delta.get("type") == "text-delta": + text = delta.get("text", "") + if "bob@example.com" in text: + seen_email_in_deltas = True + if "[REDACTED_EMAIL]" in text: + seen_redaction = True + elif kind == "content-block-finish": + content = payload.get("content") or {} + if content.get("type") == "text": + text = content.get("text", "") + if "bob@example.com" in text: + seen_email_in_finalized = True + if "[REDACTED_EMAIL]" in text: + seen_redaction = True + + assert not seen_email_in_deltas, ( + "raw PII leaked through a subgraph's streamed deltas — child " + "mini-mux did not inherit the outer transformer factory" + ) + assert not seen_email_in_finalized, ( + "raw PII leaked through a subgraph's content-block-finish snapshot" + ) + assert seen_redaction, "transformer never fired at the subgraph scope" diff --git a/libs/langchain_v1/uv.lock b/libs/langchain_v1/uv.lock index 8fb691088fb..1c3db839e08 100644 --- a/libs/langchain_v1/uv.lock +++ b/libs/langchain_v1/uv.lock @@ -2025,7 +2025,7 @@ requires-dist = [ { name = "langchain-perplexity", marker = "extra == 'perplexity'" }, { name = "langchain-together", marker = "extra == 'together'" }, { name = "langchain-xai", marker = "extra == 'xai'" }, - { name = "langgraph", specifier = ">=1.2.0,<1.3.0" }, + { name = "langgraph", specifier = ">=1.2.1,<1.3.0" }, { name = "pydantic", specifier = ">=2.7.4,<3.0.0" }, ] provides-extras = ["community", "anthropic", "openai", "azure-ai", "google-vertexai", "google-genai", "fireworks", "ollama", "together", "mistralai", "huggingface", "groq", "aws", "baseten", "deepseek", "xai", "perplexity"] @@ -2603,7 +2603,7 @@ wheels = [ [[package]] name = "langgraph" -version = "1.2.0" +version = "1.2.1" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "langchain-core" }, @@ -2613,9 +2613,9 @@ dependencies = [ { name = "pydantic" }, { name = "xxhash" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/58/61/d5d25e783035aa307d289b37e082258a6061c0fb4caa4a284f3bf1e87169/langgraph-1.2.0.tar.gz", hash = "sha256:4a9baaf62afc5d5f63144a50095140a34b9aa9b7cea695d25326d564775348e7", size = 690248, upload-time = "2026-05-12T03:46:39.164Z" } +sdist = { url = "https://files.pythonhosted.org/packages/1c/8e/34a57e338a319e3b32c1bd183c2a9a04f7f35d683d3f3d8f597f6eacbc4e/langgraph-1.2.1.tar.gz", hash = "sha256:28314f844678d9d307cbd63e7b48b0145bf17177d84b40ee2921061e07b6f966", size = 693750, upload-time = "2026-05-21T18:33:07.478Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/f6/e8/e3304ac0015c2bdb04ad9785e4ed65c788855ce7857ce6104dd2f5d322db/langgraph-1.2.0-py3-none-any.whl", hash = "sha256:03fd5895a8d4b70db1ff63ebc3bacead29dd20cd794a8b1a483e7ec9018f7a65", size = 234262, upload-time = "2026-05-12T03:46:37.971Z" }, + { url = "https://files.pythonhosted.org/packages/73/8c/313912e26866893bd15be9b4ea3442dc86f69270b0ad01a4961d1eba7118/langgraph-1.2.1-py3-none-any.whl", hash = "sha256:5cc4020de8f1e2a048d773f6e9128646a2af8c68a8067ab9cab177a2fcc8d221", size = 235317, upload-time = "2026-05-21T18:33:05.687Z" }, ] [[package]]