From 13855ef0c39705c4655e915aacd44d65b34c0bce Mon Sep 17 00:00:00 2001 From: Ethan Yang Date: Wed, 10 Jul 2024 05:02:00 +0800 Subject: [PATCH] [HuggingFace Pipeline] add streaming support (#23852) --- .../llms/huggingface_pipelines.ipynb | 19 ++++++ docs/docs/integrations/llms/openvino.ipynb | 23 ++----- .../llms/huggingface_pipeline.py | 64 +++++++++++++++++- .../llms/huggingface_pipeline.py | 66 ++++++++++++++++++- .../tests/integration_tests/test_llms.py | 18 +++++ 5 files changed, 167 insertions(+), 23 deletions(-) create mode 100644 libs/partners/huggingface/tests/integration_tests/test_llms.py diff --git a/docs/docs/integrations/llms/huggingface_pipelines.ipynb b/docs/docs/integrations/llms/huggingface_pipelines.ipynb index be074047274..734870bd7f9 100644 --- a/docs/docs/integrations/llms/huggingface_pipelines.ipynb +++ b/docs/docs/integrations/llms/huggingface_pipelines.ipynb @@ -143,6 +143,25 @@ "print(chain.invoke({\"question\": question}))" ] }, + { + "cell_type": "markdown", + "id": "5141dc4d", + "metadata": {}, + "source": [ + "Streaming repsonse." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f1819250-2db9-4143-b88a-12e92d4e2386", + "metadata": {}, + "outputs": [], + "source": [ + "for chunk in chain.stream(question):\n", + " print(chunk, end=\"\", flush=True)" + ] + }, { "cell_type": "markdown", "id": "dbbc3a37", diff --git a/docs/docs/integrations/llms/openvino.ipynb b/docs/docs/integrations/llms/openvino.ipynb index 4ed3855a0f4..70333e6829c 100644 --- a/docs/docs/integrations/llms/openvino.ipynb +++ b/docs/docs/integrations/llms/openvino.ipynb @@ -245,7 +245,7 @@ "source": [ "### Streaming\n", "\n", - "To get streaming of LLM output, you can create a Huggingface `TextIteratorStreamer` for `_forward_params`." + "You can use `stream` method to get a streaming of LLM output, " ] }, { @@ -255,24 +255,11 @@ "metadata": {}, "outputs": [], "source": [ - "from threading import Thread\n", + "generation_config = {\"skip_prompt\": True, \"pipeline_kwargs\": {\"max_new_tokens\": 100}}\n", + "chain = prompt | ov_llm.bind(**generation_config)\n", "\n", - "from transformers import TextIteratorStreamer\n", - "\n", - "streamer = TextIteratorStreamer(\n", - " ov_llm.pipeline.tokenizer,\n", - " timeout=30.0,\n", - " skip_prompt=True,\n", - " skip_special_tokens=True,\n", - ")\n", - "pipeline_kwargs = {\"pipeline_kwargs\": {\"streamer\": streamer, \"max_new_tokens\": 100}}\n", - "chain = prompt | ov_llm.bind(**pipeline_kwargs)\n", - "\n", - "t1 = Thread(target=chain.invoke, args=({\"question\": question},))\n", - "t1.start()\n", - "\n", - "for new_text in streamer:\n", - " print(new_text, end=\"\", flush=True)" + "for chunk in chain.stream(question):\n", + " print(chunk, end=\"\", flush=True)" ] }, { diff --git a/libs/community/langchain_community/llms/huggingface_pipeline.py b/libs/community/langchain_community/llms/huggingface_pipeline.py index 2bb820ba524..1468ae37f57 100644 --- a/libs/community/langchain_community/llms/huggingface_pipeline.py +++ b/libs/community/langchain_community/llms/huggingface_pipeline.py @@ -2,12 +2,12 @@ from __future__ import annotations import importlib.util import logging -from typing import Any, List, Mapping, Optional +from typing import Any, Iterator, List, Mapping, Optional from langchain_core._api.deprecation import deprecated from langchain_core.callbacks import CallbackManagerForLLMRun from langchain_core.language_models.llms import BaseLLM -from langchain_core.outputs import Generation, LLMResult +from langchain_core.outputs import Generation, GenerationChunk, LLMResult from langchain_core.pydantic_v1 import Extra DEFAULT_MODEL_ID = "gpt2" @@ -303,3 +303,63 @@ class HuggingFacePipeline(BaseLLM): return LLMResult( generations=[[Generation(text=text)] for text in text_generations] ) + + def _stream( + self, + prompt: str, + stop: Optional[List[str]] = None, + run_manager: Optional[CallbackManagerForLLMRun] = None, + **kwargs: Any, + ) -> Iterator[GenerationChunk]: + from threading import Thread + + import torch + from transformers import ( + StoppingCriteria, + StoppingCriteriaList, + TextIteratorStreamer, + ) + + pipeline_kwargs = kwargs.get("pipeline_kwargs", {}) + skip_prompt = kwargs.get("skip_prompt", True) + + if stop is not None: + stop = self.pipeline.tokenizer.convert_tokens_to_ids(stop) + stopping_ids_list = stop or [] + + class StopOnTokens(StoppingCriteria): + def __call__( + self, + input_ids: torch.LongTensor, + scores: torch.FloatTensor, + **kwargs: Any, + ) -> bool: + for stop_id in stopping_ids_list: + if input_ids[0][-1] == stop_id: + return True + return False + + stopping_criteria = StoppingCriteriaList([StopOnTokens()]) + + inputs = self.pipeline.tokenizer(prompt, return_tensors="pt") + streamer = TextIteratorStreamer( + self.pipeline.tokenizer, + timeout=60.0, + skip_prompt=skip_prompt, + skip_special_tokens=True, + ) + generation_kwargs = dict( + inputs, + streamer=streamer, + stopping_criteria=stopping_criteria, + **pipeline_kwargs, + ) + t1 = Thread(target=self.pipeline.model.generate, kwargs=generation_kwargs) + t1.start() + + for char in streamer: + chunk = GenerationChunk(text=char) + if run_manager: + run_manager.on_llm_new_token(chunk.text, chunk=chunk) + + yield chunk diff --git a/libs/partners/huggingface/langchain_huggingface/llms/huggingface_pipeline.py b/libs/partners/huggingface/langchain_huggingface/llms/huggingface_pipeline.py index 62ad200d5e8..1ab4bafd754 100644 --- a/libs/partners/huggingface/langchain_huggingface/llms/huggingface_pipeline.py +++ b/libs/partners/huggingface/langchain_huggingface/llms/huggingface_pipeline.py @@ -2,11 +2,11 @@ from __future__ import annotations # type: ignore[import-not-found] import importlib.util import logging -from typing import Any, List, Mapping, Optional +from typing import Any, Iterator, List, Mapping, Optional from langchain_core.callbacks import CallbackManagerForLLMRun from langchain_core.language_models.llms import BaseLLM -from langchain_core.outputs import Generation, LLMResult +from langchain_core.outputs import Generation, GenerationChunk, LLMResult from langchain_core.pydantic_v1 import Extra DEFAULT_MODEL_ID = "gpt2" @@ -208,7 +208,7 @@ class HuggingFacePipeline(BaseLLM): cuda_device_count, ) if device is not None and device_map is not None and backend == "openvino": - logger.warning("Please set device for OpenVINO through: " "'model_kwargs'") + logger.warning("Please set device for OpenVINO through: `model_kwargs`") if "trust_remote_code" in _model_kwargs: _model_kwargs = { k: v for k, v in _model_kwargs.items() if k != "trust_remote_code" @@ -299,3 +299,63 @@ class HuggingFacePipeline(BaseLLM): return LLMResult( generations=[[Generation(text=text)] for text in text_generations] ) + + def _stream( + self, + prompt: str, + stop: Optional[List[str]] = None, + run_manager: Optional[CallbackManagerForLLMRun] = None, + **kwargs: Any, + ) -> Iterator[GenerationChunk]: + from threading import Thread + + import torch + from transformers import ( + StoppingCriteria, + StoppingCriteriaList, + TextIteratorStreamer, + ) + + pipeline_kwargs = kwargs.get("pipeline_kwargs", {}) + skip_prompt = kwargs.get("skip_prompt", True) + + if stop is not None: + stop = self.pipeline.tokenizer.convert_tokens_to_ids(stop) + stopping_ids_list = stop or [] + + class StopOnTokens(StoppingCriteria): + def __call__( + self, + input_ids: torch.LongTensor, + scores: torch.FloatTensor, + **kwargs: Any, + ) -> bool: + for stop_id in stopping_ids_list: + if input_ids[0][-1] == stop_id: + return True + return False + + stopping_criteria = StoppingCriteriaList([StopOnTokens()]) + + inputs = self.pipeline.tokenizer(prompt, return_tensors="pt") + streamer = TextIteratorStreamer( + self.pipeline.tokenizer, + timeout=60.0, + skip_prompt=skip_prompt, + skip_special_tokens=True, + ) + generation_kwargs = dict( + inputs, + streamer=streamer, + stopping_criteria=stopping_criteria, + **pipeline_kwargs, + ) + t1 = Thread(target=self.pipeline.model.generate, kwargs=generation_kwargs) + t1.start() + + for char in streamer: + chunk = GenerationChunk(text=char) + if run_manager: + run_manager.on_llm_new_token(chunk.text, chunk=chunk) + + yield chunk diff --git a/libs/partners/huggingface/tests/integration_tests/test_llms.py b/libs/partners/huggingface/tests/integration_tests/test_llms.py new file mode 100644 index 00000000000..e251c5bdb67 --- /dev/null +++ b/libs/partners/huggingface/tests/integration_tests/test_llms.py @@ -0,0 +1,18 @@ +from typing import Generator + +from langchain_huggingface.llms import HuggingFacePipeline + + +def test_huggingface_pipeline_streaming() -> None: + """Test streaming tokens from huggingface_pipeline.""" + llm = HuggingFacePipeline.from_model_id( + model_id="gpt2", task="text-generation", pipeline_kwargs={"max_new_tokens": 10} + ) + generator = llm.stream("Q: How do you say 'hello' in German? A:'", stop=["."]) + stream_results_string = "" + assert isinstance(generator, Generator) + + for chunk in generator: + assert isinstance(chunk, str) + stream_results_string = chunk + assert len(stream_results_string.strip()) > 1