From 092f302c0f84ecab9941f265ce526636c1ee6504 Mon Sep 17 00:00:00 2001 From: Daniyar Supiyev Date: Mon, 4 Dec 2023 04:57:07 +0600 Subject: [PATCH] langchain[patch]: Asynchronous human-in-the-loop callback (#14195) **Description:** Adding a possibility to use asynchronous callback handler in human-in-the-loop validation tool. Very useful, for example, if you want to implement a validation over Telegram bot. **Issue:** - **Dependencies:** - --------- Co-authored-by: Daniyar_Supiyev Co-authored-by: Bagatur --- libs/langchain/langchain/callbacks/human.py | 42 ++++++++++++++++++++- 1 file changed, 40 insertions(+), 2 deletions(-) diff --git a/libs/langchain/langchain/callbacks/human.py b/libs/langchain/langchain/callbacks/human.py index 7fc670d9bad..d4b1db039e0 100644 --- a/libs/langchain/langchain/callbacks/human.py +++ b/libs/langchain/langchain/callbacks/human.py @@ -1,7 +1,7 @@ -from typing import Any, Callable, Dict, Optional +from typing import Any, Awaitable, Callable, Dict, Optional from uuid import UUID -from langchain.callbacks.base import BaseCallbackHandler +from langchain.callbacks.base import AsyncCallbackHandler, BaseCallbackHandler def _default_approve(_input: str) -> bool: @@ -14,6 +14,16 @@ def _default_approve(_input: str) -> bool: return resp.lower() in ("yes", "y") +async def _adefault_approve(_input: str) -> bool: + msg = ( + "Do you approve of the following input? " + "Anything except 'Y'/'Yes' (case-insensitive) will be treated as a no." + ) + msg += "\n\n" + _input + "\n" + resp = input(msg) + return resp.lower() in ("yes", "y") + + def _default_true(_: Dict[str, Any]) -> bool: return True @@ -48,3 +58,31 @@ class HumanApprovalCallbackHandler(BaseCallbackHandler): raise HumanRejectedException( f"Inputs {input_str} to tool {serialized} were rejected." ) + + +class AsyncHumanApprovalCallbackHandler(AsyncCallbackHandler): + """Asynchronous callback for manually validating values.""" + + raise_error: bool = True + + def __init__( + self, + approve: Callable[[Any], Awaitable[bool]] = _adefault_approve, + should_check: Callable[[Dict[str, Any]], bool] = _default_true, + ): + self._approve = approve + self._should_check = should_check + + async def on_tool_start( + self, + serialized: Dict[str, Any], + input_str: str, + *, + run_id: UUID, + parent_run_id: Optional[UUID] = None, + **kwargs: Any, + ) -> Any: + if self._should_check(serialized) and not await self._approve(input_str): + raise HumanRejectedException( + f"Inputs {input_str} to tool {serialized} were rejected." + )