diff --git a/libs/langchain/langchain/callbacks/tracers/evaluation.py b/libs/langchain/langchain/callbacks/tracers/evaluation.py index 877836b0a5c..5b178c84e6e 100644 --- a/libs/langchain/langchain/callbacks/tracers/evaluation.py +++ b/libs/langchain/langchain/callbacks/tracers/evaluation.py @@ -3,10 +3,11 @@ from __future__ import annotations import logging from concurrent.futures import Future, ThreadPoolExecutor, wait -from typing import Any, List, Optional, Sequence, Set, Union +from typing import Any, Dict, List, Optional, Sequence, Set, Union from uuid import UUID -from langsmith import Client, RunEvaluator +import langsmith +from langsmith import schemas as langsmith_schemas from langchain.callbacks.manager import tracing_v2_enabled from langchain.callbacks.tracers.base import BaseTracer @@ -62,13 +63,13 @@ class EvaluatorCallbackHandler(BaseTracer): The LangSmith project name to be organize eval chain runs under. """ - name: str = "evaluator_callback_handler" + name = "evaluator_callback_handler" def __init__( self, - evaluators: Sequence[RunEvaluator], + evaluators: Sequence[langsmith.RunEvaluator], max_workers: Optional[int] = None, - client: Optional[Client] = None, + client: Optional[langsmith.Client] = None, example_id: Optional[Union[UUID, str]] = None, skip_unfinished: bool = True, project_name: Optional[str] = "evaluators", @@ -86,10 +87,11 @@ class EvaluatorCallbackHandler(BaseTracer): self.futures: Set[Future] = set() self.skip_unfinished = skip_unfinished self.project_name = project_name + self.logged_feedback: Dict[str, List[langsmith_schemas.Feedback]] = {} global _TRACERS _TRACERS.append(self) - def _evaluate_in_project(self, run: Run, evaluator: RunEvaluator) -> None: + def _evaluate_in_project(self, run: Run, evaluator: langsmith.RunEvaluator) -> None: """Evaluate the run in the project. Parameters @@ -102,11 +104,11 @@ class EvaluatorCallbackHandler(BaseTracer): """ try: if self.project_name is None: - self.client.evaluate_run(run, evaluator) + feedback = self.client.evaluate_run(run, evaluator) with tracing_v2_enabled( project_name=self.project_name, tags=["eval"], client=self.client ): - self.client.evaluate_run(run, evaluator) + feedback = self.client.evaluate_run(run, evaluator) except Exception as e: logger.error( f"Error evaluating run {run.id} with " @@ -114,6 +116,8 @@ class EvaluatorCallbackHandler(BaseTracer): exc_info=True, ) raise e + example_id = str(run.reference_example_id) + self.logged_feedback.setdefault(example_id, []).append(feedback) def _persist_run(self, run: Run) -> None: """Run the evaluator on the run. diff --git a/libs/langchain/langchain/smith/evaluation/runner_utils.py b/libs/langchain/langchain/smith/evaluation/runner_utils.py index 9e06fcd65f4..438bc791400 100644 --- a/libs/langchain/langchain/smith/evaluation/runner_utils.py +++ b/libs/langchain/langchain/smith/evaluation/runner_utils.py @@ -11,6 +11,7 @@ import uuid import warnings from enum import Enum from typing import ( + TYPE_CHECKING, Any, Callable, Coroutine, @@ -44,6 +45,9 @@ from langchain.schema.runnable import Runnable, RunnableConfig, RunnableLambda from langchain.smith.evaluation.config import EvalConfig, RunEvalConfig from langchain.smith.evaluation.string_run_evaluator import StringRunEvaluatorChain +if TYPE_CHECKING: + import pandas as pd + logger = logging.getLogger(__name__) MODEL_OR_CHAIN_FACTORY = Union[ @@ -63,6 +67,31 @@ class InputFormatError(Exception): ## Shared Utilities +class TestResult(dict): + """A dictionary of the results of a single test run.""" + + def to_dataframe(self) -> pd.DataFrame: + """Convert the results to a dataframe.""" + try: + import pandas as pd + except ImportError as e: + raise ImportError( + "Pandas is required to convert the results to a dataframe." + " to install pandas, run `pip install pandas`." + ) from e + + indices = [] + records = [] + for example_id, result in self["results"].items(): + feedback = result["feedback"] + records.append( + {**{f.key: f.score for f in feedback}, "output": result["output"]} + ) + indices.append(example_id) + + return pd.DataFrame(records, index=indices) + + def _get_eval_project_url(api_url: str, project_id: str) -> str: """Get the project url from the api url.""" parsed = urlparse(api_url) @@ -667,7 +696,7 @@ async def _arun_llm_or_chain( tags: Optional[List[str]] = None, callbacks: Optional[List[BaseCallbackHandler]] = None, input_mapper: Optional[Callable[[Dict], Any]] = None, -) -> Union[List[dict], List[str], List[LLMResult], List[ChatResult]]: +) -> Union[dict, str, LLMResult, ChatResult]: """Asynchronously run the Chain or language model. Args: @@ -689,10 +718,10 @@ async def _arun_llm_or_chain( tracer.example_id = example.id else: previous_example_ids = None - outputs = [] chain_or_llm = ( "LLM" if isinstance(llm_or_chain_factory, BaseLanguageModel) else "Chain" ) + result = None try: if isinstance(llm_or_chain_factory, BaseLanguageModel): output: Any = await _arun_llm( @@ -711,15 +740,15 @@ async def _arun_llm_or_chain( callbacks=callbacks, input_mapper=input_mapper, ) - outputs.append(output) + result = output except Exception as e: logger.warning(f"{chain_or_llm} failed for example {example.id}. Error: {e}") - outputs.append({"Error": str(e)}) + result = {"Error": str(e)} if callbacks and previous_example_ids: for example_id, tracer in zip(previous_example_ids, callbacks): if hasattr(tracer, "example_id"): tracer.example_id = example_id - return outputs + return result async def _gather_with_concurrency( @@ -856,7 +885,7 @@ async def _arun_on_examples( wrapped_model, examples, evaluation, data_type ) examples = _validate_example_inputs(examples, wrapped_model, input_mapper) - results: Dict[str, List[Any]] = {} + results: Dict[str, dict] = {} async def process_example( example: Example, callbacks: List[BaseCallbackHandler], job_state: dict @@ -869,7 +898,7 @@ async def _arun_on_examples( callbacks=callbacks, input_mapper=input_mapper, ) - results[str(example.id)] = result + results[str(example.id)] = {"output": result} job_state["num_processed"] += 1 if verbose: print( @@ -890,8 +919,14 @@ async def _arun_on_examples( ), *(functools.partial(process_example, e) for e in examples), ) + all_feedback = {} for handler in evaluation_handlers: handler.wait_for_futures() + all_feedback.update(handler.logged_feedback) + # join the results and feedback on the example id + for example_id, output_dict in results.items(): + feedback = all_feedback.get(example_id, []) + output_dict["feedback"] = feedback return results @@ -978,7 +1013,7 @@ def _run_llm_or_chain( tags: Optional[List[str]] = None, callbacks: Optional[List[BaseCallbackHandler]] = None, input_mapper: Optional[Callable[[Dict], Any]] = None, -) -> Union[List[dict], List[str], List[LLMResult], List[ChatResult]]: +) -> Union[dict, str, LLMResult, ChatResult]: """ Run the Chain or language model synchronously. @@ -1001,10 +1036,10 @@ def _run_llm_or_chain( tracer.example_id = example.id else: previous_example_ids = None - outputs = [] chain_or_llm = ( "LLM" if isinstance(llm_or_chain_factory, BaseLanguageModel) else "Chain" ) + result = None try: if isinstance(llm_or_chain_factory, BaseLanguageModel): output: Any = _run_llm( @@ -1023,18 +1058,18 @@ def _run_llm_or_chain( tags=tags, input_mapper=input_mapper, ) - outputs.append(output) + result = output except Exception as e: logger.warning( f"{chain_or_llm} failed for example {example.id} with inputs:" f" {example.inputs}.\nError: {e}", ) - outputs.append({"Error": str(e)}) + result = {"Error": str(e)} if callbacks and previous_example_ids: for example_id, tracer in zip(previous_example_ids, callbacks): if hasattr(tracer, "example_id"): tracer.example_id = example_id - return outputs + return result def _run_on_examples( @@ -1075,7 +1110,7 @@ def _run_on_examples( Returns: A dictionary mapping example ids to the model outputs. """ - results: Dict[str, Any] = {} + results: Dict[str, dict] = {} wrapped_model = _wrap_in_chain_factory(llm_or_chain_factory) project_name = _get_project_name(project_name, wrapped_model) tracer = LangChainTracer( @@ -1085,11 +1120,11 @@ def _run_on_examples( wrapped_model, examples, evaluation, data_type ) examples = _validate_example_inputs(examples, wrapped_model, input_mapper) - evalution_handler = EvaluatorCallbackHandler( + evaluation_handler = EvaluatorCallbackHandler( evaluators=run_evaluators or [], client=client, ) - callbacks: List[BaseCallbackHandler] = [tracer, evalution_handler] + callbacks: List[BaseCallbackHandler] = [tracer, evaluation_handler] for i, example in enumerate(examples): result = _run_llm_or_chain( example, @@ -1100,9 +1135,14 @@ def _run_on_examples( ) if verbose: print(f"{i+1} processed", flush=True, end="\r") - results[str(example.id)] = result + results[str(example.id)] = {"output": result} tracer.wait_for_futures() - evalution_handler.wait_for_futures() + evaluation_handler.wait_for_futures() + all_feedback = evaluation_handler.logged_feedback + # join the results and feedback on the example id + for example_id, output_dict in results.items(): + feedback = all_feedback.get(example_id, []) + output_dict["feedback"] = feedback return results @@ -1276,10 +1316,10 @@ async def arun_on_dataset( input_mapper=input_mapper, data_type=dataset.data_type, ) - return { - "project_name": project_name, - "results": results, - } + return TestResult( + project_name=project_name, + results=results, + ) def _handle_coroutine(coro: Coroutine) -> Any: @@ -1461,7 +1501,7 @@ def run_on_dataset( data_type=dataset.data_type, ) results = _handle_coroutine(coro) - return { - "project_name": project_name, - "results": results, - } + return TestResult( + project_name=project_name, + results=results, + ) diff --git a/libs/langchain/tests/unit_tests/smith/evaluation/test_runner_utils.py b/libs/langchain/tests/unit_tests/smith/evaluation/test_runner_utils.py index 5c34f9032fa..914958031dc 100644 --- a/libs/langchain/tests/unit_tests/smith/evaluation/test_runner_utils.py +++ b/libs/langchain/tests/unit_tests/smith/evaluation/test_runner_utils.py @@ -182,14 +182,12 @@ def test_run_llm_or_chain_with_input_mapper() -> None: return {"the right input": inputs["the wrong input"]} result = _run_llm_or_chain(example, lambda: mock_chain, input_mapper=input_mapper) - assert len(result) == 1 - assert result[0] == {"output": "2", "the right input": "1"} + assert result == {"output": "2", "the right input": "1"} bad_result = _run_llm_or_chain( example, lambda: mock_chain, ) - assert len(bad_result) == 1 - assert "Error" in bad_result[0] + assert "Error" in bad_result # Try with LLM def llm_input_mapper(inputs: dict) -> str: @@ -197,9 +195,7 @@ def test_run_llm_or_chain_with_input_mapper() -> None: return "the right input" mock_llm = FakeLLM(queries={"the right input": "somenumber"}) - result = _run_llm_or_chain(example, mock_llm, input_mapper=llm_input_mapper) - assert len(result) == 1 - llm_result = result[0] + llm_result = _run_llm_or_chain(example, mock_llm, input_mapper=llm_input_mapper) assert isinstance(llm_result, str) assert llm_result == "somenumber" @@ -300,8 +296,8 @@ async def test_arun_on_dataset(monkeypatch: pytest.MonkeyPatch) -> None: tags: Optional[List[str]] = None, callbacks: Optional[Any] = None, **kwargs: Any, - ) -> List[Dict[str, Any]]: - return [{"result": f"Result for example {example.id}"}] + ) -> Dict[str, Any]: + return {"result": f"Result for example {example.id}"} def mock_create_project(*args: Any, **kwargs: Any) -> Any: proj = mock.MagicMock() @@ -328,9 +324,10 @@ async def test_arun_on_dataset(monkeypatch: pytest.MonkeyPatch) -> None: ) expected = { - uuid_: [ - {"result": f"Result for example {uuid.UUID(uuid_)}"} for _ in range(1) - ] + uuid_: { + "output": {"result": f"Result for example {uuid.UUID(uuid_)}"}, + "feedback": [], + } for uuid_ in uuids } assert results["results"] == expected