community[minor]: Unify Titan Takeoff Integrations and Adding Embedding Support (#18775)

**Community: Unify Titan Takeoff Integrations and Adding Embedding
Support**

 **Description:** 
Titan Takeoff no longer reflects this either of the integrations in the
community folder. The two integrations (TitanTakeoffPro and
TitanTakeoff) where causing confusion with clients, so have moved code
into one place and created an alias for backwards compatibility. Added
Takeoff Client python package to do the bulk of the work with the
requests, this is because this package is actively updated with new
versions of Takeoff. So this integration will be far more robust and
will not degrade as badly over time.

**Issue:**
Fixes bugs in the old Titan integrations and unified the code with added
unit test converge to avoid future problems.

**Dependencies:**
Added optional dependency takeoff-client, all imports still work without
dependency including the Titan Takeoff classes but just will fail on
initialisation if not pip installed takeoff-client

**Twitter**
@MeryemArik9

Thanks all :)

---------

Co-authored-by: Bagatur <baskaryan@gmail.com>
Co-authored-by: Bagatur <22008038+baskaryan@users.noreply.github.com>
This commit is contained in:
pjb157 2024-04-17 02:43:35 +01:00 committed by GitHub
parent 2cbfc94bcb
commit 479be3cc91
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 978 additions and 551 deletions

View File

@ -1,75 +1,15 @@
{
"cells": [
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"# Titan Takeoff\n",
"\n",
">`TitanML` helps businesses build and deploy better, smaller, cheaper, and faster NLP models through our training, compression, and inference optimization platform. \n",
"`TitanML` helps businesses build and deploy better, smaller, cheaper, and faster NLP models through our training, compression, and inference optimization platform.\n",
"\n",
">Our inference server, [Titan Takeoff](https://docs.titanml.co/docs/titan-takeoff/getting-started) enables deployment of LLMs locally on your hardware in a single command. Most generative model architectures are supported, such as Falcon, Llama 2, GPT2, T5 and many more."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Installation\n",
"\n",
"To get started with Iris Takeoff, all you need is to have docker and python installed on your local system. If you wish to use the server with gpu support, then you will need to install docker with cuda support.\n",
"\n",
"For Mac and Windows users, make sure you have the docker daemon running! You can check this by running docker ps in your terminal. To start the daemon, open the docker desktop app.\n",
"\n",
"Run the following command to install the Iris CLI that will enable you to run the takeoff server:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"vscode": {
"languageId": "shellscript"
}
},
"outputs": [],
"source": [
"%pip install --upgrade --quiet titan-iris"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Choose a Model\n",
"Takeoff supports many of the most powerful generative text models, such as Falcon, MPT, and Llama. See the [supported models](https://docs.titanml.co/docs/titan-takeoff/supported-models) for more information. For information about using your own models, see the [custom models](https://docs.titanml.co/docs/titan-takeoff/Advanced/custom-models).\n",
"\n",
"Going forward in this demo we will be using the falcon 7B instruct model. This is a good open-source model that is trained to follow instructions, and is small enough to easily inference even on CPUs.\n",
"\n",
"## Taking off\n",
"Models are referred to by their model id on HuggingFace. Takeoff uses port 8000 by default, but can be configured to use another port. There is also support to use a Nvidia GPU by specifying cuda for the device flag.\n",
"\n",
"To start the takeoff server, run:\n",
"\n",
"```shell\n",
"iris takeoff --model tiiuae/falcon-7b-instruct --device cpu\n",
"iris takeoff --model tiiuae/falcon-7b-instruct --device cuda # Nvidia GPU required\n",
"iris takeoff --model tiiuae/falcon-7b-instruct --device cpu --port 5000 # run on port 5000 (default: 8000)\n",
"```"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"You will then be directed to a login page, where you will need to create an account to proceed.\n",
"After logging in, run the command onscreen to check whether the server is ready. When it is ready, you can start using the Takeoff integration.\n",
"\n",
"To shutdown the server, run the following command. You will be presented with options on which Takeoff server to shut down, in case you have multiple running servers.\n",
"\n",
"```shell\n",
"iris takeoff --shutdown # shutdown the server\n",
"```"
"Our inference server, [Titan Takeoff](https://docs.titanml.co/docs/intro) enables deployment of LLMs locally on your hardware in a single command. Most generative model architectures are supported, such as Falcon, Llama 2, GPT2, T5 and many more. If you experience trouble with a specific model, please let us know at hello@titanml.co."
]
},
{
@ -77,8 +17,8 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"## Inferencing your model\n",
"To access your LLM, use the TitanTakeoff LLM wrapper:"
"## Example usage\n",
"Here are some helpful examples to get started using Titan Takeoff Server. You need to make sure Takeoff Server has been started in the background before running these commands. For more information see [docs page for launching Takeoff](https://docs.titanml.co/docs/Docs/launching/)."
]
},
{
@ -87,50 +27,23 @@
"metadata": {},
"outputs": [],
"source": [
"from langchain_community.llms import TitanTakeoff\n",
"import time\n",
"\n",
"llm = TitanTakeoff(\n",
" base_url=\"http://localhost:8000\", generate_max_length=128, temperature=1.0\n",
")\n",
"\n",
"prompt = \"What is the largest planet in the solar system?\"\n",
"\n",
"llm(prompt)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"No parameters are needed by default, but a baseURL that points to your desired URL where Takeoff is running can be specified and [generation parameters](https://docs.titanml.co/docs/titan-takeoff/Advanced/generation-parameters) can be supplied.\n",
"\n",
"### Streaming\n",
"Streaming is also supported via the streaming flag:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from langchain.callbacks.manager import CallbackManager\n",
"from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler\n",
"from langchain.prompts import PromptTemplate\n",
"\n",
"llm = TitanTakeoff(\n",
" callback_manager=CallbackManager([StreamingStdOutCallbackHandler()]), streaming=True\n",
")\n",
"\n",
"prompt = \"What is the capital of France?\"\n",
"\n",
"llm(prompt)"
"# Note importing TitanTakeoffPro instead of TitanTakeoff will work as well both use same object under the hood\n",
"from langchain_community.llms import TitanTakeoff"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Integration with LLMChain"
"### Example 1\n",
"\n",
"Basic use assuming Takeoff is running on your machine using its default ports (ie localhost:3000).\n"
]
},
{
@ -139,19 +52,144 @@
"metadata": {},
"outputs": [],
"source": [
"from langchain.chains import LLMChain\n",
"from langchain_core.prompts import PromptTemplate\n",
"\n",
"llm = TitanTakeoff()\n",
"output = llm.invoke(\"What is the weather in London in August?\")\n",
"print(output)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Example 2\n",
"\n",
"template = \"What is the capital of {country}\"\n",
"Specifying a port and other generation parameters"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"llm = TitanTakeoff(port=3000)\n",
"# A comprehensive list of parameters can be found at https://docs.titanml.co/docs/next/apis/Takeoff%20inference_REST_API/generate#request\n",
"output = llm.invoke(\n",
" \"What is the largest rainforest in the world?\",\n",
" consumer_group=\"primary\",\n",
" min_new_tokens=128,\n",
" max_new_tokens=512,\n",
" no_repeat_ngram_size=2,\n",
" sampling_topk=1,\n",
" sampling_topp=1.0,\n",
" sampling_temperature=1.0,\n",
" repetition_penalty=1.0,\n",
" regex_string=\"\",\n",
" json_schema=None,\n",
")\n",
"print(output)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Example 3\n",
"\n",
"prompt = PromptTemplate.from_template(template)\n",
"Using generate for multiple inputs"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"llm = TitanTakeoff()\n",
"rich_output = llm.generate([\"What is Deep Learning?\", \"What is Machine Learning?\"])\n",
"print(rich_output.generations)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Example 4\n",
"\n",
"llm_chain = LLMChain(llm=llm, prompt=prompt)\n",
"Streaming output"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"llm = TitanTakeoff(\n",
" streaming=True, callback_manager=CallbackManager([StreamingStdOutCallbackHandler()])\n",
")\n",
"prompt = \"What is the capital of France?\"\n",
"output = llm.invoke(prompt)\n",
"print(output)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Example 5\n",
"\n",
"generated = llm_chain.run(country=\"Belgium\")\n",
"print(generated)"
"Using LCEL"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"llm = TitanTakeoff()\n",
"prompt = PromptTemplate.from_template(\"Tell me about {topic}\")\n",
"chain = prompt | llm\n",
"output = chain.invoke({\"topic\": \"the universe\"})\n",
"print(output)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Example 6\n",
"\n",
"Starting readers using TitanTakeoff Python Wrapper. If you haven't created any readers with first launching Takeoff, or you want to add another you can do so when you initialize the TitanTakeoff object. Just pass a list of model configs you want to start as the `models` parameter."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Model config for the llama model, where you can specify the following parameters:\n",
"# model_name (str): The name of the model to use\n",
"# device: (str): The device to use for inference, cuda or cpu\n",
"# consumer_group (str): The consumer group to place the reader into\n",
"# tensor_parallel (Optional[int]): The number of gpus you would like your model to be split across\n",
"# max_seq_length (int): The maximum sequence length to use for inference, defaults to 512\n",
"# max_batch_size (int_: The max batch size for continuous batching of requests\n",
"llama_model = {\n",
" \"model_name\": \"TheBloke/Llama-2-7b-Chat-AWQ\",\n",
" \"device\": \"cuda\",\n",
" \"consumer_group\": \"llama\",\n",
"}\n",
"llm = TitanTakeoff(models=[llama_model])\n",
"\n",
"# The model needs time to spin up, length of time need will depend on the size of model and your network connection speed\n",
"time.sleep(60)\n",
"\n",
"prompt = \"What is the capital of France?\"\n",
"output = llm.invoke(prompt, consumer_group=\"llama\")\n",
"print(output)"
]
}
],

