diff --git a/docs/docs/integrations/llms/titan_takeoff.ipynb b/docs/docs/integrations/llms/titan_takeoff.ipynb index 5611210c3bf..ff714a477d4 100644 --- a/docs/docs/integrations/llms/titan_takeoff.ipynb +++ b/docs/docs/integrations/llms/titan_takeoff.ipynb @@ -1,75 +1,15 @@ { "cells": [ { + "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "# Titan Takeoff\n", "\n", - ">`TitanML` helps businesses build and deploy better, smaller, cheaper, and faster NLP models through our training, compression, and inference optimization platform. \n", + "`TitanML` helps businesses build and deploy better, smaller, cheaper, and faster NLP models through our training, compression, and inference optimization platform.\n", "\n", - ">Our inference server, [Titan Takeoff](https://docs.titanml.co/docs/titan-takeoff/getting-started) enables deployment of LLMs locally on your hardware in a single command. Most generative model architectures are supported, such as Falcon, Llama 2, GPT2, T5 and many more." - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Installation\n", - "\n", - "To get started with Iris Takeoff, all you need is to have docker and python installed on your local system. If you wish to use the server with gpu support, then you will need to install docker with cuda support.\n", - "\n", - "For Mac and Windows users, make sure you have the docker daemon running! You can check this by running docker ps in your terminal. To start the daemon, open the docker desktop app.\n", - "\n", - "Run the following command to install the Iris CLI that will enable you to run the takeoff server:" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "vscode": { - "languageId": "shellscript" - } - }, - "outputs": [], - "source": [ - "%pip install --upgrade --quiet titan-iris" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Choose a Model\n", - "Takeoff supports many of the most powerful generative text models, such as Falcon, MPT, and Llama. See the [supported models](https://docs.titanml.co/docs/titan-takeoff/supported-models) for more information. For information about using your own models, see the [custom models](https://docs.titanml.co/docs/titan-takeoff/Advanced/custom-models).\n", - "\n", - "Going forward in this demo we will be using the falcon 7B instruct model. This is a good open-source model that is trained to follow instructions, and is small enough to easily inference even on CPUs.\n", - "\n", - "## Taking off\n", - "Models are referred to by their model id on HuggingFace. Takeoff uses port 8000 by default, but can be configured to use another port. There is also support to use a Nvidia GPU by specifying cuda for the device flag.\n", - "\n", - "To start the takeoff server, run:\n", - "\n", - "```shell\n", - "iris takeoff --model tiiuae/falcon-7b-instruct --device cpu\n", - "iris takeoff --model tiiuae/falcon-7b-instruct --device cuda # Nvidia GPU required\n", - "iris takeoff --model tiiuae/falcon-7b-instruct --device cpu --port 5000 # run on port 5000 (default: 8000)\n", - "```" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "You will then be directed to a login page, where you will need to create an account to proceed.\n", - "After logging in, run the command onscreen to check whether the server is ready. When it is ready, you can start using the Takeoff integration.\n", - "\n", - "To shutdown the server, run the following command. You will be presented with options on which Takeoff server to shut down, in case you have multiple running servers.\n", - "\n", - "```shell\n", - "iris takeoff --shutdown # shutdown the server\n", - "```" + "Our inference server, [Titan Takeoff](https://docs.titanml.co/docs/intro) enables deployment of LLMs locally on your hardware in a single command. Most generative model architectures are supported, such as Falcon, Llama 2, GPT2, T5 and many more. If you experience trouble with a specific model, please let us know at hello@titanml.co." ] }, { @@ -77,8 +17,8 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "## Inferencing your model\n", - "To access your LLM, use the TitanTakeoff LLM wrapper:" + "## Example usage\n", + "Here are some helpful examples to get started using Titan Takeoff Server. You need to make sure Takeoff Server has been started in the background before running these commands. For more information see [docs page for launching Takeoff](https://docs.titanml.co/docs/Docs/launching/)." ] }, { @@ -87,50 +27,23 @@ "metadata": {}, "outputs": [], "source": [ - "from langchain_community.llms import TitanTakeoff\n", + "import time\n", "\n", - "llm = TitanTakeoff(\n", - " base_url=\"http://localhost:8000\", generate_max_length=128, temperature=1.0\n", - ")\n", - "\n", - "prompt = \"What is the largest planet in the solar system?\"\n", - "\n", - "llm(prompt)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "No parameters are needed by default, but a baseURL that points to your desired URL where Takeoff is running can be specified and [generation parameters](https://docs.titanml.co/docs/titan-takeoff/Advanced/generation-parameters) can be supplied.\n", - "\n", - "### Streaming\n", - "Streaming is also supported via the streaming flag:" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ "from langchain.callbacks.manager import CallbackManager\n", "from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler\n", + "from langchain.prompts import PromptTemplate\n", "\n", - "llm = TitanTakeoff(\n", - " callback_manager=CallbackManager([StreamingStdOutCallbackHandler()]), streaming=True\n", - ")\n", - "\n", - "prompt = \"What is the capital of France?\"\n", - "\n", - "llm(prompt)" + "# Note importing TitanTakeoffPro instead of TitanTakeoff will work as well both use same object under the hood\n", + "from langchain_community.llms import TitanTakeoff" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "### Integration with LLMChain" + "### Example 1\n", + "\n", + "Basic use assuming Takeoff is running on your machine using its default ports (ie localhost:3000).\n" ] }, { @@ -139,19 +52,144 @@ "metadata": {}, "outputs": [], "source": [ - "from langchain.chains import LLMChain\n", - "from langchain_core.prompts import PromptTemplate\n", - "\n", "llm = TitanTakeoff()\n", + "output = llm.invoke(\"What is the weather in London in August?\")\n", + "print(output)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Example 2\n", "\n", - "template = \"What is the capital of {country}\"\n", + "Specifying a port and other generation parameters" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "llm = TitanTakeoff(port=3000)\n", + "# A comprehensive list of parameters can be found at https://docs.titanml.co/docs/next/apis/Takeoff%20inference_REST_API/generate#request\n", + "output = llm.invoke(\n", + " \"What is the largest rainforest in the world?\",\n", + " consumer_group=\"primary\",\n", + " min_new_tokens=128,\n", + " max_new_tokens=512,\n", + " no_repeat_ngram_size=2,\n", + " sampling_topk=1,\n", + " sampling_topp=1.0,\n", + " sampling_temperature=1.0,\n", + " repetition_penalty=1.0,\n", + " regex_string=\"\",\n", + " json_schema=None,\n", + ")\n", + "print(output)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Example 3\n", "\n", - "prompt = PromptTemplate.from_template(template)\n", + "Using generate for multiple inputs" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "llm = TitanTakeoff()\n", + "rich_output = llm.generate([\"What is Deep Learning?\", \"What is Machine Learning?\"])\n", + "print(rich_output.generations)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Example 4\n", "\n", - "llm_chain = LLMChain(llm=llm, prompt=prompt)\n", + "Streaming output" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "llm = TitanTakeoff(\n", + " streaming=True, callback_manager=CallbackManager([StreamingStdOutCallbackHandler()])\n", + ")\n", + "prompt = \"What is the capital of France?\"\n", + "output = llm.invoke(prompt)\n", + "print(output)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Example 5\n", "\n", - "generated = llm_chain.run(country=\"Belgium\")\n", - "print(generated)" + "Using LCEL" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "llm = TitanTakeoff()\n", + "prompt = PromptTemplate.from_template(\"Tell me about {topic}\")\n", + "chain = prompt | llm\n", + "output = chain.invoke({\"topic\": \"the universe\"})\n", + "print(output)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Example 6\n", + "\n", + "Starting readers using TitanTakeoff Python Wrapper. If you haven't created any readers with first launching Takeoff, or you want to add another you can do so when you initialize the TitanTakeoff object. Just pass a list of model configs you want to start as the `models` parameter." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Model config for the llama model, where you can specify the following parameters:\n", + "# model_name (str): The name of the model to use\n", + "# device: (str): The device to use for inference, cuda or cpu\n", + "# consumer_group (str): The consumer group to place the reader into\n", + "# tensor_parallel (Optional[int]): The number of gpus you would like your model to be split across\n", + "# max_seq_length (int): The maximum sequence length to use for inference, defaults to 512\n", + "# max_batch_size (int_: The max batch size for continuous batching of requests\n", + "llama_model = {\n", + " \"model_name\": \"TheBloke/Llama-2-7b-Chat-AWQ\",\n", + " \"device\": \"cuda\",\n", + " \"consumer_group\": \"llama\",\n", + "}\n", + "llm = TitanTakeoff(models=[llama_model])\n", + "\n", + "# The model needs time to spin up, length of time need will depend on the size of model and your network connection speed\n", + "time.sleep(60)\n", + "\n", + "prompt = \"What is the capital of France?\"\n", + "output = llm.invoke(prompt, consumer_group=\"llama\")\n", + "print(output)" ] } ], diff --git a/docs/docs/integrations/llms/titan_takeoff_pro.ipynb b/docs/docs/integrations/llms/titan_takeoff_pro.ipynb deleted file mode 100644 index b728556eed2..00000000000 --- a/docs/docs/integrations/llms/titan_takeoff_pro.ipynb +++ /dev/null @@ -1,102 +0,0 @@ -{ - "cells": [ - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "# Titan Takeoff Pro\n", - "\n", - "`TitanML` helps businesses build and deploy better, smaller, cheaper, and faster NLP models through our training, compression, and inference optimization platform.\n", - "\n", - ">Note: These docs are for the Pro version of Titan Takeoff. For the community version, see the page for Titan Takeoff.\n", - "\n", - "Our inference server, [Titan Takeoff (Pro Version)](https://docs.titanml.co/docs/titan-takeoff/pro-features/feature-comparison) enables deployment of LLMs locally on your hardware in a single command. Most generative model architectures are supported, such as Falcon, Llama 2, GPT2, T5 and many more." - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Example usage\n", - "Here are some helpful examples to get started using the Pro version of Titan Takeoff Server.\n", - "No parameters are needed by default, but a baseURL that points to your desired URL where Takeoff is running can be specified and generation parameters can be supplied." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "from langchain.callbacks.manager import CallbackManager\n", - "from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler\n", - "from langchain_community.llms import TitanTakeoffPro\n", - "from langchain_core.prompts import PromptTemplate\n", - "\n", - "# Example 1: Basic use\n", - "llm = TitanTakeoffPro()\n", - "output = llm(\"What is the weather in London in August?\")\n", - "print(output)\n", - "\n", - "\n", - "# Example 2: Specifying a port and other generation parameters\n", - "llm = TitanTakeoffPro(\n", - " base_url=\"http://localhost:3000\",\n", - " min_new_tokens=128,\n", - " max_new_tokens=512,\n", - " no_repeat_ngram_size=2,\n", - " sampling_topk=1,\n", - " sampling_topp=1.0,\n", - " sampling_temperature=1.0,\n", - " repetition_penalty=1.0,\n", - " regex_string=\"\",\n", - ")\n", - "output = llm(\"What is the largest rainforest in the world?\")\n", - "print(output)\n", - "\n", - "\n", - "# Example 3: Using generate for multiple inputs\n", - "llm = TitanTakeoffPro()\n", - "rich_output = llm.generate([\"What is Deep Learning?\", \"What is Machine Learning?\"])\n", - "print(rich_output.generations)\n", - "\n", - "\n", - "# Example 4: Streaming output\n", - "llm = TitanTakeoffPro(\n", - " streaming=True, callback_manager=CallbackManager([StreamingStdOutCallbackHandler()])\n", - ")\n", - "prompt = \"What is the capital of France?\"\n", - "llm(prompt)\n", - "\n", - "# Example 5: Using LCEL\n", - "llm = TitanTakeoffPro()\n", - "prompt = PromptTemplate.from_template(\"Tell me about {topic}\")\n", - "chain = prompt | llm\n", - "chain.invoke({\"topic\": \"the universe\"})" - ] - } - ], - "metadata": { - "kernelspec": { - "display_name": "Python 3 (ipykernel)", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.10.12" - } - }, - "nbformat": 4, - "nbformat_minor": 4 -} diff --git a/docs/docs/integrations/text_embedding/titan_takeoff.ipynb b/docs/docs/integrations/text_embedding/titan_takeoff.ipynb new file mode 100644 index 00000000000..cc5ad9268ac --- /dev/null +++ b/docs/docs/integrations/text_embedding/titan_takeoff.ipynb @@ -0,0 +1,112 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Titan Takeoff\n", + "\n", + "`TitanML` helps businesses build and deploy better, smaller, cheaper, and faster NLP models through our training, compression, and inference optimization platform.\n", + "\n", + "Our inference server, [Titan Takeoff](https://docs.titanml.co/docs/intro) enables deployment of LLMs locally on your hardware in a single command. Most embedding models are supported out of the box, if you experience trouble with a specific model, please let us know at hello@titanml.co." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Example usage\n", + "Here are some helpful examples to get started using Titan Takeoff Server. You need to make sure Takeoff Server has been started in the background before running these commands. For more information see [docs page for launching Takeoff](https://docs.titanml.co/docs/Docs/launching/)." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import time\n", + "\n", + "from langchain_community.embeddings import TitanTakeoffEmbed" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Example 1\n", + "Basic use assuming Takeoff is running on your machine using its default ports (ie localhost:3000)." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "embed = TitanTakeoffEmbed()\n", + "output = embed.embed_query(\n", + " \"What is the weather in London in August?\", consumer_group=\"embed\"\n", + ")\n", + "print(output)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Example 2 \n", + "Starting readers using TitanTakeoffEmbed Python Wrapper. If you haven't created any readers with first launching Takeoff, or you want to add another you can do so when you initialize the TitanTakeoffEmbed object. Just pass a list of models you want to start as the `models` parameter.\n", + "\n", + "You can use `embed.query_documents` to embed multiple documents at once. The expected input is a list of strings, rather than just a string expected for the `embed_query` method." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Model config for the embedding model, where you can specify the following parameters:\n", + "# model_name (str): The name of the model to use\n", + "# device: (str): The device to use for inference, cuda or cpu\n", + "# consumer_group (str): The consumer group to place the reader into\n", + "embedding_model = {\n", + " \"model_name\": \"BAAI/bge-large-en-v1.5\",\n", + " \"device\": \"cpu\",\n", + " \"consumer_group\": \"embed\",\n", + "}\n", + "embed = TitanTakeoffEmbed(models=[embedding_model])\n", + "\n", + "# The model needs time to spin up, length of time need will depend on the size of model and your network connection speed\n", + "time.sleep(60)\n", + "\n", + "prompt = \"What is the capital of France?\"\n", + "# We specified \"embed\" consumer group so need to send request to the same consumer group so it hits our embedding model and not others\n", + "output = embed.embed_query(prompt, consumer_group=\"embed\")\n", + "print(output)" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "langchain", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.10.12" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/docs/vercel.json b/docs/vercel.json index 97f4dbe505e..700a45fe9f4 100644 --- a/docs/vercel.json +++ b/docs/vercel.json @@ -1,6 +1,10 @@ { "trailingSlash": true, "redirects": [ + { + "source": "/docs/integrations/llms/titan_takeoff_pro", + "destination": "/docs/integrations/llms/titan_takeoff" + }, { "source": "/docs/integrations/providers/optimum_intel(/?)", "destination": "/docs/integrations/providers/intel/" diff --git a/libs/community/langchain_community/embeddings/__init__.py b/libs/community/langchain_community/embeddings/__init__.py index 3359dae9157..bbbbf0f72c7 100644 --- a/libs/community/langchain_community/embeddings/__init__.py +++ b/libs/community/langchain_community/embeddings/__init__.py @@ -357,6 +357,7 @@ _module_lookup = { "VolcanoEmbeddings": "langchain_community.embeddings.volcengine", "VoyageEmbeddings": "langchain_community.embeddings.voyageai", "XinferenceEmbeddings": "langchain_community.embeddings.xinference", + "TitanTakeoffEmbed": "langchain_community.embeddings.titan_takeoff", "PremAIEmbeddings": "langchain_community.embeddings.premai", "YandexGPTEmbeddings": "langchain_community.embeddings.yandex", } diff --git a/libs/community/langchain_community/embeddings/titan_takeoff.py b/libs/community/langchain_community/embeddings/titan_takeoff.py new file mode 100644 index 00000000000..dc09e22936d --- /dev/null +++ b/libs/community/langchain_community/embeddings/titan_takeoff.py @@ -0,0 +1,207 @@ +from enum import Enum +from typing import Any, List, Optional, Set, Union + +from langchain_core.embeddings import Embeddings +from langchain_core.pydantic_v1 import BaseModel + + +class TakeoffEmbeddingException(Exception): + """Exceptions experienced with interfacing with Takeoff Embedding Wrapper""" + + +class MissingConsumerGroup(TakeoffEmbeddingException): + """Exception raised when no consumer group is provided on initialization of + TitanTakeoffEmbed or in embed request""" + + +class Device(str, Enum): + """The device to use for inference, cuda or cpu""" + + cuda = "cuda" + cpu = "cpu" + + +class ReaderConfig(BaseModel): + class Config: + protected_namespaces = () + + model_name: str + """The name of the model to use""" + + device: Device = Device.cuda + """The device to use for inference, cuda or cpu""" + + consumer_group: str = "primary" + """The consumer group to place the reader into""" + + +class TitanTakeoffEmbed(Embeddings): + """Titan Takeoff Embed is a wrapper to interface with Takeoff Inference API + for embedding models + + You can use this wrapper to send embedding requests and to deploy embedding + readers with Takeoff. + + Examples: + This is an example how to deploy an embedding model and send requests. + + .. code-block:: python + # Import the TitanTakeoffEmbed class from community package + import time + from langchain_community.embeddings import TitanTakeoffEmbed + + # Specify the embedding reader you'd like to deploy + reader_1 = { + "model_name": "avsolatorio/GIST-large-Embedding-v0", + "device": "cpu", + "consumer_group": "embed" + } + + # For every reader you pass into models arg Takeoff will spin up a reader + # according to the specs you provide. If you don't specify the arg no models + # are spun up and it assumes you have already done this separately. + embed = TitanTakeoffEmbed(models=[reader_1]) + + # Wait for the reader to be deployed, time needed depends on the model size + # and your internet speed + time.sleep(60) + + # Returns the embedded query, ie a List[float], sent to `embed` consumer + # group where we just spun up the embedding reader + print(embed.embed_query( + "Where can I see football?", consumer_group="embed" + )) + + # Returns a List of embeddings, ie a List[List[float]], sent to `embed` + # consumer group where we just spun up the embedding reader + print(embed.embed_document( + ["Document1", "Document2"], + consumer_group="embed" + )) + """ + + base_url: str = "http://localhost" + """The base URL of the Titan Takeoff (Pro) server. Default = "http://localhost".""" + + port: int = 3000 + """The port of the Titan Takeoff (Pro) server. Default = 3000.""" + + mgmt_port: int = 3001 + """The management port of the Titan Takeoff (Pro) server. Default = 3001.""" + + client: Any = None + """Takeoff Client Python SDK used to interact with Takeoff API""" + + embed_consumer_groups: Set[str] = set() + """The consumer groups in Takeoff which contain embedding models""" + + def __init__( + self, + base_url: str = "http://localhost", + port: int = 3000, + mgmt_port: int = 3001, + models: List[ReaderConfig] = [], + ): + """Initialize the Titan Takeoff embedding wrapper. + + Args: + base_url (str, optional): The base url where Takeoff Inference Server is + listening. Defaults to "http://localhost". + port (int, optional): What port is Takeoff Inference API listening on. + Defaults to 3000. + mgmt_port (int, optional): What port is Takeoff Management API listening on. + Defaults to 3001. + models (List[ReaderConfig], optional): Any readers you'd like to spin up on. + Defaults to []. + + Raises: + ImportError: If you haven't installed takeoff-client, you will get an + ImportError. To remedy run `pip install 'takeoff-client==0.4.0'` + """ + self.base_url = base_url + self.port = port + self.mgmt_port = mgmt_port + try: + from takeoff_client import TakeoffClient + except ImportError: + raise ImportError( + "takeoff-client is required for TitanTakeoff. " + "Please install it with `pip install 'takeoff-client==0.4.0'`." + ) + self.client = TakeoffClient( + self.base_url, port=self.port, mgmt_port=self.mgmt_port + ) + for model in models: + self.client.create_reader(model) + if isinstance(model, dict): + self.embed_consumer_groups.add(model.get("consumer_group")) + else: + self.embed_consumer_groups.add(model.consumer_group) + super(TitanTakeoffEmbed, self).__init__() + + def _embed( + self, input: Union[List[str], str], consumer_group: Optional[str] + ) -> dict: + """Embed text. + + Args: + input (List[str]): prompt/document or list of prompts/documents to embed + consumer_group (Optional[str]): what consumer group to send the embedding + request to. If not specified and there is only one + consumer group specified during initialization, it will be used. If there + are multiple consumer groups specified during initialization, you must + specify which one to use. + + Raises: + MissingConsumerGroup: The consumer group can not be inferred from the + initialization and must be specified with request. + + Returns: + Dict[str, Any]: Result of query, {"result": List[List[float]]} or + {"result": List[float]} + """ + if not consumer_group: + if len(self.embed_consumer_groups) == 1: + consumer_group = list(self.embed_consumer_groups)[0] + elif len(self.embed_consumer_groups) > 1: + raise MissingConsumerGroup( + "TakeoffEmbedding was initialized with multiple embedding reader" + "groups, you must specify which one to use." + ) + else: + raise MissingConsumerGroup( + "You must specify what consumer group you want to send embedding" + "response to as TitanTakeoffEmbed was not initialized with an " + "embedding reader." + ) + return self.client.embed(input, consumer_group) + + def embed_documents( + self, texts: List[str], consumer_group: Optional[str] = None + ) -> List[List[float]]: + """Embed documents. + + Args: + texts (List[str]): List of prompts/documents to embed + consumer_group (Optional[str], optional): Consumer group to send request + to containing embedding model. Defaults to None. + + Returns: + List[List[float]]: List of embeddings + """ + return self._embed(texts, consumer_group)["result"] + + def embed_query( + self, text: str, consumer_group: Optional[str] = None + ) -> List[float]: + """Embed query. + + Args: + text (str): Prompt/document to embed + consumer_group (Optional[str], optional): Consumer group to send request + to containing embedding model. Defaults to None. + + Returns: + List[float]: Embedding + """ + return self._embed(text, consumer_group)["result"] diff --git a/libs/community/langchain_community/llms/__init__.py b/libs/community/langchain_community/llms/__init__.py index df18912e3a5..5b6adb5c49f 100644 --- a/libs/community/langchain_community/llms/__init__.py +++ b/libs/community/langchain_community/llms/__init__.py @@ -549,9 +549,9 @@ def _import_titan_takeoff() -> Type[BaseLLM]: def _import_titan_takeoff_pro() -> Type[BaseLLM]: - from langchain_community.llms.titan_takeoff_pro import TitanTakeoffPro + from langchain_community.llms.titan_takeoff import TitanTakeoff - return TitanTakeoffPro + return TitanTakeoff def _import_together() -> Type[BaseLLM]: diff --git a/libs/community/langchain_community/llms/titan_takeoff.py b/libs/community/langchain_community/llms/titan_takeoff.py index b2007ccdea2..e5b38e77dd1 100644 --- a/libs/community/langchain_community/llms/titan_takeoff.py +++ b/libs/community/langchain_community/llms/titan_takeoff.py @@ -1,61 +1,155 @@ -from typing import Any, Iterator, List, Mapping, Optional +from enum import Enum +from typing import Any, Iterator, List, Optional -import requests from langchain_core.callbacks import CallbackManagerForLLMRun from langchain_core.language_models.llms import LLM from langchain_core.outputs import GenerationChunk -from requests.exceptions import ConnectionError +from langchain_core.pydantic_v1 import BaseModel from langchain_community.llms.utils import enforce_stop_tokens +class Device(str, Enum): + """The device to use for inference, cuda or cpu""" + + cuda = "cuda" + cpu = "cpu" + + +class ReaderConfig(BaseModel): + class Config: + protected_namespaces = () + + model_name: str + """The name of the model to use""" + + device: Device = Device.cuda + """The device to use for inference, cuda or cpu""" + + consumer_group: str = "primary" + """The consumer group to place the reader into""" + + tensor_parallel: Optional[int] = None + """The number of gpus you would like your model to be split across""" + + max_seq_length: int = 512 + """The maximum sequence length to use for inference, defaults to 512""" + + max_batch_size: int = 4 + """The max batch size for continuous batching of requests""" + + class TitanTakeoff(LLM): - """Titan Takeoff API LLMs.""" + """Titan Takeoff API LLMs. - base_url: str = "http://localhost:8000" - """Specifies the baseURL to use for the Titan Takeoff API. - Default = http://localhost:8000. + Titan Takeoff is a wrapper to interface with Takeoff Inference API for + generative text to text language models. + + You can use this wrapper to send requests to a generative language model + and to deploy readers with Takeoff. + + Examples: + This is an example how to deploy a generative language model and send + requests. + + .. code-block:: python + # Import the TitanTakeoff class from community package + import time + from langchain_community.llms import TitanTakeoff + + # Specify the embedding reader you'd like to deploy + reader_1 = { + "model_name": "TheBloke/Llama-2-7b-Chat-AWQ", + "device": "cuda", + "tensor_parallel": 1, + "consumer_group": "llama" + } + + # For every reader you pass into models arg Takeoff will spin + # up a reader according to the specs you provide. If you don't + # specify the arg no models are spun up and it assumes you have + # already done this separately. + llm = TitanTakeoff(models=[reader_1]) + + # Wait for the reader to be deployed, time needed depends on the + # model size and your internet speed + time.sleep(60) + + # Returns the query, ie a List[float], sent to `llama` consumer group + # where we just spun up the Llama 7B model + print(embed.invoke( + "Where can I see football?", consumer_group="llama" + )) + + # You can also send generation parameters to the model, any of the + # following can be passed in as kwargs: + # https://docs.titanml.co/docs/next/apis/Takeoff%20inference_REST_API/generate#request + # for instance: + print(embed.invoke( + "Where can I see football?", consumer_group="llama", max_new_tokens=100 + )) """ - generate_max_length: int = 128 - """Maximum generation length. Default = 128.""" + base_url: str = "http://localhost" + """The base URL of the Titan Takeoff (Pro) server. Default = "http://localhost".""" - sampling_topk: int = 1 - """Sample predictions from the top K most probable candidates. Default = 1.""" + port: int = 3000 + """The port of the Titan Takeoff (Pro) server. Default = 3000.""" - sampling_topp: float = 1.0 - """Sample from predictions whose cumulative probability exceeds this value. - Default = 1.0. - """ - - sampling_temperature: float = 1.0 - """Sample with randomness. Bigger temperatures are associated with - more randomness and 'creativity'. Default = 1.0. - """ - - repetition_penalty: float = 1.0 - """Penalise the generation of tokens that have been generated before. - Set to > 1 to penalize. Default = 1 (no penalty). - """ - - no_repeat_ngram_size: int = 0 - """Prevent repetitions of ngrams of this size. Default = 0 (turned off).""" + mgmt_port: int = 3001 + """The management port of the Titan Takeoff (Pro) server. Default = 3001.""" streaming: bool = False """Whether to stream the output. Default = False.""" - @property - def _default_params(self) -> Mapping[str, Any]: - """Get the default parameters for calling Titan Takeoff Server.""" - params = { - "generate_max_length": self.generate_max_length, - "sampling_topk": self.sampling_topk, - "sampling_topp": self.sampling_topp, - "sampling_temperature": self.sampling_temperature, - "repetition_penalty": self.repetition_penalty, - "no_repeat_ngram_size": self.no_repeat_ngram_size, - } - return params + client: Any = None + """Takeoff Client Python SDK used to interact with Takeoff API""" + + def __init__( + self, + base_url: str = "http://localhost", + port: int = 3000, + mgmt_port: int = 3001, + streaming: bool = False, + models: List[ReaderConfig] = [], + ): + """Initialize the Titan Takeoff language wrapper. + + Args: + base_url (str, optional): The base URL where the Takeoff + Inference Server is listening. Defaults to `http://localhost`. + port (int, optional): What port is Takeoff Inference API + listening on. Defaults to 3000. + mgmt_port (int, optional): What port is Takeoff Management API + listening on. Defaults to 3001. + streaming (bool, optional): Whether you want to by default use the + generate_stream endpoint over generate to stream responses. + Defaults to False. In reality, this is not significantly different + as the streamed response is buffered and returned similar to the + non-streamed response, but the run manager is applied per token + generated. + models (List[ReaderConfig], optional): Any readers you'd like to + spin up on. Defaults to []. + + Raises: + ImportError: If you haven't installed takeoff-client, you will + get an ImportError. To remedy run `pip install 'takeoff-client==0.4.0'` + """ + super().__init__( + base_url=base_url, port=port, mgmt_port=mgmt_port, streaming=streaming + ) + try: + from takeoff_client import TakeoffClient + except ImportError: + raise ImportError( + "takeoff-client is required for TitanTakeoff. " + "Please install it with `pip install 'takeoff-client>=0.4.0'`." + ) + self.client = TakeoffClient( + self.base_url, port=self.port, mgmt_port=self.mgmt_port + ) + for model in models: + self.client.create_reader(model) @property def _llm_type(self) -> str: @@ -69,11 +163,12 @@ class TitanTakeoff(LLM): run_manager: Optional[CallbackManagerForLLMRun] = None, **kwargs: Any, ) -> str: - """Call out to Titan Takeoff generate endpoint. + """Call out to Titan Takeoff (Pro) generate endpoint. Args: prompt: The prompt to pass into the model. stop: Optional list of stop words to use when generating. + run_manager: Optional callback manager to use when streaming. Returns: The string generated by the model. @@ -81,41 +176,31 @@ class TitanTakeoff(LLM): Example: .. code-block:: python + model = TitanTakeoff() + prompt = "What is the capital of the United Kingdom?" - response = model(prompt) + + # Use of model(prompt), ie `__call__` was deprecated in LangChain 0.1.7, + # use model.invoke(prompt) instead. + response = model.invoke(prompt) """ - try: - if self.streaming: - text_output = "" - for chunk in self._stream( - prompt=prompt, - stop=stop, - run_manager=run_manager, - ): - text_output += chunk.text - return text_output + if self.streaming: + text_output = "" + for chunk in self._stream( + prompt=prompt, + stop=stop, + run_manager=run_manager, + ): + text_output += chunk.text + return text_output - url = f"{self.base_url}/generate" - params = {"text": prompt, **self._default_params} + response = self.client.generate(prompt, **kwargs) + text = response["text"] - response = requests.post(url, json=params) - response.raise_for_status() - response.encoding = "utf-8" - text = "" - - if "message" in response.json(): - text = response.json()["message"] - else: - raise ValueError("Something went wrong.") - if stop is not None: - text = enforce_stop_tokens(text, stop) - return text - except ConnectionError: - raise ConnectionError( - "Could not connect to Titan Takeoff server. \ - Please make sure that the server is running." - ) + if stop is not None: + text = enforce_stop_tokens(text, stop) + return text def _stream( self, @@ -124,14 +209,12 @@ class TitanTakeoff(LLM): run_manager: Optional[CallbackManagerForLLMRun] = None, **kwargs: Any, ) -> Iterator[GenerationChunk]: - """Call out to Titan Takeoff stream endpoint. + """Call out to Titan Takeoff (Pro) stream endpoint. Args: prompt: The prompt to pass into the model. stop: Optional list of stop words to use when generating. - - Returns: - The string generated by the model. + run_manager: Optional callback manager to use when streaming. Yields: A dictionary like object containing a string token. @@ -139,23 +222,40 @@ class TitanTakeoff(LLM): Example: .. code-block:: python + model = TitanTakeoff() + prompt = "What is the capital of the United Kingdom?" - response = model(prompt) + response = model.stream(prompt) + + # OR + + model = TitanTakeoff(streaming=True) + + response = model.invoke(prompt) """ - url = f"{self.base_url}/generate_stream" - params = {"text": prompt, **self._default_params} + response = self.client.generate_stream(prompt, **kwargs) + buffer = "" + for text in response: + buffer += text.data + if "data:" in buffer: + # Remove the first instance of "data:" from the buffer. + if buffer.startswith("data:"): + buffer = "" + if len(buffer.split("data:", 1)) == 2: + content, _ = buffer.split("data:", 1) + buffer = content.rstrip("\n") + # Trim the buffer to only have content after the "data:" part. + if buffer: # Ensure that there's content to process. + chunk = GenerationChunk(text=buffer) + buffer = "" # Reset buffer for the next set of data. + yield chunk + if run_manager: + run_manager.on_llm_new_token(token=chunk.text) - response = requests.post(url, json=params, stream=True) - response.encoding = "utf-8" - for text in response.iter_content(chunk_size=1, decode_unicode=True): - if text: - chunk = GenerationChunk(text=text) - if run_manager: - run_manager.on_llm_new_token(token=chunk.text) - yield chunk - - @property - def _identifying_params(self) -> Mapping[str, Any]: - """Get the identifying parameters.""" - return {"base_url": self.base_url, **{}, **self._default_params} + # Yield any remaining content in the buffer. + if buffer: + chunk = GenerationChunk(text=buffer.replace("", "")) + yield chunk + if run_manager: + run_manager.on_llm_new_token(token=chunk.text) diff --git a/libs/community/langchain_community/llms/titan_takeoff_pro.py b/libs/community/langchain_community/llms/titan_takeoff_pro.py deleted file mode 100644 index 8040afab454..00000000000 --- a/libs/community/langchain_community/llms/titan_takeoff_pro.py +++ /dev/null @@ -1,217 +0,0 @@ -from typing import Any, Iterator, List, Mapping, Optional - -import requests -from langchain_core.callbacks import CallbackManagerForLLMRun -from langchain_core.language_models.llms import LLM -from langchain_core.outputs import GenerationChunk -from requests.exceptions import ConnectionError - -from langchain_community.llms.utils import enforce_stop_tokens - - -class TitanTakeoffPro(LLM): - """Titan Takeoff Pro is a language model that can be used to generate text.""" - - base_url: Optional[str] = "http://localhost:3000" - """Specifies the baseURL to use for the Titan Takeoff Pro API. - Default = http://localhost:3000. - """ - - max_new_tokens: Optional[int] = None - """Maximum tokens generated.""" - - min_new_tokens: Optional[int] = None - """Minimum tokens generated.""" - - sampling_topk: Optional[int] = None - """Sample predictions from the top K most probable candidates.""" - - sampling_topp: Optional[float] = None - """Sample from predictions whose cumulative probability exceeds this value. - """ - - sampling_temperature: Optional[float] = None - """Sample with randomness. Bigger temperatures are associated with - more randomness and 'creativity'. - """ - - repetition_penalty: Optional[float] = None - """Penalise the generation of tokens that have been generated before. - Set to > 1 to penalize. - """ - - regex_string: Optional[str] = None - """A regex string for constrained generation.""" - - no_repeat_ngram_size: Optional[int] = None - """Prevent repetitions of ngrams of this size. Default = 0 (turned off).""" - - streaming: bool = False - """Whether to stream the output. Default = False.""" - - @property - def _default_params(self) -> Mapping[str, Any]: - """Get the default parameters for calling Titan Takeoff Server (Pro).""" - return { - **( - {"regex_string": self.regex_string} - if self.regex_string is not None - else {} - ), - **( - {"sampling_temperature": self.sampling_temperature} - if self.sampling_temperature is not None - else {} - ), - **( - {"sampling_topp": self.sampling_topp} - if self.sampling_topp is not None - else {} - ), - **( - {"repetition_penalty": self.repetition_penalty} - if self.repetition_penalty is not None - else {} - ), - **( - {"max_new_tokens": self.max_new_tokens} - if self.max_new_tokens is not None - else {} - ), - **( - {"min_new_tokens": self.min_new_tokens} - if self.min_new_tokens is not None - else {} - ), - **( - {"sampling_topk": self.sampling_topk} - if self.sampling_topk is not None - else {} - ), - **( - {"no_repeat_ngram_size": self.no_repeat_ngram_size} - if self.no_repeat_ngram_size is not None - else {} - ), - } - - @property - def _llm_type(self) -> str: - """Return type of llm.""" - return "titan_takeoff_pro" - - def _call( - self, - prompt: str, - stop: Optional[List[str]] = None, - run_manager: Optional[CallbackManagerForLLMRun] = None, - **kwargs: Any, - ) -> str: - """Call out to Titan Takeoff (Pro) generate endpoint. - - Args: - prompt: The prompt to pass into the model. - stop: Optional list of stop words to use when generating. - - Returns: - The string generated by the model. - - Example: - .. code-block:: python - - prompt = "What is the capital of the United Kingdom?" - response = model(prompt) - - """ - try: - if self.streaming: - text_output = "" - for chunk in self._stream( - prompt=prompt, - stop=stop, - run_manager=run_manager, - ): - text_output += chunk.text - return text_output - url = f"{self.base_url}/generate" - params = {"text": prompt, **self._default_params} - - response = requests.post(url, json=params) - response.raise_for_status() - response.encoding = "utf-8" - - text = "" - if "text" in response.json(): - text = response.json()["text"] - text = text.replace("", "") - else: - raise ValueError("Something went wrong.") - if stop is not None: - text = enforce_stop_tokens(text, stop) - return text - except ConnectionError: - raise ConnectionError( - "Could not connect to Titan Takeoff (Pro) server. \ - Please make sure that the server is running." - ) - - def _stream( - self, - prompt: str, - stop: Optional[List[str]] = None, - run_manager: Optional[CallbackManagerForLLMRun] = None, - **kwargs: Any, - ) -> Iterator[GenerationChunk]: - """Call out to Titan Takeoff (Pro) stream endpoint. - - Args: - prompt: The prompt to pass into the model. - stop: Optional list of stop words to use when generating. - - Returns: - The string generated by the model. - - Yields: - A dictionary like object containing a string token. - - Example: - .. code-block:: python - - prompt = "What is the capital of the United Kingdom?" - response = model(prompt) - - """ - url = f"{self.base_url}/generate_stream" - params = {"text": prompt, **self._default_params} - - response = requests.post(url, json=params, stream=True) - response.encoding = "utf-8" - buffer = "" - for text in response.iter_content(chunk_size=1, decode_unicode=True): - buffer += text - if "data:" in buffer: - # Remove the first instance of "data:" from the buffer. - if buffer.startswith("data:"): - buffer = "" - if len(buffer.split("data:", 1)) == 2: - content, _ = buffer.split("data:", 1) - buffer = content.rstrip("\n") - # Trim the buffer to only have content after the "data:" part. - if buffer: # Ensure that there's content to process. - chunk = GenerationChunk(text=buffer) - buffer = "" # Reset buffer for the next set of data. - yield chunk - if run_manager: - run_manager.on_llm_new_token(token=chunk.text) - - # Yield any remaining content in the buffer. - if buffer: - chunk = GenerationChunk(text=buffer.replace("", "")) - if run_manager: - run_manager.on_llm_new_token(token=chunk.text) - yield chunk - - @property - def _identifying_params(self) -> Mapping[str, Any]: - """Get the identifying parameters.""" - return {"base_url": self.base_url, **{}, **self._default_params} diff --git a/libs/community/tests/integration_tests/embeddings/test_titan_takeoff.py b/libs/community/tests/integration_tests/embeddings/test_titan_takeoff.py new file mode 100644 index 00000000000..884f1a120ab --- /dev/null +++ b/libs/community/tests/integration_tests/embeddings/test_titan_takeoff.py @@ -0,0 +1,178 @@ +"""Test Titan Takeoff Embedding wrapper.""" + + +import json +from typing import Any + +import pytest + +from langchain_community.embeddings import TitanTakeoffEmbed +from langchain_community.embeddings.titan_takeoff import MissingConsumerGroup + + +@pytest.mark.requires("pytest_httpx") +@pytest.mark.requires("takeoff_client") +def test_titan_takeoff_call(httpx_mock: Any) -> None: + """Test valid call to Titan Takeoff.""" + port = 2345 + + httpx_mock.add_response( + method="POST", + url=f"http://localhost:{port}/embed", + json={"result": [0.46635, 0.234, -0.8521]}, + ) + + embedding = TitanTakeoffEmbed(port=port) + + output_1 = embedding.embed_documents("What is 2 + 2?", "primary") + output_2 = embedding.embed_query("What is 2 + 2?", "primary") + + assert isinstance(output_1, list) + assert isinstance(output_2, list) + + assert len(httpx_mock.get_requests()) == 2 + for n in range(2): + assert httpx_mock.get_requests()[n].url == f"http://localhost:{port}/embed" + assert ( + json.loads(httpx_mock.get_requests()[n].content)["text"] == "What is 2 + 2?" + ) + + +@pytest.mark.requires("pytest_httpx") +@pytest.mark.requires("takeoff_client") +def test_no_consumer_group_fails(httpx_mock: Any) -> None: + """Test that not specifying a consumer group fails.""" + port = 2345 + + httpx_mock.add_response( + method="POST", + url=f"http://localhost:{port}/embed", + json={"result": [0.46635, 0.234, -0.8521]}, + ) + + embedding = TitanTakeoffEmbed(port=port) + + with pytest.raises(MissingConsumerGroup): + embedding.embed_documents("What is 2 + 2?") + with pytest.raises(MissingConsumerGroup): + embedding.embed_query("What is 2 + 2?") + + # Check specifying a consumer group works + embedding.embed_documents("What is 2 + 2?", "primary") + embedding.embed_query("What is 2 + 2?", "primary") + + +@pytest.mark.requires("pytest_httpx") +@pytest.mark.requires("takeoff_client") +def test_takeoff_initialization(httpx_mock: Any) -> None: + """Test valid call to Titan Takeoff.""" + mgnt_port = 36452 + inf_port = 46253 + mgnt_url = f"http://localhost:{mgnt_port}/reader" + embed_url = f"http://localhost:{inf_port}/embed" + reader_1 = { + "model_name": "test", + "device": "cpu", + "consumer_group": "embed", + } + reader_2 = reader_1.copy() + reader_2["model_name"] = "test2" + reader_2["device"] = "cuda" + + httpx_mock.add_response( + method="POST", url=mgnt_url, json={"key": "value"}, status_code=201 + ) + httpx_mock.add_response( + method="POST", + url=embed_url, + json={"result": [0.34, 0.43, -0.934532]}, + status_code=200, + ) + + llm = TitanTakeoffEmbed( + port=inf_port, mgmt_port=mgnt_port, models=[reader_1, reader_2] + ) + # Shouldn't need to specify consumer group as there is only one specified during + # initialization + output_1 = llm.embed_documents("What is 2 + 2?") + output_2 = llm.embed_query("What is 2 + 2?") + + assert isinstance(output_1, list) + assert isinstance(output_2, list) + # Ensure the management api was called to create the reader + assert len(httpx_mock.get_requests()) == 4 + for key, value in reader_1.items(): + assert json.loads(httpx_mock.get_requests()[0].content)[key] == value + assert httpx_mock.get_requests()[0].url == mgnt_url + # Also second call should be made to spin uo reader 2 + for key, value in reader_2.items(): + assert json.loads(httpx_mock.get_requests()[1].content)[key] == value + assert httpx_mock.get_requests()[1].url == mgnt_url + # Ensure the third call is to generate endpoint to inference + for n in range(2, 4): + assert httpx_mock.get_requests()[n].url == embed_url + assert ( + json.loads(httpx_mock.get_requests()[n].content)["text"] == "What is 2 + 2?" + ) + + +@pytest.mark.requires("pytest_httpx") +@pytest.mark.requires("takeoff_client") +def test_takeoff_initialization_with_more_than_one_consumer_group( + httpx_mock: Any, +) -> None: + """Test valid call to Titan Takeoff.""" + mgnt_port = 36452 + inf_port = 46253 + mgnt_url = f"http://localhost:{mgnt_port}/reader" + embed_url = f"http://localhost:{inf_port}/embed" + reader_1 = { + "model_name": "test", + "device": "cpu", + "consumer_group": "embed", + } + reader_2 = reader_1.copy() + reader_2["model_name"] = "test2" + reader_2["device"] = "cuda" + reader_2["consumer_group"] = "embed2" + + httpx_mock.add_response( + method="POST", url=mgnt_url, json={"key": "value"}, status_code=201 + ) + httpx_mock.add_response( + method="POST", + url=embed_url, + json={"result": [0.34, 0.43, -0.934532]}, + status_code=200, + ) + + llm = TitanTakeoffEmbed( + port=inf_port, mgmt_port=mgnt_port, models=[reader_1, reader_2] + ) + # There was more than one consumer group specified during initialization so we + # need to specify which one to use + with pytest.raises(MissingConsumerGroup): + llm.embed_documents("What is 2 + 2?") + with pytest.raises(MissingConsumerGroup): + llm.embed_query("What is 2 + 2?") + + output_1 = llm.embed_documents("What is 2 + 2?", "embed") + output_2 = llm.embed_query("What is 2 + 2?", "embed2") + + assert isinstance(output_1, list) + assert isinstance(output_2, list) + # Ensure the management api was called to create the reader + assert len(httpx_mock.get_requests()) == 4 + for key, value in reader_1.items(): + assert json.loads(httpx_mock.get_requests()[0].content)[key] == value + assert httpx_mock.get_requests()[0].url == mgnt_url + # Also second call should be made to spin uo reader 2 + for key, value in reader_2.items(): + assert json.loads(httpx_mock.get_requests()[1].content)[key] == value + assert httpx_mock.get_requests()[1].url == mgnt_url + # Ensure the third call is to generate endpoint to inference + for n in range(2, 4): + assert httpx_mock.get_requests()[n].url == embed_url + assert ( + json.loads(httpx_mock.get_requests()[n].content)["text"] == "What is 2 + 2?" + ) diff --git a/libs/community/tests/integration_tests/llms/test_titan_takeoff.py b/libs/community/tests/integration_tests/llms/test_titan_takeoff.py index 0b7ba94d51f..a573bb55e51 100644 --- a/libs/community/tests/integration_tests/llms/test_titan_takeoff.py +++ b/libs/community/tests/integration_tests/llms/test_titan_takeoff.py @@ -1,18 +1,141 @@ """Test Titan Takeoff wrapper.""" +import json +from typing import Any, Union + +import pytest + +from langchain_community.llms import TitanTakeoff, TitanTakeoffPro -import responses - -from langchain_community.llms.titan_takeoff import TitanTakeoff - - -@responses.activate -def test_titan_takeoff_call() -> None: +@pytest.mark.requires("takeoff_client") +@pytest.mark.requires("pytest_httpx") +@pytest.mark.parametrize("streaming", [True, False]) +@pytest.mark.parametrize("takeoff_object", [TitanTakeoff, TitanTakeoffPro]) +def test_titan_takeoff_call( + httpx_mock: Any, + streaming: bool, + takeoff_object: Union[TitanTakeoff, TitanTakeoffPro], +) -> None: """Test valid call to Titan Takeoff.""" - url = "http://localhost:8000/generate" - responses.add(responses.POST, url, json={"message": "2 + 2 is 4"}, status=200) + from pytest_httpx import IteratorStream - # response = requests.post(url) - llm = TitanTakeoff() + port = 2345 + url = ( + f"http://localhost:{port}/generate_stream" + if streaming + else f"http://localhost:{port}/generate" + ) + + if streaming: + httpx_mock.add_response( + method="POST", + url=url, + stream=IteratorStream([b"data: ask someone else\n\n"]), + ) + else: + httpx_mock.add_response( + method="POST", + url=url, + json={"text": "ask someone else"}, + ) + + llm = takeoff_object(port=port, streaming=streaming) + number_of_calls = 0 + for function_call in [llm, llm.invoke]: + number_of_calls += 1 + output = function_call("What is 2 + 2?") + assert isinstance(output, str) + assert len(httpx_mock.get_requests()) == number_of_calls + assert httpx_mock.get_requests()[0].url == url + assert ( + json.loads(httpx_mock.get_requests()[0].content)["text"] == "What is 2 + 2?" + ) + + if streaming: + output = llm._stream("What is 2 + 2?") + for chunk in output: + assert isinstance(chunk.text, str) + assert len(httpx_mock.get_requests()) == number_of_calls + 1 + assert httpx_mock.get_requests()[0].url == url + assert ( + json.loads(httpx_mock.get_requests()[0].content)["text"] == "What is 2 + 2?" + ) + + +@pytest.mark.requires("pytest_httpx") +@pytest.mark.requires("takeoff_client") +@pytest.mark.parametrize("streaming", [True, False]) +@pytest.mark.parametrize("takeoff_object", [TitanTakeoff, TitanTakeoffPro]) +def test_titan_takeoff_bad_call( + httpx_mock: Any, + streaming: bool, + takeoff_object: Union[TitanTakeoff, TitanTakeoffPro], +) -> None: + """Test valid call to Titan Takeoff.""" + from takeoff_client import TakeoffException + + url = ( + "http://localhost:3000/generate" + if not streaming + else "http://localhost:3000/generate_stream" + ) + httpx_mock.add_response( + method="POST", url=url, json={"text": "bad things"}, status_code=400 + ) + + llm = takeoff_object(streaming=streaming) + with pytest.raises(TakeoffException): + llm("What is 2 + 2?") + assert len(httpx_mock.get_requests()) == 1 + assert httpx_mock.get_requests()[0].url == url + assert json.loads(httpx_mock.get_requests()[0].content)["text"] == "What is 2 + 2?" + + +@pytest.mark.requires("pytest_httpx") +@pytest.mark.requires("takeoff_client") +@pytest.mark.parametrize("takeoff_object", [TitanTakeoff, TitanTakeoffPro]) +def test_titan_takeoff_model_initialisation( + httpx_mock: Any, + takeoff_object: Union[TitanTakeoff, TitanTakeoffPro], +) -> None: + """Test valid call to Titan Takeoff.""" + mgnt_port = 36452 + inf_port = 46253 + mgnt_url = f"http://localhost:{mgnt_port}/reader" + gen_url = f"http://localhost:{inf_port}/generate" + reader_1 = { + "model_name": "test", + "device": "cpu", + "consumer_group": "primary", + "max_sequence_length": 512, + "max_batch_size": 4, + "tensor_parallel": 3, + } + reader_2 = reader_1.copy() + reader_2["model_name"] = "test2" + + httpx_mock.add_response( + method="POST", url=mgnt_url, json={"key": "value"}, status_code=201 + ) + httpx_mock.add_response( + method="POST", url=gen_url, json={"text": "value"}, status_code=200 + ) + + llm = takeoff_object( + port=inf_port, mgmt_port=mgnt_port, models=[reader_1, reader_2] + ) output = llm("What is 2 + 2?") + assert isinstance(output, str) + # Ensure the management api was called to create the reader + assert len(httpx_mock.get_requests()) == 3 + for key, value in reader_1.items(): + assert json.loads(httpx_mock.get_requests()[0].content)[key] == value + assert httpx_mock.get_requests()[0].url == mgnt_url + # Also second call should be made to spin uo reader 2 + for key, value in reader_2.items(): + assert json.loads(httpx_mock.get_requests()[1].content)[key] == value + assert httpx_mock.get_requests()[1].url == mgnt_url + # Ensure the third call is to generate endpoint to inference + assert httpx_mock.get_requests()[2].url == gen_url + assert json.loads(httpx_mock.get_requests()[2].content)["text"] == "What is 2 + 2?" diff --git a/libs/community/tests/integration_tests/llms/test_titan_takeoff_pro.py b/libs/community/tests/integration_tests/llms/test_titan_takeoff_pro.py deleted file mode 100644 index 757bd9d48e6..00000000000 --- a/libs/community/tests/integration_tests/llms/test_titan_takeoff_pro.py +++ /dev/null @@ -1,18 +0,0 @@ -"""Test Titan Takeoff wrapper.""" - - -import responses - -from langchain_community.llms.titan_takeoff_pro import TitanTakeoffPro - - -@responses.activate -def test_titan_takeoff_pro_call() -> None: - """Test valid call to Titan Takeoff.""" - url = "http://localhost:3000/generate" - responses.add(responses.POST, url, json={"message": "2 + 2 is 4"}, status=200) - - # response = requests.post(url) - llm = TitanTakeoffPro() - output = llm("What is 2 + 2?") - assert isinstance(output, str) diff --git a/libs/community/tests/unit_tests/embeddings/test_imports.py b/libs/community/tests/unit_tests/embeddings/test_imports.py index abbfd6123f8..25a823afe9c 100644 --- a/libs/community/tests/unit_tests/embeddings/test_imports.py +++ b/libs/community/tests/unit_tests/embeddings/test_imports.py @@ -66,6 +66,7 @@ EXPECTED_ALL = [ "QuantizedBiEncoderEmbeddings", "NeMoEmbeddings", "SparkLLMTextEmbeddings", + "TitanTakeoffEmbed", "QuantizedBgeEmbeddings", "PremAIEmbeddings", "YandexGPTEmbeddings", diff --git a/libs/langchain/langchain/llms/__init__.py b/libs/langchain/langchain/llms/__init__.py index 87268883687..a1ef02b2392 100644 --- a/libs/langchain/langchain/llms/__init__.py +++ b/libs/langchain/langchain/llms/__init__.py @@ -469,9 +469,9 @@ def _import_titan_takeoff() -> Any: def _import_titan_takeoff_pro() -> Any: - from langchain_community.llms.titan_takeoff_pro import TitanTakeoffPro + from langchain_community.llms.titan_takeoff import TitanTakeoff - return TitanTakeoffPro + return TitanTakeoff def _import_together() -> Any: diff --git a/libs/langchain/langchain/llms/titan_takeoff_pro.py b/libs/langchain/langchain/llms/titan_takeoff_pro.py index 12f73d32bc3..0d323c197e0 100644 --- a/libs/langchain/langchain/llms/titan_takeoff_pro.py +++ b/libs/langchain/langchain/llms/titan_takeoff_pro.py @@ -1,3 +1,3 @@ -from langchain_community.llms.titan_takeoff_pro import TitanTakeoffPro +from langchain_community.llms.titan_takeoff import TitanTakeoff as TitanTakeoffPro __all__ = ["TitanTakeoffPro"]