Return feedback (#9629)

Return the feedback values in an eval run result

Also made a helper method to display as a dataframe but it may be
overkill
This commit is contained in:
William FH 2023-08-28 09:15:05 -07:00 committed by GitHub
parent 5e2d0cf54e
commit cb642ef658
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 86 additions and 45 deletions

View File

@ -3,10 +3,11 @@ from __future__ import annotations
import logging import logging
from concurrent.futures import Future, ThreadPoolExecutor, wait from concurrent.futures import Future, ThreadPoolExecutor, wait
from typing import Any, List, Optional, Sequence, Set, Union from typing import Any, Dict, List, Optional, Sequence, Set, Union
from uuid import UUID from uuid import UUID
from langsmith import Client, RunEvaluator import langsmith
from langsmith import schemas as langsmith_schemas
from langchain.callbacks.manager import tracing_v2_enabled from langchain.callbacks.manager import tracing_v2_enabled
from langchain.callbacks.tracers.base import BaseTracer from langchain.callbacks.tracers.base import BaseTracer
@ -62,13 +63,13 @@ class EvaluatorCallbackHandler(BaseTracer):
The LangSmith project name to be organize eval chain runs under. The LangSmith project name to be organize eval chain runs under.
""" """
name: str = "evaluator_callback_handler" name = "evaluator_callback_handler"
def __init__( def __init__(
self, self,
evaluators: Sequence[RunEvaluator], evaluators: Sequence[langsmith.RunEvaluator],
max_workers: Optional[int] = None, max_workers: Optional[int] = None,
client: Optional[Client] = None, client: Optional[langsmith.Client] = None,
example_id: Optional[Union[UUID, str]] = None, example_id: Optional[Union[UUID, str]] = None,
skip_unfinished: bool = True, skip_unfinished: bool = True,
project_name: Optional[str] = "evaluators", project_name: Optional[str] = "evaluators",
@ -86,10 +87,11 @@ class EvaluatorCallbackHandler(BaseTracer):
self.futures: Set[Future] = set() self.futures: Set[Future] = set()
self.skip_unfinished = skip_unfinished self.skip_unfinished = skip_unfinished
self.project_name = project_name self.project_name = project_name
self.logged_feedback: Dict[str, List[langsmith_schemas.Feedback]] = {}
global _TRACERS global _TRACERS
_TRACERS.append(self) _TRACERS.append(self)
def _evaluate_in_project(self, run: Run, evaluator: RunEvaluator) -> None: def _evaluate_in_project(self, run: Run, evaluator: langsmith.RunEvaluator) -> None:
"""Evaluate the run in the project. """Evaluate the run in the project.
Parameters Parameters
@ -102,11 +104,11 @@ class EvaluatorCallbackHandler(BaseTracer):
""" """
try: try:
if self.project_name is None: if self.project_name is None:
self.client.evaluate_run(run, evaluator) feedback = self.client.evaluate_run(run, evaluator)
with tracing_v2_enabled( with tracing_v2_enabled(
project_name=self.project_name, tags=["eval"], client=self.client project_name=self.project_name, tags=["eval"], client=self.client
): ):
self.client.evaluate_run(run, evaluator) feedback = self.client.evaluate_run(run, evaluator)
except Exception as e: except Exception as e:
logger.error( logger.error(
f"Error evaluating run {run.id} with " f"Error evaluating run {run.id} with "
@ -114,6 +116,8 @@ class EvaluatorCallbackHandler(BaseTracer):
exc_info=True, exc_info=True,
) )
raise e raise e
example_id = str(run.reference_example_id)
self.logged_feedback.setdefault(example_id, []).append(feedback)
def _persist_run(self, run: Run) -> None: def _persist_run(self, run: Run) -> None:
"""Run the evaluator on the run. """Run the evaluator on the run.

View File

@ -11,6 +11,7 @@ import uuid
import warnings import warnings
from enum import Enum from enum import Enum
from typing import ( from typing import (
TYPE_CHECKING,
Any, Any,
Callable, Callable,
Coroutine, Coroutine,
@ -44,6 +45,9 @@ from langchain.schema.runnable import Runnable, RunnableConfig, RunnableLambda
from langchain.smith.evaluation.config import EvalConfig, RunEvalConfig from langchain.smith.evaluation.config import EvalConfig, RunEvalConfig
from langchain.smith.evaluation.string_run_evaluator import StringRunEvaluatorChain from langchain.smith.evaluation.string_run_evaluator import StringRunEvaluatorChain
if TYPE_CHECKING:
import pandas as pd
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
MODEL_OR_CHAIN_FACTORY = Union[ MODEL_OR_CHAIN_FACTORY = Union[
@ -63,6 +67,31 @@ class InputFormatError(Exception):
## Shared Utilities ## Shared Utilities
class TestResult(dict):
"""A dictionary of the results of a single test run."""
def to_dataframe(self) -> pd.DataFrame:
"""Convert the results to a dataframe."""
try:
import pandas as pd
except ImportError as e:
raise ImportError(
"Pandas is required to convert the results to a dataframe."
" to install pandas, run `pip install pandas`."
) from e
indices = []
records = []
for example_id, result in self["results"].items():
feedback = result["feedback"]
records.append(
{**{f.key: f.score for f in feedback}, "output": result["output"]}
)
indices.append(example_id)
return pd.DataFrame(records, index=indices)
def _get_eval_project_url(api_url: str, project_id: str) -> str: def _get_eval_project_url(api_url: str, project_id: str) -> str:
"""Get the project url from the api url.""" """Get the project url from the api url."""
parsed = urlparse(api_url) parsed = urlparse(api_url)
@ -667,7 +696,7 @@ async def _arun_llm_or_chain(
tags: Optional[List[str]] = None, tags: Optional[List[str]] = None,
callbacks: Optional[List[BaseCallbackHandler]] = None, callbacks: Optional[List[BaseCallbackHandler]] = None,
input_mapper: Optional[Callable[[Dict], Any]] = None, input_mapper: Optional[Callable[[Dict], Any]] = None,
) -> Union[List[dict], List[str], List[LLMResult], List[ChatResult]]: ) -> Union[dict, str, LLMResult, ChatResult]:
"""Asynchronously run the Chain or language model. """Asynchronously run the Chain or language model.
Args: Args:
@ -689,10 +718,10 @@ async def _arun_llm_or_chain(
tracer.example_id = example.id tracer.example_id = example.id
else: else:
previous_example_ids = None previous_example_ids = None
outputs = []
chain_or_llm = ( chain_or_llm = (
"LLM" if isinstance(llm_or_chain_factory, BaseLanguageModel) else "Chain" "LLM" if isinstance(llm_or_chain_factory, BaseLanguageModel) else "Chain"
) )
result = None
try: try:
if isinstance(llm_or_chain_factory, BaseLanguageModel): if isinstance(llm_or_chain_factory, BaseLanguageModel):
output: Any = await _arun_llm( output: Any = await _arun_llm(
@ -711,15 +740,15 @@ async def _arun_llm_or_chain(
callbacks=callbacks, callbacks=callbacks,
input_mapper=input_mapper, input_mapper=input_mapper,
) )
outputs.append(output) result = output
except Exception as e: except Exception as e:
logger.warning(f"{chain_or_llm} failed for example {example.id}. Error: {e}") logger.warning(f"{chain_or_llm} failed for example {example.id}. Error: {e}")
outputs.append({"Error": str(e)}) result = {"Error": str(e)}
if callbacks and previous_example_ids: if callbacks and previous_example_ids:
for example_id, tracer in zip(previous_example_ids, callbacks): for example_id, tracer in zip(previous_example_ids, callbacks):
if hasattr(tracer, "example_id"): if hasattr(tracer, "example_id"):
tracer.example_id = example_id tracer.example_id = example_id
return outputs return result
async def _gather_with_concurrency( async def _gather_with_concurrency(
@ -856,7 +885,7 @@ async def _arun_on_examples(
wrapped_model, examples, evaluation, data_type wrapped_model, examples, evaluation, data_type
) )
examples = _validate_example_inputs(examples, wrapped_model, input_mapper) examples = _validate_example_inputs(examples, wrapped_model, input_mapper)
results: Dict[str, List[Any]] = {} results: Dict[str, dict] = {}
async def process_example( async def process_example(
example: Example, callbacks: List[BaseCallbackHandler], job_state: dict example: Example, callbacks: List[BaseCallbackHandler], job_state: dict
@ -869,7 +898,7 @@ async def _arun_on_examples(
callbacks=callbacks, callbacks=callbacks,
input_mapper=input_mapper, input_mapper=input_mapper,
) )
results[str(example.id)] = result results[str(example.id)] = {"output": result}
job_state["num_processed"] += 1 job_state["num_processed"] += 1
if verbose: if verbose:
print( print(
@ -890,8 +919,14 @@ async def _arun_on_examples(
), ),
*(functools.partial(process_example, e) for e in examples), *(functools.partial(process_example, e) for e in examples),
) )
all_feedback = {}
for handler in evaluation_handlers: for handler in evaluation_handlers:
handler.wait_for_futures() handler.wait_for_futures()
all_feedback.update(handler.logged_feedback)
# join the results and feedback on the example id
for example_id, output_dict in results.items():
feedback = all_feedback.get(example_id, [])
output_dict["feedback"] = feedback
return results return results
@ -978,7 +1013,7 @@ def _run_llm_or_chain(
tags: Optional[List[str]] = None, tags: Optional[List[str]] = None,
callbacks: Optional[List[BaseCallbackHandler]] = None, callbacks: Optional[List[BaseCallbackHandler]] = None,
input_mapper: Optional[Callable[[Dict], Any]] = None, input_mapper: Optional[Callable[[Dict], Any]] = None,
) -> Union[List[dict], List[str], List[LLMResult], List[ChatResult]]: ) -> Union[dict, str, LLMResult, ChatResult]:
""" """
Run the Chain or language model synchronously. Run the Chain or language model synchronously.
@ -1001,10 +1036,10 @@ def _run_llm_or_chain(
tracer.example_id = example.id tracer.example_id = example.id
else: else:
previous_example_ids = None previous_example_ids = None
outputs = []
chain_or_llm = ( chain_or_llm = (
"LLM" if isinstance(llm_or_chain_factory, BaseLanguageModel) else "Chain" "LLM" if isinstance(llm_or_chain_factory, BaseLanguageModel) else "Chain"
) )
result = None
try: try:
if isinstance(llm_or_chain_factory, BaseLanguageModel): if isinstance(llm_or_chain_factory, BaseLanguageModel):
output: Any = _run_llm( output: Any = _run_llm(
@ -1023,18 +1058,18 @@ def _run_llm_or_chain(
tags=tags, tags=tags,
input_mapper=input_mapper, input_mapper=input_mapper,
) )
outputs.append(output) result = output
except Exception as e: except Exception as e:
logger.warning( logger.warning(
f"{chain_or_llm} failed for example {example.id} with inputs:" f"{chain_or_llm} failed for example {example.id} with inputs:"
f" {example.inputs}.\nError: {e}", f" {example.inputs}.\nError: {e}",
) )
outputs.append({"Error": str(e)}) result = {"Error": str(e)}
if callbacks and previous_example_ids: if callbacks and previous_example_ids:
for example_id, tracer in zip(previous_example_ids, callbacks): for example_id, tracer in zip(previous_example_ids, callbacks):
if hasattr(tracer, "example_id"): if hasattr(tracer, "example_id"):
tracer.example_id = example_id tracer.example_id = example_id
return outputs return result
def _run_on_examples( def _run_on_examples(
@ -1075,7 +1110,7 @@ def _run_on_examples(
Returns: Returns:
A dictionary mapping example ids to the model outputs. A dictionary mapping example ids to the model outputs.
""" """
results: Dict[str, Any] = {} results: Dict[str, dict] = {}
wrapped_model = _wrap_in_chain_factory(llm_or_chain_factory) wrapped_model = _wrap_in_chain_factory(llm_or_chain_factory)
project_name = _get_project_name(project_name, wrapped_model) project_name = _get_project_name(project_name, wrapped_model)
tracer = LangChainTracer( tracer = LangChainTracer(
@ -1085,11 +1120,11 @@ def _run_on_examples(
wrapped_model, examples, evaluation, data_type wrapped_model, examples, evaluation, data_type
) )
examples = _validate_example_inputs(examples, wrapped_model, input_mapper) examples = _validate_example_inputs(examples, wrapped_model, input_mapper)
evalution_handler = EvaluatorCallbackHandler( evaluation_handler = EvaluatorCallbackHandler(
evaluators=run_evaluators or [], evaluators=run_evaluators or [],
client=client, client=client,
) )
callbacks: List[BaseCallbackHandler] = [tracer, evalution_handler] callbacks: List[BaseCallbackHandler] = [tracer, evaluation_handler]
for i, example in enumerate(examples): for i, example in enumerate(examples):
result = _run_llm_or_chain( result = _run_llm_or_chain(
example, example,
@ -1100,9 +1135,14 @@ def _run_on_examples(
) )
if verbose: if verbose:
print(f"{i+1} processed", flush=True, end="\r") print(f"{i+1} processed", flush=True, end="\r")
results[str(example.id)] = result results[str(example.id)] = {"output": result}
tracer.wait_for_futures() tracer.wait_for_futures()
evalution_handler.wait_for_futures() evaluation_handler.wait_for_futures()
all_feedback = evaluation_handler.logged_feedback
# join the results and feedback on the example id
for example_id, output_dict in results.items():
feedback = all_feedback.get(example_id, [])
output_dict["feedback"] = feedback
return results return results
@ -1276,10 +1316,10 @@ async def arun_on_dataset(
input_mapper=input_mapper, input_mapper=input_mapper,
data_type=dataset.data_type, data_type=dataset.data_type,
) )
return { return TestResult(
"project_name": project_name, project_name=project_name,
"results": results, results=results,
} )
def _handle_coroutine(coro: Coroutine) -> Any: def _handle_coroutine(coro: Coroutine) -> Any:
@ -1461,7 +1501,7 @@ def run_on_dataset(
data_type=dataset.data_type, data_type=dataset.data_type,
) )
results = _handle_coroutine(coro) results = _handle_coroutine(coro)
return { return TestResult(
"project_name": project_name, project_name=project_name,
"results": results, results=results,
} )

View File

@ -182,14 +182,12 @@ def test_run_llm_or_chain_with_input_mapper() -> None:
return {"the right input": inputs["the wrong input"]} return {"the right input": inputs["the wrong input"]}
result = _run_llm_or_chain(example, lambda: mock_chain, input_mapper=input_mapper) result = _run_llm_or_chain(example, lambda: mock_chain, input_mapper=input_mapper)
assert len(result) == 1 assert result == {"output": "2", "the right input": "1"}
assert result[0] == {"output": "2", "the right input": "1"}
bad_result = _run_llm_or_chain( bad_result = _run_llm_or_chain(
example, example,
lambda: mock_chain, lambda: mock_chain,
) )
assert len(bad_result) == 1 assert "Error" in bad_result
assert "Error" in bad_result[0]
# Try with LLM # Try with LLM
def llm_input_mapper(inputs: dict) -> str: def llm_input_mapper(inputs: dict) -> str:
@ -197,9 +195,7 @@ def test_run_llm_or_chain_with_input_mapper() -> None:
return "the right input" return "the right input"
mock_llm = FakeLLM(queries={"the right input": "somenumber"}) mock_llm = FakeLLM(queries={"the right input": "somenumber"})
result = _run_llm_or_chain(example, mock_llm, input_mapper=llm_input_mapper) llm_result = _run_llm_or_chain(example, mock_llm, input_mapper=llm_input_mapper)
assert len(result) == 1
llm_result = result[0]
assert isinstance(llm_result, str) assert isinstance(llm_result, str)
assert llm_result == "somenumber" assert llm_result == "somenumber"
@ -300,8 +296,8 @@ async def test_arun_on_dataset(monkeypatch: pytest.MonkeyPatch) -> None:
tags: Optional[List[str]] = None, tags: Optional[List[str]] = None,
callbacks: Optional[Any] = None, callbacks: Optional[Any] = None,
**kwargs: Any, **kwargs: Any,
) -> List[Dict[str, Any]]: ) -> Dict[str, Any]:
return [{"result": f"Result for example {example.id}"}] return {"result": f"Result for example {example.id}"}
def mock_create_project(*args: Any, **kwargs: Any) -> Any: def mock_create_project(*args: Any, **kwargs: Any) -> Any:
proj = mock.MagicMock() proj = mock.MagicMock()
@ -328,9 +324,10 @@ async def test_arun_on_dataset(monkeypatch: pytest.MonkeyPatch) -> None:
) )
expected = { expected = {
uuid_: [ uuid_: {
{"result": f"Result for example {uuid.UUID(uuid_)}"} for _ in range(1) "output": {"result": f"Result for example {uuid.UUID(uuid_)}"},
] "feedback": [],
}
for uuid_ in uuids for uuid_ in uuids
} }
assert results["results"] == expected assert results["results"] == expected