View File

@ -1,102 +0,0 @@
{
"cells": [
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"# Titan Takeoff Pro\n",
"\n",
"`TitanML` helps businesses build and deploy better, smaller, cheaper, and faster NLP models through our training, compression, and inference optimization platform.\n",
"\n",
">Note: These docs are for the Pro version of Titan Takeoff. For the community version, see the page for Titan Takeoff.\n",
"\n",
"Our inference server, [Titan Takeoff (Pro Version)](https://docs.titanml.co/docs/titan-takeoff/pro-features/feature-comparison) enables deployment of LLMs locally on your hardware in a single command. Most generative model architectures are supported, such as Falcon, Llama 2, GPT2, T5 and many more."
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"## Example usage\n",
"Here are some helpful examples to get started using the Pro version of Titan Takeoff Server.\n",
"No parameters are needed by default, but a baseURL that points to your desired URL where Takeoff is running can be specified and generation parameters can be supplied."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from langchain.callbacks.manager import CallbackManager\n",
"from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler\n",
"from langchain_community.llms import TitanTakeoffPro\n",
"from langchain_core.prompts import PromptTemplate\n",
"\n",
"# Example 1: Basic use\n",
"llm = TitanTakeoffPro()\n",
"output = llm(\"What is the weather in London in August?\")\n",
"print(output)\n",
"\n",
"\n",
"# Example 2: Specifying a port and other generation parameters\n",
"llm = TitanTakeoffPro(\n",
" base_url=\"http://localhost:3000\",\n",
" min_new_tokens=128,\n",
" max_new_tokens=512,\n",
" no_repeat_ngram_size=2,\n",
" sampling_topk=1,\n",
" sampling_topp=1.0,\n",
" sampling_temperature=1.0,\n",
" repetition_penalty=1.0,\n",
" regex_string=\"\",\n",
")\n",
"output = llm(\"What is the largest rainforest in the world?\")\n",
"print(output)\n",
"\n",
"\n",
"# Example 3: Using generate for multiple inputs\n",
"llm = TitanTakeoffPro()\n",
"rich_output = llm.generate([\"What is Deep Learning?\", \"What is Machine Learning?\"])\n",
"print(rich_output.generations)\n",
"\n",
"\n",
"# Example 4: Streaming output\n",
"llm = TitanTakeoffPro(\n",
" streaming=True, callback_manager=CallbackManager([StreamingStdOutCallbackHandler()])\n",
")\n",
"prompt = \"What is the capital of France?\"\n",
"llm(prompt)\n",
"\n",
"# Example 5: Using LCEL\n",
"llm = TitanTakeoffPro()\n",
"prompt = PromptTemplate.from_template(\"Tell me about {topic}\")\n",
"chain = prompt | llm\n",
"chain.invoke({\"topic\": \"the universe\"})"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.12"
}
},
"nbformat": 4,
"nbformat_minor": 4
}

View File

