huggingface: init package (#21097)

First Pr for the langchain_huggingface partner Package

- Moved some of the hugging face related class from `community` to the
new `partner package`

Still needed :
- Documentation
- Tests
- Support for the new apply_chat_template in `ChatHuggingFace`
- Confirm choice of class to support for embeddings witht he
sentence-transformer team.

cc : @efriis

---------

Co-authored-by: Cyril Kondratenko <kkn1993@gmail.com>
Co-authored-by: Erick Friis <erick@langchain.dev>
This commit is contained in:
Jofthomas
2024-05-13 22:53:15 +02:00
committed by GitHub
parent 9fce03e7db
commit afd85b60fc
33 changed files with 5299 additions and 133 deletions

View File

@@ -0,0 +1,17 @@
from langchain_huggingface.chat_models import ChatHuggingFace
from langchain_huggingface.embeddings import (
HuggingFaceEmbeddings,
HuggingFaceEndpointEmbeddings,
)
from langchain_huggingface.llms import (
HuggingFaceEndpoint,
HuggingFacePipeline,
)
__all__ = [
"ChatHuggingFace",
"HuggingFaceEndpointEmbeddings",
"HuggingFaceEmbeddings",
"HuggingFaceEndpoint",
"HuggingFacePipeline",
]

View File

@@ -0,0 +1,15 @@
from langchain_huggingface.chat_models.huggingface import (
TGI_MESSAGE,
TGI_RESPONSE,
ChatHuggingFace,
_convert_message_to_chat_message,
_convert_TGI_message_to_LC_message,
)
__all__ = [
"ChatHuggingFace",
"_convert_message_to_chat_message",
"_convert_TGI_message_to_LC_message",
"TGI_MESSAGE",
"TGI_RESPONSE",
]

View File

@@ -0,0 +1,350 @@
"""Hugging Face Chat Wrapper."""
from dataclasses import dataclass
from typing import (
Any,
Callable,
Dict,
List,
Literal,
Optional,
Sequence,
Type,
Union,
cast,
)
from langchain_community.llms.huggingface_hub import HuggingFaceHub
from langchain_community.llms.huggingface_text_gen_inference import (
HuggingFaceTextGenInference,
)
from langchain_core.callbacks.manager import (
AsyncCallbackManagerForLLMRun,
CallbackManagerForLLMRun,
)
from langchain_core.language_models import LanguageModelInput
from langchain_core.language_models.chat_models import BaseChatModel
from langchain_core.messages import (
AIMessage,
BaseMessage,
ChatMessage,
HumanMessage,
SystemMessage,
ToolMessage,
)
from langchain_core.outputs import ChatGeneration, ChatResult, LLMResult
from langchain_core.pydantic_v1 import BaseModel, root_validator
from langchain_core.runnables import Runnable
from langchain_core.tools import BaseTool
from langchain_core.utils.function_calling import convert_to_openai_tool
from langchain_huggingface.llms.huggingface_endpoint import HuggingFaceEndpoint
DEFAULT_SYSTEM_PROMPT = """You are a helpful, respectful, and honest assistant."""
@dataclass
class TGI_RESPONSE:
choices: List[Any]
usage: Dict
@dataclass
class TGI_MESSAGE:
role: str
content: str
tool_calls: List[Dict]
def _convert_message_to_chat_message(
message: BaseMessage,
) -> Dict:
if isinstance(message, ChatMessage):
return dict(role=message.role, content=message.content)
elif isinstance(message, HumanMessage):
return dict(role="user", content=message.content)
elif isinstance(message, AIMessage):
if "tool_calls" in message.additional_kwargs:
tool_calls = [
{
"function": {
"name": tc["function"]["name"],
"arguments": tc["function"]["arguments"],
}
}
for tc in message.additional_kwargs["tool_calls"]
]
else:
tool_calls = None
return {
"role": "assistant",
"content": message.content,
"tool_calls": tool_calls,
}
elif isinstance(message, SystemMessage):
return dict(role="system", content=message.content)
elif isinstance(message, ToolMessage):
return {
"role": "tool",
"content": message.content,
"name": message.name,
}
else:
raise ValueError(f"Got unknown type {message}")
def _convert_TGI_message_to_LC_message(
_message: TGI_MESSAGE,
) -> BaseMessage:
role = _message.role
assert role == "assistant", f"Expected role to be 'assistant', got {role}"
content = cast(str, _message.content)
if content is None:
content = ""
additional_kwargs: Dict = {}
if tool_calls := _message.tool_calls:
if "arguments" in tool_calls[0]["function"]:
functions_string = str(tool_calls[0]["function"].pop("arguments"))
corrected_functions = functions_string.replace("'", '"')
tool_calls[0]["function"]["arguments"] = corrected_functions
additional_kwargs["tool_calls"] = tool_calls
return AIMessage(content=content, additional_kwargs=additional_kwargs)
class ChatHuggingFace(BaseChatModel):
"""
Wrapper for using Hugging Face LLM's as ChatModels.
Works with `HuggingFaceTextGenInference`, `HuggingFaceEndpoint`,
and `HuggingFaceHub` LLMs.
Upon instantiating this class, the model_id is resolved from the url
provided to the LLM, and the appropriate tokenizer is loaded from
the HuggingFace Hub.
Adapted from: https://python.langchain.com/docs/integrations/chat/llama2_chat
"""
llm: Any
"""LLM, must be of type HuggingFaceTextGenInference, HuggingFaceEndpoint, or
HuggingFaceHub."""
system_message: SystemMessage = SystemMessage(content=DEFAULT_SYSTEM_PROMPT)
tokenizer: Any = None
model_id: Optional[str] = None
def __init__(self, **kwargs: Any):
super().__init__(**kwargs)
from transformers import AutoTokenizer # type: ignore[import]
self._resolve_model_id()
self.tokenizer = (
AutoTokenizer.from_pretrained(self.model_id)
if self.tokenizer is None
else self.tokenizer
)
@root_validator()
def validate_llm(cls, values: dict) -> dict:
if not isinstance(
values["llm"],
(HuggingFaceHub, HuggingFaceTextGenInference, HuggingFaceEndpoint),
):
raise TypeError(
"Expected llm to be one of HuggingFaceTextGenInference, "
f"HuggingFaceEndpoint, HuggingFaceHub, received {type(values['llm'])}"
)
return values
def _create_chat_result(self, response: TGI_RESPONSE) -> ChatResult:
generations = []
finish_reason = response.choices[0].finish_reason
gen = ChatGeneration(
message=_convert_TGI_message_to_LC_message(response.choices[0].message),
generation_info={"finish_reason": finish_reason},
)
generations.append(gen)
token_usage = response.usage
model_object = self.llm.inference_server_url
llm_output = {"token_usage": token_usage, "model": model_object}
return ChatResult(generations=generations, llm_output=llm_output)
def _generate(
self,
messages: List[BaseMessage],
stop: Optional[List[str]] = None,
run_manager: Optional[CallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> ChatResult:
if isinstance(self.llm, HuggingFaceTextGenInference):
message_dicts = self._create_message_dicts(messages, stop)
answer = self.llm.client.chat(messages=message_dicts, **kwargs)
return self._create_chat_result(answer)
elif isinstance(self.llm, HuggingFaceEndpoint):
message_dicts = self._create_message_dicts(messages, stop)
answer = self.llm.client.chat_completion(messages=message_dicts, **kwargs)
return self._create_chat_result(answer)
else:
llm_input = self._to_chat_prompt(messages)
llm_result = self.llm._generate(
prompts=[llm_input], stop=stop, run_manager=run_manager, **kwargs
)
return self._to_chat_result(llm_result)
async def _agenerate(
self,
messages: List[BaseMessage],
stop: Optional[List[str]] = None,
run_manager: Optional[AsyncCallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> ChatResult:
if isinstance(self.llm, HuggingFaceTextGenInference):
message_dicts = self._create_message_dicts(messages, stop)
answer = await self.llm.async_client.chat(messages=message_dicts, **kwargs)
return self._create_chat_result(answer)
else:
llm_input = self._to_chat_prompt(messages)
llm_result = await self.llm._agenerate(
prompts=[llm_input], stop=stop, run_manager=run_manager, **kwargs
)
return self._to_chat_result(llm_result)
def _to_chat_prompt(
self,
messages: List[BaseMessage],
) -> str:
"""Convert a list of messages into a prompt format expected by wrapped LLM."""
if not messages:
raise ValueError("At least one HumanMessage must be provided!")
if not isinstance(messages[-1], HumanMessage):
raise ValueError("Last message must be a HumanMessage!")
messages_dicts = [self._to_chatml_format(m) for m in messages]
return self.tokenizer.apply_chat_template(
messages_dicts, tokenize=False, add_generation_prompt=True
)
def _to_chatml_format(self, message: BaseMessage) -> dict:
"""Convert LangChain message to ChatML format."""
if isinstance(message, SystemMessage):
role = "system"
elif isinstance(message, AIMessage):
role = "assistant"
elif isinstance(message, HumanMessage):
role = "user"
else:
raise ValueError(f"Unknown message type: {type(message)}")
return {"role": role, "content": message.content}
@staticmethod
def _to_chat_result(llm_result: LLMResult) -> ChatResult:
chat_generations = []
for g in llm_result.generations[0]:
chat_generation = ChatGeneration(
message=AIMessage(content=g.text), generation_info=g.generation_info
)
chat_generations.append(chat_generation)
return ChatResult(
generations=chat_generations, llm_output=llm_result.llm_output
)
def _resolve_model_id(self) -> None:
"""Resolve the model_id from the LLM's inference_server_url"""
from huggingface_hub import list_inference_endpoints # type: ignore[import]
available_endpoints = list_inference_endpoints("*")
if isinstance(self.llm, HuggingFaceHub) or (
hasattr(self.llm, "repo_id") and self.llm.repo_id
):
self.model_id = self.llm.repo_id
return
elif isinstance(self.llm, HuggingFaceTextGenInference):
endpoint_url: Optional[str] = self.llm.inference_server_url
else:
endpoint_url = self.llm.endpoint_url
for endpoint in available_endpoints:
if endpoint.url == endpoint_url:
self.model_id = endpoint.repository
if not self.model_id:
raise ValueError(
"Failed to resolve model_id:"
f"Could not find model id for inference server: {endpoint_url}"
"Make sure that your Hugging Face token has access to the endpoint."
)
def bind_tools(
self,
tools: Sequence[Union[Dict[str, Any], Type[BaseModel], Callable, BaseTool]],
*,
tool_choice: Optional[Union[dict, str, Literal["auto", "none"], bool]] = None,
**kwargs: Any,
) -> Runnable[LanguageModelInput, BaseMessage]:
"""Bind tool-like objects to this chat model.
Assumes model is compatible with OpenAI tool-calling API.
Args:
tools: A list of tool definitions to bind to this chat model.
Can be a dictionary, pydantic model, callable, or BaseTool. Pydantic
models, callables, and BaseTools will be automatically converted to
their schema dictionary representation.
tool_choice: Which tool to require the model to call.
Must be the name of the single provided function or
"auto" to automatically determine which function to call
(if any), or a dict of the form:
{"type": "function", "function": {"name": <<tool_name>>}}.
**kwargs: Any additional parameters to pass to the
:class:`~langchain.runnable.Runnable` constructor.
"""
formatted_tools = [convert_to_openai_tool(tool) for tool in tools]
if tool_choice is not None and tool_choice:
if len(formatted_tools) != 1:
raise ValueError(
"When specifying `tool_choice`, you must provide exactly one "
f"tool. Received {len(formatted_tools)} tools."
)
if isinstance(tool_choice, str):
if tool_choice not in ("auto", "none"):
tool_choice = {
"type": "function",
"function": {"name": tool_choice},
}
elif isinstance(tool_choice, bool):
tool_choice = formatted_tools[0]
elif isinstance(tool_choice, dict):
if (
formatted_tools[0]["function"]["name"]
!= tool_choice["function"]["name"]
):
raise ValueError(
f"Tool choice {tool_choice} was specified, but the only "
f"provided tool was {formatted_tools[0]['function']['name']}."
)
else:
raise ValueError(
f"Unrecognized tool_choice type. Expected str, bool or dict. "
f"Received: {tool_choice}"
)
kwargs["tool_choice"] = tool_choice
return super().bind(tools=formatted_tools, **kwargs)
def _create_message_dicts(
self, messages: List[BaseMessage], stop: Optional[List[str]]
) -> List[Dict[Any, Any]]:
message_dicts = [_convert_message_to_chat_message(m) for m in messages]
return message_dicts
@property
def _llm_type(self) -> str:
return "huggingface-chat-wrapper"

View File

@@ -0,0 +1,9 @@
from langchain_huggingface.embeddings.huggingface import HuggingFaceEmbeddings
from langchain_huggingface.embeddings.huggingface_endpoint import (
HuggingFaceEndpointEmbeddings,
)
__all__ = [
"HuggingFaceEmbeddings",
"HuggingFaceEndpointEmbeddings",
]

View File

@@ -0,0 +1,102 @@
from typing import Any, Dict, List, Optional
from langchain_core.embeddings import Embeddings
from langchain_core.pydantic_v1 import BaseModel, Extra, Field
DEFAULT_MODEL_NAME = "sentence-transformers/all-mpnet-base-v2"
class HuggingFaceEmbeddings(BaseModel, Embeddings):
"""HuggingFace sentence_transformers embedding models.
To use, you should have the ``sentence_transformers`` python package installed.
Example:
.. code-block:: python
from langchain_community.embeddings import HuggingFaceEmbeddings
model_name = "sentence-transformers/all-mpnet-base-v2"
model_kwargs = {'device': 'cpu'}
encode_kwargs = {'normalize_embeddings': False}
hf = HuggingFaceEmbeddings(
model_name=model_name,
model_kwargs=model_kwargs,
encode_kwargs=encode_kwargs
)
"""
client: Any #: :meta private:
model_name: str = DEFAULT_MODEL_NAME
"""Model name to use."""
cache_folder: Optional[str] = None
"""Path to store models.
Can be also set by SENTENCE_TRANSFORMERS_HOME environment variable."""
model_kwargs: Dict[str, Any] = Field(default_factory=dict)
"""Keyword arguments to pass to the Sentence Transformer model, such as `device`,
`prompts`, `default_prompt_name`, `revision`, `trust_remote_code`, or `token`.
See also the Sentence Transformer documentation: https://sbert.net/docs/package_reference/SentenceTransformer.html#sentence_transformers.SentenceTransformer"""
encode_kwargs: Dict[str, Any] = Field(default_factory=dict)
"""Keyword arguments to pass when calling the `encode` method of the Sentence
Transformer model, such as `prompt_name`, `prompt`, `batch_size`, `precision`,
`normalize_embeddings`, and more.
See also the Sentence Transformer documentation: https://sbert.net/docs/package_reference/SentenceTransformer.html#sentence_transformers.SentenceTransformer.encode"""
multi_process: bool = False
"""Run encode() on multiple GPUs."""
show_progress: bool = False
"""Whether to show a progress bar."""
def __init__(self, **kwargs: Any):
"""Initialize the sentence_transformer."""
super().__init__(**kwargs)
try:
import sentence_transformers # type: ignore[import]
except ImportError as exc:
raise ImportError(
"Could not import sentence_transformers python package. "
"Please install it with `pip install sentence-transformers`."
) from exc
self.client = sentence_transformers.SentenceTransformer(
self.model_name, cache_folder=self.cache_folder, **self.model_kwargs
)
class Config:
"""Configuration for this pydantic object."""
extra = Extra.forbid
def embed_documents(self, texts: List[str]) -> List[List[float]]:
"""Compute doc embeddings using a HuggingFace transformer model.
Args:
texts: The list of texts to embed.
Returns:
List of embeddings, one for each text.
"""
import sentence_transformers # type: ignore[import]
texts = list(map(lambda x: x.replace("\n", " "), texts))
if self.multi_process:
pool = self.client.start_multi_process_pool()
embeddings = self.client.encode_multi_process(texts, pool)
sentence_transformers.SentenceTransformer.stop_multi_process_pool(pool)
else:
embeddings = self.client.encode(
texts, show_progress_bar=self.show_progress, **self.encode_kwargs
)
return embeddings.tolist()
def embed_query(self, text: str) -> List[float]:
"""Compute query embeddings using a HuggingFace transformer model.
Args:
text: The text to embed.
Returns:
Embeddings for the text.
"""
return self.embed_documents([text])[0]

View File

@@ -0,0 +1,151 @@
import json
import os
from typing import Any, Dict, List, Optional
from langchain_core.embeddings import Embeddings
from langchain_core.pydantic_v1 import BaseModel, Extra, root_validator
DEFAULT_MODEL = "sentence-transformers/all-mpnet-base-v2"
VALID_TASKS = ("feature-extraction",)
class HuggingFaceEndpointEmbeddings(BaseModel, Embeddings):
"""HuggingFaceHub embedding models.
To use, you should have the ``huggingface_hub`` python package installed, and the
environment variable ``HUGGINGFACEHUB_API_TOKEN`` set with your API token, or pass
it as a named parameter to the constructor.
Example:
.. code-block:: python
from langchain_community.embeddings import HuggingFaceEndpointEmbeddings
model = "sentence-transformers/all-mpnet-base-v2"
hf = HuggingFaceEndpointEmbeddings(
model=model,
task="feature-extraction",
huggingfacehub_api_token="my-api-key",
)
"""
client: Any #: :meta private:
async_client: Any #: :meta private:
model: Optional[str] = None
"""Model name to use."""
repo_id: Optional[str] = None
"""Huggingfacehub repository id, for backward compatibility."""
task: Optional[str] = "feature-extraction"
"""Task to call the model with."""
model_kwargs: Optional[dict] = None
"""Keyword arguments to pass to the model."""
huggingfacehub_api_token: Optional[str] = None
class Config:
"""Configuration for this pydantic object."""
extra = Extra.forbid
@root_validator()
def validate_environment(cls, values: Dict) -> Dict:
"""Validate that api key and python package exists in environment."""
huggingfacehub_api_token = values["huggingfacehub_api_token"] or os.getenv(
"HUGGINGFACEHUB_API_TOKEN"
)
try:
from huggingface_hub import ( # type: ignore[import]
AsyncInferenceClient,
InferenceClient,
)
if values["model"]:
values["repo_id"] = values["model"]
elif values["repo_id"]:
values["model"] = values["repo_id"]
else:
values["model"] = DEFAULT_MODEL
values["repo_id"] = DEFAULT_MODEL
client = InferenceClient(
model=values["model"],
token=huggingfacehub_api_token,
)
async_client = AsyncInferenceClient(
model=values["model"],
token=huggingfacehub_api_token,
)
if values["task"] not in VALID_TASKS:
raise ValueError(
f"Got invalid task {values['task']}, "
f"currently only {VALID_TASKS} are supported"
)
values["client"] = client
values["async_client"] = async_client
except ImportError:
raise ImportError(
"Could not import huggingface_hub python package. "
"Please install it with `pip install huggingface_hub`."
)
return values
def embed_documents(self, texts: List[str]) -> List[List[float]]:
"""Call out to HuggingFaceHub's embedding endpoint for embedding search docs.
Args:
texts: The list of texts to embed.
Returns:
List of embeddings, one for each text.
"""
# replace newlines, which can negatively affect performance.
texts = [text.replace("\n", " ") for text in texts]
_model_kwargs = self.model_kwargs or {}
responses = self.client.post(
json={"inputs": texts, "parameters": _model_kwargs}, task=self.task
)
return json.loads(responses.decode())
async def aembed_documents(self, texts: List[str]) -> List[List[float]]:
"""Async Call to HuggingFaceHub's embedding endpoint for embedding search docs.
Args:
texts: The list of texts to embed.
Returns:
List of embeddings, one for each text.
"""
# replace newlines, which can negatively affect performance.
texts = [text.replace("\n", " ") for text in texts]
_model_kwargs = self.model_kwargs or {}
responses = await self.async_client.post(
json={"inputs": texts, "parameters": _model_kwargs}, task=self.task
)
return json.loads(responses.decode())
def embed_query(self, text: str) -> List[float]:
"""Call out to HuggingFaceHub's embedding endpoint for embedding query text.
Args:
text: The text to embed.
Returns:
Embeddings for the text.
"""
response = self.embed_documents([text])[0]
return response
async def aembed_query(self, text: str) -> List[float]:
"""Async Call to HuggingFaceHub's embedding endpoint for embedding query text.
Args:
text: The text to embed.
Returns:
Embeddings for the text.
"""
response = (await self.aembed_documents([text]))[0]
return response

View File

@@ -0,0 +1,7 @@
from langchain_huggingface.llms.huggingface_endpoint import HuggingFaceEndpoint
from langchain_huggingface.llms.huggingface_pipeline import HuggingFacePipeline
__all__ = [
"HuggingFaceEndpoint",
"HuggingFacePipeline",
]

View File

@@ -0,0 +1,372 @@
import json
import logging
from typing import Any, AsyncIterator, Dict, Iterator, List, Mapping, Optional
from langchain_core.callbacks import (
AsyncCallbackManagerForLLMRun,
CallbackManagerForLLMRun,
)
from langchain_core.language_models.llms import LLM
from langchain_core.outputs import GenerationChunk
from langchain_core.pydantic_v1 import Extra, Field, root_validator
from langchain_core.utils import get_from_dict_or_env, get_pydantic_field_names
logger = logging.getLogger(__name__)
VALID_TASKS = (
"text2text-generation",
"text-generation",
"summarization",
"conversational",
)
class HuggingFaceEndpoint(LLM):
"""
HuggingFace Endpoint.
To use this class, you should have installed the ``huggingface_hub`` package, and
the environment variable ``HUGGINGFACEHUB_API_TOKEN`` set with your API token,
or given as a named parameter to the constructor.
Example:
.. code-block:: python
# Basic Example (no streaming)
llm = HuggingFaceEndpoint(
endpoint_url="http://localhost:8010/",
max_new_tokens=512,
top_k=10,
top_p=0.95,
typical_p=0.95,
temperature=0.01,
repetition_penalty=1.03,
huggingfacehub_api_token="my-api-key"
)
print(llm.invoke("What is Deep Learning?"))
# Streaming response example
from langchain_core.callbacks.streaming_stdout import StreamingStdOutCallbackHandler
callbacks = [StreamingStdOutCallbackHandler()]
llm = HuggingFaceEndpoint(
endpoint_url="http://localhost:8010/",
max_new_tokens=512,
top_k=10,
top_p=0.95,
typical_p=0.95,
temperature=0.01,
repetition_penalty=1.03,
callbacks=callbacks,
streaming=True,
huggingfacehub_api_token="my-api-key"
)
print(llm.invoke("What is Deep Learning?"))
""" # noqa: E501
endpoint_url: Optional[str] = None
"""Endpoint URL to use."""
repo_id: Optional[str] = None
"""Repo to use."""
huggingfacehub_api_token: Optional[str] = None
max_new_tokens: int = 512
"""Maximum number of generated tokens"""
top_k: Optional[int] = None
"""The number of highest probability vocabulary tokens to keep for
top-k-filtering."""
top_p: Optional[float] = 0.95
"""If set to < 1, only the smallest set of most probable tokens with probabilities
that add up to `top_p` or higher are kept for generation."""
typical_p: Optional[float] = 0.95
"""Typical Decoding mass. See [Typical Decoding for Natural Language
Generation](https://arxiv.org/abs/2202.00666) for more information."""
temperature: Optional[float] = 0.8
"""The value used to module the logits distribution."""
repetition_penalty: Optional[float] = None
"""The parameter for repetition penalty. 1.0 means no penalty.
See [this paper](https://arxiv.org/pdf/1909.05858.pdf) for more details."""
return_full_text: bool = False
"""Whether to prepend the prompt to the generated text"""
truncate: Optional[int] = None
"""Truncate inputs tokens to the given size"""
stop_sequences: List[str] = Field(default_factory=list)
"""Stop generating tokens if a member of `stop_sequences` is generated"""
seed: Optional[int] = None
"""Random sampling seed"""
inference_server_url: str = ""
"""text-generation-inference instance base url"""
timeout: int = 120
"""Timeout in seconds"""
streaming: bool = False
"""Whether to generate a stream of tokens asynchronously"""
do_sample: bool = False
"""Activate logits sampling"""
watermark: bool = False
"""Watermarking with [A Watermark for Large Language Models]
(https://arxiv.org/abs/2301.10226)"""
server_kwargs: Dict[str, Any] = Field(default_factory=dict)
"""Holds any text-generation-inference server parameters not explicitly specified"""
model_kwargs: Dict[str, Any] = Field(default_factory=dict)
"""Holds any model parameters valid for `call` not explicitly specified"""
model: str
client: Any
async_client: Any
task: Optional[str] = None
"""Task to call the model with.
Should be a task that returns `generated_text` or `summary_text`."""
class Config:
"""Configuration for this pydantic object."""
extra = Extra.forbid
@root_validator(pre=True)
def build_extra(cls, values: Dict[str, Any]) -> Dict[str, Any]:
"""Build extra kwargs from additional params that were passed in."""
all_required_field_names = get_pydantic_field_names(cls)
extra = values.get("model_kwargs", {})
for field_name in list(values):
if field_name in extra:
raise ValueError(f"Found {field_name} supplied twice.")
if field_name not in all_required_field_names:
logger.warning(
f"""WARNING! {field_name} is not default parameter.
{field_name} was transferred to model_kwargs.
Please make sure that {field_name} is what you intended."""
)
extra[field_name] = values.pop(field_name)
invalid_model_kwargs = all_required_field_names.intersection(extra.keys())
if invalid_model_kwargs:
raise ValueError(
f"Parameters {invalid_model_kwargs} should be specified explicitly. "
f"Instead they were passed in as part of `model_kwargs` parameter."
)
values["model_kwargs"] = extra
if "endpoint_url" not in values and "repo_id" not in values:
raise ValueError(
"Please specify an `endpoint_url` or `repo_id` for the model."
)
if "endpoint_url" in values and "repo_id" in values:
raise ValueError(
"Please specify either an `endpoint_url` OR a `repo_id`, not both."
)
values["model"] = values.get("endpoint_url") or values.get("repo_id")
return values
@root_validator()
def validate_environment(cls, values: Dict) -> Dict:
"""Validate that package is installed and that the API token is valid."""
try:
from huggingface_hub import login # type: ignore[import]
except ImportError:
raise ImportError(
"Could not import huggingface_hub python package. "
"Please install it with `pip install huggingface_hub`."
)
try:
huggingfacehub_api_token = get_from_dict_or_env(
values, "huggingfacehub_api_token", "HUGGINGFACEHUB_API_TOKEN"
)
login(token=huggingfacehub_api_token)
except Exception as e:
raise ValueError(
"Could not authenticate with huggingface_hub. "
"Please check your API token."
) from e
from huggingface_hub import AsyncInferenceClient, InferenceClient
values["client"] = InferenceClient(
model=values["model"],
timeout=values["timeout"],
token=huggingfacehub_api_token,
**values["server_kwargs"],
)
values["async_client"] = AsyncInferenceClient(
model=values["model"],
timeout=values["timeout"],
token=huggingfacehub_api_token,
**values["server_kwargs"],
)
return values
@property
def _default_params(self) -> Dict[str, Any]:
"""Get the default parameters for calling text generation inference API."""
return {
"max_new_tokens": self.max_new_tokens,
"top_k": self.top_k,
"top_p": self.top_p,
"typical_p": self.typical_p,
"temperature": self.temperature,
"repetition_penalty": self.repetition_penalty,
"return_full_text": self.return_full_text,
"truncate": self.truncate,
"stop_sequences": self.stop_sequences,
"seed": self.seed,
"do_sample": self.do_sample,
"watermark": self.watermark,
**self.model_kwargs,
}
@property
def _identifying_params(self) -> Mapping[str, Any]:
"""Get the identifying parameters."""
_model_kwargs = self.model_kwargs or {}
return {
**{"endpoint_url": self.endpoint_url, "task": self.task},
**{"model_kwargs": _model_kwargs},
}
@property
def _llm_type(self) -> str:
"""Return type of llm."""
return "huggingface_endpoint"
def _invocation_params(
self, runtime_stop: Optional[List[str]], **kwargs: Any
) -> Dict[str, Any]:
params = {**self._default_params, **kwargs}
params["stop_sequences"] = params["stop_sequences"] + (runtime_stop or [])
return params
def _call(
self,
prompt: str,
stop: Optional[List[str]] = None,
run_manager: Optional[CallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> str:
"""Call out to HuggingFace Hub's inference endpoint."""
invocation_params = self._invocation_params(stop, **kwargs)
if self.streaming:
completion = ""
for chunk in self._stream(prompt, stop, run_manager, **invocation_params):
completion += chunk.text
return completion
else:
invocation_params["stop"] = invocation_params[
"stop_sequences"
] # porting 'stop_sequences' into the 'stop' argument
response = self.client.post(
json={"inputs": prompt, "parameters": invocation_params},
stream=False,
task=self.task,
)
response_text = json.loads(response.decode())[0]["generated_text"]
# Maybe the generation has stopped at one of the stop sequences:
# then we remove this stop sequence from the end of the generated text
for stop_seq in invocation_params["stop_sequences"]:
if response_text[-len(stop_seq) :] == stop_seq:
response_text = response_text[: -len(stop_seq)]
return response_text
async def _acall(
self,
prompt: str,
stop: Optional[List[str]] = None,
run_manager: Optional[AsyncCallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> str:
invocation_params = self._invocation_params(stop, **kwargs)
if self.streaming:
completion = ""
async for chunk in self._astream(
prompt, stop, run_manager, **invocation_params
):
completion += chunk.text
return completion
else:
invocation_params["stop"] = invocation_params["stop_sequences"]
response = await self.async_client.post(
json={"inputs": prompt, "parameters": invocation_params},
stream=False,
task=self.task,
)
response_text = json.loads(response.decode())[0]["generated_text"]
# Maybe the generation has stopped at one of the stop sequences:
# then remove this stop sequence from the end of the generated text
for stop_seq in invocation_params["stop_sequences"]:
if response_text[-len(stop_seq) :] == stop_seq:
response_text = response_text[: -len(stop_seq)]
return response_text
def _stream(
self,
prompt: str,
stop: Optional[List[str]] = None,
run_manager: Optional[CallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> Iterator[GenerationChunk]:
invocation_params = self._invocation_params(stop, **kwargs)
for response in self.client.text_generation(
prompt, **invocation_params, stream=True
):
# identify stop sequence in generated text, if any
stop_seq_found: Optional[str] = None
for stop_seq in invocation_params["stop_sequences"]:
if stop_seq in response:
stop_seq_found = stop_seq
# identify text to yield
text: Optional[str] = None
if stop_seq_found:
text = response[: response.index(stop_seq_found)]
else:
text = response
# yield text, if any
if text:
chunk = GenerationChunk(text=text)
if run_manager:
run_manager.on_llm_new_token(chunk.text)
yield chunk
# break if stop sequence found
if stop_seq_found:
break
async def _astream(
self,
prompt: str,
stop: Optional[List[str]] = None,
run_manager: Optional[AsyncCallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> AsyncIterator[GenerationChunk]:
invocation_params = self._invocation_params(stop, **kwargs)
async for response in await self.async_client.text_generation(
prompt, **invocation_params, stream=True
):
# identify stop sequence in generated text, if any
stop_seq_found: Optional[str] = None
for stop_seq in invocation_params["stop_sequences"]:
if stop_seq in response:
stop_seq_found = stop_seq
# identify text to yield
text: Optional[str] = None
if stop_seq_found:
text = response[: response.index(stop_seq_found)]
else:
text = response
# yield text, if any
if text:
chunk = GenerationChunk(text=text)
if run_manager:
await run_manager.on_llm_new_token(chunk.text)
yield chunk
# break if stop sequence found
if stop_seq_found:
break

View File

@@ -0,0 +1,299 @@
from __future__ import annotations
import importlib.util
import logging
from typing import Any, List, Mapping, Optional
from langchain_core.callbacks import CallbackManagerForLLMRun
from langchain_core.language_models.llms import BaseLLM
from langchain_core.outputs import Generation, LLMResult
from langchain_core.pydantic_v1 import Extra
DEFAULT_MODEL_ID = "gpt2"
DEFAULT_TASK = "text-generation"
VALID_TASKS = (
"text2text-generation",
"text-generation",
"summarization",
"translation",
)
DEFAULT_BATCH_SIZE = 4
logger = logging.getLogger(__name__)
class HuggingFacePipeline(BaseLLM):
"""HuggingFace Pipeline API.
To use, you should have the ``transformers`` python package installed.
Only supports `text-generation`, `text2text-generation`, `summarization` and
`translation` for now.
Example using from_model_id:
.. code-block:: python
from langchain_community.llms import HuggingFacePipeline
hf = HuggingFacePipeline.from_model_id(
model_id="gpt2",
task="text-generation",
pipeline_kwargs={"max_new_tokens": 10},
)
Example passing pipeline in directly:
.. code-block:: python
from langchain_community.llms import HuggingFacePipeline
from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline
model_id = "gpt2"
tokenizer = AutoTokenizer.from_pretrained(model_id)
model = AutoModelForCausalLM.from_pretrained(model_id)
pipe = pipeline(
"text-generation", model=model, tokenizer=tokenizer, max_new_tokens=10
)
hf = HuggingFacePipeline(pipeline=pipe)
"""
pipeline: Any #: :meta private:
model_id: str = DEFAULT_MODEL_ID
"""Model name to use."""
model_kwargs: Optional[dict] = None
"""Keyword arguments passed to the model."""
pipeline_kwargs: Optional[dict] = None
"""Keyword arguments passed to the pipeline."""
batch_size: int = DEFAULT_BATCH_SIZE
"""Batch size to use when passing multiple documents to generate."""
class Config:
"""Configuration for this pydantic object."""
extra = Extra.forbid
@classmethod
def from_model_id(
cls,
model_id: str,
task: str,
backend: str = "default",
device: Optional[int] = -1,
device_map: Optional[str] = None,
model_kwargs: Optional[dict] = None,
pipeline_kwargs: Optional[dict] = None,
batch_size: int = DEFAULT_BATCH_SIZE,
**kwargs: Any,
) -> HuggingFacePipeline:
"""Construct the pipeline object from model_id and task."""
try:
from transformers import ( # type: ignore[import]
AutoModelForCausalLM,
AutoModelForSeq2SeqLM,
AutoTokenizer,
)
from transformers import pipeline as hf_pipeline # type: ignore[import]
except ImportError:
raise ValueError(
"Could not import transformers python package. "
"Please install it with `pip install transformers`."
)
_model_kwargs = model_kwargs or {}
tokenizer = AutoTokenizer.from_pretrained(model_id, **_model_kwargs)
try:
if task == "text-generation":
if backend == "openvino":
try:
from optimum.intel.openvino import ( # type: ignore[import]
OVModelForCausalLM,
)
except ImportError:
raise ValueError(
"Could not import optimum-intel python package. "
"Please install it with: "
"pip install 'optimum[openvino,nncf]' "
)
try:
# use local model
model = OVModelForCausalLM.from_pretrained(
model_id, **_model_kwargs
)
except Exception:
# use remote model
model = OVModelForCausalLM.from_pretrained(
model_id, export=True, **_model_kwargs
)
else:
model = AutoModelForCausalLM.from_pretrained(
model_id, **_model_kwargs
)
elif task in ("text2text-generation", "summarization", "translation"):
if backend == "openvino":
try:
from optimum.intel.openvino import OVModelForSeq2SeqLM
except ImportError:
raise ValueError(
"Could not import optimum-intel python package. "
"Please install it with: "
"pip install 'optimum[openvino,nncf]' "
)
try:
# use local model
model = OVModelForSeq2SeqLM.from_pretrained(
model_id, **_model_kwargs
)
except Exception:
# use remote model
model = OVModelForSeq2SeqLM.from_pretrained(
model_id, export=True, **_model_kwargs
)
else:
model = AutoModelForSeq2SeqLM.from_pretrained(
model_id, **_model_kwargs
)
else:
raise ValueError(
f"Got invalid task {task}, "
f"currently only {VALID_TASKS} are supported"
)
except ImportError as e:
raise ValueError(
f"Could not load the {task} model due to missing dependencies."
) from e
if tokenizer.pad_token is None:
tokenizer.pad_token_id = model.config.eos_token_id
if (
(
getattr(model, "is_loaded_in_4bit", False)
or getattr(model, "is_loaded_in_8bit", False)
)
and device is not None
and backend == "default"
):
logger.warning(
f"Setting the `device` argument to None from {device} to avoid "
"the error caused by attempting to move the model that was already "
"loaded on the GPU using the Accelerate module to the same or "
"another device."
)
device = None
if (
device is not None
and importlib.util.find_spec("torch") is not None
and backend == "default"
):
import torch
cuda_device_count = torch.cuda.device_count()
if device < -1 or (device >= cuda_device_count):
raise ValueError(
f"Got device=={device}, "
f"device is required to be within [-1, {cuda_device_count})"
)
if device_map is not None and device < 0:
device = None
if device is not None and device < 0 and cuda_device_count > 0:
logger.warning(
"Device has %d GPUs available. "
"Provide device={deviceId} to `from_model_id` to use available"
"GPUs for execution. deviceId is -1 (default) for CPU and "
"can be a positive integer associated with CUDA device id.",
cuda_device_count,
)
if device is not None and device_map is not None and backend == "openvino":
logger.warning("Please set device for OpenVINO through: " "'model_kwargs'")
if "trust_remote_code" in _model_kwargs:
_model_kwargs = {
k: v for k, v in _model_kwargs.items() if k != "trust_remote_code"
}
_pipeline_kwargs = pipeline_kwargs or {}
pipeline = hf_pipeline(
task=task,
model=model,
tokenizer=tokenizer,
device=device,
device_map=device_map,
batch_size=batch_size,
model_kwargs=_model_kwargs,
**_pipeline_kwargs,
)
if pipeline.task not in VALID_TASKS:
raise ValueError(
f"Got invalid task {pipeline.task}, "
f"currently only {VALID_TASKS} are supported"
)
return cls(
pipeline=pipeline,
model_id=model_id,
model_kwargs=_model_kwargs,
pipeline_kwargs=_pipeline_kwargs,
batch_size=batch_size,
**kwargs,
)
@property
def _identifying_params(self) -> Mapping[str, Any]:
"""Get the identifying parameters."""
return {
"model_id": self.model_id,
"model_kwargs": self.model_kwargs,
"pipeline_kwargs": self.pipeline_kwargs,
}
@property
def _llm_type(self) -> str:
return "huggingface_pipeline"
def _generate(
self,
prompts: List[str],
stop: Optional[List[str]] = None,
run_manager: Optional[CallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> LLMResult:
# List to hold all results
text_generations: List[str] = []
pipeline_kwargs = kwargs.get("pipeline_kwargs", {})
for i in range(0, len(prompts), self.batch_size):
batch_prompts = prompts[i : i + self.batch_size]
# Process batch of prompts
responses = self.pipeline(
batch_prompts,
**pipeline_kwargs,
)
# Process each response in the batch
for j, response in enumerate(responses):
if isinstance(response, list):
# if model returns multiple generations, pick the top one
response = response[0]
if self.pipeline.task == "text-generation":
text = response["generated_text"]
elif self.pipeline.task == "text2text-generation":
text = response["generated_text"]
elif self.pipeline.task == "summarization":
text = response["summary_text"]
elif self.pipeline.task in "translation":
text = response["translation_text"]
else:
raise ValueError(
f"Got invalid task {self.pipeline.task}, "
f"currently only {VALID_TASKS} are supported"
)
# Append the processed text to results
text_generations.append(text)
return LLMResult(
generations=[[Generation(text=text)] for text in text_generations]
)