Compare commits

...

1 Commits

Author SHA1 Message Date
William Fu-Hinthorn
d3594f3b55 Limit max concurrency with evaluators 2023-07-10 11:46:13 -07:00
2 changed files with 13 additions and 8 deletions

View File

@@ -68,9 +68,12 @@ class EvaluatorCallbackHandler(BaseTracer):
)
self.client = client or LangChainPlusClient()
self.evaluators = evaluators
self.executor = ThreadPoolExecutor(
max_workers=max(max_workers or len(evaluators), 1)
)
if max_workers == 0:
self.executor = None
else:
self.executor = ThreadPoolExecutor(
max_workers=max(max_workers or len(evaluators), 1)
)
self.futures: Set[Future] = set()
self.skip_unfinished = skip_unfinished
self.project_name = project_name
@@ -97,7 +100,6 @@ class EvaluatorCallbackHandler(BaseTracer):
f"{evaluator.__class__.__name__}: {e}",
exc_info=True,
)
raise e
def _persist_run(self, run: Run) -> None:
"""Run the evaluator on the run.
@@ -114,9 +116,12 @@ class EvaluatorCallbackHandler(BaseTracer):
run_ = run.copy()
run_.reference_example_id = self.example_id
for evaluator in self.evaluators:
self.futures.add(
self.executor.submit(self._evaluate_in_project, run_, evaluator)
)
if self.executor is None:
self._evaluate_in_project(run_, evaluator)
else:
self.futures.add(
self.executor.submit(self._evaluate_in_project, run_, evaluator)
)
def wait_for_futures(self) -> None:
"""Wait for all futures to complete."""

View File

@@ -332,7 +332,7 @@ async def _callbacks_initializer(
client=client,
evaluators=run_evaluators,
# We already have concurrency, don't want to overload the machine
max_workers=1,
max_workers=0,
project_name=evaluator_project_name,
)
callbacks.append(callback)