@ -0,0 +1,112 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Titan Takeoff\n",
"\n",
"`TitanML` helps businesses build and deploy better, smaller, cheaper, and faster NLP models through our training, compression, and inference optimization platform.\n",
"\n",
"Our inference server, [Titan Takeoff](https://docs.titanml.co/docs/intro) enables deployment of LLMs locally on your hardware in a single command. Most embedding models are supported out of the box, if you experience trouble with a specific model, please let us know at hello@titanml.co."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Example usage\n",
"Here are some helpful examples to get started using Titan Takeoff Server. You need to make sure Takeoff Server has been started in the background before running these commands. For more information see [docs page for launching Takeoff](https://docs.titanml.co/docs/Docs/launching/)."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import time\n",
"\n",
"from langchain_community.embeddings import TitanTakeoffEmbed"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Example 1\n",
"Basic use assuming Takeoff is running on your machine using its default ports (ie localhost:3000)."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"embed = TitanTakeoffEmbed()\n",
"output = embed.embed_query(\n",
" \"What is the weather in London in August?\", consumer_group=\"embed\"\n",
")\n",
"print(output)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Example 2 \n",
"Starting readers using TitanTakeoffEmbed Python Wrapper. If you haven't created any readers with first launching Takeoff, or you want to add another you can do so when you initialize the TitanTakeoffEmbed object. Just pass a list of models you want to start as the `models` parameter.\n",
"\n",
"You can use `embed.query_documents` to embed multiple documents at once. The expected input is a list of strings, rather than just a string expected for the `embed_query` method."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Model config for the embedding model, where you can specify the following parameters:\n",
"# model_name (str): The name of the model to use\n",
"# device: (str): The device to use for inference, cuda or cpu\n",
"# consumer_group (str): The consumer group to place the reader into\n",
"embedding_model = {\n",
" \"model_name\": \"BAAI/bge-large-en-v1.5\",\n",
" \"device\": \"cpu\",\n",
" \"consumer_group\": \"embed\",\n",
"}\n",
"embed = TitanTakeoffEmbed(models=[embedding_model])\n",
"\n",
"# The model needs time to spin up, length of time need will depend on the size of model and your network connection speed\n",
"time.sleep(60)\n",
"\n",
"prompt = \"What is the capital of France?\"\n",
"# We specified \"embed\" consumer group so need to send request to the same consumer group so it hits our embedding model and not others\n",
"output = embed.embed_query(prompt, consumer_group=\"embed\")\n",
"print(output)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "langchain",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.12"
}
},
"nbformat": 4,
"nbformat_minor": 2
}

View File

