core docstrings tracers update (#24211)

Added missed docstrings. Formatted docstrings to the consistent form.
This commit is contained in:
Leonid Ganeline 2024-07-15 08:37:09 -07:00 committed by GitHub
parent 36ee083753
commit cacdf96f9c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 423 additions and 61 deletions

View File

@ -62,7 +62,21 @@ class BaseTracer(_TracerCore, BaseCallbackHandler, ABC):
name: Optional[str] = None, name: Optional[str] = None,
**kwargs: Any, **kwargs: Any,
) -> Run: ) -> Run:
"""Start a trace for an LLM run.""" """Start a trace for an LLM run.
Args:
serialized: The serialized model.
messages: The messages to start the chat with.
run_id: The run ID.
tags: The tags for the run. Defaults to None.
parent_run_id: The parent run ID. Defaults to None.
metadata: The metadata for the run. Defaults to None.
name: The name of the run.
**kwargs: Additional arguments.
Returns:
The run.
"""
chat_model_run = self._create_chat_model_run( chat_model_run = self._create_chat_model_run(
serialized=serialized, serialized=serialized,
messages=messages, messages=messages,
@ -89,7 +103,21 @@ class BaseTracer(_TracerCore, BaseCallbackHandler, ABC):
name: Optional[str] = None, name: Optional[str] = None,
**kwargs: Any, **kwargs: Any,
) -> Run: ) -> Run:
"""Start a trace for an LLM run.""" """Start a trace for an LLM run.
Args:
serialized: The serialized model.
prompts: The prompts to start the LLM with.
run_id: The run ID.
tags: The tags for the run. Defaults to None.
parent_run_id: The parent run ID. Defaults to None.
metadata: The metadata for the run. Defaults to None.
name: The name of the run.
**kwargs: Additional arguments.
Returns:
The run.
"""
llm_run = self._create_llm_run( llm_run = self._create_llm_run(
serialized=serialized, serialized=serialized,
prompts=prompts, prompts=prompts,
@ -113,7 +141,18 @@ class BaseTracer(_TracerCore, BaseCallbackHandler, ABC):
parent_run_id: Optional[UUID] = None, parent_run_id: Optional[UUID] = None,
**kwargs: Any, **kwargs: Any,
) -> Run: ) -> Run:
"""Run on new LLM token. Only available when streaming is enabled.""" """Run on new LLM token. Only available when streaming is enabled.
Args:
token: The token.
chunk: The chunk. Defaults to None.
run_id: The run ID.
parent_run_id: The parent run ID. Defaults to None.
**kwargs: Additional arguments.
Returns:
The run.
"""
# "chat_model" is only used for the experimental new streaming_events format. # "chat_model" is only used for the experimental new streaming_events format.
# This change should not affect any existing tracers. # This change should not affect any existing tracers.
llm_run = self._llm_run_with_token_event( llm_run = self._llm_run_with_token_event(
@ -133,6 +172,16 @@ class BaseTracer(_TracerCore, BaseCallbackHandler, ABC):
run_id: UUID, run_id: UUID,
**kwargs: Any, **kwargs: Any,
) -> Run: ) -> Run:
"""Run on retry.
Args:
retry_state: The retry state.
run_id: The run ID.
**kwargs: Additional arguments.
Returns:
The run.
"""
llm_run = self._llm_run_with_retry_event( llm_run = self._llm_run_with_retry_event(
retry_state=retry_state, retry_state=retry_state,
run_id=run_id, run_id=run_id,
@ -140,7 +189,16 @@ class BaseTracer(_TracerCore, BaseCallbackHandler, ABC):
return llm_run return llm_run
def on_llm_end(self, response: LLMResult, *, run_id: UUID, **kwargs: Any) -> Run: def on_llm_end(self, response: LLMResult, *, run_id: UUID, **kwargs: Any) -> Run:
"""End a trace for an LLM run.""" """End a trace for an LLM run.
Args:
response: The response.
run_id: The run ID.
**kwargs: Additional arguments.
Returns:
The run.
"""
# "chat_model" is only used for the experimental new streaming_events format. # "chat_model" is only used for the experimental new streaming_events format.
# This change should not affect any existing tracers. # This change should not affect any existing tracers.
llm_run = self._complete_llm_run( llm_run = self._complete_llm_run(
@ -158,7 +216,16 @@ class BaseTracer(_TracerCore, BaseCallbackHandler, ABC):
run_id: UUID, run_id: UUID,
**kwargs: Any, **kwargs: Any,
) -> Run: ) -> Run:
"""Handle an error for an LLM run.""" """Handle an error for an LLM run.
Args:
error: The error.
run_id: The run ID.
**kwargs: Additional arguments.
Returns:
The run.
"""
# "chat_model" is only used for the experimental new streaming_events format. # "chat_model" is only used for the experimental new streaming_events format.
# This change should not affect any existing tracers. # This change should not affect any existing tracers.
llm_run = self._errored_llm_run( llm_run = self._errored_llm_run(
@ -182,7 +249,22 @@ class BaseTracer(_TracerCore, BaseCallbackHandler, ABC):
name: Optional[str] = None, name: Optional[str] = None,
**kwargs: Any, **kwargs: Any,
) -> Run: ) -> Run:
"""Start a trace for a chain run.""" """Start a trace for a chain run.
Args:
serialized: The serialized chain.
inputs: The inputs for the chain.
run_id: The run ID.
tags: The tags for the run. Defaults to None.
parent_run_id: The parent run ID. Defaults to None.
metadata: The metadata for the run. Defaults to None.
run_type: The type of the run. Defaults to None.
name: The name of the run.
**kwargs: Additional arguments.
Returns:
The run.
"""
chain_run = self._create_chain_run( chain_run = self._create_chain_run(
serialized=serialized, serialized=serialized,
inputs=inputs, inputs=inputs,
@ -206,7 +288,17 @@ class BaseTracer(_TracerCore, BaseCallbackHandler, ABC):
inputs: Optional[Dict[str, Any]] = None, inputs: Optional[Dict[str, Any]] = None,
**kwargs: Any, **kwargs: Any,
) -> Run: ) -> Run:
"""End a trace for a chain run.""" """End a trace for a chain run.
Args:
outputs: The outputs for the chain.
run_id: The run ID.
inputs: The inputs for the chain. Defaults to None.
**kwargs: Additional arguments.
Returns:
The run.
"""
chain_run = self._complete_chain_run( chain_run = self._complete_chain_run(
outputs=outputs, outputs=outputs,
run_id=run_id, run_id=run_id,
@ -225,7 +317,17 @@ class BaseTracer(_TracerCore, BaseCallbackHandler, ABC):
run_id: UUID, run_id: UUID,
**kwargs: Any, **kwargs: Any,
) -> Run: ) -> Run:
"""Handle an error for a chain run.""" """Handle an error for a chain run.
Args:
error: The error.
inputs: The inputs for the chain. Defaults to None.
run_id: The run ID.
**kwargs: Additional arguments.
Returns:
The run.
"""
chain_run = self._errored_chain_run( chain_run = self._errored_chain_run(
error=error, error=error,
run_id=run_id, run_id=run_id,
@ -249,7 +351,22 @@ class BaseTracer(_TracerCore, BaseCallbackHandler, ABC):
inputs: Optional[Dict[str, Any]] = None, inputs: Optional[Dict[str, Any]] = None,
**kwargs: Any, **kwargs: Any,
) -> Run: ) -> Run:
"""Start a trace for a tool run.""" """Start a trace for a tool run.
Args:
serialized: The serialized tool.
input_str: The input string.
run_id: The run ID.
tags: The tags for the run. Defaults to None.
parent_run_id: The parent run ID. Defaults to None.
metadata: The metadata for the run. Defaults to None.
name: The name of the run.
inputs: The inputs for the tool.
**kwargs: Additional arguments.
Returns:
The run.
"""
tool_run = self._create_tool_run( tool_run = self._create_tool_run(
serialized=serialized, serialized=serialized,
input_str=input_str, input_str=input_str,
@ -266,7 +383,16 @@ class BaseTracer(_TracerCore, BaseCallbackHandler, ABC):
return tool_run return tool_run
def on_tool_end(self, output: Any, *, run_id: UUID, **kwargs: Any) -> Run: def on_tool_end(self, output: Any, *, run_id: UUID, **kwargs: Any) -> Run:
"""End a trace for a tool run.""" """End a trace for a tool run.
Args:
output: The output for the tool.
run_id: The run ID.
**kwargs: Additional arguments.
Returns:
The run.
"""
tool_run = self._complete_tool_run( tool_run = self._complete_tool_run(
output=output, output=output,
run_id=run_id, run_id=run_id,
@ -283,7 +409,16 @@ class BaseTracer(_TracerCore, BaseCallbackHandler, ABC):
run_id: UUID, run_id: UUID,
**kwargs: Any, **kwargs: Any,
) -> Run: ) -> Run:
"""Handle an error for a tool run.""" """Handle an error for a tool run.
Args:
error: The error.
run_id: The run ID.
**kwargs: Additional arguments.
Returns:
The run.
"""
tool_run = self._errored_tool_run( tool_run = self._errored_tool_run(
error=error, error=error,
run_id=run_id, run_id=run_id,
@ -304,7 +439,21 @@ class BaseTracer(_TracerCore, BaseCallbackHandler, ABC):
name: Optional[str] = None, name: Optional[str] = None,
**kwargs: Any, **kwargs: Any,
) -> Run: ) -> Run:
"""Run when Retriever starts running.""" """Run when the Retriever starts running.
Args:
serialized: The serialized retriever.
query: The query.
run_id: The run ID.
parent_run_id: The parent run ID. Defaults to None.
tags: The tags for the run. Defaults to None.
metadata: The metadata for the run. Defaults to None.
name: The name of the run.
**kwargs: Additional arguments.
Returns:
The run.
"""
retrieval_run = self._create_retrieval_run( retrieval_run = self._create_retrieval_run(
serialized=serialized, serialized=serialized,
query=query, query=query,
@ -326,7 +475,16 @@ class BaseTracer(_TracerCore, BaseCallbackHandler, ABC):
run_id: UUID, run_id: UUID,
**kwargs: Any, **kwargs: Any,
) -> Run: ) -> Run:
"""Run when Retriever errors.""" """Run when Retriever errors.
Args:
error: The error.
run_id: The run ID.
**kwargs: Additional arguments.
Returns:
The run.
"""
retrieval_run = self._errored_retrieval_run( retrieval_run = self._errored_retrieval_run(
error=error, error=error,
run_id=run_id, run_id=run_id,
@ -339,7 +497,16 @@ class BaseTracer(_TracerCore, BaseCallbackHandler, ABC):
def on_retriever_end( def on_retriever_end(
self, documents: Sequence[Document], *, run_id: UUID, **kwargs: Any self, documents: Sequence[Document], *, run_id: UUID, **kwargs: Any
) -> Run: ) -> Run:
"""Run when Retriever ends running.""" """Run when the Retriever ends running.
Args:
documents: The documents.
run_id: The run ID.
**kwargs: Additional arguments.
Returns:
The run.
"""
retrieval_run = self._complete_retrieval_run( retrieval_run = self._complete_retrieval_run(
documents=documents, documents=documents,
run_id=run_id, run_id=run_id,

View File

@ -68,8 +68,8 @@ def tracing_v2_enabled(
client (LangSmithClient, optional): The client of the langsmith. client (LangSmithClient, optional): The client of the langsmith.
Defaults to None. Defaults to None.
Returns: Yields:
None LangChainTracer: The LangChain tracer.
Example: Example:
>>> with tracing_v2_enabled(): >>> with tracing_v2_enabled():
@ -100,7 +100,7 @@ def tracing_v2_enabled(
def collect_runs() -> Generator[RunCollectorCallbackHandler, None, None]: def collect_runs() -> Generator[RunCollectorCallbackHandler, None, None]:
"""Collect all run traces in context. """Collect all run traces in context.
Returns: Yields:
run_collector.RunCollectorCallbackHandler: The run collector callback handler. run_collector.RunCollectorCallbackHandler: The run collector callback handler.
Example: Example:

View File

@ -46,7 +46,8 @@ SCHEMA_FORMAT_TYPE = Literal["original", "streaming_events"]
class _TracerCore(ABC): class _TracerCore(ABC):
""" """
Abstract base class for tracers Abstract base class for tracers.
This class provides common methods, and reusable methods for tracers. This class provides common methods, and reusable methods for tracers.
""" """
@ -65,6 +66,7 @@ class _TracerCore(ABC):
Args: Args:
_schema_format: Primarily changes how the inputs and outputs are _schema_format: Primarily changes how the inputs and outputs are
handled. For internal use only. This API will change. handled. For internal use only. This API will change.
- 'original' is the format used by all current tracers. - 'original' is the format used by all current tracers.
This format is slightly inconsistent with respect to inputs This format is slightly inconsistent with respect to inputs
and outputs. and outputs.
@ -75,7 +77,7 @@ class _TracerCore(ABC):
- 'original+chat' is a format that is the same as 'original' - 'original+chat' is a format that is the same as 'original'
except it does NOT raise an attribute error on_chat_model_start except it does NOT raise an attribute error on_chat_model_start
kwargs: Additional keyword arguments that will be passed to kwargs: Additional keyword arguments that will be passed to
the super class. the superclass.
""" """
super().__init__(**kwargs) super().__init__(**kwargs)
self._schema_format = _schema_format # For internal use only API will change. self._schema_format = _schema_format # For internal use only API will change.
@ -207,7 +209,7 @@ class _TracerCore(ABC):
name: Optional[str] = None, name: Optional[str] = None,
**kwargs: Any, **kwargs: Any,
) -> Run: ) -> Run:
"""Create a llm run""" """Create a llm run."""
start_time = datetime.now(timezone.utc) start_time = datetime.now(timezone.utc)
if metadata: if metadata:
kwargs.update({"metadata": metadata}) kwargs.update({"metadata": metadata})
@ -234,7 +236,7 @@ class _TracerCore(ABC):
**kwargs: Any, **kwargs: Any,
) -> Run: ) -> Run:
""" """
Append token event to LLM run and return the run Append token event to LLM run and return the run.
""" """
llm_run = self._get_run(run_id, run_type={"llm", "chat_model"}) llm_run = self._get_run(run_id, run_type={"llm", "chat_model"})
event_kwargs: Dict[str, Any] = {"token": token} event_kwargs: Dict[str, Any] = {"token": token}
@ -314,7 +316,7 @@ class _TracerCore(ABC):
name: Optional[str] = None, name: Optional[str] = None,
**kwargs: Any, **kwargs: Any,
) -> Run: ) -> Run:
"""Create a chain Run""" """Create a chain Run."""
start_time = datetime.now(timezone.utc) start_time = datetime.now(timezone.utc)
if metadata: if metadata:
kwargs.update({"metadata": metadata}) kwargs.update({"metadata": metadata})

View File

@ -104,7 +104,7 @@ class EvaluatorCallbackHandler(BaseTracer):
def _evaluate_in_project(self, run: Run, evaluator: langsmith.RunEvaluator) -> None: def _evaluate_in_project(self, run: Run, evaluator: langsmith.RunEvaluator) -> None:
"""Evaluate the run in the project. """Evaluate the run in the project.
Parameters Args:
---------- ----------
run : Run run : Run
The run to be evaluated. The run to be evaluated.
@ -200,7 +200,7 @@ class EvaluatorCallbackHandler(BaseTracer):
def _persist_run(self, run: Run) -> None: def _persist_run(self, run: Run) -> None:
"""Run the evaluator on the run. """Run the evaluator on the run.
Parameters Args:
---------- ----------
run : Run run : Run
The run to be evaluated. The run to be evaluated.

View File

@ -52,7 +52,18 @@ logger = logging.getLogger(__name__)
class RunInfo(TypedDict): class RunInfo(TypedDict):
"""Information about a run.""" """Information about a run.
This is used to keep track of the metadata associated with a run.
Parameters:
name: The name of the run.
tags: The tags associated with the run.
metadata: The metadata associated with the run.
run_type: The type of the run.
inputs: The inputs to the run.
parent_run_id: The ID of the parent run.
"""
name: str name: str
tags: List[str] tags: List[str]
@ -150,7 +161,19 @@ class _AstreamEventsCallbackHandler(AsyncCallbackHandler, _StreamingCallbackHand
async def tap_output_aiter( async def tap_output_aiter(
self, run_id: UUID, output: AsyncIterator[T] self, run_id: UUID, output: AsyncIterator[T]
) -> AsyncIterator[T]: ) -> AsyncIterator[T]:
"""Tap the output aiter.""" """Tap the output aiter.
This method is used to tap the output of a Runnable that produces
an async iterator. It is used to generate stream events for the
output of the Runnable.
Args:
run_id: The ID of the run.
output: The output of the Runnable.
Yields:
T: The output of the Runnable.
"""
sentinel = object() sentinel = object()
# atomic check and set # atomic check and set
tap = self.is_tapped.setdefault(run_id, sentinel) tap = self.is_tapped.setdefault(run_id, sentinel)
@ -192,7 +215,15 @@ class _AstreamEventsCallbackHandler(AsyncCallbackHandler, _StreamingCallbackHand
yield chunk yield chunk
def tap_output_iter(self, run_id: UUID, output: Iterator[T]) -> Iterator[T]: def tap_output_iter(self, run_id: UUID, output: Iterator[T]) -> Iterator[T]:
"""Tap the output aiter.""" """Tap the output aiter.
Args:
run_id: The ID of the run.
output: The output of the Runnable.
Yields:
T: The output of the Runnable.
"""
sentinel = object() sentinel = object()
# atomic check and set # atomic check and set
tap = self.is_tapped.setdefault(run_id, sentinel) tap = self.is_tapped.setdefault(run_id, sentinel)

View File

@ -32,7 +32,12 @@ _EXECUTOR: Optional[ThreadPoolExecutor] = None
def log_error_once(method: str, exception: Exception) -> None: def log_error_once(method: str, exception: Exception) -> None:
"""Log an error once.""" """Log an error once.
Args:
method: The method that raised the exception.
exception: The exception that was raised.
"""
global _LOGGED global _LOGGED
if (method, type(exception)) in _LOGGED: if (method, type(exception)) in _LOGGED:
return return
@ -82,7 +87,15 @@ class LangChainTracer(BaseTracer):
tags: Optional[List[str]] = None, tags: Optional[List[str]] = None,
**kwargs: Any, **kwargs: Any,
) -> None: ) -> None:
"""Initialize the LangChain tracer.""" """Initialize the LangChain tracer.
Args:
example_id: The example ID.
project_name: The project name. Defaults to the tracer project.
client: The client. Defaults to the global client.
tags: The tags. Defaults to an empty list.
**kwargs: Additional keyword arguments.
"""
super().__init__(**kwargs) super().__init__(**kwargs)
self.example_id = ( self.example_id = (
UUID(example_id) if isinstance(example_id, str) else example_id UUID(example_id) if isinstance(example_id, str) else example_id
@ -104,7 +117,21 @@ class LangChainTracer(BaseTracer):
name: Optional[str] = None, name: Optional[str] = None,
**kwargs: Any, **kwargs: Any,
) -> Run: ) -> Run:
"""Start a trace for an LLM run.""" """Start a trace for an LLM run.
Args:
serialized: The serialized model.
messages: The messages.
run_id: The run ID.
tags: The tags. Defaults to None.
parent_run_id: The parent run ID. Defaults to None.
metadata: The metadata. Defaults to None.
name: The name. Defaults to None.
**kwargs: Additional keyword arguments.
Returns:
Run: The run.
"""
start_time = datetime.now(timezone.utc) start_time = datetime.now(timezone.utc)
if metadata: if metadata:
kwargs.update({"metadata": metadata}) kwargs.update({"metadata": metadata})
@ -130,7 +157,15 @@ class LangChainTracer(BaseTracer):
self.latest_run = run_ self.latest_run = run_
def get_run_url(self) -> str: def get_run_url(self) -> str:
"""Get the LangSmith root run URL""" """Get the LangSmith root run URL.
Returns:
str: The LangSmith root run URL.
Raises:
ValueError: If no traced run is found.
ValueError: If the run URL cannot be found.
"""
if not self.latest_run: if not self.latest_run:
raise ValueError("No traced run found.") raise ValueError("No traced run found.")
# If this is the first run in a project, the project may not yet be created. # If this is the first run in a project, the project may not yet be created.

View File

@ -195,6 +195,9 @@ class LogStreamCallbackHandler(BaseTracer, _StreamingCallbackHandler):
for internal usage. It will likely change in the future, or for internal usage. It will likely change in the future, or
be deprecated entirely in favor of a dedicated async tracer be deprecated entirely in favor of a dedicated async tracer
for streaming events. for streaming events.
Raises:
ValueError: If an invalid schema format is provided (internal use only).
""" """
if _schema_format not in {"original", "streaming_events"}: if _schema_format not in {"original", "streaming_events"}:
raise ValueError( raise ValueError(
@ -224,7 +227,15 @@ class LogStreamCallbackHandler(BaseTracer, _StreamingCallbackHandler):
return self.receive_stream.__aiter__() return self.receive_stream.__aiter__()
def send(self, *ops: Dict[str, Any]) -> bool: def send(self, *ops: Dict[str, Any]) -> bool:
"""Send a patch to the stream, return False if the stream is closed.""" """Send a patch to the stream, return False if the stream is closed.
Args:
*ops: The operations to send to the stream.
Returns:
bool: True if the patch was sent successfully, False if the stream
is closed.
"""
# We will likely want to wrap this in try / except at some point # We will likely want to wrap this in try / except at some point
# to handle exceptions that might arise at run time. # to handle exceptions that might arise at run time.
# For now we'll let the exception bubble up, and always return # For now we'll let the exception bubble up, and always return
@ -235,7 +246,15 @@ class LogStreamCallbackHandler(BaseTracer, _StreamingCallbackHandler):
async def tap_output_aiter( async def tap_output_aiter(
self, run_id: UUID, output: AsyncIterator[T] self, run_id: UUID, output: AsyncIterator[T]
) -> AsyncIterator[T]: ) -> AsyncIterator[T]:
"""Tap an output async iterator to stream its values to the log.""" """Tap an output async iterator to stream its values to the log.
Args:
run_id: The ID of the run.
output: The output async iterator.
Yields:
T: The output value.
"""
async for chunk in output: async for chunk in output:
# root run is handled in .astream_log() # root run is handled in .astream_log()
if run_id != self.root_id: if run_id != self.root_id:
@ -254,7 +273,15 @@ class LogStreamCallbackHandler(BaseTracer, _StreamingCallbackHandler):
yield chunk yield chunk
def tap_output_iter(self, run_id: UUID, output: Iterator[T]) -> Iterator[T]: def tap_output_iter(self, run_id: UUID, output: Iterator[T]) -> Iterator[T]:
"""Tap an output async iterator to stream its values to the log.""" """Tap an output async iterator to stream its values to the log.
Args:
run_id: The ID of the run.
output: The output iterator.
Yields:
T: The output value.
"""
for chunk in output: for chunk in output:
# root run is handled in .astream_log() # root run is handled in .astream_log()
if run_id != self.root_id: if run_id != self.root_id:
@ -273,6 +300,14 @@ class LogStreamCallbackHandler(BaseTracer, _StreamingCallbackHandler):
yield chunk yield chunk
def include_run(self, run: Run) -> bool: def include_run(self, run: Run) -> bool:
"""Check if a Run should be included in the log.
Args:
run: The Run to check.
Returns:
bool: True if the run should be included, False otherwise.
"""
if run.id == self.root_id: if run.id == self.root_id:
return False return False
@ -454,7 +489,7 @@ def _get_standardized_inputs(
Returns: Returns:
Valid inputs are only dict. By conventions, inputs always represented Valid inputs are only dict. By conventions, inputs always represented
invocation using named arguments. invocation using named arguments.
A None means that the input is not yet known! None means that the input is not yet known!
""" """
if schema_format == "original": if schema_format == "original":
raise NotImplementedError( raise NotImplementedError(

View File

@ -33,11 +33,27 @@ class _SendStream(Generic[T]):
self._done = done self._done = done
async def send(self, item: T) -> None: async def send(self, item: T) -> None:
"""Schedule the item to be written to the queue using the original loop.""" """Schedule the item to be written to the queue using the original loop.
This is a coroutine that can be awaited.
Args:
item: The item to write to the queue.
"""
return self.send_nowait(item) return self.send_nowait(item)
def send_nowait(self, item: T) -> None: def send_nowait(self, item: T) -> None:
"""Schedule the item to be written to the queue using the original loop.""" """Schedule the item to be written to the queue using the original loop.
This is a non-blocking call.
Args:
item: The item to write to the queue.
Raises:
RuntimeError: If the event loop is already closed when trying to write
to the queue.
"""
try: try:
self._reader_loop.call_soon_threadsafe(self._queue.put_nowait, item) self._reader_loop.call_soon_threadsafe(self._queue.put_nowait, item)
except RuntimeError: except RuntimeError:
@ -45,11 +61,18 @@ class _SendStream(Generic[T]):
raise # Raise the exception if the loop is not closed raise # Raise the exception if the loop is not closed
async def aclose(self) -> None: async def aclose(self) -> None:
"""Schedule the done object write the queue using the original loop.""" """Async schedule the done object write the queue using the original loop."""
return self.close() return self.close()
def close(self) -> None: def close(self) -> None:
"""Schedule the done object write the queue using the original loop.""" """Schedule the done object write the queue using the original loop.
This is a non-blocking call.
Raises:
RuntimeError: If the event loop is already closed when trying to write
to the queue.
"""
try: try:
self._reader_loop.call_soon_threadsafe(self._queue.put_nowait, self._done) self._reader_loop.call_soon_threadsafe(self._queue.put_nowait, self._done)
except RuntimeError: except RuntimeError:
@ -87,7 +110,7 @@ class _MemoryStream(Generic[T]):
This implementation is meant to be used with a single writer and a single reader. This implementation is meant to be used with a single writer and a single reader.
This is an internal implementation to LangChain please do not use it directly. This is an internal implementation to LangChain. Please do not use it directly.
""" """
def __init__(self, loop: AbstractEventLoop) -> None: def __init__(self, loop: AbstractEventLoop) -> None:
@ -103,11 +126,19 @@ class _MemoryStream(Generic[T]):
self._done = object() self._done = object()
def get_send_stream(self) -> _SendStream[T]: def get_send_stream(self) -> _SendStream[T]:
"""Get a writer for the channel.""" """Get a writer for the channel.
Returns:
_SendStream: The writer for the channel.
"""
return _SendStream[T]( return _SendStream[T](
reader_loop=self._loop, queue=self._queue, done=self._done reader_loop=self._loop, queue=self._queue, done=self._done
) )
def get_receive_stream(self) -> _ReceiveStream[T]: def get_receive_stream(self) -> _ReceiveStream[T]:
"""Get a reader for the channel.""" """Get a reader for the channel.
Returns:
_ReceiveStream: The reader for the channel.
"""
return _ReceiveStream[T](queue=self._queue, done=self._done) return _ReceiveStream[T](queue=self._queue, done=self._done)

View File

@ -16,7 +16,16 @@ AsyncListener = Union[
class RootListenersTracer(BaseTracer): class RootListenersTracer(BaseTracer):
"""Tracer that calls listeners on run start, end, and error.""" """Tracer that calls listeners on run start, end, and error.
Parameters:
log_missing_parent: Whether to log a warning if the parent is missing.
Default is False.
config: The runnable config.
on_start: The listener to call on run start.
on_end: The listener to call on run end.
on_error: The listener to call on run error.
"""
log_missing_parent = False log_missing_parent = False
@ -28,6 +37,14 @@ class RootListenersTracer(BaseTracer):
on_end: Optional[Listener], on_end: Optional[Listener],
on_error: Optional[Listener], on_error: Optional[Listener],
) -> None: ) -> None:
"""Initialize the tracer.
Args:
config: The runnable config.
on_start: The listener to call on run start.
on_end: The listener to call on run end.
on_error: The listener to call on run error
"""
super().__init__(_schema_format="original+chat") super().__init__(_schema_format="original+chat")
self.config = config self.config = config
@ -63,7 +80,16 @@ class RootListenersTracer(BaseTracer):
class AsyncRootListenersTracer(AsyncBaseTracer): class AsyncRootListenersTracer(AsyncBaseTracer):
"""Async Tracer that calls listeners on run start, end, and error.""" """Async Tracer that calls listeners on run start, end, and error.
Parameters:
log_missing_parent: Whether to log a warning if the parent is missing.
Default is False.
config: The runnable config.
on_start: The listener to call on run start.
on_end: The listener to call on run end.
on_error: The listener to call on run error.
"""
log_missing_parent = False log_missing_parent = False
@ -75,6 +101,14 @@ class AsyncRootListenersTracer(AsyncBaseTracer):
on_end: Optional[AsyncListener], on_end: Optional[AsyncListener],
on_error: Optional[AsyncListener], on_error: Optional[AsyncListener],
) -> None: ) -> None:
"""Initialize the tracer.
Args:
config: The runnable config.
on_start: The listener to call on run start.
on_end: The listener to call on run end.
on_error: The listener to call on run error
"""
super().__init__(_schema_format="original+chat") super().__init__(_schema_format="original+chat")
self.config = config self.config = config

View File

@ -8,13 +8,13 @@ from langchain_core.tracers.schemas import Run
class RunCollectorCallbackHandler(BaseTracer): class RunCollectorCallbackHandler(BaseTracer):
""" """Tracer that collects all nested runs in a list.
Tracer that collects all nested runs in a list.
This tracer is useful for inspection and evaluation purposes. This tracer is useful for inspection and evaluation purposes.
Parameters Parameters
---------- ----------
name : str, default="run-collector_callback_handler"
example_id : Optional[Union[UUID, str]], default=None example_id : Optional[Union[UUID, str]], default=None
The ID of the example being traced. It can be either a UUID or a string. The ID of the example being traced. It can be either a UUID or a string.
""" """
@ -31,6 +31,8 @@ class RunCollectorCallbackHandler(BaseTracer):
---------- ----------
example_id : Optional[Union[UUID, str]], default=None example_id : Optional[Union[UUID, str]], default=None
The ID of the example being traced. It can be either a UUID or a string. The ID of the example being traced. It can be either a UUID or a string.
**kwargs : Any
Additional keyword arguments
""" """
super().__init__(**kwargs) super().__init__(**kwargs)
self.example_id = ( self.example_id = (

View File

@ -112,7 +112,15 @@ class ToolRun(BaseRun):
class Run(BaseRunV2): class Run(BaseRunV2):
"""Run schema for the V2 API in the Tracer.""" """Run schema for the V2 API in the Tracer.
Parameters:
child_runs: The child runs.
tags: The tags. Default is an empty list.
events: The events. Default is an empty list.
trace_id: The trace ID. Default is None.
dotted_order: The dotted order.
"""
child_runs: List[Run] = Field(default_factory=list) child_runs: List[Run] = Field(default_factory=list)
tags: Optional[List[str]] = Field(default_factory=list) tags: Optional[List[str]] = Field(default_factory=list)

View File

@ -7,15 +7,14 @@ from langchain_core.utils.input import get_bolded_text, get_colored_text
def try_json_stringify(obj: Any, fallback: str) -> str: def try_json_stringify(obj: Any, fallback: str) -> str:
""" """Try to stringify an object to JSON.
Try to stringify an object to JSON.
Args: Args:
obj: Object to stringify. obj: Object to stringify.
fallback: Fallback string to return if the object cannot be stringified. fallback: Fallback string to return if the object cannot be stringified.
Returns: Returns:
A JSON string if the object can be stringified, otherwise the fallback string. A JSON string if the object can be stringified, otherwise the fallback string.
""" """
try: try:
return json.dumps(obj, indent=2, ensure_ascii=False) return json.dumps(obj, indent=2, ensure_ascii=False)
@ -45,6 +44,8 @@ class FunctionCallbackHandler(BaseTracer):
"""Tracer that calls a function with a single str parameter.""" """Tracer that calls a function with a single str parameter."""
name: str = "function_callback_handler" name: str = "function_callback_handler"
"""The name of the tracer. This is used to identify the tracer in the logs.
Default is "function_callback_handler"."""
def __init__(self, function: Callable[[str], None], **kwargs: Any) -> None: def __init__(self, function: Callable[[str], None], **kwargs: Any) -> None:
super().__init__(**kwargs) super().__init__(**kwargs)
@ -54,6 +55,14 @@ class FunctionCallbackHandler(BaseTracer):
pass pass
def get_parents(self, run: Run) -> List[Run]: def get_parents(self, run: Run) -> List[Run]:
"""Get the parents of a run.
Args:
run: The run to get the parents of.
Returns:
A list of parent runs.
"""
parents = [] parents = []
current_run = run current_run = run
while current_run.parent_run_id: while current_run.parent_run_id:
@ -66,6 +75,14 @@ class FunctionCallbackHandler(BaseTracer):
return parents return parents
def get_breadcrumbs(self, run: Run) -> str: def get_breadcrumbs(self, run: Run) -> str:
"""Get the breadcrumbs of a run.
Args:
run: The run to get the breadcrumbs of.
Returns:
A string with the breadcrumbs of the run.
"""
parents = self.get_parents(run)[::-1] parents = self.get_parents(run)[::-1]
string = " > ".join( string = " > ".join(
f"{parent.run_type}:{parent.name}" f"{parent.run_type}:{parent.name}"