[Integration] NVIDIA AI Playground (#14648)

Description: Added NVIDIA AI Playground Initial support for a selection of models (Llama models, Mistral, etc.)

Dependencies: These models do depend on the AI Playground services in NVIDIA NGC. API keys with a significant amount of trial compute are available (10K queries as of the time of writing).

H/t to @VKudlay
This commit is contained in:
William FH
2023-12-13 19:46:37 -08:00
committed by GitHub
parent 1e21a3f7ed
commit 451c5d1d8c
25 changed files with 4371 additions and 0 deletions

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,39 @@
# NVIDIA AI Playground
> [NVIDIA AI Playground](https://www.nvidia.com/en-us/research/ai-playground/) gives users easy access to hosted endpoints for generative AI models like Llama-2, Mistral, etc. This example demonstrates how to use LangChain to interact with supported AI Playground models.
These models are provided via the `langchain-nvidia-aiplay` package.
## Installation
```bash
pip install -U langchain-nvidia-aiplay
```
## Setup and Authentication
- Create a free account at [NVIDIA GPU Cloud](https://catalog.ngc.nvidia.com/).
- Navigate to `Catalog > AI Foundation Models > (Model with API endpoint)`.
- Select `API` and generate the key `NVIDIA_API_KEY`.
```bash
export NVIDIA_API_KEY=nvapi-XXXXXXXXXXXXXXXXXXXXXXXXXX
```
```python
from langchain_nvidia_aiplay import ChatNVAIPlay
llm = ChatNVAIPlay(model="mixtral_8x7b")
result = llm.invoke("Write a ballad about LangChain.")
print(result.content)
```
## Using NVIDIA AI Playground Models
A selection of NVIDIA AI Playground models are supported directly in LangChain with familiar APIs.
The active models which are supported can be found [in NGC](https://catalog.ngc.nvidia.com/orgs/nvidia/teams/ai-foundation/). In addition, a selection of models can be retrieved from `langchain.<llms/chat_models>.nv_aiplay` which pull in default model options based on their use cases.
**The following may be useful examples to help you get started:**
- **[`ChatNVAIPlay` Model](/docs/integrations/chat/nv_aiplay).**
- **[`NVAIPlayEmbedding` Model for RAG Workflows](/docs/integrations/text_embeddings/nv_aiplay).**

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1 @@
__pycache__

View File

@@ -0,0 +1,21 @@
MIT License
Copyright (c) 2023 LangChain, Inc.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View File

@@ -0,0 +1,62 @@
.PHONY: all format lint test tests integration_tests help
# Default target executed when no arguments are given to make.
all: help
# Define a variable for the test file path.
TEST_FILE ?= tests/unit_tests/
test:
poetry run pytest $(TEST_FILE)
tests:
poetry run pytest $(TEST_FILE)
check_imports: $(shell find langchain_nvidia_aiplay -name '*.py')
poetry run python ./scripts/check_imports.py $^
integration_tests:
poetry run pytest tests/integration_tests
######################
# LINTING AND FORMATTING
######################
# Define a variable for Python and notebook files.
PYTHON_FILES=.
MYPY_CACHE=.mypy_cache
lint format: PYTHON_FILES=.
lint_diff format_diff: PYTHON_FILES=$(shell git diff --name-only --diff-filter=d master | grep -E '\.py$$|\.ipynb$$')
lint_package: PYTHON_FILES=langchain_nvidia_aiplay
lint_tests: PYTHON_FILES=tests
lint_tests: MYPY_CACHE=.mypy_cache_test
lint lint_diff lint_package lint_tests:
./scripts/check_pydantic.sh .
./scripts/lint_imports.sh
poetry run ruff .
[ "$(PYTHON_FILES)" = "" ] || poetry run ruff format $(PYTHON_FILES) --diff
[ "$(PYTHON_FILES)" = "" ] || poetry run mypy $(PYTHON_FILES)
format format_diff:
poetry run ruff format $(PYTHON_FILES)
poetry run ruff --select I --fix $(PYTHON_FILES)
spell_check:
poetry run codespell --toml pyproject.toml
spell_fix:
poetry run codespell --toml pyproject.toml -w
######################
# HELP
######################
help:
@echo '----'
@echo 'format - run code formatters'
@echo 'lint - run linters'
@echo 'test - run unit tests'
@echo 'tests - run unit tests'
@echo 'test TEST_FILE=<test_file> - run all tests in file'

View File

@@ -0,0 +1,358 @@
# langchain-nvidia-aiplay
The `langchain-nvidia-aiplay` package contains LangChain integrations for chat models and embeddings powered by the NVIDIA AI Playground.
>[NVIDIA AI Playground](https://www.nvidia.com/en-us/research/ai-playground/) gives users easy access to hosted endpoints for generative AI models like Llama-2, SteerLM, Mistral, etc. Using the API, you can query NVCR (NVIDIA Container Registry) function endpoints and get quick results from a DGX-hosted cloud compute environment. All models are source-accessible and can be deployed on your own compute cluster.
Below is an example on how to use some common chat model functionality.
## Installation
```python
%pip install -U --quiet langchain-nvidia-aiplay
```
## Setup
**To get started:**
1. Create a free account with the [NVIDIA GPU Cloud](https://catalog.ngc.nvidia.com/) service, which hosts AI solution catalogs, containers, models, etc.
2. Navigate to `Catalog > AI Foundation Models > (Model with API endpoint)`.
3. Select the `API` option and click `Generate Key`.
4. Save the generated key as `NVIDIA_API_KEY`. From there, you should have access to the endpoints.
```python
import getpass
import os
if not os.environ.get("NVIDIA_API_KEY", "").startswith("nvapi-"):
nvidia_api_key = getpass.getpass("Enter your NVIDIA AIPLAY API key: ")
assert nvidia_api_key.startswith("nvapi-"), f"{nvidia_api_key[:5]}... is not a valid key"
os.environ["NVIDIA_API_KEY"] = nvidia_api_key
```
```python
## Core LC Chat Interface
from langchain_nvidia_aiplay import ChatNVAIPlay
llm = ChatNVAIPlay(model="mixtral_8x7b")
result = llm.invoke("Write a ballad about LangChain.")
print(result.content)
```
## Stream, Batch, and Async
These models natively support streaming, and as is the case with all LangChain LLMs they expose a batch method to handle concurrent requests, as well as async methods for invoke, stream, and batch. Below are a few examples.
```python
print(llm.batch(["What's 2*3?", "What's 2*6?"]))
# Or via the async API
# await llm.abatch(["What's 2*3?", "What's 2*6?"])
```
```python
for chunk in llm.stream("How far can a seagull fly in one day?"):
# Show the token separations
print(chunk.content, end="|")
```
```python
async for chunk in llm.astream("How long does it take for monarch butterflies to migrate?"):
print(chunk.content, end="|")
```
## Supported models
Querying `available_models` will still give you all of the other models offered by your API credentials.
The `playground_` prefix is optional.
```python
list(llm.available_models)
# ['playground_llama2_13b',
# 'playground_llama2_code_13b',
# 'playground_clip',
# 'playground_fuyu_8b',
# 'playground_mistral_7b',
# 'playground_nvolveqa_40k',
# 'playground_yi_34b',
# 'playground_nemotron_steerlm_8b',
# 'playground_nv_llama2_rlhf_70b',
# 'playground_llama2_code_34b',
# 'playground_mixtral_8x7b',
# 'playground_neva_22b',
# 'playground_steerlm_llama_70b',
# 'playground_nemotron_qa_8b',
# 'playground_sdxl']
```
## Model types
All of these models above are supported and can be accessed via `ChatNVAIPlay`.
Some model types support unique prompting techniques and chat messages. We will review a few important ones below.
**To find out more about a specific model, please navigate to the API section of an AI Playground model [as linked here](https://catalog.ngc.nvidia.com/orgs/nvidia/teams/ai-foundation/models/codellama-13b/api).**
### General Chat
Models such as `llama2_13b` and `mixtral_8x7b` are good all-around models that you can use for with any LangChain chat messages. Example below.
```python
from langchain_nvidia_aiplay import ChatNVAIPlay
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
prompt = ChatPromptTemplate.from_messages(
[
("system", "You are a helpful AI assistant named Fred."),
("user", "{input}")
]
)
chain = (
prompt
| ChatNVAIPlay(model="llama2_13b")
| StrOutputParser()
)
for txt in chain.stream({"input": "What's your name?"}):
print(txt, end="")
```
### Code Generation
These models accept the same arguments and input structure as regular chat models, but they tend to perform better on code-genreation and structured code tasks. An example of this is `llama2_code_13b`.
```python
prompt = ChatPromptTemplate.from_messages(
[
("system", "You are an expert coding AI. Respond only in valid python; no narration whatsoever."),
("user", "{input}")
]
)
chain = (
prompt
| ChatNVAIPlay(model="llama2_code_13b")
| StrOutputParser()
)
for txt in chain.stream({"input": "How do I solve this fizz buzz problem?"}):
print(txt, end="")
```
## Steering LLMs
> [SteerLM-optimized models](https://developer.nvidia.com/blog/announcing-steerlm-a-simple-and-practical-technique-to-customize-llms-during-inference/) supports "dynamic steering" of model outputs at inference time.
This lets you "control" the complexity, verbosity, and creativity of the model via integer labels on a scale from 0 to 9. Under the hood, these are passed as a special type of assistant message to the model.
The "steer" models support this type of input, such as `steerlm_llama_70b`
```python
from langchain_nvidia_aiplay import ChatNVAIPlay
llm = ChatNVAIPlay(model="steerlm_llama_70b")
# Try making it uncreative and not verbose
complex_result = llm.invoke(
"What's a PB&J?",
labels={"creativity": 0, "complexity": 3, "verbosity": 0}
)
print("Un-creative\n")
print(complex_result.content)
# Try making it very creative and verbose
print("\n\nCreative\n")
creative_result = llm.invoke(
"What's a PB&J?",
labels={"creativity": 9, "complexity": 3, "verbosity": 9}
)
print(creative_result.content)
```
#### Use within LCEL
The labels are passed as invocation params. You can `bind` these to the LLM using the `bind` method on the LLM to include it within a declarative, functional chain. Below is an example.
```python
from langchain_nvidia_aiplay import ChatNVAIPlay
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
prompt = ChatPromptTemplate.from_messages(
[
("system", "You are a helpful AI assistant named Fred."),
("user", "{input}")
]
)
chain = (
prompt
| ChatNVAIPlay(model="steerlm_llama_70b").bind(labels={"creativity": 9, "complexity": 0, "verbosity": 9})
| StrOutputParser()
)
for txt in chain.stream({"input": "Why is a PB&J?"}):
print(txt, end="")
```
## Multimodal
NVidia also supports multimodal inputs, meaning you can provide both images and text for the model to reason over.
These models also accept `labels`, similar to the Steering LLMs above. In addition to `creativity`, `complexity`, and `verbosity`, these models support a `quality` toggle.
An example model supporting multimodal inputs is `playground_neva_22b`.
These models accept LangChain's standard image formats. Below are examples.
```python
import requests
image_url = "https://picsum.photos/seed/kitten/300/200"
image_content = requests.get(image_url).content
```
Initialize the model like so:
```python
from langchain_nvidia_aiplay import ChatNVAIPlay
llm = ChatNVAIPlay(model="playground_neva_22b")
```
#### Passing an image as a URL
```python
from langchain_core.messages import HumanMessage
llm.invoke(
[
HumanMessage(content=[
{"type": "text", "text": "Describe this image:"},
{"type": "image_url", "image_url": {"url": image_url}},
])
])
```
```python
### You can specify the labels for steering here as well. You can try setting a low verbosity, for instance
from langchain_core.messages import HumanMessage
llm.invoke(
[
HumanMessage(content=[
{"type": "text", "text": "Describe this image:"},
{"type": "image_url", "image_url": {"url": image_url}},
])
],
labels={
"creativity": 0,
"quality": 9,
"complexity": 0,
"verbosity": 0
}
)
```
#### Passing an image as a base64 encoded string
```python
import base64
b64_string = base64.b64encode(image_content).decode('utf-8')
llm.invoke(
[
HumanMessage(content=[
{"type": "text", "text": "Describe this image:"},
{"type": "image_url", "image_url": {"url": f"data:image/png;base64,{b64_string}"}},
])
])
```
#### Directly within the string
The NVIDIA API uniquely accepts images as base64 images inlined within <img> HTML tags. While this isn't interoperable with other LLMs, you can directly prompt the model accordingly.
```python
base64_with_mime_type = f"data:image/png;base64,{b64_string}"
llm.invoke(
f'What\'s in this image?\n<img src="{base64_with_mime_type}" />'
)
```
## RAG: Context models
NVIDIA also has Q&A models that support a special "context" chat message containing retrieved context (such as documents within a RAG chain). This is useful to avoid prompt-injecting the model.
**Note:** Only "user" (human) and "context" chat messages are supported for these models, not system or AI messages useful in conversational flows.
The `_qa_` models like `nemotron_qa_8b` support this.
```python
from langchain_nvidia_aiplay import ChatNVAIPlay
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.messages import ChatMessage
prompt = ChatPromptTemplate.from_messages(
[
ChatMessage(role="context", content="Parrots and Cats have signed the peace accord."),
("user", "{input}")
]
)
llm = ChatNVAIPlay(model="nemotron_qa_8b")
chain = (
prompt
| llm
| StrOutputParser()
)
chain.invoke({"input": "What was signed?"})
```
## Embeddings
You can also connect to embeddings models through this package. Below is an example:
```
from langchain_nvidia_aiplay import NVAIPlayEmbeddings
embedder = NVAIPlayEmbeddings(model="nvolveqa_40k")
embedder.embed_query("What's the temperature today?")
embedder.embed_documents([
"The temperature is 42 degrees.",
"Class is dismissed at 9 PM."
])
```
By default the embedding model will use the "passage" type for documents and "query" type for queries, but you can fix this on the instance.
```python
query_embedder = NVAIPlayEmbeddings(model="nvolveqa_40k", model_type="query")
doc_embeddder = NVAIPlayEmbeddings(model="nvolveqa_40k", model_type="passage")
```

View File

@@ -0,0 +1,45 @@
"""
**LangChain NVIDIA AI Playground Integration**
This comprehensive module integrates NVIDIA's state-of-the-art AI Playground, featuring advanced models for conversational AI and semantic embeddings, into the LangChain framework. It provides robust classes for seamless interaction with NVIDIA's AI models, particularly tailored for enriching conversational experiences and enhancing semantic understanding in various applications.
**Features:**
1. **Chat Models (`ChatNVAIPlay`):** This class serves as the primary interface for interacting with NVIDIA AI Playground's chat models. Users can effortlessly utilize NVIDIA's advanced models like 'Mistral' to engage in rich, context-aware conversations, applicable across diverse domains from customer support to interactive storytelling.
2. **Semantic Embeddings (`NVAIPlayEmbeddings`):** The module offers capabilities to generate sophisticated embeddings using NVIDIA's AI models. These embeddings are instrumental for tasks like semantic analysis, text similarity assessments, and contextual understanding, significantly enhancing the depth of NLP applications.
**Installation:**
Install this module easily using pip:
```python
pip install langchain-nvidia-aiplay
```
## Utilizing Chat Models:
After setting up the environment, interact with NVIDIA AI Playground models:
```python
from langchain_nvidia_aiplay import ChatNVAIPlay
ai_chat_model = ChatNVAIPlay(model="llama2_13b")
response = ai_chat_model.invoke("Tell me about the LangChain integration.")
```
# Generating Semantic Embeddings:
Use NVIDIA's models for creating embeddings, useful in various NLP tasks:
```python
from langchain_nvidia_aiplay import NVAIPlayEmbeddings
embed_model = NVAIPlayEmbeddings(model="nvolveqa_40k")
embedding_output = embed_model.embed_query("Exploring AI capabilities.")
```
""" # noqa: E501
from langchain_nvidia_aiplay.chat_models import ChatNVAIPlay
from langchain_nvidia_aiplay.embeddings import NVAIPlayEmbeddings
__all__ = ["ChatNVAIPlay", "NVAIPlayEmbeddings"]

View File

@@ -0,0 +1,525 @@
from __future__ import annotations
import json
import logging
from typing import (
Any,
AsyncIterator,
Callable,
Dict,
Generator,
Iterator,
List,
Optional,
Sequence,
Tuple,
Union,
)
import aiohttp
import requests
from langchain_core.messages import BaseMessage
from langchain_core.pydantic_v1 import (
BaseModel,
Field,
PrivateAttr,
SecretStr,
root_validator,
)
from langchain_core.utils import get_from_dict_or_env
from requests.models import Response
logger = logging.getLogger(__name__)
class NVCRModel(BaseModel):
"""
Underlying Client for interacting with the AI Playground API.
Leveraged by the NVAIPlayBaseModel to provide a simple requests-oriented interface.
Direct abstraction over NGC-recommended streaming/non-streaming Python solutions.
NOTE: AI Playground does not currently support raw text continuation.
"""
## Core defaults. These probably should not be changed
fetch_url_format: str = Field("https://api.nvcf.nvidia.com/v2/nvcf/pexec/status/")
call_invoke_base: str = Field("https://api.nvcf.nvidia.com/v2/nvcf/pexec/functions")
get_session_fn: Callable = Field(requests.Session)
get_asession_fn: Callable = Field(aiohttp.ClientSession)
nvidia_api_key: SecretStr = Field(
...,
description="API key for NVIDIA AI Playground. Should start with `nvapi-`",
)
is_staging: bool = Field(False, description="Whether to use staging API")
## Generation arguments
max_tries: int = Field(5, ge=1)
headers_tmpl: dict = Field(
...,
description="Headers template for API calls."
" Should contain `call` and `stream` keys.",
)
_available_functions: Optional[List[dict]] = PrivateAttr(default=None)
_available_models: Optional[dict] = PrivateAttr(default=None)
@property
def headers(self) -> dict:
"""Return headers with API key injected"""
headers_ = self.headers_tmpl.copy()
for header in headers_.values():
if "{nvidia_api_key}" in header["Authorization"]:
header["Authorization"] = header["Authorization"].format(
nvidia_api_key=self.nvidia_api_key.get_secret_value(),
)
return headers_
@root_validator(pre=True)
def validate_model(cls, values: Dict[str, Any]) -> Dict[str, Any]:
"""Validate and update model arguments, including API key and formatting"""
values["nvidia_api_key"] = get_from_dict_or_env(
values,
"nvidia_api_key",
"NVIDIA_API_KEY",
)
if "nvapi-" not in values.get("nvidia_api_key", ""):
raise ValueError("Invalid NVAPI key detected. Should start with `nvapi-`")
is_staging = "nvapi-stg-" in values["nvidia_api_key"]
values["is_staging"] = is_staging
if "headers_tmpl" not in values:
values["headers_tmpl"] = {
"call": {
"Authorization": "Bearer {nvidia_api_key}",
"Accept": "application/json",
},
"stream": {
"Authorization": "Bearer {nvidia_api_key}",
"Accept": "text/event-stream",
"content-type": "application/json",
},
}
values["fetch_url_format"] = cls._stagify(
is_staging,
values.get(
"fetch_url_format", "https://api.nvcf.nvidia.com/v2/nvcf/pexec/status/"
),
)
values["call_invoke_base"] = cls._stagify(
is_staging,
values.get(
"call_invoke_base",
"https://api.nvcf.nvidia.com/v2/nvcf/pexec/functions",
),
)
return values
@property
def available_models(self) -> dict:
"""List the available models that can be invoked."""
if self._available_models is not None:
return self._available_models
live_fns = [v for v in self.available_functions if v.get("status") == "ACTIVE"]
self._available_models = {v["name"]: v["id"] for v in live_fns}
return self._available_models
@property
def available_functions(self) -> List[dict]:
"""List the available functions that can be invoked."""
if self._available_functions is not None:
return self._available_functions
invoke_url = self._stagify(
self.is_staging, "https://api.nvcf.nvidia.com/v2/nvcf/functions"
)
query_res = self.query(invoke_url)
if "functions" not in query_res:
raise ValueError(
f"Unexpected response when querying {invoke_url}\n{query_res}"
)
self._available_functions = query_res["functions"]
return self._available_functions
@classmethod
def _stagify(cls, is_staging: bool, path: str) -> str:
"""Helper method to switch between staging and production endpoints"""
if is_staging and "stg.api" not in path:
return path.replace("api.", "stg.api.")
if not is_staging and "stg.api" in path:
return path.replace("stg.api.", "api.")
return path
####################################################################################
## Core utilities for posting and getting from NVCR
def _post(self, invoke_url: str, payload: dict = {}) -> Tuple[Response, Any]:
"""Method for posting to the AI Playground API."""
call_inputs = {
"url": invoke_url,
"headers": self.headers["call"],
"json": payload,
"stream": False,
}
session = self.get_session_fn()
response = session.post(**call_inputs)
self._try_raise(response)
return response, session
def _get(self, invoke_url: str, payload: dict = {}) -> Tuple[Response, Any]:
"""Method for getting from the AI Playground API."""
last_inputs = {
"url": invoke_url,
"headers": self.headers["call"],
"json": payload,
"stream": False,
}
session = self.get_session_fn()
last_response = session.get(**last_inputs)
self._try_raise(last_response)
return last_response, session
def _wait(self, response: Response, session: Any) -> Response:
"""Wait for a response from API after an initial response is made."""
i = 1
while response.status_code == 202:
request_id = response.headers.get("NVCF-REQID", "")
response = session.get(
self.fetch_url_format + request_id,
headers=self.headers["call"],
)
if response.status_code == 202:
try:
body = response.json()
except ValueError:
body = str(response)
if i > self.max_tries:
raise ValueError(f"Failed to get response with {i} tries: {body}")
self._try_raise(response)
return response
def _try_raise(self, response: Response) -> None:
"""Try to raise an error from a response"""
try:
response.raise_for_status()
except requests.HTTPError as e:
try:
rd = response.json()
except json.JSONDecodeError:
rd = response.__dict__
rd = rd.get("_content", rd)
if isinstance(rd, bytes):
rd = rd.decode("utf-8")[5:] ## lop of data: prefix ??
try:
rd = json.loads(rd)
except Exception:
rd = {"detail": rd}
title = f"[{rd.get('status', '###')}] {rd.get('title', 'Unknown Error')}"
body = f"{rd.get('detail', rd.get('type', rd))}"
raise Exception(f"{title}\n{body}") from e
####################################################################################
## Simple query interface to show the set of model options
def query(self, invoke_url: str, payload: dict = {}) -> dict:
"""Simple method for an end-to-end get query. Returns result dictionary"""
response, session = self._get(invoke_url, payload)
response = self._wait(response, session)
output = self._process_response(response)[0]
return output
def _process_response(self, response: Union[str, Response]) -> List[dict]:
"""General-purpose response processing for single responses and streams"""
if hasattr(response, "json"): ## For single response (i.e. non-streaming)
try:
return [response.json()]
except json.JSONDecodeError:
response = str(response.__dict__)
if isinstance(response, str): ## For set of responses (i.e. streaming)
msg_list = []
for msg in response.split("\n\n"):
if "{" not in msg:
continue
msg_list += [json.loads(msg[msg.find("{") :])]
return msg_list
raise ValueError(f"Received ill-formed response: {response}")
def _get_invoke_url(
self, model_name: Optional[str] = None, invoke_url: Optional[str] = None
) -> str:
"""Helper method to get invoke URL from a model name, URL, or endpoint stub"""
if not invoke_url:
if not model_name:
raise ValueError("URL or model name must be specified to invoke")
if model_name in self.available_models:
invoke_url = self.available_models[model_name]
elif f"playground_{model_name}" in self.available_models:
invoke_url = self.available_models[f"playground_{model_name}"]
else:
available_models_str = "\n".join(
[f"{k} - {v}" for k, v in self.available_models.items()]
)
raise ValueError(
f"Unknown model name {model_name} specified."
"\nAvailable models are:\n"
f"{available_models_str}"
)
if not invoke_url:
# For mypy
raise ValueError("URL or model name must be specified to invoke")
# Why is this even needed?
if "http" not in invoke_url:
invoke_url = f"{self.call_invoke_base}/{invoke_url}"
return invoke_url
####################################################################################
## Generation interface to allow users to generate new values from endpoints
def get_req(
self,
model_name: Optional[str] = None,
payload: dict = {},
invoke_url: Optional[str] = None,
stop: Optional[Sequence[str]] = None,
) -> Response:
"""Post to the API."""
invoke_url = self._get_invoke_url(model_name, invoke_url)
if payload.get("stream", False) is True:
payload = {**payload, "stream": False}
response, session = self._post(invoke_url, payload)
return self._wait(response, session)
def get_req_generation(
self,
model_name: Optional[str] = None,
payload: dict = {},
invoke_url: Optional[str] = None,
stop: Optional[Sequence[str]] = None,
) -> dict:
"""Method for an end-to-end post query with NVCR post-processing."""
response = self.get_req(model_name, payload, invoke_url)
output, _ = self.postprocess(response, stop=stop)
return output
def postprocess(
self, response: Union[str, Response], stop: Optional[Sequence[str]] = None
) -> Tuple[dict, bool]:
"""Parses a response from the AI Playground API.
Strongly assumes that the API will return a single response.
"""
msg_list = self._process_response(response)
msg, is_stopped = self._aggregate_msgs(msg_list)
msg, is_stopped = self._early_stop_msg(msg, is_stopped, stop=stop)
return msg, is_stopped
def _aggregate_msgs(self, msg_list: Sequence[dict]) -> Tuple[dict, bool]:
"""Dig out relevant details of aggregated message"""
content_buffer: Dict[str, Any] = dict()
content_holder: Dict[Any, Any] = dict()
is_stopped = False
for msg in msg_list:
if "choices" in msg:
## Tease out ['choices'][0]...['delta'/'message']
msg = msg.get("choices", [{}])[0]
is_stopped = msg.get("finish_reason", "") == "stop"
msg = msg.get("delta", msg.get("message", {"content": ""}))
elif "data" in msg:
## Tease out ['data'][0]...['embedding']
msg = msg.get("data", [{}])[0]
content_holder = msg
for k, v in msg.items():
if k in ("content",) and k in content_buffer:
content_buffer[k] += v
else:
content_buffer[k] = v
if is_stopped:
break
content_holder = {**content_holder, **content_buffer}
return content_holder, is_stopped
def _early_stop_msg(
self, msg: dict, is_stopped: bool, stop: Optional[Sequence[str]] = None
) -> Tuple[dict, bool]:
"""Try to early-terminate streaming or generation by iterating over stop list"""
content = msg.get("content", "")
if content and stop:
for stop_str in stop:
if stop_str and stop_str in content:
msg["content"] = content[: content.find(stop_str) + 1]
is_stopped = True
return msg, is_stopped
####################################################################################
## Streaming interface to allow you to iterate through progressive generations
def get_req_stream(
self,
model: Optional[str] = None,
payload: dict = {},
invoke_url: Optional[str] = None,
stop: Optional[Sequence[str]] = None,
) -> Iterator:
invoke_url = self._get_invoke_url(model, invoke_url)
if payload.get("stream", True) is False:
payload = {**payload, "stream": True}
last_inputs = {
"url": invoke_url,
"headers": self.headers["stream"],
"json": payload,
"stream": True,
}
response = self.get_session_fn().post(**last_inputs)
self._try_raise(response)
call = self.copy()
def out_gen() -> Generator[dict, Any, Any]:
## Good for client, since it allows self.last_input
for line in response.iter_lines():
if line and line.strip() != b"data: [DONE]":
line = line.decode("utf-8")
msg, final_line = call.postprocess(line, stop=stop)
yield msg
if final_line:
break
self._try_raise(response)
return (r for r in out_gen())
####################################################################################
## Asynchronous streaming interface to allow multiple generations to happen at once.
async def get_req_astream(
self,
model: Optional[str] = None,
payload: dict = {},
invoke_url: Optional[str] = None,
stop: Optional[Sequence[str]] = None,
) -> AsyncIterator:
invoke_url = self._get_invoke_url(model, invoke_url)
if payload.get("stream", True) is False:
payload = {**payload, "stream": True}
last_inputs = {
"url": invoke_url,
"headers": self.headers["stream"],
"json": payload,
}
async with self.get_asession_fn() as session:
async with session.post(**last_inputs) as response:
self._try_raise(response)
async for line in response.content.iter_any():
if line and line.strip() != b"data: [DONE]":
line = line.decode("utf-8")
msg, final_line = self.postprocess(line, stop=stop)
yield msg
if final_line:
break
class _NVAIPlayClient(BaseModel):
"""
Higher-Level Client for interacting with AI Playground API with argument defaults.
Is subclassed by NVAIPlayLLM/ChatNVAIPlay to provide a simple LangChain interface.
"""
client: NVCRModel = Field(NVCRModel)
model: str = Field(..., description="Name of the model to invoke")
temperature: float = Field(0.2, le=1.0, gt=0.0)
top_p: float = Field(0.7, le=1.0, ge=0.0)
max_tokens: int = Field(1024, le=1024, ge=32)
####################################################################################
@root_validator(pre=True)
def validate_client(cls, values: Any) -> Any:
"""Validate and update client arguments, including API key and formatting"""
if not values.get("client"):
values["client"] = NVCRModel(**values)
return values
@classmethod
def is_lc_serializable(cls) -> bool:
return True
@property
def available_functions(self) -> List[dict]:
"""Map the available functions that can be invoked."""
return self.client.available_functions
@property
def available_models(self) -> dict:
"""Map the available models that can be invoked."""
return self.client.available_models
def get_model_details(self, model: Optional[str] = None) -> dict:
"""Get more meta-details about a model retrieved by a given name"""
if model is None:
model = self.model
model_key = self.client._get_invoke_url(model).split("/")[-1]
known_fns = self.client.available_functions
fn_spec = [f for f in known_fns if f.get("id") == model_key][0]
return fn_spec
def get_generation(
self,
inputs: Sequence[Dict],
labels: Optional[dict] = None,
stop: Optional[Sequence[str]] = None,
**kwargs: Any,
) -> dict:
"""Call to client generate method with call scope"""
payload = self.get_payload(inputs=inputs, stream=False, labels=labels, **kwargs)
out = self.client.get_req_generation(self.model, stop=stop, payload=payload)
return out
def get_stream(
self,
inputs: Sequence[Dict],
labels: Optional[dict] = None,
stop: Optional[Sequence[str]] = None,
**kwargs: Any,
) -> Iterator:
"""Call to client stream method with call scope"""
payload = self.get_payload(inputs=inputs, stream=True, labels=labels, **kwargs)
return self.client.get_req_stream(self.model, stop=stop, payload=payload)
def get_astream(
self,
inputs: Sequence[Dict],
labels: Optional[dict] = None,
stop: Optional[Sequence[str]] = None,
**kwargs: Any,
) -> AsyncIterator:
"""Call to client astream methods with call scope"""
payload = self.get_payload(inputs=inputs, stream=True, labels=labels, **kwargs)
return self.client.get_req_astream(self.model, stop=stop, payload=payload)
def get_payload(
self, inputs: Sequence[Dict], labels: Optional[dict] = None, **kwargs: Any
) -> dict:
"""Generates payload for the _NVAIPlayClient API to send to service."""
return {
**self.preprocess(inputs=inputs, labels=labels),
**kwargs,
}
def preprocess(self, inputs: Sequence[Dict], labels: Optional[dict] = None) -> dict:
"""Prepares a message or list of messages for the payload"""
messages = [self.prep_msg(m) for m in inputs]
if labels:
# (WFH) Labels are currently (?) always passed as an assistant
# suffix message, but this API seems less stable.
messages += [{"labels": labels, "role": "assistant"}]
return {"messages": messages}
def prep_msg(self, msg: Union[str, dict, BaseMessage]) -> dict:
"""Helper Method: Ensures a message is a dictionary with a role and content."""
if isinstance(msg, str):
# (WFH) this shouldn't ever be reached but leaving this here bcs
# it's a Chesterton's fence I'm unwilling to touch
return dict(role="user", content=msg)
if isinstance(msg, dict):
if msg.get("content", None) is None:
raise ValueError(f"Message {msg} has no content")
return msg
raise ValueError(f"Unknown message received: {msg} of type {type(msg)}")

View File

@@ -0,0 +1,207 @@
"""Chat Model Components Derived from ChatModel/NVAIPlay"""
from __future__ import annotations
import base64
import logging
import os
import urllib.parse
from typing import (
Any,
AsyncIterator,
Dict,
Iterator,
List,
Mapping,
Optional,
Sequence,
Union,
)
import requests
from langchain_core.callbacks.manager import (
AsyncCallbackManagerForLLMRun,
CallbackManagerForLLMRun,
)
from langchain_core.language_models.chat_models import SimpleChatModel
from langchain_core.messages import BaseMessage, ChatMessage, ChatMessageChunk
from langchain_core.outputs import ChatGenerationChunk
from langchain_nvidia_aiplay import _common as nv_aiplay
logger = logging.getLogger(__name__)
def _is_openai_parts_format(part: dict) -> bool:
return "type" in part
def _is_url(s: str) -> bool:
try:
result = urllib.parse.urlparse(s)
return all([result.scheme, result.netloc])
except Exception as e:
logger.debug(f"Unable to parse URL: {e}")
return False
def _is_b64(s: str) -> bool:
return s.startswith("data:image")
def _url_to_b64_string(image_source: str) -> str:
b64_template = "data:image/png;base64,{b64_string}"
try:
if _is_url(image_source):
response = requests.get(image_source)
response.raise_for_status()
encoded = base64.b64encode(response.content).decode("utf-8")
return b64_template.format(b64_string=encoded)
elif _is_b64(image_source):
return image_source
elif os.path.exists(image_source):
with open(image_source, "rb") as f:
encoded = base64.b64encode(f.read()).decode("utf-8")
return b64_template.format(b64_string=encoded)
else:
raise ValueError(
"The provided string is not a valid URL, base64, or file path."
)
except Exception as e:
raise ValueError(f"Unable to process the provided image source: {e}")
class ChatNVAIPlay(nv_aiplay._NVAIPlayClient, SimpleChatModel):
"""NVAIPlay chat model.
Example:
.. code-block:: python
from langchain_nvidia_aiplay import ChatNVAIPlay
model = ChatNVAIPlay(model="llama2_13b")
response = model.invoke("Hello")
"""
@property
def _llm_type(self) -> str:
"""Return type of NVIDIA AI Playground Interface."""
return "chat-nvidia-ai-playground"
def _call(
self,
messages: List[BaseMessage],
stop: Optional[Sequence[str]] = None,
run_manager: Optional[CallbackManagerForLLMRun] = None,
labels: Optional[dict] = None,
**kwargs: Any,
) -> str:
"""Invoke on a single list of chat messages."""
inputs = self.custom_preprocess(messages)
responses = self.get_generation(
inputs=inputs, stop=stop, labels=labels, **kwargs
)
outputs = self.custom_postprocess(responses)
return outputs
def _get_filled_chunk(
self, text: str, role: Optional[str] = "assistant"
) -> ChatGenerationChunk:
"""Fill the generation chunk."""
return ChatGenerationChunk(message=ChatMessageChunk(content=text, role=role))
def _stream(
self,
messages: List[BaseMessage],
stop: Optional[Sequence[str]] = None,
run_manager: Optional[CallbackManagerForLLMRun] = None,
labels: Optional[dict] = None,
**kwargs: Any,
) -> Iterator[ChatGenerationChunk]:
"""Allows streaming to model!"""
inputs = self.custom_preprocess(messages)
for response in self.get_stream(
inputs=inputs, stop=stop, labels=labels, **kwargs
):
chunk = self._get_filled_chunk(self.custom_postprocess(response))
yield chunk
if run_manager:
run_manager.on_llm_new_token(chunk.text, chunk=chunk)
async def _astream(
self,
messages: List[BaseMessage],
stop: Optional[Sequence[str]] = None,
run_manager: Optional[AsyncCallbackManagerForLLMRun] = None,
labels: Optional[dict] = None,
**kwargs: Any,
) -> AsyncIterator[ChatGenerationChunk]:
inputs = self.custom_preprocess(messages)
async for response in self.get_astream(
inputs=inputs, stop=stop, labels=labels, **kwargs
):
chunk = self._get_filled_chunk(self.custom_postprocess(response))
yield chunk
if run_manager:
await run_manager.on_llm_new_token(chunk.text, chunk=chunk)
def custom_preprocess(
self, msg_list: Sequence[BaseMessage]
) -> List[Dict[str, str]]:
# The previous author had a lot of custom preprocessing here
# but I'm just going to assume it's a list
return [self.preprocess_msg(m) for m in msg_list]
def _process_content(self, content: Union[str, List[Union[dict, str]]]) -> str:
if isinstance(content, str):
return content
string_array: list = []
for part in content:
if isinstance(part, str):
string_array.append(part)
elif isinstance(part, Mapping):
# OpenAI Format
if _is_openai_parts_format(part):
if part["type"] == "text":
string_array.append(str(part["text"]))
elif part["type"] == "image_url":
img_url = part["image_url"]
if isinstance(img_url, dict):
if "url" not in img_url:
raise ValueError(
f"Unrecognized message image format: {img_url}"
)
img_url = img_url["url"]
b64_string = _url_to_b64_string(img_url)
string_array.append(f'<img src="{b64_string}" />')
else:
raise ValueError(
f"Unrecognized message part type: {part['type']}"
)
else:
raise ValueError(f"Unrecognized message part format: {part}")
return "".join(string_array)
def preprocess_msg(self, msg: BaseMessage) -> Dict[str, str]:
## (WFH): Previous author added a bunch of
# custom processing here, but I'm just going to support
# the LCEL api.
if isinstance(msg, BaseMessage):
role_convert = {"ai": "assistant", "human": "user"}
if isinstance(msg, ChatMessage):
role = msg.role
else:
role = msg.type
role = role_convert.get(role, role)
content = self._process_content(msg.content)
return {"role": role, "content": content}
raise ValueError(f"Invalid message: {repr(msg)} of type {type(msg)}")
def custom_postprocess(self, msg: dict) -> str:
if "content" in msg:
return msg["content"]
logger.warning(
f"Got ambiguous message in postprocessing; returning as-is: msg = {msg}"
)
return str(msg)

View File

@@ -0,0 +1,74 @@
"""Embeddings Components Derived from ChatModel/NVAIPlay"""
from typing import Any, List, Literal, Optional
from langchain_core.embeddings import Embeddings
from langchain_core.pydantic_v1 import BaseModel, Field, root_validator
import langchain_nvidia_aiplay._common as nvaiplay_common
class NVAIPlayEmbeddings(BaseModel, Embeddings):
"""NVIDIA's AI Playground NVOLVE Question-Answer Asymmetric Model."""
client: nvaiplay_common.NVCRModel = Field(nvaiplay_common.NVCRModel)
model: str = Field(
..., description="The embedding model to use. Example: nvolveqa_40k"
)
max_length: int = Field(2048, ge=1, le=2048)
max_batch_size: int = Field(default=50)
model_type: Optional[Literal["passage", "query"]] = Field(
"passage", description="The type of text to be embedded."
)
@root_validator(pre=True)
def _validate_client(cls, values: Any) -> Any:
if "client" not in values:
values["client"] = nvaiplay_common.NVCRModel()
return values
@property
def available_models(self) -> dict:
"""Map the available models that can be invoked."""
return self.client.available_models
def _embed(
self, texts: List[str], model_type: Literal["passage", "query"]
) -> List[List[float]]:
"""Embed a single text entry to either passage or query type"""
response = self.client.get_req(
model_name=self.model,
payload={
"input": texts,
"model": model_type,
"encoding_format": "float",
},
)
response.raise_for_status()
result = response.json()
data = result["data"]
if not isinstance(data, list):
raise ValueError(f"Expected a list of embeddings. Got: {data}")
embedding_list = [(res["embedding"], res["index"]) for res in data]
return [x[0] for x in sorted(embedding_list, key=lambda x: x[1])]
def embed_query(self, text: str) -> List[float]:
"""Input pathway for query embeddings."""
return self._embed([text], model_type=self.model_type or "query")[0]
def embed_documents(self, texts: List[str]) -> List[List[float]]:
"""Input pathway for document embeddings."""
# From https://catalog.ngc.nvidia.com/orgs/nvidia/teams/ai-foundation/models/nvolve-40k/documentation
# The input must not exceed the 2048 max input characters and inputs above 512
# model tokens will be truncated. The input array must not exceed 50 input
# strings.
all_embeddings = []
for i in range(0, len(texts), self.max_batch_size):
batch = texts[i : i + self.max_batch_size]
truncated = [
text[: self.max_length] if len(text) > self.max_length else text
for text in batch
]
all_embeddings.extend(
self._embed(truncated, model_type=self.model_type or "passage")
)
return all_embeddings

1235
libs/partners/nvidia-aiplay/poetry.lock generated Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,92 @@
[tool.poetry]
name = "langchain-nvidia-aiplay"
version = "0.0.1"
description = "An integration package connecting NVidia AIPlay and LangChain"
authors = []
readme = "README.md"
repository = "https://github.com/langchain-ai/langchain/tree/master/libs/partners/nvidia-aiplay"
[tool.poetry.dependencies]
python = ">=3.8.1,<4.0"
langchain-core = "^0.1.0"
aiohttp = "^3.9.1"
[tool.poetry.group.test]
optional = true
[tool.poetry.group.test.dependencies]
pytest = "^7.3.0"
freezegun = "^1.2.2"
pytest-mock = "^3.10.0"
syrupy = "^4.0.2"
pytest-watcher = "^0.3.4"
pytest-asyncio = "^0.21.1"
langchain-core = {path = "../../core", develop = true}
[tool.poetry.group.codespell]
optional = true
[tool.poetry.group.codespell.dependencies]
codespell = "^2.2.0"
[tool.poetry.group.test_integration]
optional = true
[tool.poetry.group.test_integration.dependencies]
[tool.poetry.group.lint]
optional = true
[tool.poetry.group.lint.dependencies]
ruff = "^0.1.5"
[tool.poetry.group.typing.dependencies]
mypy = "^0.991"
langchain-core = {path = "../../core", develop = true}
types-requests = "^2.31.0.10"
[tool.poetry.group.dev]
optional = true
[tool.poetry.group.dev.dependencies]
langchain-core = {path = "../../core", develop = true}
[tool.ruff]
select = [
"E", # pycodestyle
"F", # pyflakes
"I", # isort
]
[tool.mypy]
disallow_untyped_defs = "True"
exclude = ["notebooks", "examples", "example_data", "langchain_core/pydantic"]
[tool.coverage.run]
omit = [
"tests/*",
]
[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"
[tool.pytest.ini_options]
# --strict-markers will raise errors on unknown marks.
# https://docs.pytest.org/en/7.1.x/how-to/mark.html#raising-errors-on-unknown-marks
#
# https://docs.pytest.org/en/7.1.x/reference/reference.html
# --strict-config any warnings encountered while parsing the `pytest`
# section of the configuration file raise errors.
#
# https://github.com/tophat/syrupy
# --snapshot-warn-unused Prints a warning on unused snapshots rather than fail the test suite.
addopts = "--snapshot-warn-unused --strict-markers --strict-config --durations=5"
# Registering custom markers.
# https://docs.pytest.org/en/7.1.x/example/markers.html#registering-markers
markers = [
"requires: mark tests as requiring a specific library",
"asyncio: mark tests as requiring asyncio",
"compile: mark placeholder test used to compile integration tests without running them",
]
asyncio_mode = "auto"

View File

@@ -0,0 +1,17 @@
import sys
import traceback
from importlib.machinery import SourceFileLoader
if __name__ == "__main__":
files = sys.argv[1:]
has_failure = False
for file in files:
try:
SourceFileLoader("x", file).load_module()
except Exception:
has_faillure = True
print(file)
traceback.print_exc()
print()
sys.exit(1 if has_failure else 0)

View File

@@ -0,0 +1,27 @@
#!/bin/bash
#
# This script searches for lines starting with "import pydantic" or "from pydantic"
# in tracked files within a Git repository.
#
# Usage: ./scripts/check_pydantic.sh /path/to/repository
# Check if a path argument is provided
if [ $# -ne 1 ]; then
echo "Usage: $0 /path/to/repository"
exit 1
fi
repository_path="$1"
# Search for lines matching the pattern within the specified repository
result=$(git -C "$repository_path" grep -E '^import pydantic|^from pydantic')
# Check if any matching lines were found
if [ -n "$result" ]; then
echo "ERROR: The following lines need to be updated:"
echo "$result"
echo "Please replace the code with an import from langchain_core.pydantic_v1."
echo "For example, replace 'from pydantic import BaseModel'"
echo "with 'from langchain_core.pydantic_v1 import BaseModel'"
exit 1
fi

View File

@@ -0,0 +1,17 @@
#!/bin/bash
set -eu
# Initialize a variable to keep track of errors
errors=0
# make sure not importing from langchain or langchain_experimental
git --no-pager grep '^from langchain\.' . && errors=$((errors+1))
git --no-pager grep '^from langchain_experimental\.' . && errors=$((errors+1))
# Decide on an exit status based on the errors
if [ "$errors" -gt 0 ]; then
exit 1
else
exit 0
fi

View File

@@ -0,0 +1,96 @@
"""Test ChatNVAIPlay chat model."""
from langchain_core.messages import BaseMessage, HumanMessage, SystemMessage
from langchain_nvidia_aiplay.chat_models import ChatNVAIPlay
def test_chat_aiplay() -> None:
"""Test ChatNVAIPlay wrapper."""
chat = ChatNVAIPlay(
model="llama2_13b",
temperature=0.7,
)
message = HumanMessage(content="Hello")
response = chat([message])
assert isinstance(response, BaseMessage)
assert isinstance(response.content, str)
def test_chat_aiplay_model() -> None:
"""Test GeneralChat wrapper handles model."""
chat = ChatNVAIPlay(model="mistral")
assert chat.model == "mistral"
def test_chat_aiplay_system_message() -> None:
"""Test GeneralChat wrapper with system message."""
chat = ChatNVAIPlay(model="llama2_13b", max_tokens=36)
system_message = SystemMessage(content="You are to chat with the user.")
human_message = HumanMessage(content="Hello")
response = chat([system_message, human_message])
assert isinstance(response, BaseMessage)
assert isinstance(response.content, str)
## TODO: Not sure if we want to support the n syntax. Trash or keep test
def test_aiplay_streaming() -> None:
"""Test streaming tokens from aiplay."""
llm = ChatNVAIPlay(model="llama2_13b", max_tokens=36)
for token in llm.stream("I'm Pickle Rick"):
assert isinstance(token.content, str)
async def test_aiplay_astream() -> None:
"""Test streaming tokens from aiplay."""
llm = ChatNVAIPlay(model="llama2_13b", max_tokens=35)
async for token in llm.astream("I'm Pickle Rick"):
assert isinstance(token.content, str)
async def test_aiplay_abatch() -> None:
"""Test streaming tokens from GeneralChat."""
llm = ChatNVAIPlay(model="llama2_13b", max_tokens=36)
result = await llm.abatch(["I'm Pickle Rick", "I'm not Pickle Rick"])
for token in result:
assert isinstance(token.content, str)
async def test_aiplay_abatch_tags() -> None:
"""Test batch tokens from GeneralChat."""
llm = ChatNVAIPlay(model="llama2_13b", max_tokens=55)
result = await llm.abatch(
["I'm Pickle Rick", "I'm not Pickle Rick"], config={"tags": ["foo"]}
)
for token in result:
assert isinstance(token.content, str)
def test_aiplay_batch() -> None:
"""Test batch tokens from GeneralChat."""
llm = ChatNVAIPlay(model="llama2_13b", max_tokens=60)
result = llm.batch(["I'm Pickle Rick", "I'm not Pickle Rick"])
for token in result:
assert isinstance(token.content, str)
async def test_aiplay_ainvoke() -> None:
"""Test invoke tokens from GeneralChat."""
llm = ChatNVAIPlay(model="llama2_13b", max_tokens=60)
result = await llm.ainvoke("I'm Pickle Rick", config={"tags": ["foo"]})
assert isinstance(result.content, str)
def test_aiplay_invoke() -> None:
"""Test invoke tokens from GeneralChat."""
llm = ChatNVAIPlay(model="llama2_13b", max_tokens=60)
result = llm.invoke("I'm Pickle Rick", config=dict(tags=["foo"]))
assert isinstance(result.content, str)

View File

@@ -0,0 +1,7 @@
import pytest
@pytest.mark.compile
def test_placeholder() -> None:
"""Used for compiling integration tests without running any real tests."""
pass

View File

@@ -0,0 +1,48 @@
"""Test NVIDIA AI Playground Embeddings.
Note: These tests are designed to validate the functionality of NVAIPlayEmbeddings.
"""
from langchain_nvidia_aiplay import NVAIPlayEmbeddings
def test_nvai_play_embedding_documents() -> None:
"""Test NVAIPlay embeddings for documents."""
documents = ["foo bar"]
embedding = NVAIPlayEmbeddings(model="nvolveqa_40k")
output = embedding.embed_documents(documents)
assert len(output) == 1
assert len(output[0]) == 1024 # Assuming embedding size is 2048
def test_nvai_play_embedding_documents_multiple() -> None:
"""Test NVAIPlay embeddings for multiple documents."""
documents = ["foo bar", "bar foo", "foo"]
embedding = NVAIPlayEmbeddings(model="nvolveqa_40k")
output = embedding.embed_documents(documents)
assert len(output) == 3
assert all(len(doc) == 1024 for doc in output)
def test_nvai_play_embedding_query() -> None:
"""Test NVAIPlay embeddings for a single query."""
query = "foo bar"
embedding = NVAIPlayEmbeddings(model="nvolveqa_40k")
output = embedding.embed_query(query)
assert len(output) == 1024
async def test_nvai_play_embedding_async_query() -> None:
"""Test NVAIPlay async embeddings for a single query."""
query = "foo bar"
embedding = NVAIPlayEmbeddings(model="nvolveqa_40k")
output = await embedding.aembed_query(query)
assert len(output) == 1024
async def test_nvai_play_embedding_async_documents() -> None:
"""Test NVAIPlay async embeddings for multiple documents."""
documents = ["foo bar", "bar foo", "foo"]
embedding = NVAIPlayEmbeddings(model="nvolveqa_40k")
output = await embedding.aembed_documents(documents)
assert len(output) == 3
assert all(len(doc) == 1024 for doc in output)

View File

@@ -0,0 +1,16 @@
"""Test chat model integration."""
from langchain_nvidia_aiplay.chat_models import ChatNVAIPlay
def test_integration_initialization() -> None:
"""Test chat model initialization."""
ChatNVAIPlay(
model="llama2_13b",
nvidia_api_key="nvapi-...",
temperature=0.5,
top_p=0.9,
max_tokens=50,
)
ChatNVAIPlay(model="mistral", nvidia_api_key="nvapi-...")

View File

@@ -0,0 +1,7 @@
from langchain_nvidia_aiplay import __all__
EXPECTED_ALL = ["ChatNVAIPlay", "NVAIPlayEmbeddings"]
def test_all_imports() -> None:
assert sorted(EXPECTED_ALL) == sorted(__all__)