@ -1,6 +1,10 @@
{
"trailingSlash": true,
"redirects": [
{
"source": "/docs/integrations/llms/titan_takeoff_pro",
"destination": "/docs/integrations/llms/titan_takeoff"
},
{
"source": "/docs/integrations/providers/optimum_intel(/?)",
"destination": "/docs/integrations/providers/intel/"

View File

@ -357,6 +357,7 @@ _module_lookup = {
"VolcanoEmbeddings": "langchain_community.embeddings.volcengine",
"VoyageEmbeddings": "langchain_community.embeddings.voyageai",
"XinferenceEmbeddings": "langchain_community.embeddings.xinference",
"TitanTakeoffEmbed": "langchain_community.embeddings.titan_takeoff",
"PremAIEmbeddings": "langchain_community.embeddings.premai",
"YandexGPTEmbeddings": "langchain_community.embeddings.yandex",
}

View File

@ -0,0 +1,207 @@
from enum import Enum
from typing import Any, List, Optional, Set, Union
from langchain_core.embeddings import Embeddings
from langchain_core.pydantic_v1 import BaseModel
class TakeoffEmbeddingException(Exception):
"""Exceptions experienced with interfacing with Takeoff Embedding Wrapper"""
class MissingConsumerGroup(TakeoffEmbeddingException):
"""Exception raised when no consumer group is provided on initialization of
TitanTakeoffEmbed or in embed request"""
class Device(str, Enum):
"""The device to use for inference, cuda or cpu"""
cuda = "cuda"
cpu = "cpu"
class ReaderConfig(BaseModel):
class Config:
protected_namespaces = ()
model_name: str
"""The name of the model to use"""
device: Device = Device.cuda
"""The device to use for inference, cuda or cpu"""
consumer_group: str = "primary"
"""The consumer group to place the reader into"""
class TitanTakeoffEmbed(Embeddings):
"""Titan Takeoff Embed is a wrapper to interface with Takeoff Inference API
for embedding models
You can use this wrapper to send embedding requests and to deploy embedding
readers with Takeoff.
Examples:
This is an example how to deploy an embedding model and send requests.
.. code-block:: python
# Import the TitanTakeoffEmbed class from community package
import time
from langchain_community.embeddings import TitanTakeoffEmbed
# Specify the embedding reader you'd like to deploy
reader_1 = {
"model_name": "avsolatorio/GIST-large-Embedding-v0",
"device": "cpu",
"consumer_group": "embed"
}
# For every reader you pass into models arg Takeoff will spin up a reader
# according to the specs you provide. If you don't specify the arg no models
# are spun up and it assumes you have already done this separately.
embed = TitanTakeoffEmbed(models=[reader_1])
# Wait for the reader to be deployed, time needed depends on the model size
# and your internet speed
time.sleep(60)
# Returns the embedded query, ie a List[float], sent to `embed` consumer
# group where we just spun up the embedding reader
print(embed.embed_query(
"Where can I see football?", consumer_group="embed"
))
# Returns a List of embeddings, ie a List[List[float]], sent to `embed`
# consumer group where we just spun up the embedding reader
print(embed.embed_document(
["Document1", "Document2"],
consumer_group="embed"
))
"""
base_url: str = "http://localhost"
"""The base URL of the Titan Takeoff (Pro) server. Default = "http://localhost"."""
port: int = 3000
"""The port of the Titan Takeoff (Pro) server. Default = 3000."""
mgmt_port: int = 3001
"""The management port of the Titan Takeoff (Pro) server. Default = 3001."""
client: Any = None
"""Takeoff Client Python SDK used to interact with Takeoff API"""
embed_consumer_groups: Set[str] = set()
"""The consumer groups in Takeoff which contain embedding models"""
def __init__(
self,
base_url: str = "http://localhost",
port: int = 3000,
mgmt_port: int = 3001,
models: List[ReaderConfig] = [],
):
"""Initialize the Titan Takeoff embedding wrapper.
Args:
base_url (str, optional): The base url where Takeoff Inference Server is
listening. Defaults to "http://localhost".
port (int, optional): What port is Takeoff Inference API listening on.
Defaults to 3000.
mgmt_port (int, optional): What port is Takeoff Management API listening on.
Defaults to 3001.
models (List[ReaderConfig], optional): Any readers you'd like to spin up on.
Defaults to [].
Raises:
ImportError: If you haven't installed takeoff-client, you will get an
ImportError. To remedy run `pip install 'takeoff-client==0.4.0'`
"""
self.base_url = base_url
self.port = port
self.mgmt_port = mgmt_port
try:
from takeoff_client import TakeoffClient
except ImportError:
raise ImportError(
"takeoff-client is required for TitanTakeoff. "
"Please install it with `pip install 'takeoff-client==0.4.0'`."
)
self.client = TakeoffClient(
self.base_url, port=self.port, mgmt_port=self.mgmt_port
)
for model in models:
self.client.create_reader(model)
if isinstance(model, dict):
self.embed_consumer_groups.add(model.get("consumer_group"))
else:
self.embed_consumer_groups.add(model.consumer_group)
super(TitanTakeoffEmbed, self).__init__()
def _embed(
self, input: Union[List[str], str], consumer_group: Optional[str]
) -> dict:
"""Embed text.
Args:
input (List[str]): prompt/document or list of prompts/documents to embed
consumer_group (Optional[str]): what consumer group to send the embedding
request to. If not specified and there is only one
consumer group specified during initialization, it will be used. If there
are multiple consumer groups specified during initialization, you must
specify which one to use.
Raises:
MissingConsumerGroup: The consumer group can not be inferred from the
initialization and must be specified with request.
Returns:
Dict[str, Any]: Result of query, {"result": List[List[float]]} or
{"result": List[float]}
"""
if not consumer_group:
if len(self.embed_consumer_groups) == 1:
consumer_group = list(self.embed_consumer_groups)[0]
elif len(self.embed_consumer_groups) > 1:
raise MissingConsumerGroup(
"TakeoffEmbedding was initialized with multiple embedding reader"
"groups, you must specify which one to use."
)
else:
raise MissingConsumerGroup(
"You must specify what consumer group you want to send embedding"
"response to as TitanTakeoffEmbed was not initialized with an "
"embedding reader."
)
return self.client.embed(input, consumer_group)
def embed_documents(
self, texts: List[str], consumer_group: Optional[str] = None
) -> List[List[float]]:
"""Embed documents.
Args:
texts (List[str]): List of prompts/documents to embed
consumer_group (Optional[str], optional): Consumer group to send request
to containing embedding model. Defaults to None.
Returns:
List[List[float]]: List of embeddings
"""
return self._embed(texts, consumer_group)["result"]
def embed_query(
self, text: str, consumer_group: Optional[str] = None
) -> List[float]:
"""Embed query.
Args:
text (str): Prompt/document to embed
consumer_group (Optional[str], optional): Consumer group to send request
to containing embedding model. Defaults to None.
Returns:
List[float]: Embedding
"""
return self._embed(text, consumer_group)["result"]

View File

@ -549,9 +549,9 @@ def _import_titan_takeoff() -> Type[BaseLLM]:
def _import_titan_takeoff_pro() -> Type[BaseLLM]:
from langchain_community.llms.titan_takeoff_pro import TitanTakeoffPro
from langchain_community.llms.titan_takeoff import TitanTakeoff
return TitanTakeoffPro
return TitanTakeoff
def _import_together() -> Type[BaseLLM]:

View File

@ -1,61 +1,155 @@
from typing import Any, Iterator, List, Mapping, Optional
from enum import Enum
from typing import Any, Iterator, List, Optional
import requests
from langchain_core.callbacks import CallbackManagerForLLMRun
from langchain_core.language_models.llms import LLM
from langchain_core.outputs import GenerationChunk
from requests.exceptions import ConnectionError
from langchain_core.pydantic_v1 import BaseModel
from langchain_community.llms.utils import enforce_stop_tokens
class Device(str, Enum):
"""The device to use for inference, cuda or cpu"""
cuda = "cuda"
cpu = "cpu"
class ReaderConfig(BaseModel):
class Config:
protected_namespaces = ()
model_name: str
"""The name of the model to use"""
device: Device = Device.cuda
"""The device to use for inference, cuda or cpu"""
consumer_group: str = "primary"
"""The consumer group to place the reader into"""
tensor_parallel: Optional[int] = None
"""The number of gpus you would like your model to be split across"""
max_seq_length: int = 512
"""The maximum sequence length to use for inference, defaults to 512"""
max_batch_size: int = 4
"""The max batch size for continuous batching of requests"""
class TitanTakeoff(LLM):
"""Titan Takeoff API LLMs."""
"""Titan Takeoff API LLMs.
base_url: str = "http://localhost:8000"
"""Specifies the baseURL to use for the Titan Takeoff API.
Default = http://localhost:8000.
Titan Takeoff is a wrapper to interface with Takeoff Inference API for
generative text to text language models.
You can use this wrapper to send requests to a generative language model
and to deploy readers with Takeoff.
Examples:
This is an example how to deploy a generative language model and send
requests.
.. code-block:: python
# Import the TitanTakeoff class from community package
import time
from langchain_community.llms import TitanTakeoff
# Specify the embedding reader you'd like to deploy
reader_1 = {
"model_name": "TheBloke/Llama-2-7b-Chat-AWQ",
"device": "cuda",
"tensor_parallel": 1,
"consumer_group": "llama"
}
# For every reader you pass into models arg Takeoff will spin
# up a reader according to the specs you provide. If you don't
# specify the arg no models are spun up and it assumes you have
# already done this separately.
llm = TitanTakeoff(models=[reader_1])
# Wait for the reader to be deployed, time needed depends on the
# model size and your internet speed
time.sleep(60)
# Returns the query, ie a List[float], sent to `llama` consumer group
# where we just spun up the Llama 7B model
print(embed.invoke(
"Where can I see football?", consumer_group="llama"
))
# You can also send generation parameters to the model, any of the
# following can be passed in as kwargs:
# https://docs.titanml.co/docs/next/apis/Takeoff%20inference_REST_API/generate#request
# for instance:
print(embed.invoke(
"Where can I see football?", consumer_group="llama", max_new_tokens=100
))
"""
generate_max_length: int = 128
"""Maximum generation length. Default = 128."""
base_url: str = "http://localhost"
"""The base URL of the Titan Takeoff (Pro) server. Default = "http://localhost"."""
sampling_topk: int = 1
"""Sample predictions from the top K most probable candidates. Default = 1."""
port: int = 3000
"""The port of the Titan Takeoff (Pro) server. Default = 3000."""
sampling_topp: float = 1.0
"""Sample from predictions whose cumulative probability exceeds this value.
Default = 1.0.
"""
sampling_temperature: float = 1.0
"""Sample with randomness. Bigger temperatures are associated with
more randomness and 'creativity'. Default = 1.0.
"""
repetition_penalty: float = 1.0
"""Penalise the generation of tokens that have been generated before.
Set to > 1 to penalize. Default = 1 (no penalty).
"""
no_repeat_ngram_size: int = 0
"""Prevent repetitions of ngrams of this size. Default = 0 (turned off)."""
mgmt_port: int = 3001
"""The management port of the Titan Takeoff (Pro) server. Default = 3001."""
streaming: bool = False
"""Whether to stream the output. Default = False."""
@property
def _default_params(self) -> Mapping[str, Any]:
"""Get the default parameters for calling Titan Takeoff Server."""
params = {
"generate_max_length": self.generate_max_length,
"sampling_topk": self.sampling_topk,
"sampling_topp": self.sampling_topp,
"sampling_temperature": self.sampling_temperature,
"repetition_penalty": self.repetition_penalty,
"no_repeat_ngram_size": self.no_repeat_ngram_size,
}
return params
client: Any = None
"""Takeoff Client Python SDK used to interact with Takeoff API"""
def __init__(
self,
base_url: str = "http://localhost",
port: int = 3000,
mgmt_port: int = 3001,
streaming: bool = False,
models: List[ReaderConfig] = [],
):
"""Initialize the Titan Takeoff language wrapper.
Args:
base_url (str, optional): The base URL where the Takeoff
Inference Server is listening. Defaults to `http://localhost`.
port (int, optional): What port is Takeoff Inference API
listening on. Defaults to 3000.
mgmt_port (int, optional): What port is Takeoff Management API
listening on. Defaults to 3001.
streaming (bool, optional): Whether you want to by default use the
generate_stream endpoint over generate to stream responses.
Defaults to False. In reality, this is not significantly different
as the streamed response is buffered and returned similar to the
non-streamed response, but the run manager is applied per token
generated.
models (List[ReaderConfig], optional): Any readers you'd like to
spin up on. Defaults to [].
Raises:
ImportError: If you haven't installed takeoff-client, you will
get an ImportError. To remedy run `pip install 'takeoff-client==0.4.0'`
"""
super().__init__(
base_url=base_url, port=port, mgmt_port=mgmt_port, streaming=streaming
)
try:
from takeoff_client import TakeoffClient
except ImportError:
raise ImportError(
"takeoff-client is required for TitanTakeoff. "
"Please install it with `pip install 'takeoff-client>=0.4.0'`."
)
self.client = TakeoffClient(
self.base_url, port=self.port, mgmt_port=self.mgmt_port
)
for model in models:
self.client.create_reader(model)
@property
def _llm_type(self) -> str:
@ -69,11 +163,12 @@ class TitanTakeoff(LLM):
run_manager: Optional[CallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> str:
"""Call out to Titan Takeoff generate endpoint.
"""Call out to Titan Takeoff (Pro) generate endpoint.
Args:
prompt: The prompt to pass into the model.
stop: Optional list of stop words to use when generating.
run_manager: Optional callback manager to use when streaming.
Returns:
The string generated by the model.
@ -81,41 +176,31 @@ class TitanTakeoff(LLM):
Example:
.. code-block:: python
model = TitanTakeoff()
prompt = "What is the capital of the United Kingdom?"
response = model(prompt)
# Use of model(prompt), ie `__call__` was deprecated in LangChain 0.1.7,
# use model.invoke(prompt) instead.
response = model.invoke(prompt)
"""
try:
if self.streaming:
text_output = ""
for chunk in self._stream(
prompt=prompt,
stop=stop,
run_manager=run_manager,
):
text_output += chunk.text
return text_output
if self.streaming:
text_output = ""
for chunk in self._stream(
prompt=prompt,
stop=stop,
run_manager=run_manager,
):
text_output += chunk.text
return text_output
url = f"{self.base_url}/generate"
params = {"text": prompt, **self._default_params}
response = self.client.generate(prompt, **kwargs)
text = response["text"]
response = requests.post(url, json=params)
response.raise_for_status()
response.encoding = "utf-8"
text = ""
if "message" in response.json():
text = response.json()["message"]
else:
raise ValueError("Something went wrong.")
if stop is not None:
text = enforce_stop_tokens(text, stop)
return text
except ConnectionError:
raise ConnectionError(
"Could not connect to Titan Takeoff server. \
Please make sure that the server is running."
)
if stop is not None:
text = enforce_stop_tokens(text, stop)
return text
def _stream(
self,
@ -124,14 +209,12 @@ class TitanTakeoff(LLM):
run_manager: Optional[CallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> Iterator[GenerationChunk]:
"""Call out to Titan Takeoff stream endpoint.
"""Call out to Titan Takeoff (Pro) stream endpoint.
Args:
prompt: The prompt to pass into the model.
stop: Optional list of stop words to use when generating.
Returns:
The string generated by the model.
run_manager: Optional callback manager to use when streaming.
Yields:
A dictionary like object containing a string token.
@ -139,23 +222,40 @@ class TitanTakeoff(LLM):
Example:
.. code-block:: python
model = TitanTakeoff()
prompt = "What is the capital of the United Kingdom?"
response = model(prompt)
response = model.stream(prompt)
# OR
model = TitanTakeoff(streaming=True)
response = model.invoke(prompt)
"""
url = f"{self.base_url}/generate_stream"
params = {"text": prompt, **self._default_params}
response = self.client.generate_stream(prompt, **kwargs)
buffer = ""
for text in response:
buffer += text.data
if "data:" in buffer:
# Remove the first instance of "data:" from the buffer.
if buffer.startswith("data:"):
buffer = ""
if len(buffer.split("data:", 1)) == 2:
content, _ = buffer.split("data:", 1)
buffer = content.rstrip("\n")
# Trim the buffer to only have content after the "data:" part.
if buffer: # Ensure that there's content to process.
chunk = GenerationChunk(text=buffer)
buffer = "" # Reset buffer for the next set of data.
yield chunk
if run_manager:
run_manager.on_llm_new_token(token=chunk.text)
response = requests.post(url, json=params, stream=True)
response.encoding = "utf-8"
for text in response.iter_content(chunk_size=1, decode_unicode=True):
if text:
chunk = GenerationChunk(text=text)
if run_manager:
run_manager.on_llm_new_token(token=chunk.text)
yield chunk
@property
def _identifying_params(self) -> Mapping[str, Any]:
"""Get the identifying parameters."""
return {"base_url": self.base_url, **{}, **self._default_params}
# Yield any remaining content in the buffer.
if buffer:
chunk = GenerationChunk(text=buffer.replace("</s>", ""))
yield chunk
if run_manager:
run_manager.on_llm_new_token(token=chunk.text)

View File

@ -1,217 +0,0 @@
from typing import Any, Iterator, List, Mapping, Optional
import requests
from langchain_core.callbacks import CallbackManagerForLLMRun
from langchain_core.language_models.llms import LLM
from langchain_core.outputs import GenerationChunk
from requests.exceptions import ConnectionError
from langchain_community.llms.utils import enforce_stop_tokens
class TitanTakeoffPro(LLM):
"""Titan Takeoff Pro is a language model that can be used to generate text."""
base_url: Optional[str] = "http://localhost:3000"
"""Specifies the baseURL to use for the Titan Takeoff Pro API.
Default = http://localhost:3000.
"""
max_new_tokens: Optional[int] = None
"""Maximum tokens generated."""
min_new_tokens: Optional[int] = None
"""Minimum tokens generated."""
sampling_topk: Optional[int] = None
"""Sample predictions from the top K most probable candidates."""
sampling_topp: Optional[float] = None
"""Sample from predictions whose cumulative probability exceeds this value.
"""
sampling_temperature: Optional[float] = None
"""Sample with randomness. Bigger temperatures are associated with
more randomness and 'creativity'.
"""
repetition_penalty: Optional[float] = None
"""Penalise the generation of tokens that have been generated before.
Set to > 1 to penalize.
"""
regex_string: Optional[str] = None
"""A regex string for constrained generation."""
no_repeat_ngram_size: Optional[int] = None
"""Prevent repetitions of ngrams of this size. Default = 0 (turned off)."""
streaming: bool = False
"""Whether to stream the output. Default = False."""
@property
def _default_params(self) -> Mapping[str, Any]:
"""Get the default parameters for calling Titan Takeoff Server (Pro)."""
return {
**(
{"regex_string": self.regex_string}
if self.regex_string is not None
else {}
),
**(
{"sampling_temperature": self.sampling_temperature}
if self.sampling_temperature is not None
else {}
),
**(
{"sampling_topp": self.sampling_topp}
if self.sampling_topp is not None
else {}
),
**(
{"repetition_penalty": self.repetition_penalty}
if self.repetition_penalty is not None
else {}
),
**(
{"max_new_tokens": self.max_new_tokens}
if self.max_new_tokens is not None
else {}
),
**(
{"min_new_tokens": self.min_new_tokens}
if self.min_new_tokens is not None
else {}
),
**(
{"sampling_topk": self.sampling_topk}
if self.sampling_topk is not None
else {}
),
**(
{"no_repeat_ngram_size": self.no_repeat_ngram_size}
if self.no_repeat_ngram_size is not None
else {}
),
}
@property
def _llm_type(self) -> str:
"""Return type of llm."""
return "titan_takeoff_pro"
def _call(
self,
prompt: str,
stop: Optional[List[str]] = None,
run_manager: Optional[CallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> str:
"""Call out to Titan Takeoff (Pro) generate endpoint.
Args:
prompt: The prompt to pass into the model.
stop: Optional list of stop words to use when generating.
Returns:
The string generated by the model.
Example:
.. code-block:: python
prompt = "What is the capital of the United Kingdom?"
response = model(prompt)
"""
try:
if self.streaming:
text_output = ""
for chunk in self._stream(
prompt=prompt,
stop=stop,
run_manager=run_manager,
):
text_output += chunk.text
return text_output
url = f"{self.base_url}/generate"
params = {"text": prompt, **self._default_params}
response = requests.post(url, json=params)
response.raise_for_status()
response.encoding = "utf-8"
text = ""
if "text" in response.json():
text = response.json()["text"]
text = text.replace("</s>", "")
else:
raise ValueError("Something went wrong.")
if stop is not None:
text = enforce_stop_tokens(text, stop)
return text
except ConnectionError:
raise ConnectionError(
"Could not connect to Titan Takeoff (Pro) server. \
Please make sure that the server is running."
)
def _stream(
self,
prompt: str,
stop: Optional[List[str]] = None,
run_manager: Optional[CallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> Iterator[GenerationChunk]:
"""Call out to Titan Takeoff (Pro) stream endpoint.
Args:
prompt: The prompt to pass into the model.
stop: Optional list of stop words to use when generating.
Returns:
The string generated by the model.
Yields:
A dictionary like object containing a string token.
Example:
.. code-block:: python
prompt = "What is the capital of the United Kingdom?"
response = model(prompt)
"""
url = f"{self.base_url}/generate_stream"
params = {"text": prompt, **self._default_params}
response = requests.post(url, json=params, stream=True)
response.encoding = "utf-8"
buffer = ""
for text in response.iter_content(chunk_size=1, decode_unicode=True):
buffer += text
if "data:" in buffer:
# Remove the first instance of "data:" from the buffer.
if buffer.startswith("data:"):
buffer = ""
if len(buffer.split("data:", 1)) == 2:
content, _ = buffer.split("data:", 1)
buffer = content.rstrip("\n")
# Trim the buffer to only have content after the "data:" part.
if buffer: # Ensure that there's content to process.
chunk = GenerationChunk(text=buffer)
buffer = "" # Reset buffer for the next set of data.
yield chunk
if run_manager:
run_manager.on_llm_new_token(token=chunk.text)
# Yield any remaining content in the buffer.
if buffer:
chunk = GenerationChunk(text=buffer.replace("</s>", ""))
if run_manager:
run_manager.on_llm_new_token(token=chunk.text)
yield chunk
@property
def _identifying_params(self) -> Mapping[str, Any]:
"""Get the identifying parameters."""
return {"base_url": self.base_url, **{}, **self._default_params}

View File

@ -0,0 +1,178 @@
"""Test Titan Takeoff Embedding wrapper."""
import json
from typing import Any
import pytest
from langchain_community.embeddings import TitanTakeoffEmbed
from langchain_community.embeddings.titan_takeoff import MissingConsumerGroup
@pytest.mark.requires("pytest_httpx")
@pytest.mark.requires("takeoff_client")
def test_titan_takeoff_call(httpx_mock: Any) -> None:
"""Test valid call to Titan Takeoff."""
port = 2345
httpx_mock.add_response(
method="POST",
url=f"http://localhost:{port}/embed",
json={"result": [0.46635, 0.234, -0.8521]},
)
embedding = TitanTakeoffEmbed(port=port)
output_1 = embedding.embed_documents("What is 2 + 2?", "primary")
output_2 = embedding.embed_query("What is 2 + 2?", "primary")
assert isinstance(output_1, list)
assert isinstance(output_2, list)
assert len(httpx_mock.get_requests()) == 2
for n in range(2):
assert httpx_mock.get_requests()[n].url == f"http://localhost:{port}/embed"
assert (
json.loads(httpx_mock.get_requests()[n].content)["text"] == "What is 2 + 2?"
)
@pytest.mark.requires("pytest_httpx")
@pytest.mark.requires("takeoff_client")
def test_no_consumer_group_fails(httpx_mock: Any) -> None:
"""Test that not specifying a consumer group fails."""
port = 2345
httpx_mock.add_response(
method="POST",
url=f"http://localhost:{port}/embed",
json={"result": [0.46635, 0.234, -0.8521]},
)
embedding = TitanTakeoffEmbed(port=port)
with pytest.raises(MissingConsumerGroup):
embedding.embed_documents("What is 2 + 2?")
with pytest.raises(MissingConsumerGroup):
embedding.embed_query("What is 2 + 2?")
# Check specifying a consumer group works
embedding.embed_documents("What is 2 + 2?", "primary")
embedding.embed_query("What is 2 + 2?", "primary")
@pytest.mark.requires("pytest_httpx")
@pytest.mark.requires("takeoff_client")
def test_takeoff_initialization(httpx_mock: Any) -> None:
"""Test valid call to Titan Takeoff."""
mgnt_port = 36452
inf_port = 46253
mgnt_url = f"http://localhost:{mgnt_port}/reader"
embed_url = f"http://localhost:{inf_port}/embed"
reader_1 = {
"model_name": "test",
"device": "cpu",
"consumer_group": "embed",
}
reader_2 = reader_1.copy()
reader_2["model_name"] = "test2"
reader_2["device"] = "cuda"
httpx_mock.add_response(
method="POST", url=mgnt_url, json={"key": "value"}, status_code=201
)
httpx_mock.add_response(
method="POST",
url=embed_url,
json={"result": [0.34, 0.43, -0.934532]},
status_code=200,
)
llm = TitanTakeoffEmbed(
port=inf_port, mgmt_port=mgnt_port, models=[reader_1, reader_2]
)
# Shouldn't need to specify consumer group as there is only one specified during
# initialization
output_1 = llm.embed_documents("What is 2 + 2?")
output_2 = llm.embed_query("What is 2 + 2?")
assert isinstance(output_1, list)
assert isinstance(output_2, list)
# Ensure the management api was called to create the reader
assert len(httpx_mock.get_requests()) == 4
for key, value in reader_1.items():
assert json.loads(httpx_mock.get_requests()[0].content)[key] == value
assert httpx_mock.get_requests()[0].url == mgnt_url
# Also second call should be made to spin uo reader 2
for key, value in reader_2.items():
assert json.loads(httpx_mock.get_requests()[1].content)[key] == value
assert httpx_mock.get_requests()[1].url == mgnt_url
# Ensure the third call is to generate endpoint to inference
for n in range(2, 4):
assert httpx_mock.get_requests()[n].url == embed_url
assert (
json.loads(httpx_mock.get_requests()[n].content)["text"] == "What is 2 + 2?"
)
@pytest.mark.requires("pytest_httpx")
@pytest.mark.requires("takeoff_client")
def test_takeoff_initialization_with_more_than_one_consumer_group(
httpx_mock: Any,
) -> None:
"""Test valid call to Titan Takeoff."""
mgnt_port = 36452
inf_port = 46253
mgnt_url = f"http://localhost:{mgnt_port}/reader"
embed_url = f"http://localhost:{inf_port}/embed"
reader_1 = {
"model_name": "test",
"device": "cpu",
"consumer_group": "embed",
}
reader_2 = reader_1.copy()
reader_2["model_name"] = "test2"
reader_2["device"] = "cuda"
reader_2["consumer_group"] = "embed2"
httpx_mock.add_response(
method="POST", url=mgnt_url, json={"key": "value"}, status_code=201
)
httpx_mock.add_response(
method="POST",
url=embed_url,
json={"result": [0.34, 0.43, -0.934532]},
status_code=200,
)
llm = TitanTakeoffEmbed(
port=inf_port, mgmt_port=mgnt_port, models=[reader_1, reader_2]
)
# There was more than one consumer group specified during initialization so we
# need to specify which one to use
with pytest.raises(MissingConsumerGroup):
llm.embed_documents("What is 2 + 2?")
with pytest.raises(MissingConsumerGroup):
llm.embed_query("What is 2 + 2?")
output_1 = llm.embed_documents("What is 2 + 2?", "embed")
output_2 = llm.embed_query("What is 2 + 2?", "embed2")
assert isinstance(output_1, list)
assert isinstance(output_2, list)
# Ensure the management api was called to create the reader
assert len(httpx_mock.get_requests()) == 4
for key, value in reader_1.items():
assert json.loads(httpx_mock.get_requests()[0].content)[key] == value
assert httpx_mock.get_requests()[0].url == mgnt_url
# Also second call should be made to spin uo reader 2
for key, value in reader_2.items():
assert json.loads(httpx_mock.get_requests()[1].content)[key] == value
assert httpx_mock.get_requests()[1].url == mgnt_url
# Ensure the third call is to generate endpoint to inference
for n in range(2, 4):
assert httpx_mock.get_requests()[n].url == embed_url
assert (
json.loads(httpx_mock.get_requests()[n].content)["text"] == "What is 2 + 2?"
)

View File

@ -1,18 +1,141 @@
"""Test Titan Takeoff wrapper."""
import json
from typing import Any, Union
import pytest
from langchain_community.llms import TitanTakeoff, TitanTakeoffPro
import responses
from langchain_community.llms.titan_takeoff import TitanTakeoff
@responses.activate
def test_titan_takeoff_call() -> None:
@pytest.mark.requires("takeoff_client")
@pytest.mark.requires("pytest_httpx")
@pytest.mark.parametrize("streaming", [True, False])
@pytest.mark.parametrize("takeoff_object", [TitanTakeoff, TitanTakeoffPro])
def test_titan_takeoff_call(
httpx_mock: Any,
streaming: bool,
takeoff_object: Union[TitanTakeoff, TitanTakeoffPro],
) -> None:
"""Test valid call to Titan Takeoff."""
url = "http://localhost:8000/generate"
responses.add(responses.POST, url, json={"message": "2 + 2 is 4"}, status=200)
from pytest_httpx import IteratorStream
# response = requests.post(url)
llm = TitanTakeoff()
port = 2345
url = (
f"http://localhost:{port}/generate_stream"
if streaming
else f"http://localhost:{port}/generate"
)
if streaming:
httpx_mock.add_response(
method="POST",
url=url,
stream=IteratorStream([b"data: ask someone else\n\n"]),
)
else:
httpx_mock.add_response(
method="POST",
url=url,
json={"text": "ask someone else"},
)
llm = takeoff_object(port=port, streaming=streaming)
number_of_calls = 0
for function_call in [llm, llm.invoke]:
number_of_calls += 1
output = function_call("What is 2 + 2?")
assert isinstance(output, str)
assert len(httpx_mock.get_requests()) == number_of_calls
assert httpx_mock.get_requests()[0].url == url
assert (
json.loads(httpx_mock.get_requests()[0].content)["text"] == "What is 2 + 2?"
)
if streaming:
output = llm._stream("What is 2 + 2?")
for chunk in output:
assert isinstance(chunk.text, str)
assert len(httpx_mock.get_requests()) == number_of_calls + 1
assert httpx_mock.get_requests()[0].url == url
assert (
json.loads(httpx_mock.get_requests()[0].content)["text"] == "What is 2 + 2?"
)
@pytest.mark.requires("pytest_httpx")
@pytest.mark.requires("takeoff_client")
@pytest.mark.parametrize("streaming", [True, False])
@pytest.mark.parametrize("takeoff_object", [TitanTakeoff, TitanTakeoffPro])
def test_titan_takeoff_bad_call(
httpx_mock: Any,
streaming: bool,
takeoff_object: Union[TitanTakeoff, TitanTakeoffPro],
) -> None:
"""Test valid call to Titan Takeoff."""
from takeoff_client import TakeoffException
url = (
"http://localhost:3000/generate"
if not streaming
else "http://localhost:3000/generate_stream"
)
httpx_mock.add_response(
method="POST", url=url, json={"text": "bad things"}, status_code=400
)
llm = takeoff_object(streaming=streaming)
with pytest.raises(TakeoffException):
llm("What is 2 + 2?")
assert len(httpx_mock.get_requests()) == 1
assert httpx_mock.get_requests()[0].url == url
assert json.loads(httpx_mock.get_requests()[0].content)["text"] == "What is 2 + 2?"
@pytest.mark.requires("pytest_httpx")
@pytest.mark.requires("takeoff_client")
@pytest.mark.parametrize("takeoff_object", [TitanTakeoff, TitanTakeoffPro])
def test_titan_takeoff_model_initialisation(
httpx_mock: Any,
takeoff_object: Union[TitanTakeoff, TitanTakeoffPro],
) -> None:
"""Test valid call to Titan Takeoff."""
mgnt_port = 36452
inf_port = 46253
mgnt_url = f"http://localhost:{mgnt_port}/reader"
gen_url = f"http://localhost:{inf_port}/generate"
reader_1 = {
"model_name": "test",
"device": "cpu",
"consumer_group": "primary",
"max_sequence_length": 512,
"max_batch_size": 4,
"tensor_parallel": 3,
}
reader_2 = reader_1.copy()
reader_2["model_name"] = "test2"
httpx_mock.add_response(
method="POST", url=mgnt_url, json={"key": "value"}, status_code=201
)
httpx_mock.add_response(
method="POST", url=gen_url, json={"text": "value"}, status_code=200
)
llm = takeoff_object(
port=inf_port, mgmt_port=mgnt_port, models=[reader_1, reader_2]
)
output = llm("What is 2 + 2?")
assert isinstance(output, str)
# Ensure the management api was called to create the reader
assert len(httpx_mock.get_requests()) == 3
for key, value in reader_1.items():
assert json.loads(httpx_mock.get_requests()[0].content)[key] == value
assert httpx_mock.get_requests()[0].url == mgnt_url
# Also second call should be made to spin uo reader 2
for key, value in reader_2.items():
assert json.loads(httpx_mock.get_requests()[1].content)[key] == value
assert httpx_mock.get_requests()[1].url == mgnt_url
# Ensure the third call is to generate endpoint to inference
assert httpx_mock.get_requests()[2].url == gen_url
assert json.loads(httpx_mock.get_requests()[2].content)["text"] == "What is 2 + 2?"

View File

@ -1,18 +0,0 @@
"""Test Titan Takeoff wrapper."""
import responses
from langchain_community.llms.titan_takeoff_pro import TitanTakeoffPro
@responses.activate
def test_titan_takeoff_pro_call() -> None:
"""Test valid call to Titan Takeoff."""
url = "http://localhost:3000/generate"
responses.add(responses.POST, url, json={"message": "2 + 2 is 4"}, status=200)
# response = requests.post(url)
llm = TitanTakeoffPro()
output = llm("What is 2 + 2?")
assert isinstance(output, str)

View File

@ -66,6 +66,7 @@ EXPECTED_ALL = [
"QuantizedBiEncoderEmbeddings",
"NeMoEmbeddings",
"SparkLLMTextEmbeddings",
"TitanTakeoffEmbed",
"QuantizedBgeEmbeddings",
"PremAIEmbeddings",
"YandexGPTEmbeddings",

View File

@ -469,9 +469,9 @@ def _import_titan_takeoff() -> Any:
def _import_titan_takeoff_pro() -> Any:
from langchain_community.llms.titan_takeoff_pro import TitanTakeoffPro
from langchain_community.llms.titan_takeoff import TitanTakeoff
return TitanTakeoffPro
return TitanTakeoff
def _import_together() -> Any:

View File

@ -1,3 +1,3 @@
from langchain_community.llms.titan_takeoff_pro import TitanTakeoffPro
from langchain_community.llms.titan_takeoff import TitanTakeoff as TitanTakeoffPro
__all__ = ["TitanTakeoffPro"]