mirror of
https://github.com/hwchase17/langchain.git
synced 2025-06-21 06:14:37 +00:00
[HuggingFace Pipeline] add streaming support (#23852)
This commit is contained in:
parent
34a02efcf9
commit
13855ef0c3
@ -143,6 +143,25 @@
|
|||||||
"print(chain.invoke({\"question\": question}))"
|
"print(chain.invoke({\"question\": question}))"
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
"cell_type": "markdown",
|
||||||
|
"id": "5141dc4d",
|
||||||
|
"metadata": {},
|
||||||
|
"source": [
|
||||||
|
"Streaming repsonse."
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"cell_type": "code",
|
||||||
|
"execution_count": null,
|
||||||
|
"id": "f1819250-2db9-4143-b88a-12e92d4e2386",
|
||||||
|
"metadata": {},
|
||||||
|
"outputs": [],
|
||||||
|
"source": [
|
||||||
|
"for chunk in chain.stream(question):\n",
|
||||||
|
" print(chunk, end=\"\", flush=True)"
|
||||||
|
]
|
||||||
|
},
|
||||||
{
|
{
|
||||||
"cell_type": "markdown",
|
"cell_type": "markdown",
|
||||||
"id": "dbbc3a37",
|
"id": "dbbc3a37",
|
||||||
|
@ -245,7 +245,7 @@
|
|||||||
"source": [
|
"source": [
|
||||||
"### Streaming\n",
|
"### Streaming\n",
|
||||||
"\n",
|
"\n",
|
||||||
"To get streaming of LLM output, you can create a Huggingface `TextIteratorStreamer` for `_forward_params`."
|
"You can use `stream` method to get a streaming of LLM output, "
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
@ -255,24 +255,11 @@
|
|||||||
"metadata": {},
|
"metadata": {},
|
||||||
"outputs": [],
|
"outputs": [],
|
||||||
"source": [
|
"source": [
|
||||||
"from threading import Thread\n",
|
"generation_config = {\"skip_prompt\": True, \"pipeline_kwargs\": {\"max_new_tokens\": 100}}\n",
|
||||||
|
"chain = prompt | ov_llm.bind(**generation_config)\n",
|
||||||
"\n",
|
"\n",
|
||||||
"from transformers import TextIteratorStreamer\n",
|
"for chunk in chain.stream(question):\n",
|
||||||
"\n",
|
" print(chunk, end=\"\", flush=True)"
|
||||||
"streamer = TextIteratorStreamer(\n",
|
|
||||||
" ov_llm.pipeline.tokenizer,\n",
|
|
||||||
" timeout=30.0,\n",
|
|
||||||
" skip_prompt=True,\n",
|
|
||||||
" skip_special_tokens=True,\n",
|
|
||||||
")\n",
|
|
||||||
"pipeline_kwargs = {\"pipeline_kwargs\": {\"streamer\": streamer, \"max_new_tokens\": 100}}\n",
|
|
||||||
"chain = prompt | ov_llm.bind(**pipeline_kwargs)\n",
|
|
||||||
"\n",
|
|
||||||
"t1 = Thread(target=chain.invoke, args=({\"question\": question},))\n",
|
|
||||||
"t1.start()\n",
|
|
||||||
"\n",
|
|
||||||
"for new_text in streamer:\n",
|
|
||||||
" print(new_text, end=\"\", flush=True)"
|
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
@ -2,12 +2,12 @@ from __future__ import annotations
|
|||||||
|
|
||||||
import importlib.util
|
import importlib.util
|
||||||
import logging
|
import logging
|
||||||
from typing import Any, List, Mapping, Optional
|
from typing import Any, Iterator, List, Mapping, Optional
|
||||||
|
|
||||||
from langchain_core._api.deprecation import deprecated
|
from langchain_core._api.deprecation import deprecated
|
||||||
from langchain_core.callbacks import CallbackManagerForLLMRun
|
from langchain_core.callbacks import CallbackManagerForLLMRun
|
||||||
from langchain_core.language_models.llms import BaseLLM
|
from langchain_core.language_models.llms import BaseLLM
|
||||||
from langchain_core.outputs import Generation, LLMResult
|
from langchain_core.outputs import Generation, GenerationChunk, LLMResult
|
||||||
from langchain_core.pydantic_v1 import Extra
|
from langchain_core.pydantic_v1 import Extra
|
||||||
|
|
||||||
DEFAULT_MODEL_ID = "gpt2"
|
DEFAULT_MODEL_ID = "gpt2"
|
||||||
@ -303,3 +303,63 @@ class HuggingFacePipeline(BaseLLM):
|
|||||||
return LLMResult(
|
return LLMResult(
|
||||||
generations=[[Generation(text=text)] for text in text_generations]
|
generations=[[Generation(text=text)] for text in text_generations]
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def _stream(
|
||||||
|
self,
|
||||||
|
prompt: str,
|
||||||
|
stop: Optional[List[str]] = None,
|
||||||
|
run_manager: Optional[CallbackManagerForLLMRun] = None,
|
||||||
|
**kwargs: Any,
|
||||||
|
) -> Iterator[GenerationChunk]:
|
||||||
|
from threading import Thread
|
||||||
|
|
||||||
|
import torch
|
||||||
|
from transformers import (
|
||||||
|
StoppingCriteria,
|
||||||
|
StoppingCriteriaList,
|
||||||
|
TextIteratorStreamer,
|
||||||
|
)
|
||||||
|
|
||||||
|
pipeline_kwargs = kwargs.get("pipeline_kwargs", {})
|
||||||
|
skip_prompt = kwargs.get("skip_prompt", True)
|
||||||
|
|
||||||
|
if stop is not None:
|
||||||
|
stop = self.pipeline.tokenizer.convert_tokens_to_ids(stop)
|
||||||
|
stopping_ids_list = stop or []
|
||||||
|
|
||||||
|
class StopOnTokens(StoppingCriteria):
|
||||||
|
def __call__(
|
||||||
|
self,
|
||||||
|
input_ids: torch.LongTensor,
|
||||||
|
scores: torch.FloatTensor,
|
||||||
|
**kwargs: Any,
|
||||||
|
) -> bool:
|
||||||
|
for stop_id in stopping_ids_list:
|
||||||
|
if input_ids[0][-1] == stop_id:
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
stopping_criteria = StoppingCriteriaList([StopOnTokens()])
|
||||||
|
|
||||||
|
inputs = self.pipeline.tokenizer(prompt, return_tensors="pt")
|
||||||
|
streamer = TextIteratorStreamer(
|
||||||
|
self.pipeline.tokenizer,
|
||||||
|
timeout=60.0,
|
||||||
|
skip_prompt=skip_prompt,
|
||||||
|
skip_special_tokens=True,
|
||||||
|
)
|
||||||
|
generation_kwargs = dict(
|
||||||
|
inputs,
|
||||||
|
streamer=streamer,
|
||||||
|
stopping_criteria=stopping_criteria,
|
||||||
|
**pipeline_kwargs,
|
||||||
|
)
|
||||||
|
t1 = Thread(target=self.pipeline.model.generate, kwargs=generation_kwargs)
|
||||||
|
t1.start()
|
||||||
|
|
||||||
|
for char in streamer:
|
||||||
|
chunk = GenerationChunk(text=char)
|
||||||
|
if run_manager:
|
||||||
|
run_manager.on_llm_new_token(chunk.text, chunk=chunk)
|
||||||
|
|
||||||
|
yield chunk
|
||||||
|
@ -2,11 +2,11 @@ from __future__ import annotations # type: ignore[import-not-found]
|
|||||||
|
|
||||||
import importlib.util
|
import importlib.util
|
||||||
import logging
|
import logging
|
||||||
from typing import Any, List, Mapping, Optional
|
from typing import Any, Iterator, List, Mapping, Optional
|
||||||
|
|
||||||
from langchain_core.callbacks import CallbackManagerForLLMRun
|
from langchain_core.callbacks import CallbackManagerForLLMRun
|
||||||
from langchain_core.language_models.llms import BaseLLM
|
from langchain_core.language_models.llms import BaseLLM
|
||||||
from langchain_core.outputs import Generation, LLMResult
|
from langchain_core.outputs import Generation, GenerationChunk, LLMResult
|
||||||
from langchain_core.pydantic_v1 import Extra
|
from langchain_core.pydantic_v1 import Extra
|
||||||
|
|
||||||
DEFAULT_MODEL_ID = "gpt2"
|
DEFAULT_MODEL_ID = "gpt2"
|
||||||
@ -208,7 +208,7 @@ class HuggingFacePipeline(BaseLLM):
|
|||||||
cuda_device_count,
|
cuda_device_count,
|
||||||
)
|
)
|
||||||
if device is not None and device_map is not None and backend == "openvino":
|
if device is not None and device_map is not None and backend == "openvino":
|
||||||
logger.warning("Please set device for OpenVINO through: " "'model_kwargs'")
|
logger.warning("Please set device for OpenVINO through: `model_kwargs`")
|
||||||
if "trust_remote_code" in _model_kwargs:
|
if "trust_remote_code" in _model_kwargs:
|
||||||
_model_kwargs = {
|
_model_kwargs = {
|
||||||
k: v for k, v in _model_kwargs.items() if k != "trust_remote_code"
|
k: v for k, v in _model_kwargs.items() if k != "trust_remote_code"
|
||||||
@ -299,3 +299,63 @@ class HuggingFacePipeline(BaseLLM):
|
|||||||
return LLMResult(
|
return LLMResult(
|
||||||
generations=[[Generation(text=text)] for text in text_generations]
|
generations=[[Generation(text=text)] for text in text_generations]
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def _stream(
|
||||||
|
self,
|
||||||
|
prompt: str,
|
||||||
|
stop: Optional[List[str]] = None,
|
||||||
|
run_manager: Optional[CallbackManagerForLLMRun] = None,
|
||||||
|
**kwargs: Any,
|
||||||
|
) -> Iterator[GenerationChunk]:
|
||||||
|
from threading import Thread
|
||||||
|
|
||||||
|
import torch
|
||||||
|
from transformers import (
|
||||||
|
StoppingCriteria,
|
||||||
|
StoppingCriteriaList,
|
||||||
|
TextIteratorStreamer,
|
||||||
|
)
|
||||||
|
|
||||||
|
pipeline_kwargs = kwargs.get("pipeline_kwargs", {})
|
||||||
|
skip_prompt = kwargs.get("skip_prompt", True)
|
||||||
|
|
||||||
|
if stop is not None:
|
||||||
|
stop = self.pipeline.tokenizer.convert_tokens_to_ids(stop)
|
||||||
|
stopping_ids_list = stop or []
|
||||||
|
|
||||||
|
class StopOnTokens(StoppingCriteria):
|
||||||
|
def __call__(
|
||||||
|
self,
|
||||||
|
input_ids: torch.LongTensor,
|
||||||
|
scores: torch.FloatTensor,
|
||||||
|
**kwargs: Any,
|
||||||
|
) -> bool:
|
||||||
|
for stop_id in stopping_ids_list:
|
||||||
|
if input_ids[0][-1] == stop_id:
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
stopping_criteria = StoppingCriteriaList([StopOnTokens()])
|
||||||
|
|
||||||
|
inputs = self.pipeline.tokenizer(prompt, return_tensors="pt")
|
||||||
|
streamer = TextIteratorStreamer(
|
||||||
|
self.pipeline.tokenizer,
|
||||||
|
timeout=60.0,
|
||||||
|
skip_prompt=skip_prompt,
|
||||||
|
skip_special_tokens=True,
|
||||||
|
)
|
||||||
|
generation_kwargs = dict(
|
||||||
|
inputs,
|
||||||
|
streamer=streamer,
|
||||||
|
stopping_criteria=stopping_criteria,
|
||||||
|
**pipeline_kwargs,
|
||||||
|
)
|
||||||
|
t1 = Thread(target=self.pipeline.model.generate, kwargs=generation_kwargs)
|
||||||
|
t1.start()
|
||||||
|
|
||||||
|
for char in streamer:
|
||||||
|
chunk = GenerationChunk(text=char)
|
||||||
|
if run_manager:
|
||||||
|
run_manager.on_llm_new_token(chunk.text, chunk=chunk)
|
||||||
|
|
||||||
|
yield chunk
|
||||||
|
@ -0,0 +1,18 @@
|
|||||||
|
from typing import Generator
|
||||||
|
|
||||||
|
from langchain_huggingface.llms import HuggingFacePipeline
|
||||||
|
|
||||||
|
|
||||||
|
def test_huggingface_pipeline_streaming() -> None:
|
||||||
|
"""Test streaming tokens from huggingface_pipeline."""
|
||||||
|
llm = HuggingFacePipeline.from_model_id(
|
||||||
|
model_id="gpt2", task="text-generation", pipeline_kwargs={"max_new_tokens": 10}
|
||||||
|
)
|
||||||
|
generator = llm.stream("Q: How do you say 'hello' in German? A:'", stop=["."])
|
||||||
|
stream_results_string = ""
|
||||||
|
assert isinstance(generator, Generator)
|
||||||
|
|
||||||
|
for chunk in generator:
|
||||||
|
assert isinstance(chunk, str)
|
||||||
|
stream_results_string = chunk
|
||||||
|
assert len(stream_results_string.strip()) > 1
|
Loading…
Reference in New Issue
Block a user