From c244e1a50b72b80d9e83c38034dee11495c28aab Mon Sep 17 00:00:00 2001 From: Shuai Liu Date: Sat, 16 Mar 2024 07:27:53 +0800 Subject: [PATCH] community[patch]: Fixed bug in merging `generation_info` during chunk concatenation in Tongyi and ChatTongyi (#19014) - **Description:** In #16218 , during the `GenerationChunk` and `ChatGenerationChunk` concatenation, the `generation_info` merging changed from simple keys & values replacement to using the util method [`merge_dicts`](https://github.com/langchain-ai/langchain/blob/master/libs/core/langchain_core/utils/_merge.py): ![image](https://github.com/langchain-ai/langchain/assets/2098020/10f315bf-7fe0-43a7-a0ce-6a3834b99a15) The `merge_dicts` method could not handle merging values of `int` or some other types, and would raise a [`TypeError`](https://github.com/langchain-ai/langchain/blob/master/libs/core/langchain_core/utils/_merge.py#L55). This PR fixes this issue in the **Tongyi and ChatTongyi Model** by adopting the `generation_info` of the last chunk and discarding the `generation_info` of the intermediate chunks, ensuring that `stream` and `astream` function correctly. - **Issue:** - Related issues or PRs about Tongyi & ChatTongyi: #16605, #17105 - Other models or cases: #18441, #17376 - **Dependencies:** No new dependencies --- .../langchain_community/chat_models/tongyi.py | 50 ++++++++---- .../langchain_community/llms/tongyi.py | 81 +++++++++++++++---- 2 files changed, 103 insertions(+), 28 deletions(-) diff --git a/libs/community/langchain_community/chat_models/tongyi.py b/libs/community/langchain_community/chat_models/tongyi.py index 1eb881d0fb9..36883058188 100644 --- a/libs/community/langchain_community/chat_models/tongyi.py +++ b/libs/community/langchain_community/chat_models/tongyi.py @@ -49,7 +49,11 @@ from tenacity import ( wait_exponential, ) -from langchain_community.llms.tongyi import check_response +from langchain_community.llms.tongyi import ( + agenerate_with_last_element_mark, + check_response, + generate_with_last_element_mark, +) logger = logging.getLogger(__name__) @@ -338,9 +342,13 @@ class ChatTongyi(BaseChatModel): params: Dict[str, Any] = self._invocation_params( messages=messages, stop=stop, stream=True, **kwargs ) - for stream_resp in self.stream_completion_with_retry(**params): + for stream_resp, is_last_chunk in generate_with_last_element_mark( + self.stream_completion_with_retry(**params) + ): chunk = ChatGenerationChunk( - **self._chat_generation_from_qwen_resp(stream_resp, is_chunk=True) + **self._chat_generation_from_qwen_resp( + stream_resp, is_chunk=True, is_last_chunk=is_last_chunk + ) ) if run_manager: run_manager.on_llm_new_token(chunk.text, chunk=chunk) @@ -356,9 +364,13 @@ class ChatTongyi(BaseChatModel): params: Dict[str, Any] = self._invocation_params( messages=messages, stop=stop, stream=True, **kwargs ) - async for stream_resp in self.astream_completion_with_retry(**params): + async for stream_resp, is_last_chunk in agenerate_with_last_element_mark( + self.astream_completion_with_retry(**params) + ): chunk = ChatGenerationChunk( - **self._chat_generation_from_qwen_resp(stream_resp, is_chunk=True) + **self._chat_generation_from_qwen_resp( + stream_resp, is_chunk=True, is_last_chunk=is_last_chunk + ) ) if run_manager: await run_manager.on_llm_new_token(chunk.text, chunk=chunk) @@ -398,18 +410,28 @@ class ChatTongyi(BaseChatModel): @staticmethod def _chat_generation_from_qwen_resp( - resp: Any, is_chunk: bool = False + resp: Any, is_chunk: bool = False, is_last_chunk: bool = True ) -> Dict[str, Any]: + # According to the response from dashscope, + # each chunk's `generation_info` overwrites the previous one. + # Besides, The `merge_dicts` method, + # which is used to concatenate `generation_info` in `GenerationChunk`, + # does not support merging of int type values. + # Therefore, we adopt the `generation_info` of the last chunk + # and discard the `generation_info` of the intermediate chunks. choice = resp["output"]["choices"][0] message = convert_dict_to_message(choice["message"], is_chunk=is_chunk) - return dict( - message=message, - generation_info=dict( - finish_reason=choice["finish_reason"], - request_id=resp["request_id"], - token_usage=dict(resp["usage"]), - ), - ) + if is_last_chunk: + return dict( + message=message, + generation_info=dict( + finish_reason=choice["finish_reason"], + request_id=resp["request_id"], + token_usage=dict(resp["usage"]), + ), + ) + else: + return dict(message=message) @staticmethod def _chunk_to_generation(chunk: ChatGenerationChunk) -> ChatGeneration: diff --git a/libs/community/langchain_community/llms/tongyi.py b/libs/community/langchain_community/llms/tongyi.py index 3734e2f3a69..6254609ece7 100644 --- a/libs/community/langchain_community/llms/tongyi.py +++ b/libs/community/langchain_community/llms/tongyi.py @@ -5,13 +5,17 @@ import functools import logging from typing import ( Any, + AsyncIterable, AsyncIterator, Callable, Dict, + Iterable, Iterator, List, Mapping, Optional, + Tuple, + TypeVar, ) from langchain_core.callbacks import ( @@ -32,6 +36,7 @@ from tenacity import ( ) logger = logging.getLogger(__name__) +T = TypeVar("T") def _create_retry_decorator(llm: Tongyi) -> Callable[[Any], Any]: @@ -122,6 +127,36 @@ async def astream_generate_with_retry(llm: Tongyi, **kwargs: Any) -> Any: yield chunk +def generate_with_last_element_mark(iterable: Iterable[T]) -> Iterator[Tuple[T, bool]]: + """Generate elements from an iterable, + and a boolean indicating if it is the last element.""" + iterator = iter(iterable) + try: + item = next(iterator) + except StopIteration: + return + for next_item in iterator: + yield item, False + item = next_item + yield item, True + + +async def agenerate_with_last_element_mark( + iterable: AsyncIterable[T], +) -> AsyncIterator[Tuple[T, bool]]: + """Generate elements from an async iterable, + and a boolean indicating if it is the last element.""" + iterator = iterable.__aiter__() + try: + item = await iterator.__anext__() + except StopAsyncIteration: + return + async for next_item in iterator: + yield item, False + item = next_item + yield item, True + + class Tongyi(BaseLLM): """Tongyi Qwen large language models. @@ -283,8 +318,12 @@ class Tongyi(BaseLLM): params: Dict[str, Any] = self._invocation_params( stop=stop, stream=True, **kwargs ) - for stream_resp in stream_generate_with_retry(self, prompt=prompt, **params): - chunk = GenerationChunk(**self._generation_from_qwen_resp(stream_resp)) + for stream_resp, is_last_chunk in generate_with_last_element_mark( + stream_generate_with_retry(self, prompt=prompt, **params) + ): + chunk = GenerationChunk( + **self._generation_from_qwen_resp(stream_resp, is_last_chunk) + ) if run_manager: run_manager.on_llm_new_token( chunk.text, @@ -303,10 +342,12 @@ class Tongyi(BaseLLM): params: Dict[str, Any] = self._invocation_params( stop=stop, stream=True, **kwargs ) - async for stream_resp in astream_generate_with_retry( - self, prompt=prompt, **params + async for stream_resp, is_last_chunk in agenerate_with_last_element_mark( + astream_generate_with_retry(self, prompt=prompt, **params) ): - chunk = GenerationChunk(**self._generation_from_qwen_resp(stream_resp)) + chunk = GenerationChunk( + **self._generation_from_qwen_resp(stream_resp, is_last_chunk) + ) if run_manager: await run_manager.on_llm_new_token( chunk.text, @@ -327,15 +368,27 @@ class Tongyi(BaseLLM): return params @staticmethod - def _generation_from_qwen_resp(resp: Any) -> Dict[str, Any]: - return dict( - text=resp["output"]["text"], - generation_info=dict( - finish_reason=resp["output"]["finish_reason"], - request_id=resp["request_id"], - token_usage=dict(resp["usage"]), - ), - ) + def _generation_from_qwen_resp( + resp: Any, is_last_chunk: bool = True + ) -> Dict[str, Any]: + # According to the response from dashscope, + # each chunk's `generation_info` overwrites the previous one. + # Besides, The `merge_dicts` method, + # which is used to concatenate `generation_info` in `GenerationChunk`, + # does not support merging of int type values. + # Therefore, we adopt the `generation_info` of the last chunk + # and discard the `generation_info` of the intermediate chunks. + if is_last_chunk: + return dict( + text=resp["output"]["text"], + generation_info=dict( + finish_reason=resp["output"]["finish_reason"], + request_id=resp["request_id"], + token_usage=dict(resp["usage"]), + ), + ) + else: + return dict(text=resp["output"]["text"]) @staticmethod def _chunk_to_generation(chunk: GenerationChunk) -> Generation: