From af4d711a2f2ebcaa524af34abb0262dfdac9678a Mon Sep 17 00:00:00 2001 From: Eugene Yurtsev Date: Fri, 10 Apr 2026 10:47:54 -0400 Subject: [PATCH] chore(core): reduce streaming metadata / perf (#36588) - looking into reducing streaming metadata / perfm --------- Co-authored-by: William Fu-Hinthorn <13333726+hinthornw@users.noreply.github.com> --- libs/core/Makefile | 2 +- libs/core/langchain_core/callbacks/manager.py | 55 +- libs/core/langchain_core/runnables/config.py | 41 +- libs/core/langchain_core/tracers/core.py | 8 +- libs/core/langchain_core/tracers/langchain.py | 55 ++ .../tests/unit_tests/runnables/test_config.py | 142 +++- .../unit_tests/runnables/test_runnable.py | 2 +- .../runnables/test_tracing_interops.py | 701 +++++++++++++++++- .../unit_tests/tracers/test_langchain.py | 205 +++++ libs/langchain_v1/Makefile | 4 +- .../chat_models/test_chat_models.py | 10 +- 11 files changed, 1195 insertions(+), 30 deletions(-) diff --git a/libs/core/Makefile b/libs/core/Makefile index 4f0e1e20221..4df60bc02c4 100644 --- a/libs/core/Makefile +++ b/libs/core/Makefile @@ -17,7 +17,7 @@ test tests: -u LANGSMITH_API_KEY \ -u LANGSMITH_TRACING \ -u LANGCHAIN_PROJECT \ - uv run --group test pytest -n auto $(PYTEST_EXTRA) --disable-socket --allow-unix-socket $(TEST_FILE) + uv run --group test pytest -n auto --benchmark-disable $(PYTEST_EXTRA) --disable-socket --allow-unix-socket $(TEST_FILE) test_watch: env \ diff --git a/libs/core/langchain_core/callbacks/manager.py b/libs/core/langchain_core/callbacks/manager.py index 31aa2ac156f..19f66f6d46e 100644 --- a/libs/core/langchain_core/callbacks/manager.py +++ b/libs/core/langchain_core/callbacks/manager.py @@ -7,7 +7,7 @@ import atexit import functools import logging from abc import ABC, abstractmethod -from collections.abc import Callable +from collections.abc import Callable, Mapping from concurrent.futures import ThreadPoolExecutor from contextlib import asynccontextmanager, contextmanager from contextvars import copy_context @@ -1614,6 +1614,9 @@ class CallbackManager(BaseCallbackManager): local_tags: list[str] | None = None, inheritable_metadata: dict[str, Any] | None = None, local_metadata: dict[str, Any] | None = None, + *, + langsmith_inheritable_metadata: Mapping[str, Any] | None = None, + langsmith_inheritable_tags: list[str] | None = None, ) -> CallbackManager: """Configure the callback manager. @@ -1625,6 +1628,10 @@ class CallbackManager(BaseCallbackManager): local_tags: The local tags. inheritable_metadata: The inheritable metadata. local_metadata: The local metadata. + langsmith_inheritable_metadata: Default inheritable metadata applied + to any `LangChainTracer` handlers via `set_defaults`. + langsmith_inheritable_tags: Default inheritable tags applied to any + `LangChainTracer` handlers via `set_defaults`. Returns: The configured callback manager. @@ -1638,6 +1645,8 @@ class CallbackManager(BaseCallbackManager): inheritable_metadata, local_metadata, verbose=verbose, + langsmith_inheritable_metadata=langsmith_inheritable_metadata, + langsmith_inheritable_tags=langsmith_inheritable_tags, ) @@ -2134,6 +2143,9 @@ class AsyncCallbackManager(BaseCallbackManager): local_tags: list[str] | None = None, inheritable_metadata: dict[str, Any] | None = None, local_metadata: dict[str, Any] | None = None, + *, + langsmith_inheritable_metadata: Mapping[str, Any] | None = None, + langsmith_inheritable_tags: list[str] | None = None, ) -> AsyncCallbackManager: """Configure the async callback manager. @@ -2145,6 +2157,10 @@ class AsyncCallbackManager(BaseCallbackManager): local_tags: The local tags. inheritable_metadata: The inheritable metadata. local_metadata: The local metadata. + langsmith_inheritable_metadata: Default inheritable metadata applied + to any `LangChainTracer` handlers via `set_defaults`. + langsmith_inheritable_tags: Default inheritable tags applied to any + `LangChainTracer` handlers via `set_defaults`. Returns: The configured async callback manager. @@ -2158,6 +2174,8 @@ class AsyncCallbackManager(BaseCallbackManager): inheritable_metadata, local_metadata, verbose=verbose, + langsmith_inheritable_metadata=langsmith_inheritable_metadata, + langsmith_inheritable_tags=langsmith_inheritable_tags, ) @@ -2304,6 +2322,8 @@ def _configure( local_metadata: dict[str, Any] | None = None, *, verbose: bool = False, + langsmith_inheritable_metadata: Mapping[str, Any] | None = None, + langsmith_inheritable_tags: list[str] | None = None, ) -> T: """Configure the callback manager. @@ -2316,6 +2336,10 @@ def _configure( inheritable_metadata: The inheritable metadata. local_metadata: The local metadata. verbose: Whether to enable verbose mode. + langsmith_inheritable_metadata: Default inheritable metadata applied to + any `LangChainTracer` handlers via `set_defaults`. + langsmith_inheritable_tags: Default inheritable tags applied to any + `LangChainTracer` handlers via `set_defaults`. Raises: RuntimeError: If `LANGCHAIN_TRACING` is set but `LANGCHAIN_TRACING_V2` is not. @@ -2387,8 +2411,6 @@ def _configure( if inheritable_metadata or local_metadata: callback_manager.add_metadata(inheritable_metadata or {}) callback_manager.add_metadata(local_metadata or {}, inherit=False) - if tracing_metadata: - callback_manager.add_metadata(tracing_metadata.copy()) if tracing_tags: callback_manager.add_tags(tracing_tags.copy()) @@ -2440,6 +2462,7 @@ def _configure( else tracing_context["client"] ), tags=tracing_tags, + metadata=tracing_metadata, ) callback_manager.add_handler(handler) except Exception as e: @@ -2479,6 +2502,32 @@ def _configure( for handler in callback_manager.handlers ): callback_manager.add_handler(var_handler, inheritable) + + if tracing_metadata: + langsmith_inheritable_metadata = { + **tracing_metadata, + **(langsmith_inheritable_metadata or {}), + } + + if langsmith_inheritable_metadata or langsmith_inheritable_tags: + callback_manager.handlers = [ + handler.copy_with_metadata_defaults( + metadata=langsmith_inheritable_metadata, + tags=langsmith_inheritable_tags, + ) + if isinstance(handler, LangChainTracer) + else handler + for handler in callback_manager.handlers + ] + callback_manager.inheritable_handlers = [ + handler.copy_with_metadata_defaults( + metadata=langsmith_inheritable_metadata, + tags=langsmith_inheritable_tags, + ) + if isinstance(handler, LangChainTracer) + else handler + for handler in callback_manager.inheritable_handlers + ] return callback_manager diff --git a/libs/core/langchain_core/runnables/config.py b/libs/core/langchain_core/runnables/config.py index b538ff9fa05..5bae0488370 100644 --- a/libs/core/langchain_core/runnables/config.py +++ b/libs/core/langchain_core/runnables/config.py @@ -138,6 +138,28 @@ COPIABLE_KEYS = [ "configurable", ] + +# Users are expected to use the `context` API with a context object +# (which does not get traced) +CONFIGURABLE_TO_TRACING_METADATA_EXCLUDED_KEYS = frozenset(("api_key",)) + + +def _get_langsmith_inheritable_metadata_from_config( + config: RunnableConfig, +) -> dict[str, Any] | None: + """Get LangSmith-only inheritable metadata defaults derived from config.""" + configurable = config.get("configurable") or {} + metadata = { + key: value + for key, value in configurable.items() + if not key.startswith("__") + and isinstance(value, (str, int, float, bool)) + and key not in config.get("metadata", {}) + and key not in CONFIGURABLE_TO_TRACING_METADATA_EXCLUDED_KEYS + } + return metadata or None + + DEFAULT_RECURSION_LIMIT = 25 @@ -264,14 +286,11 @@ def ensure_config(config: RunnableConfig | None = None) -> RunnableConfig: for k, v in config.items(): if k not in CONFIG_KEYS and v is not None: empty["configurable"][k] = v - for key, value in empty.get("configurable", {}).items(): - if ( - not key.startswith("__") - and isinstance(value, (str, int, float, bool)) - and key not in empty["metadata"] - and key != "api_key" - ): - empty["metadata"][key] = value + if ( + isinstance(model := empty.get("configurable", {}).get("model"), str) + and "model" not in empty["metadata"] + ): + empty["metadata"]["model"] = model return empty @@ -508,6 +527,9 @@ def get_callback_manager_for_config(config: RunnableConfig) -> CallbackManager: inheritable_callbacks=config.get("callbacks"), inheritable_tags=config.get("tags"), inheritable_metadata=config.get("metadata"), + langsmith_inheritable_metadata=_get_langsmith_inheritable_metadata_from_config( + config + ), ) @@ -526,6 +548,9 @@ def get_async_callback_manager_for_config( inheritable_callbacks=config.get("callbacks"), inheritable_tags=config.get("tags"), inheritable_metadata=config.get("metadata"), + langsmith_inheritable_metadata=_get_langsmith_inheritable_metadata_from_config( + config + ), ) diff --git a/libs/core/langchain_core/tracers/core.py b/libs/core/langchain_core/tracers/core.py index bbfb5492d85..5d08a03b46f 100644 --- a/libs/core/langchain_core/tracers/core.py +++ b/libs/core/langchain_core/tracers/core.py @@ -51,6 +51,8 @@ class _TracerCore(ABC): _schema_format: Literal[ "original", "streaming_events", "original+chat" ] = "original", + run_map: dict[str, Run] | None = None, + order_map: dict[UUID, tuple[UUID, str]] | None = None, **kwargs: Any, ) -> None: """Initialize the tracer. @@ -70,6 +72,8 @@ class _TracerCore(ABC): streaming events. - `'original+chat'` is a format that is the same as `'original'` except it does NOT raise an attribute error `on_chat_model_start` + run_map: Optional shared map of run ID to run. + order_map: Optional shared map of run ID to trace ordering data. **kwargs: Additional keyword arguments that will be passed to the superclass. """ @@ -77,10 +81,10 @@ class _TracerCore(ABC): self._schema_format = _schema_format # For internal use only API will change. - self.run_map: dict[str, Run] = {} + self.run_map = run_map if run_map is not None else {} """Map of run ID to run. Cleared on run end.""" - self.order_map: dict[UUID, tuple[UUID, str]] = {} + self.order_map = order_map if order_map is not None else {} """Map of run ID to (trace_id, dotted_order). Cleared when tracer GCed.""" @abstractmethod diff --git a/libs/core/langchain_core/tracers/langchain.py b/libs/core/langchain_core/tracers/langchain.py index 2a2b5ce0f62..a0abe155396 100644 --- a/libs/core/langchain_core/tracers/langchain.py +++ b/libs/core/langchain_core/tracers/langchain.py @@ -27,6 +27,8 @@ from langchain_core.tracers.base import BaseTracer from langchain_core.tracers.schemas import Run if TYPE_CHECKING: + from collections.abc import Mapping + from langchain_core.messages import BaseMessage from langchain_core.outputs import ChatGenerationChunk, GenerationChunk @@ -124,6 +126,8 @@ class LangChainTracer(BaseTracer): project_name: str | None = None, client: Client | None = None, tags: list[str] | None = None, + *, + metadata: Mapping[str, str] | None = None, **kwargs: Any, ) -> None: """Initialize the LangChain tracer. @@ -139,6 +143,9 @@ class LangChainTracer(BaseTracer): tags: The tags. Defaults to an empty list. + metadata: Additional metadata to include if it isn't already in the run. + + Defaults to None. **kwargs: Additional keyword arguments. """ super().__init__(**kwargs) @@ -150,6 +157,39 @@ class LangChainTracer(BaseTracer): self.tags = tags or [] self.latest_run: Run | None = None self.run_has_token_event_map: dict[str, bool] = {} + self.tracing_metadata: dict[str, str] | None = ( + dict(metadata) if metadata is not None else None + ) + + def copy_with_metadata_defaults( + self, + *, + metadata: Mapping[str, str] | None = None, + tags: list[str] | None = None, + ) -> LangChainTracer: + """Return a new tracer with merged tracer-only defaults.""" + base_metadata = self.tracing_metadata + if metadata is None: + merged_metadata = dict(base_metadata) if base_metadata is not None else None + elif base_metadata is None: + merged_metadata = dict(metadata) + else: + merged_metadata = dict(base_metadata) + for key, value in metadata.items(): + if key not in merged_metadata: + merged_metadata[key] = value + + merged_tags = sorted(set(self.tags + tags)) if tags else self.tags + + return self.__class__( + example_id=self.example_id, + project_name=self.project_name, + client=self.client, + tags=merged_tags, + metadata=merged_metadata, + run_map=self.run_map, + order_map=self.order_map, + ) def _start_trace(self, run: Run) -> None: if self.project_name: @@ -263,6 +303,7 @@ class LangChainTracer(BaseTracer): try: run.extra["runtime"] = get_runtime_environment() run.tags = self._get_tags(run) + _patch_missing_metadata(self, run) if run.ls_client is not self.client: run.ls_client = self.client run.post() @@ -398,3 +439,17 @@ class LangChainTracer(BaseTracer): """Wait for the given futures to complete.""" if self.client is not None: self.client.flush() + + +def _patch_missing_metadata(self: LangChainTracer, run: Run) -> None: + if not self.tracing_metadata: + return + metadata = run.metadata + patched = None + for k, v in self.tracing_metadata.items(): + if k not in metadata: + if patched is None: + # Copy on first miss to avoid mutating the shared dict. + patched = {**metadata} + run.extra["metadata"] = patched + patched[k] = v diff --git a/libs/core/tests/unit_tests/runnables/test_config.py b/libs/core/tests/unit_tests/runnables/test_config.py index 5897879d90a..9a4716e8269 100644 --- a/libs/core/tests/unit_tests/runnables/test_config.py +++ b/libs/core/tests/unit_tests/runnables/test_config.py @@ -16,6 +16,7 @@ from langchain_core.callbacks.streaming_stdout import StreamingStdOutCallbackHan from langchain_core.runnables import RunnableBinding, RunnablePassthrough from langchain_core.runnables.config import ( RunnableConfig, + _get_langsmith_inheritable_metadata_from_config, _set_config_context, ensure_config, merge_configs, @@ -61,7 +62,7 @@ def test_ensure_config() -> None: assert config["configurable"] is not arg["configurable"] assert config == { "tags": ["tag1", "tag2"], - "metadata": {"foo": "bar", "baz": "qux", "something": "else"}, + "metadata": {"foo": "bar"}, "callbacks": [arg["callbacks"][0]], "recursion_limit": 100, "configurable": {"baz": "qux", "something": "else"}, @@ -71,6 +72,145 @@ def test_ensure_config() -> None: } +def test_ensure_config_copies_model_to_metadata() -> None: + config = ensure_config( + { + "configurable": { + "thread_id": "th-123", + "checkpoint_id": "ckpt-1", + "checkpoint_ns": "ns-1", + "task_id": "task-1", + "run_id": "run-456", + "assistant_id": "asst-789", + "graph_id": "graph-0", + "model": "gpt-4o", + "user_id": "uid-1", + "cron_id": "cron-1", + "langgraph_auth_user_id": "user-1", + "some_api_key": "opaque-token", + "custom_setting": {"nested": True}, + "none_value": None, + }, + "metadata": {"nooverride": 18}, + } + ) + + assert config["metadata"] == {"nooverride": 18, "model": "gpt-4o"} + assert config["configurable"] == { + "thread_id": "th-123", + "checkpoint_id": "ckpt-1", + "checkpoint_ns": "ns-1", + "task_id": "task-1", + "run_id": "run-456", + "assistant_id": "asst-789", + "graph_id": "graph-0", + "model": "gpt-4o", + "user_id": "uid-1", + "cron_id": "cron-1", + "langgraph_auth_user_id": "user-1", + "some_api_key": "opaque-token", + "custom_setting": {"nested": True}, + "none_value": None, + } + + +def test_ensure_config_metadata_is_not_overridden_by_configurable_model() -> None: + config = ensure_config( + { + "configurable": { + "model": "from-configurable", + "run_id": None, + "checkpoint_ns": "from-configurable", + }, + "metadata": { + "model": "from-metadata", + "run_id": "from-metadata", + "checkpoint_ns": "from-metadata", + }, + } + ) + + assert config["metadata"] == { + "model": "from-metadata", + "run_id": "from-metadata", + "checkpoint_ns": "from-metadata", + } + assert config["configurable"] == { + "model": "from-configurable", + "run_id": None, + "checkpoint_ns": "from-configurable", + } + + +def test_ensure_config_copies_top_level_model_to_metadata() -> None: + config = ensure_config( + cast( + "RunnableConfig", + { + "model": "gpt-4o", + "metadata": {"nooverride": 18}, + }, + ) + ) + + assert config["metadata"] == {"nooverride": 18, "model": "gpt-4o"} + assert config["configurable"] == {"model": "gpt-4o"} + + +def test_get_langsmith_inheritable_metadata_from_config_uses_previous_copy_rules() -> ( + None +): + config = ensure_config( + cast( + "RunnableConfig", + { + "something": "else", + "metadata": { + "foo": "bar", + "model": "from-metadata", + "checkpoint_ns": "from-metadata", + }, + "configurable": { + "baz": "qux", + "thread_id": "th-123", + "checkpoint_id": "ckpt-1", + "checkpoint_ns": "from-configurable", + "task_id": "task-1", + "run_id": "run-456", + "assistant_id": "asst-789", + "graph_id": "graph-0", + "model": "from-configurable", + "user_id": "uid-1", + "cron_id": "cron-1", + "langgraph_auth_user_id": "user-1", + "api_key": "should-not-propagate", + "__secret_key": "should-not-propagate", + "temperature": 0.5, + "streaming": True, + "custom_setting": {"nested": True}, + "none_value": None, + }, + }, + ) + ) + + assert _get_langsmith_inheritable_metadata_from_config(config) == { + "something": "else", + "baz": "qux", + "thread_id": "th-123", + "checkpoint_id": "ckpt-1", + "task_id": "task-1", + "run_id": "run-456", + "assistant_id": "asst-789", + "graph_id": "graph-0", + "user_id": "uid-1", + "cron_id": "cron-1", + "langgraph_auth_user_id": "user-1", + "temperature": 0.5, + "streaming": True, + } + + async def test_merge_config_callbacks() -> None: manager: RunnableConfig = { "callbacks": CallbackManager(handlers=[StdOutCallbackHandler()]) diff --git a/libs/core/tests/unit_tests/runnables/test_runnable.py b/libs/core/tests/unit_tests/runnables/test_runnable.py index b6848c2cd85..da648b00312 100644 --- a/libs/core/tests/unit_tests/runnables/test_runnable.py +++ b/libs/core/tests/unit_tests/runnables/test_runnable.py @@ -1162,7 +1162,7 @@ async def test_with_config_metadata_passthrough(mocker: MockerFixture) -> None: "callbacks": None, "recursion_limit": 25, "configurable": {"hello": "there", "__secret_key": "nahnah"}, - "metadata": {"hello": "there", "bye": "now"}, + "metadata": {"bye": "now"}, }, ) spy.reset_mock() diff --git a/libs/core/tests/unit_tests/runnables/test_tracing_interops.py b/libs/core/tests/unit_tests/runnables/test_tracing_interops.py index b5fae902f7e..c163e024ad5 100644 --- a/libs/core/tests/unit_tests/runnables/test_tracing_interops.py +++ b/libs/core/tests/unit_tests/runnables/test_tracing_interops.py @@ -1,7 +1,10 @@ from __future__ import annotations +import asyncio +import concurrent.futures import json import sys +import threading import uuid from inspect import isasyncgenfunction from typing import TYPE_CHECKING, Any, Literal @@ -12,13 +15,15 @@ from langsmith import Client, RunTree, get_current_run_tree, traceable from langsmith.run_helpers import tracing_context from langsmith.utils import get_env_var +from langchain_core.callbacks.base import BaseCallbackHandler +from langchain_core.callbacks.manager import CallbackManager from langchain_core.runnables.base import RunnableLambda, RunnableParallel from langchain_core.tracers.langchain import LangChainTracer if TYPE_CHECKING: - from collections.abc import AsyncGenerator, Callable, Coroutine, Generator + from collections.abc import AsyncGenerator, Callable, Coroutine, Generator, Mapping - from langchain_core.callbacks import BaseCallbackHandler + from langchain_core.runnables.config import RunnableConfig def _get_posts(client: Client) -> list[dict[str, Any]]: @@ -43,12 +48,15 @@ def _get_posts(client: Client) -> list[dict[str, Any]]: def _create_tracer_with_mocked_client( project_name: str | None = None, tags: list[str] | None = None, + metadata: Mapping[str, str] | None = None, ) -> LangChainTracer: mock_session = MagicMock() mock_client_ = Client( session=mock_session, api_key="test", auto_batch_tracing=False ) - return LangChainTracer(client=mock_client_, project_name=project_name, tags=tags) + return LangChainTracer( + client=mock_client_, project_name=project_name, tags=tags, metadata=metadata + ) def test_tracing_context() -> None: @@ -75,6 +83,38 @@ def test_tracing_context() -> None: assert all(post["session_name"] == project_name for post in posts) +def test_inheritable_metadata_respects_explicit_metadata_with_tracing_context() -> None: + """Tracer defaults fill missing keys while run metadata keeps precedence.""" + tracer = _create_tracer_with_mocked_client() + + @RunnableLambda + def my_func(x: int) -> int: + return x + + callbacks = CallbackManager.configure( + inheritable_callbacks=[tracer], + langsmith_inheritable_metadata={ + "tenant": "from_tracer", + "shared": "from_tracer", + }, + ) + with tracing_context(enabled=True, client=tracer.client): + my_func.invoke( + 1, + { + "callbacks": callbacks, + "metadata": {"shared": "from_run", "explicit": "from_run"}, + }, + ) + + posts = _get_posts(tracer.client) + assert len(posts) == 1 + metadata = posts[0].get("extra", {}).get("metadata", {}) + assert metadata["tenant"] == "from_tracer" + assert metadata["shared"] == "from_run" + assert metadata["explicit"] == "from_run" + + def test_config_traceable_handoff() -> None: if hasattr(get_env_var, "cache_clear"): get_env_var.cache_clear() # type: ignore[attr-defined] @@ -466,7 +506,10 @@ def test_tree_is_constructed(parent_type: Literal["ls", "lc"]) -> None: ): collected: dict[str, RunTree] = {} - def collect_run(run: RunTree) -> None: + def collect_langsmith_run(run: RunTree) -> None: + collected[str(run.id)] = run + + def collect_tracer_run(_: LangChainTracer, run: RunTree) -> None: collected[str(run.id)] = run if parent_type == "ls": @@ -476,7 +519,8 @@ def test_tree_is_constructed(parent_type: Literal["ls", "lc"]) -> None: return child.invoke("foo") assert ( - parent(langsmith_extra={"on_end": collect_run, "run_id": rid}) == "foo" + parent(langsmith_extra={"on_end": collect_langsmith_run, "run_id": rid}) + == "foo" ) assert collected @@ -487,9 +531,10 @@ def test_tree_is_constructed(parent_type: Literal["ls", "lc"]) -> None: return child.invoke("foo") tracer = LangChainTracer() - tracer._persist_run = collect_run # type: ignore[method-assign] - - assert parent.invoke(..., {"run_id": rid, "callbacks": [tracer]}) == "foo" # type: ignore[attr-defined] + with patch.object(LangChainTracer, "_persist_run", new=collect_tracer_run): + assert ( + parent.invoke(..., {"run_id": rid, "callbacks": [tracer]}) == "foo" # type: ignore[attr-defined] + ) run = collected.get(str(rid)) assert run is not None @@ -508,3 +553,643 @@ def test_tree_is_constructed(parent_type: Literal["ls", "lc"]) -> None: assert "afoo" in kitten_run.tags # type: ignore[operator] assert grandchild_run is not None assert kitten_run.dotted_order.startswith(grandchild_run.dotted_order) + + +class TestTracerMetadataThroughInvoke: + """Tests for tracer metadata merging through invoke calls.""" + + def test_tracer_metadata_applied_to_all_runs(self) -> None: + """Tracer metadata appears on every run when no config metadata is set.""" + tracer = _create_tracer_with_mocked_client( + metadata={"env": "prod", "service": "api"} + ) + + @RunnableLambda + def child(x: int) -> int: + return x + 1 + + @RunnableLambda + def parent(x: int) -> int: + return child.invoke(x) + + parent.invoke(1, {"callbacks": [tracer]}) + + posts = _get_posts(tracer.client) + assert len(posts) == 2 + for post in posts: + md = post.get("extra", {}).get("metadata", {}) + assert md.get("env") == "prod", f"run {post['name']} missing env" + assert md.get("service") == "api", f"run {post['name']} missing service" + + def test_config_metadata_takes_precedence(self) -> None: + """Config metadata wins over tracer metadata for overlapping keys.""" + tracer = _create_tracer_with_mocked_client( + metadata={"env": "prod", "tracer_only": "yes"} + ) + + @RunnableLambda + def my_func(x: int) -> int: + return x + + my_func.invoke( + 1, + { + "callbacks": [tracer], + "metadata": {"env": "staging", "config_only": "yes"}, + }, + ) + + posts = _get_posts(tracer.client) + assert len(posts) == 1 + md = posts[0].get("extra", {}).get("metadata", {}) + # Config wins for overlapping key + assert md["env"] == "staging" + # Both non-overlapping keys are present + assert md["tracer_only"] == "yes" + assert md["config_only"] == "yes" + + def test_nested_calls_inherit_config_metadata(self) -> None: + """Child runs inherit config metadata; tracer metadata fills gaps.""" + tracer = _create_tracer_with_mocked_client( + metadata={"tracer_key": "tracer_val"} + ) + + @RunnableLambda + def child(x: int) -> int: + return x + 1 + + @RunnableLambda + def parent(x: int) -> int: + return child.invoke(x) + + parent.invoke( + 1, + { + "callbacks": [tracer], + "metadata": {"config_key": "config_val"}, + }, + ) + + posts = _get_posts(tracer.client) + assert len(posts) == 2 + name_to_md = { + post["name"]: post.get("extra", {}).get("metadata", {}) for post in posts + } + # Both parent and child should have config metadata (inherited) + # and tracer metadata (patched in) + for name, md in name_to_md.items(): + assert md.get("config_key") == "config_val", f"{name} missing config_key" + assert md.get("tracer_key") == "tracer_val", f"{name} missing tracer_key" + + def test_tracer_metadata_not_applied_to_sibling_handlers(self) -> None: + """Tracer metadata is not applied to other callback handlers. + + `_patch_missing_metadata` copies the metadata dict before patching, + so the callback manager's shared metadata dict is not mutated. + Other handlers should only see config metadata, not tracer metadata. + """ + tracer = _create_tracer_with_mocked_client( + metadata={"tracer_key": "tracer_val"} + ) + + received_metadata: list[dict[str, Any]] = [] + + class MetadataCapture(BaseCallbackHandler): + """Callback handler that records metadata from chain events.""" + + def on_chain_start(self, *_args: Any, **kwargs: Any) -> None: + received_metadata.append(dict(kwargs.get("metadata", {}))) + + capture = MetadataCapture() + + @RunnableLambda + def my_func(x: int) -> int: + return x + + my_func.invoke( + 1, + { + "callbacks": [tracer, capture], + "metadata": {"shared_key": "shared_val"}, + }, + ) + + assert len(received_metadata) >= 1 + for md in received_metadata: + assert md["shared_key"] == "shared_val" + assert "tracer_key" not in md + + # But the posted run DOES have tracer metadata + posts = _get_posts(tracer.client) + assert len(posts) >= 1 + for post in posts: + post_md = post.get("extra", {}).get("metadata", {}) + assert post_md["shared_key"] == "shared_val" + assert post_md["tracer_key"] == "tracer_val" + + def test_tracer_metadata_with_no_config_metadata(self) -> None: + """When no config metadata is set, tracer metadata is the sole source.""" + tracer = _create_tracer_with_mocked_client( + metadata={"only_from_tracer": "value"} + ) + + @RunnableLambda + def my_func(x: int) -> int: + return x + + my_func.invoke(1, {"callbacks": [tracer]}) + + posts = _get_posts(tracer.client) + assert len(posts) == 1 + md = posts[0].get("extra", {}).get("metadata", {}) + assert md["only_from_tracer"] == "value" + + def test_empty_tracer_metadata_does_not_interfere(self) -> None: + """Tracer with no metadata does not interfere with config metadata.""" + tracer = _create_tracer_with_mocked_client(metadata=None) + + @RunnableLambda + def my_func(x: int) -> int: + return x + + my_func.invoke( + 1, + {"callbacks": [tracer], "metadata": {"config_key": "config_val"}}, + ) + + posts = _get_posts(tracer.client) + assert len(posts) == 1 + md = posts[0].get("extra", {}).get("metadata", {}) + assert md["config_key"] == "config_val" + + +def test_inheritable_metadata_nested_runs_preserve_parent_child_shape() -> None: + """Concurrent nested runs keep parent-child linkage within each invocation.""" + tracer = _create_tracer_with_mocked_client() + barrier = threading.Barrier(2) + + @RunnableLambda + def child(x: int) -> int: + barrier.wait() + return x + 1 + + @RunnableLambda + def parent(x: int) -> int: + return child.invoke(x) + + def invoke_for_tenant(tenant: str, value: int) -> int: + callbacks = CallbackManager.configure( + inheritable_callbacks=[tracer], + langsmith_inheritable_metadata={"tenant": tenant}, + ) + return parent.invoke(value, {"callbacks": callbacks}) + + threads = [ + threading.Thread(target=invoke_for_tenant, args=("alpha", 1)), + threading.Thread(target=invoke_for_tenant, args=("beta", 2)), + ] + for thread in threads: + thread.start() + for thread in threads: + thread.join() + + posts = _get_posts(tracer.client) + assert len(posts) == 4 + parents = [post for post in posts if post["name"] == "parent"] + children = [post for post in posts if post["name"] == "child"] + assert len(parents) == 2 + assert len(children) == 2 + parent_ids = {parent["id"] for parent in parents} + assert {child["parent_run_id"] for child in children} == parent_ids + assert { + post.get("extra", {}).get("metadata", {}).get("tenant") for post in posts + } == { + "alpha", + "beta", + } + + +def test_inheritable_metadata_parallel_children_keep_tenant_isolation() -> None: + """Concurrent roots with parallel child runs keep tenant metadata isolated.""" + tracer = _create_tracer_with_mocked_client() + barrier = threading.Barrier(4) + + @RunnableLambda + def add_one(x: int) -> int: + barrier.wait() + return x + 1 + + @RunnableLambda + def add_two(x: int) -> int: + barrier.wait() + return x + 2 + + parallel = RunnableParallel(first=add_one, second=add_two) + + def invoke_for_tenant(tenant: str, value: int) -> dict[str, int]: + callbacks = CallbackManager.configure( + inheritable_callbacks=[tracer], + langsmith_inheritable_metadata={"tenant": tenant}, + ) + return parallel.invoke(value, {"callbacks": callbacks}) + + with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: + list(executor.map(invoke_for_tenant, ["alpha", "beta"], [1, 2])) + + posts = _get_posts(tracer.client) + assert len(posts) == 6 + assert { + post.get("extra", {}).get("metadata", {}).get("tenant") for post in posts + } == { + "alpha", + "beta", + } + posts_by_trace: dict[str, list[dict[str, Any]]] = {} + for post in posts: + posts_by_trace.setdefault(post["trace_id"], []).append(post) + assert len(posts_by_trace) == 2 + assert all(len(trace_posts) == 3 for trace_posts in posts_by_trace.values()) + + +@pytest.mark.skipif( + sys.version_info < (3, 11), reason="Asyncio context vars require Python 3.11+" +) +async def test_langsmith_inheritable_metadata_mixed_sync_async_managers_isolated() -> ( + None +): + """Sync and async manager configure paths can overlap without metadata sharing.""" + tracer = _create_tracer_with_mocked_client() + + @RunnableLambda + async def async_runnable(x: int) -> int: + await asyncio.sleep(0) + return x + 1 + + @RunnableLambda + def sync_runnable(x: int) -> int: + return x + 1 + + async def run_sync() -> int: + callbacks = CallbackManager.configure( + inheritable_callbacks=[tracer], + langsmith_inheritable_metadata={"path": "sync"}, + ) + return await asyncio.to_thread( + sync_runnable.invoke, 1, {"callbacks": callbacks} + ) + + async def run_async() -> int: + callbacks = CallbackManager.configure( + inheritable_callbacks=[tracer], + langsmith_inheritable_metadata={"path": "async"}, + ) + return await async_runnable.ainvoke(1, {"callbacks": callbacks}) + + await asyncio.gather(run_sync(), run_async()) + + posts = _get_posts(tracer.client) + assert len(posts) == 2 + assert { + post.get("extra", {}).get("metadata", {}).get("path") for post in posts + } == { + "sync", + "async", + } + + +class TestLangsmithInheritableTracingDefaultsInConfigure: + """Tests for LangSmith inheritable tracing defaults in configure.""" + + def test_langsmith_inheritable_metadata_applied_via_configure(self) -> None: + """langsmith_inheritable_metadata flows to a copied tracer.""" + tracer = _create_tracer_with_mocked_client() + cm = CallbackManager.configure( + inheritable_callbacks=[tracer], + langsmith_inheritable_metadata={"env": "prod", "service": "api"}, + ) + lc_tracers = [h for h in cm.handlers if isinstance(h, LangChainTracer)] + assert len(lc_tracers) == 1 + assert lc_tracers[0] is not tracer + assert lc_tracers[0].tracing_metadata == {"env": "prod", "service": "api"} + assert tracer.tracing_metadata is None + + def test_langsmith_inheritable_metadata_does_not_overwrite_tracer_metadata( + self, + ) -> None: + """Tracer metadata takes precedence over langsmith_inheritable_metadata.""" + tracer = _create_tracer_with_mocked_client(metadata={"env": "staging"}) + cm = CallbackManager.configure( + inheritable_callbacks=[tracer], + langsmith_inheritable_metadata={"env": "prod", "service": "api"}, + ) + lc_tracer = next(h for h in cm.handlers if isinstance(h, LangChainTracer)) + assert tracer.tracing_metadata == {"env": "staging"} + assert lc_tracer.tracing_metadata == {"env": "staging", "service": "api"} + + def test_tracing_context_metadata_merged_into_langsmith_inheritable_metadata( + self, + ) -> None: + """Tracing-context metadata merges into tracer defaults. + + LangSmith metadata keeps precedence on collisions. + """ + tracer = _create_tracer_with_mocked_client() + with tracing_context( + enabled=True, + client=tracer.client, + metadata={"trace_only": "value", "shared": "trace"}, + ): + cm = CallbackManager.configure( + inheritable_callbacks=[tracer], + langsmith_inheritable_metadata={ + "shared": "langsmith", + "tenant": "alpha", + }, + ) + + lc_tracer = next(h for h in cm.handlers if isinstance(h, LangChainTracer)) + assert lc_tracer.tracing_metadata == { + "trace_only": "value", + "shared": "langsmith", + "tenant": "alpha", + } + + def test_langsmith_inheritable_metadata_end_to_end(self) -> None: + """langsmith_inheritable_metadata in configure propagates to posted runs.""" + tracer = _create_tracer_with_mocked_client() + + @RunnableLambda + def my_func(x: int) -> int: + return x + + # Use langsmith_inheritable_metadata through the config callbacks path + cm = CallbackManager.configure( + inheritable_callbacks=[tracer], + langsmith_inheritable_metadata={"env": "prod"}, + ) + my_func.invoke(1, {"callbacks": cm}) + + posts = _get_posts(tracer.client) + assert len(posts) == 1 + md = posts[0].get("extra", {}).get("metadata", {}) + assert md["env"] == "prod" + + def test_runnable_config_copies_configurable_values_to_tracing_metadata( + self, + ) -> None: + tracer = _create_tracer_with_mocked_client() + + @RunnableLambda + def my_func(x: int) -> int: + return x + + config: RunnableConfig = { + "callbacks": [tracer], + "metadata": { + "something": "else", + "checkpoint_ns": "from-metadata", + "model": "from-metadata", + }, + "configurable": { + "thread_id": "th-123", + "checkpoint_id": "ckpt-1", + "checkpoint_ns": "from-configurable", + "task_id": "task-1", + "run_id": "run-456", + "assistant_id": "asst-789", + "graph_id": "graph-0", + "model": "from-configurable", + "user_id": "uid-1", + "cron_id": "cron-1", + "langgraph_auth_user_id": "user-1", + "api_key": "should-not-propagate", + "__secret_key": "should-not-propagate", + "temperature": 0.5, + "streaming": True, + "custom_setting": {"nested": True}, + "none_value": None, + }, + } + my_func.invoke(1, config) + + posts = _get_posts(tracer.client) + assert len(posts) == 1 + md = posts[0].get("extra", {}).get("metadata", {}) + assert { + key: md[key] + for key in ( + "something", + "thread_id", + "checkpoint_id", + "task_id", + "run_id", + "assistant_id", + "graph_id", + "user_id", + "cron_id", + "langgraph_auth_user_id", + "temperature", + "streaming", + "model", + "checkpoint_ns", + ) + } == { + "something": "else", + "thread_id": "th-123", + "checkpoint_id": "ckpt-1", + "task_id": "task-1", + "run_id": "run-456", + "assistant_id": "asst-789", + "graph_id": "graph-0", + "user_id": "uid-1", + "cron_id": "cron-1", + "langgraph_auth_user_id": "user-1", + "temperature": 0.5, + "streaming": True, + "model": "from-metadata", + "checkpoint_ns": "from-metadata", + } + assert "api_key" not in md + assert "__secret_key" not in md + assert "custom_setting" not in md + assert "none_value" not in md + + def test_langsmith_inheritable_metadata_does_not_affect_non_tracer_handlers( + self, + ) -> None: + """langsmith_inheritable_metadata only applies to tracers.""" + tracer = _create_tracer_with_mocked_client() + + received_metadata: list[dict[str, Any]] = [] + + class MetadataCapture(BaseCallbackHandler): + def on_chain_start(self, *_args: Any, **kwargs: Any) -> None: + received_metadata.append(dict(kwargs.get("metadata", {}))) + + capture = MetadataCapture() + cm = CallbackManager.configure( + inheritable_callbacks=[tracer, capture], + langsmith_inheritable_metadata={"tracer_only": "yes"}, + ) + + @RunnableLambda + def my_func(x: int) -> int: + return x + + my_func.invoke(1, {"callbacks": cm}) + + # Non-tracer handler should NOT see langsmith_inheritable_metadata + assert len(received_metadata) >= 1 + for md in received_metadata: + assert "tracer_only" not in md + + # But the tracer's posted runs SHOULD have it + posts = _get_posts(tracer.client) + assert len(posts) >= 1 + for post in posts: + post_md = post.get("extra", {}).get("metadata", {}) + assert post_md["tracer_only"] == "yes" + + def test_no_langsmith_inheritable_metadata_is_noop(self) -> None: + """Passing langsmith_inheritable_metadata=None does not alter tracer state.""" + tracer = _create_tracer_with_mocked_client() + cm = CallbackManager.configure( + inheritable_callbacks=[tracer], + langsmith_inheritable_metadata=None, + ) + lc_tracer = next(h for h in cm.handlers if isinstance(h, LangChainTracer)) + assert lc_tracer is tracer + assert tracer.tracing_metadata is None + + def test_langsmith_inheritable_tags_applied_via_configure(self) -> None: + """langsmith_inheritable_tags flow to a copied tracer.""" + tracer = _create_tracer_with_mocked_client() + tracer.tags = ["existing"] + cm = CallbackManager.configure( + inheritable_callbacks=[tracer], + langsmith_inheritable_tags=["tenant:alpha", "existing"], + ) + lc_tracer = next(h for h in cm.handlers if isinstance(h, LangChainTracer)) + assert lc_tracer is not tracer + assert lc_tracer.tags == ["existing", "tenant:alpha"] + assert tracer.tags == ["existing"] + + def test_inheritable_tags_do_not_affect_non_tracer_handlers(self) -> None: + """langsmith_inheritable_tags only apply to tracers.""" + tracer = _create_tracer_with_mocked_client() + + received_tags: list[list[str]] = [] + + class TagCapture(BaseCallbackHandler): + def on_chain_start(self, *_args: Any, **kwargs: Any) -> None: + received_tags.append(list(kwargs.get("tags", []))) + + capture = TagCapture() + cm = CallbackManager.configure( + inheritable_callbacks=[tracer, capture], + langsmith_inheritable_tags=["tracer-only"], + ) + + @RunnableLambda + def my_func(x: int) -> int: + return x + + my_func.invoke(1, {"callbacks": cm}) + + assert received_tags + assert all("tracer-only" not in tags for tags in received_tags) + + posts = _get_posts(tracer.client) + assert posts + assert all("tracer-only" in post.get("tags", []) for post in posts) + + def test_langsmith_inheritable_metadata_copies_handlers_without_mutating_original( + self, + ) -> None: + """Configured manager copies tracers and leaves the original unchanged.""" + tracer = _create_tracer_with_mocked_client() + cm = CallbackManager.configure( + inheritable_callbacks=[tracer], + langsmith_inheritable_metadata={"env": "prod"}, + ) + handler_tracer = next(h for h in cm.handlers if isinstance(h, LangChainTracer)) + inheritable_tracer = next( + h for h in cm.inheritable_handlers if isinstance(h, LangChainTracer) + ) + assert handler_tracer is not tracer + assert inheritable_tracer is not tracer + assert tracer.tracing_metadata is None + + def test_langsmith_inheritable_metadata_configure_isolated_per_manager( + self, + ) -> None: + """Separate configure calls keep tracer-only defaults isolated.""" + tracer = _create_tracer_with_mocked_client() + alpha_manager = CallbackManager.configure( + inheritable_callbacks=[tracer], + langsmith_inheritable_metadata={"tenant": "alpha"}, + ) + beta_manager = CallbackManager.configure( + inheritable_callbacks=[tracer], + langsmith_inheritable_metadata={"tenant": "beta"}, + ) + + alpha_tracer = next( + handler + for handler in alpha_manager.handlers + if isinstance(handler, LangChainTracer) + ) + beta_tracer = next( + handler + for handler in beta_manager.handlers + if isinstance(handler, LangChainTracer) + ) + + assert tracer.tracing_metadata is None + assert alpha_tracer is not tracer + assert beta_tracer is not tracer + assert alpha_tracer is not beta_tracer + assert alpha_tracer.tracing_metadata == {"tenant": "alpha"} + assert beta_tracer.tracing_metadata == {"tenant": "beta"} + assert alpha_tracer.run_map is tracer.run_map + assert beta_tracer.run_map is tracer.run_map + assert alpha_tracer.order_map is tracer.order_map + assert beta_tracer.order_map is tracer.order_map + + def test_inheritable_metadata_concurrent_invocations_remain_isolated( + self, + ) -> None: + """Parallel invocations through copied tracers keep metadata separated.""" + tracer = _create_tracer_with_mocked_client() + barrier = threading.Barrier(2) + + @traceable + def traced_leaf(x: int) -> int: + barrier.wait() + return x + + @RunnableLambda + def my_func(x: int) -> int: + return traced_leaf(x) + + def invoke_for_tenant(tenant: str, value: int) -> int: + callbacks = CallbackManager.configure( + inheritable_callbacks=[tracer], + langsmith_inheritable_metadata={"tenant": tenant}, + ) + return my_func.invoke(value, {"callbacks": callbacks}) + + with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: + list(executor.map(invoke_for_tenant, ["alpha", "beta"], [1, 2])) + + posts = _get_posts(tracer.client) + assert len(posts) == 4 + assert {post["name"] for post in posts} == {"my_func", "traced_leaf"} + my_func_posts = [post for post in posts if post["name"] == "my_func"] + assert len(my_func_posts) == 2 + assert { + post.get("extra", {}).get("metadata", {}).get("tenant") + for post in my_func_posts + } == {"alpha", "beta"} + assert tracer.run_map == {} + assert len(tracer.order_map) == 2 diff --git a/libs/core/tests/unit_tests/tracers/test_langchain.py b/libs/core/tests/unit_tests/tracers/test_langchain.py index d2a83e655a6..0e02c05986a 100644 --- a/libs/core/tests/unit_tests/tracers/test_langchain.py +++ b/libs/core/tests/unit_tests/tracers/test_langchain.py @@ -1,3 +1,4 @@ +import concurrent.futures import threading import time import unittest.mock @@ -15,6 +16,7 @@ from langchain_core.outputs import ChatGeneration, LLMResult from langchain_core.tracers.langchain import ( LangChainTracer, _get_usage_metadata_from_generations, + _patch_missing_metadata, ) from langchain_core.tracers.schemas import Run @@ -696,3 +698,206 @@ def test_on_chain_error_updates_when_not_defers_inputs() -> None: # Should call update (PATCH), not persist (POST) for normal inputs assert not persist_called assert update_called + + +class TestPatchMissingMetadata: + """Tests for `_patch_missing_metadata` and tracer metadata behavior.""" + + @staticmethod + def _make_tracer( + metadata: dict[str, str] | None = None, + ) -> LangChainTracer: + client = unittest.mock.MagicMock(spec=Client) + client.tracing_queue = None + return LangChainTracer(client=client, metadata=metadata) + + @staticmethod + def _make_run( + metadata: dict[str, Any] | None = None, + ) -> Run: + return Run( + id=uuid.uuid4(), + name="test", + inputs={}, + run_type="chain", + extra={"metadata": metadata or {}}, + ) + + def test_adds_metadata_when_run_has_none(self) -> None: + """Tracer metadata fills in when the run has no matching keys.""" + tracer = self._make_tracer(metadata={"env": "prod", "service": "api"}) + run = self._make_run() + + _patch_missing_metadata(tracer, run) + + assert run.metadata["env"] == "prod" + assert run.metadata["service"] == "api" + + def test_does_not_overwrite_existing_keys(self) -> None: + """Config metadata takes precedence over tracer metadata.""" + tracer = self._make_tracer(metadata={"env": "prod", "service": "api"}) + run = self._make_run(metadata={"env": "staging"}) + + _patch_missing_metadata(tracer, run) + + assert run.metadata["env"] == "staging" + assert run.metadata["service"] == "api" + + def test_noop_when_tracer_has_no_metadata(self) -> None: + """No-op when the tracer has no metadata configured.""" + tracer = self._make_tracer(metadata=None) + run = self._make_run(metadata={"existing": "value"}) + + _patch_missing_metadata(tracer, run) + + assert run.metadata == {"existing": "value"} + + def test_noop_when_all_keys_already_present(self) -> None: + """No-op when every tracer key already exists in the run.""" + tracer = self._make_tracer(metadata={"env": "prod"}) + run = self._make_run(metadata={"env": "dev"}) + + _patch_missing_metadata(tracer, run) + + assert run.metadata == {"env": "dev"} + + def test_merges_disjoint_keys(self) -> None: + """Disjoint keys from tracer and config are all present after patching.""" + tracer = self._make_tracer(metadata={"tracer_key": "tracer_val"}) + run = self._make_run(metadata={"config_key": "config_val"}) + + _patch_missing_metadata(tracer, run) + + assert run.metadata == { + "tracer_key": "tracer_val", + "config_key": "config_val", + } + + def test_persist_run_single_applies_tracer_metadata(self) -> None: + """End-to-end: `_persist_run_single` calls `_patch_missing_metadata`.""" + tracer = self._make_tracer(metadata={"env": "prod"}) + run_id = UUID("9d878ab3-e5ca-4218-aef6-44cbdc90160a") + tracer.on_chain_start( + {"name": "test_chain"}, + {"input": "hello"}, + run_id=run_id, + ) + run = tracer.run_map[str(run_id)] + + with unittest.mock.patch.object(Run, "post"): + tracer._persist_run_single(run) + + assert run.metadata.get("env") == "prod" + + def test_persist_run_single_config_metadata_wins(self) -> None: + """Config metadata is not overwritten by tracer metadata during persist.""" + tracer = self._make_tracer(metadata={"env": "prod", "extra": "from_tracer"}) + run_id = UUID("9d878ab3-e5ca-4218-aef6-44cbdc90160b") + tracer.on_chain_start( + {"name": "test_chain"}, + {"input": "hello"}, + run_id=run_id, + metadata={"env": "staging"}, + ) + run = tracer.run_map[str(run_id)] + + with unittest.mock.patch.object(Run, "post"): + tracer._persist_run_single(run) + + assert run.metadata["env"] == "staging" + assert run.metadata["extra"] == "from_tracer" + + +class TestTracerMetadataCloning: + """Tests for LangChainTracer metadata cloning helpers.""" + + @staticmethod + def _make_tracer( + metadata: dict[str, str] | None = None, + ) -> LangChainTracer: + client = unittest.mock.MagicMock(spec=Client) + client.tracing_queue = None + return LangChainTracer(client=client, metadata=metadata) + + def test_copy_with_metadata_defaults_copies_configuration(self) -> None: + """Copied tracer keeps stable configuration but not identity.""" + tracer = self._make_tracer(metadata={"env": "staging"}) + tracer.project_name = "project" + tracer.tags = ["tag"] + + copied = tracer.copy_with_metadata_defaults(metadata={"service": "api"}) + + assert copied is not tracer + assert copied.client is tracer.client + assert copied.project_name == "project" + assert copied.tags == ["tag"] + assert copied.tags is tracer.tags + assert copied.tracing_metadata == {"env": "staging", "service": "api"} + assert copied.run_map is tracer.run_map + assert copied.order_map is tracer.order_map + assert copied.run_has_token_event_map == {} + + def test_copy_with_metadata_defaults_does_not_mutate_original(self) -> None: + """Metadata-default cloning leaves the source tracer unchanged.""" + tracer = self._make_tracer(metadata={"env": "staging"}) + + copied = tracer.copy_with_metadata_defaults(metadata={"service": "api"}) + + assert tracer.tracing_metadata == {"env": "staging"} + assert copied.tracing_metadata == {"env": "staging", "service": "api"} + + def test_copy_with_metadata_defaults_none_preserves_configuration(self) -> None: + """Copying without new metadata preserves metadata and shared run state.""" + tracer = self._make_tracer(metadata={"env": "staging"}) + copied = tracer.copy_with_metadata_defaults(metadata=None) + + assert copied is not tracer + assert copied.tracing_metadata == {"env": "staging"} + assert copied.run_map is tracer.run_map + assert copied.order_map is tracer.order_map + + def test_copy_with_metadata_defaults_threadsafe(self) -> None: + """Concurrent metadata-default copies do not mutate each other or the source.""" + tracer = self._make_tracer(metadata={"env": "staging"}) + + def copy_for_service(service: str) -> dict[str, str]: + copied = tracer.copy_with_metadata_defaults(metadata={"service": service}) + assert copied is not tracer + return copied.tracing_metadata or {} + + with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: + metadata_values = list(executor.map(copy_for_service, ["api", "worker"])) + + assert tracer.tracing_metadata == {"env": "staging"} + assert {metadata["service"] for metadata in metadata_values} == { + "api", + "worker", + } + assert all(metadata["env"] == "staging" for metadata in metadata_values) + + def test_copy_with_metadata_defaults_threadsafe_with_existing_shared_state( + self, + ) -> None: + """Concurrent copies preserve pre-populated shared run state.""" + tracer = self._make_tracer(metadata={"env": "staging"}) + run_id = uuid.uuid4() + tracer.run_map["existing"] = unittest.mock.MagicMock() + tracer.order_map[run_id] = (run_id, f"prefix.{run_id}") + + def copy_for_service(service: str) -> LangChainTracer: + copied = tracer.copy_with_metadata_defaults(metadata={"service": service}) + assert copied.run_map is tracer.run_map + assert copied.order_map is tracer.order_map + return copied + + with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: + copied_tracers = list(executor.map(copy_for_service, ["api", "worker"])) + + assert tracer.run_map.keys() == {"existing"} + assert tracer.order_map == {run_id: (run_id, f"prefix.{run_id}")} + copied_services = { + copied.tracing_metadata["service"] + for copied in copied_tracers + if copied.tracing_metadata is not None + } + assert copied_services == {"api", "worker"} diff --git a/libs/langchain_v1/Makefile b/libs/langchain_v1/Makefile index cb14bbaba32..9367f4422ee 100644 --- a/libs/langchain_v1/Makefile +++ b/libs/langchain_v1/Makefile @@ -38,13 +38,13 @@ coverage_agents: --cov-report=html:htmlcov \ test: - make start_services && LANGGRAPH_TEST_FAST=0 uv run --no-sync --active --group test pytest -n auto $(PYTEST_EXTRA) --disable-socket --allow-unix-socket $(TEST_FILE) --cov-report term-missing:skip-covered --snapshot-update; \ + make start_services && LANGGRAPH_TEST_FAST=0 uv run --no-sync --active --group test pytest -n auto $(PYTEST_EXTRA) --benchmark-disable --disable-socket --allow-unix-socket $(TEST_FILE) --cov-report term-missing:skip-covered --snapshot-update; \ EXIT_CODE=$$?; \ make stop_services; \ exit $$EXIT_CODE test_fast: - LANGGRAPH_TEST_FAST=1 uv run --group test pytest -n auto $(PYTEST_EXTRA) --disable-socket --allow-unix-socket $(TEST_FILE) + LANGGRAPH_TEST_FAST=1 uv run --group test pytest -n auto $(PYTEST_EXTRA) --benchmark-disable --disable-socket --allow-unix-socket $(TEST_FILE) benchmark: uv run --group test pytest tests/benchmarks/test_create_agent.py -m benchmark diff --git a/libs/langchain_v1/tests/unit_tests/chat_models/test_chat_models.py b/libs/langchain_v1/tests/unit_tests/chat_models/test_chat_models.py index e51f8d3d5d8..590bad566ba 100644 --- a/libs/langchain_v1/tests/unit_tests/chat_models/test_chat_models.py +++ b/libs/langchain_v1/tests/unit_tests/chat_models/test_chat_models.py @@ -1,5 +1,5 @@ import os -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any from unittest import mock import pytest @@ -171,7 +171,7 @@ def test_configurable() -> None: for method in ("get_num_tokens", "get_num_tokens_from_messages"): assert hasattr(model_with_config, method) - assert model_with_config.model_dump() == { # type: ignore[attr-defined] + expected: dict[str, Any] = { "name": None, "bound": { "name": None, @@ -234,6 +234,7 @@ def test_configurable() -> None: "custom_input_type": None, "custom_output_type": None, } + assert model_with_config.model_dump() == expected # type: ignore[attr-defined] @pytest.mark.requires("langchain_openai", "langchain_anthropic") @@ -297,7 +298,7 @@ def test_configurable_with_default() -> None: assert model_with_config.model == "claude-sonnet-4-5-20250929" # type: ignore[attr-defined] - assert model_with_config.model_dump() == { # type: ignore[attr-defined] + expected: dict[str, Any] = { "name": None, "bound": { "name": None, @@ -332,7 +333,7 @@ def test_configurable_with_default() -> None: "config": { "callbacks": None, "configurable": {}, - "metadata": {"bar_model": "claude-sonnet-4-5-20250929"}, + "metadata": {}, "recursion_limit": 25, "tags": ["foo"], }, @@ -340,6 +341,7 @@ def test_configurable_with_default() -> None: "custom_input_type": None, "custom_output_type": None, } + assert model_with_config.model_dump() == expected # type: ignore[attr-defined] prompt = ChatPromptTemplate.from_messages([("system", "foo")]) chain = prompt | model_with_config assert isinstance(chain, RunnableSequence)