Merge branch 'master' into bagatur/core_update_ruff_mypy

This commit is contained in:
Erick Friis
2023-12-14 11:37:51 -08:00
43 changed files with 7813 additions and 4 deletions

View File

@@ -19,6 +19,7 @@ on:
- libs/experimental
- libs/community
- libs/partners/google-genai
- libs/partners/nvidia-aiplay
env:
PYTHON_VERSION: "3.10"

View File

@@ -41,7 +41,7 @@
"import os\n",
"\n",
"if \"GOOGLE_API_KEY\" not in os.environ:\n",
" os.environ[\"GOOGLE_API_KEY\"] = getpass(\"Provide your Google API Key\")"
" os.environ[\"GOOGLE_API_KEY\"] = getpass.getpass(\"Provide your Google API Key\")"
]
},
{
@@ -285,7 +285,7 @@
"source": [
"## Gemini Prompting FAQs\n",
"\n",
"As of the time this doc was written (2024/12/12), Gemini has some restrictions on the types and structure of prompts it accepts. Specifically:\n",
"As of the time this doc was written (2023/12/12), Gemini has some restrictions on the types and structure of prompts it accepts. Specifically:\n",
"\n",
"1. When providing multimodal (image) inputs, you are restricted to at most 1 message of \"human\" (user) type. You cannot pass multiple messages (though the single human message may have multiple content entries)\n",
"2. System messages are not accepted.\n",
@@ -295,6 +295,7 @@
},
{
"cell_type": "markdown",
"id": "92b5aca5",
"metadata": {},
"source": []
}
@@ -315,7 +316,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.2"
"version": "3.11.5"
}
},
"nbformat": 4,

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,39 @@
# NVIDIA AI Playground
> [NVIDIA AI Playground](https://www.nvidia.com/en-us/research/ai-playground/) gives users easy access to hosted endpoints for generative AI models like Llama-2, Mistral, etc. This example demonstrates how to use LangChain to interact with supported AI Playground models.
These models are provided via the `langchain-nvidia-aiplay` package.
## Installation
```bash
pip install -U langchain-nvidia-aiplay
```
## Setup and Authentication
- Create a free account at [NVIDIA GPU Cloud](https://catalog.ngc.nvidia.com/).
- Navigate to `Catalog > AI Foundation Models > (Model with API endpoint)`.
- Select `API` and generate the key `NVIDIA_API_KEY`.
```bash
export NVIDIA_API_KEY=nvapi-XXXXXXXXXXXXXXXXXXXXXXXXXX
```
```python
from langchain_nvidia_aiplay import ChatNVAIPlay
llm = ChatNVAIPlay(model="mixtral_8x7b")
result = llm.invoke("Write a ballad about LangChain.")
print(result.content)
```
## Using NVIDIA AI Playground Models
A selection of NVIDIA AI Playground models are supported directly in LangChain with familiar APIs.
The active models which are supported can be found [in NGC](https://catalog.ngc.nvidia.com/orgs/nvidia/teams/ai-foundation/). In addition, a selection of models can be retrieved from `langchain.<llms/chat_models>.nv_aiplay` which pull in default model options based on their use cases.
**The following may be useful examples to help you get started:**
- **[`ChatNVAIPlay` Model](/docs/integrations/chat/nv_aiplay).**
- **[`NVAIPlayEmbedding` Model for RAG Workflows](/docs/integrations/text_embeddings/nv_aiplay).**

File diff suppressed because one or more lines are too long

View File

@@ -5,6 +5,7 @@ from typing import TYPE_CHECKING, Any, Optional
from langchain_core.load.dump import dumps
from langchain_core.load.load import loads
from langchain_core.runnables import Runnable
if TYPE_CHECKING:
from langchainhub import Client
@@ -78,4 +79,7 @@ def pull(
"""
client = _get_client(api_url=api_url, api_key=api_key)
resp: str = client.pull(owner_repo_commit)
return loads(resp)
loaded = loads(resp)
if isinstance(loaded, Runnable):
return loaded.with_config(metadata={"hub_owner_repo_commit": owner_repo_commit})
return loaded

View File

@@ -0,0 +1,61 @@
from typing import Any, List
from unittest.mock import MagicMock, Mock, patch
from langchain_core.load.dump import dumps
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables.base import RunnableBinding
from langchain_core.tracers.base import BaseTracer
from langchain_core.tracers.schemas import Run
from langchain import hub
class FakeTracer(BaseTracer):
"""Fake tracer that records LangChain execution."""
def __init__(self) -> None:
"""Initialize the tracer."""
super().__init__()
self.runs: List[Run] = []
def _persist_run(self, run: Run) -> None:
"""Persist a run."""
self.runs.append(run)
repo_dict = {
"wfh/my-prompt-1": ChatPromptTemplate.from_messages(
[("system", "a"), ("user", "1")]
),
"wfh/my-random-object": {"Hi": "there"},
}
def repo_lookup(owner_repo_commit: str, **kwargs: Any) -> str:
return dumps(repo_dict[owner_repo_commit])
@patch("langchain.hub._get_client")
def test_hub_pull_metadata(mock_get_client: Mock) -> None:
mock_client = MagicMock()
mock_client.pull = repo_lookup
mock_get_client.return_value = mock_client
res = hub.pull("wfh/my-prompt-1")
assert isinstance(res, RunnableBinding)
tracer = FakeTracer()
result = res.invoke({}, {"callbacks": [tracer]})
assert result.messages[0].content == "a"
assert result.messages[1].content == "1"
assert len(tracer.runs) == 1
run = tracer.runs[0]
assert run.extra is not None
assert run.extra["metadata"]["hub_owner_repo_commit"] == "wfh/my-prompt-1" # type: ignore
@patch("langchain.hub._get_client")
def test_hub_pull_random_object(mock_get_client: Mock) -> None:
mock_client = MagicMock()
mock_client.pull = repo_lookup
mock_get_client.return_value = mock_client
res = hub.pull("wfh/my-random-object")
assert res == {"Hi": "there"}

View File

@@ -0,0 +1 @@
__pycache__

View File

@@ -0,0 +1,21 @@
MIT License
Copyright (c) 2023 LangChain, Inc.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View File

@@ -0,0 +1,62 @@
.PHONY: all format lint test tests integration_tests help
# Default target executed when no arguments are given to make.
all: help
# Define a variable for the test file path.
TEST_FILE ?= tests/unit_tests/
test:
poetry run pytest $(TEST_FILE)
tests:
poetry run pytest $(TEST_FILE)
check_imports: $(shell find langchain_nvidia_aiplay -name '*.py')
poetry run python ./scripts/check_imports.py $^
integration_tests:
poetry run pytest tests/integration_tests
######################
# LINTING AND FORMATTING
######################
# Define a variable for Python and notebook files.
PYTHON_FILES=.
MYPY_CACHE=.mypy_cache
lint format: PYTHON_FILES=.
lint_diff format_diff: PYTHON_FILES=$(shell git diff --name-only --diff-filter=d master | grep -E '\.py$$|\.ipynb$$')
lint_package: PYTHON_FILES=langchain_nvidia_aiplay
lint_tests: PYTHON_FILES=tests
lint_tests: MYPY_CACHE=.mypy_cache_test
lint lint_diff lint_package lint_tests:
./scripts/check_pydantic.sh .
./scripts/lint_imports.sh
poetry run ruff .
[ "$(PYTHON_FILES)" = "" ] || poetry run ruff format $(PYTHON_FILES) --diff
[ "$(PYTHON_FILES)" = "" ] || poetry run mypy $(PYTHON_FILES)
format format_diff:
poetry run ruff format $(PYTHON_FILES)
poetry run ruff --select I --fix $(PYTHON_FILES)
spell_check:
poetry run codespell --toml pyproject.toml
spell_fix:
poetry run codespell --toml pyproject.toml -w
######################
# HELP
######################
help:
@echo '----'
@echo 'format - run code formatters'
@echo 'lint - run linters'
@echo 'test - run unit tests'
@echo 'tests - run unit tests'
@echo 'test TEST_FILE=<test_file> - run all tests in file'

View File

@@ -0,0 +1,358 @@
# langchain-nvidia-aiplay
The `langchain-nvidia-aiplay` package contains LangChain integrations for chat models and embeddings powered by the NVIDIA AI Playground.
>[NVIDIA AI Playground](https://www.nvidia.com/en-us/research/ai-playground/) gives users easy access to hosted endpoints for generative AI models like Llama-2, SteerLM, Mistral, etc. Using the API, you can query NVCR (NVIDIA Container Registry) function endpoints and get quick results from a DGX-hosted cloud compute environment. All models are source-accessible and can be deployed on your own compute cluster.
Below is an example on how to use some common chat model functionality.
## Installation
```python
%pip install -U --quiet langchain-nvidia-aiplay
```
## Setup
**To get started:**
1. Create a free account with the [NVIDIA GPU Cloud](https://catalog.ngc.nvidia.com/) service, which hosts AI solution catalogs, containers, models, etc.
2. Navigate to `Catalog > AI Foundation Models > (Model with API endpoint)`.
3. Select the `API` option and click `Generate Key`.
4. Save the generated key as `NVIDIA_API_KEY`. From there, you should have access to the endpoints.
```python
import getpass
import os
if not os.environ.get("NVIDIA_API_KEY", "").startswith("nvapi-"):
nvidia_api_key = getpass.getpass("Enter your NVIDIA AIPLAY API key: ")
assert nvidia_api_key.startswith("nvapi-"), f"{nvidia_api_key[:5]}... is not a valid key"
os.environ["NVIDIA_API_KEY"] = nvidia_api_key
```
```python
## Core LC Chat Interface
from langchain_nvidia_aiplay import ChatNVAIPlay
llm = ChatNVAIPlay(model="mixtral_8x7b")
result = llm.invoke("Write a ballad about LangChain.")
print(result.content)
```
## Stream, Batch, and Async
These models natively support streaming, and as is the case with all LangChain LLMs they expose a batch method to handle concurrent requests, as well as async methods for invoke, stream, and batch. Below are a few examples.
```python
print(llm.batch(["What's 2*3?", "What's 2*6?"]))
# Or via the async API
# await llm.abatch(["What's 2*3?", "What's 2*6?"])
```
```python
for chunk in llm.stream("How far can a seagull fly in one day?"):
# Show the token separations
print(chunk.content, end="|")
```
```python
async for chunk in llm.astream("How long does it take for monarch butterflies to migrate?"):
print(chunk.content, end="|")
```
## Supported models
Querying `available_models` will still give you all of the other models offered by your API credentials.
The `playground_` prefix is optional.
```python
list(llm.available_models)
# ['playground_llama2_13b',
# 'playground_llama2_code_13b',
# 'playground_clip',
# 'playground_fuyu_8b',
# 'playground_mistral_7b',
# 'playground_nvolveqa_40k',
# 'playground_yi_34b',
# 'playground_nemotron_steerlm_8b',
# 'playground_nv_llama2_rlhf_70b',
# 'playground_llama2_code_34b',
# 'playground_mixtral_8x7b',
# 'playground_neva_22b',
# 'playground_steerlm_llama_70b',
# 'playground_nemotron_qa_8b',
# 'playground_sdxl']
```
## Model types
All of these models above are supported and can be accessed via `ChatNVAIPlay`.
Some model types support unique prompting techniques and chat messages. We will review a few important ones below.
**To find out more about a specific model, please navigate to the API section of an AI Playground model [as linked here](https://catalog.ngc.nvidia.com/orgs/nvidia/teams/ai-foundation/models/codellama-13b/api).**
### General Chat
Models such as `llama2_13b` and `mixtral_8x7b` are good all-around models that you can use for with any LangChain chat messages. Example below.
```python
from langchain_nvidia_aiplay import ChatNVAIPlay
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
prompt = ChatPromptTemplate.from_messages(
[
("system", "You are a helpful AI assistant named Fred."),
("user", "{input}")
]
)
chain = (
prompt
| ChatNVAIPlay(model="llama2_13b")
| StrOutputParser()
)
for txt in chain.stream({"input": "What's your name?"}):
print(txt, end="")
```
### Code Generation
These models accept the same arguments and input structure as regular chat models, but they tend to perform better on code-genreation and structured code tasks. An example of this is `llama2_code_13b`.
```python
prompt = ChatPromptTemplate.from_messages(
[
("system", "You are an expert coding AI. Respond only in valid python; no narration whatsoever."),
("user", "{input}")
]
)
chain = (
prompt
| ChatNVAIPlay(model="llama2_code_13b")
| StrOutputParser()
)
for txt in chain.stream({"input": "How do I solve this fizz buzz problem?"}):
print(txt, end="")
```
## Steering LLMs
> [SteerLM-optimized models](https://developer.nvidia.com/blog/announcing-steerlm-a-simple-and-practical-technique-to-customize-llms-during-inference/) supports "dynamic steering" of model outputs at inference time.
This lets you "control" the complexity, verbosity, and creativity of the model via integer labels on a scale from 0 to 9. Under the hood, these are passed as a special type of assistant message to the model.
The "steer" models support this type of input, such as `steerlm_llama_70b`
```python
from langchain_nvidia_aiplay import ChatNVAIPlay
llm = ChatNVAIPlay(model="steerlm_llama_70b")
# Try making it uncreative and not verbose
complex_result = llm.invoke(
"What's a PB&J?",
labels={"creativity": 0, "complexity": 3, "verbosity": 0}
)
print("Un-creative\n")
print(complex_result.content)
# Try making it very creative and verbose
print("\n\nCreative\n")
creative_result = llm.invoke(
"What's a PB&J?",
labels={"creativity": 9, "complexity": 3, "verbosity": 9}
)
print(creative_result.content)
```
#### Use within LCEL
The labels are passed as invocation params. You can `bind` these to the LLM using the `bind` method on the LLM to include it within a declarative, functional chain. Below is an example.
```python
from langchain_nvidia_aiplay import ChatNVAIPlay
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
prompt = ChatPromptTemplate.from_messages(
[
("system", "You are a helpful AI assistant named Fred."),
("user", "{input}")
]
)
chain = (
prompt
| ChatNVAIPlay(model="steerlm_llama_70b").bind(labels={"creativity": 9, "complexity": 0, "verbosity": 9})
| StrOutputParser()
)
for txt in chain.stream({"input": "Why is a PB&J?"}):
print(txt, end="")
```
## Multimodal
NVidia also supports multimodal inputs, meaning you can provide both images and text for the model to reason over.
These models also accept `labels`, similar to the Steering LLMs above. In addition to `creativity`, `complexity`, and `verbosity`, these models support a `quality` toggle.
An example model supporting multimodal inputs is `playground_neva_22b`.
These models accept LangChain's standard image formats. Below are examples.
```python
import requests
image_url = "https://picsum.photos/seed/kitten/300/200"
image_content = requests.get(image_url).content
```
Initialize the model like so:
```python
from langchain_nvidia_aiplay import ChatNVAIPlay
llm = ChatNVAIPlay(model="playground_neva_22b")
```
#### Passing an image as a URL
```python
from langchain_core.messages import HumanMessage
llm.invoke(
[
HumanMessage(content=[
{"type": "text", "text": "Describe this image:"},
{"type": "image_url", "image_url": {"url": image_url}},
])
])
```
```python
### You can specify the labels for steering here as well. You can try setting a low verbosity, for instance
from langchain_core.messages import HumanMessage
llm.invoke(
[
HumanMessage(content=[
{"type": "text", "text": "Describe this image:"},
{"type": "image_url", "image_url": {"url": image_url}},
])
],
labels={
"creativity": 0,
"quality": 9,
"complexity": 0,
"verbosity": 0
}
)
```
#### Passing an image as a base64 encoded string
```python
import base64
b64_string = base64.b64encode(image_content).decode('utf-8')
llm.invoke(
[
HumanMessage(content=[
{"type": "text", "text": "Describe this image:"},
{"type": "image_url", "image_url": {"url": f"data:image/png;base64,{b64_string}"}},
])
])
```
#### Directly within the string
The NVIDIA API uniquely accepts images as base64 images inlined within <img> HTML tags. While this isn't interoperable with other LLMs, you can directly prompt the model accordingly.
```python
base64_with_mime_type = f"data:image/png;base64,{b64_string}"
llm.invoke(
f'What\'s in this image?\n<img src="{base64_with_mime_type}" />'
)
```
## RAG: Context models
NVIDIA also has Q&A models that support a special "context" chat message containing retrieved context (such as documents within a RAG chain). This is useful to avoid prompt-injecting the model.
**Note:** Only "user" (human) and "context" chat messages are supported for these models, not system or AI messages useful in conversational flows.
The `_qa_` models like `nemotron_qa_8b` support this.
```python
from langchain_nvidia_aiplay import ChatNVAIPlay
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.messages import ChatMessage
prompt = ChatPromptTemplate.from_messages(
[
ChatMessage(role="context", content="Parrots and Cats have signed the peace accord."),
("user", "{input}")
]
)
llm = ChatNVAIPlay(model="nemotron_qa_8b")
chain = (
prompt
| llm
| StrOutputParser()
)
chain.invoke({"input": "What was signed?"})
```
## Embeddings
You can also connect to embeddings models through this package. Below is an example:
```
from langchain_nvidia_aiplay import NVAIPlayEmbeddings
embedder = NVAIPlayEmbeddings(model="nvolveqa_40k")
embedder.embed_query("What's the temperature today?")
embedder.embed_documents([
"The temperature is 42 degrees.",
"Class is dismissed at 9 PM."
])
```
By default the embedding model will use the "passage" type for documents and "query" type for queries, but you can fix this on the instance.
```python
query_embedder = NVAIPlayEmbeddings(model="nvolveqa_40k", model_type="query")
doc_embeddder = NVAIPlayEmbeddings(model="nvolveqa_40k", model_type="passage")
```

View File

@@ -0,0 +1,45 @@
"""
**LangChain NVIDIA AI Playground Integration**
This comprehensive module integrates NVIDIA's state-of-the-art AI Playground, featuring advanced models for conversational AI and semantic embeddings, into the LangChain framework. It provides robust classes for seamless interaction with NVIDIA's AI models, particularly tailored for enriching conversational experiences and enhancing semantic understanding in various applications.
**Features:**
1. **Chat Models (`ChatNVAIPlay`):** This class serves as the primary interface for interacting with NVIDIA AI Playground's chat models. Users can effortlessly utilize NVIDIA's advanced models like 'Mistral' to engage in rich, context-aware conversations, applicable across diverse domains from customer support to interactive storytelling.
2. **Semantic Embeddings (`NVAIPlayEmbeddings`):** The module offers capabilities to generate sophisticated embeddings using NVIDIA's AI models. These embeddings are instrumental for tasks like semantic analysis, text similarity assessments, and contextual understanding, significantly enhancing the depth of NLP applications.
**Installation:**
Install this module easily using pip:
```python
pip install langchain-nvidia-aiplay
```
## Utilizing Chat Models:
After setting up the environment, interact with NVIDIA AI Playground models:
```python
from langchain_nvidia_aiplay import ChatNVAIPlay
ai_chat_model = ChatNVAIPlay(model="llama2_13b")
response = ai_chat_model.invoke("Tell me about the LangChain integration.")
```
# Generating Semantic Embeddings:
Use NVIDIA's models for creating embeddings, useful in various NLP tasks:
```python
from langchain_nvidia_aiplay import NVAIPlayEmbeddings
embed_model = NVAIPlayEmbeddings(model="nvolveqa_40k")
embedding_output = embed_model.embed_query("Exploring AI capabilities.")
```
""" # noqa: E501
from langchain_nvidia_aiplay.chat_models import ChatNVAIPlay
from langchain_nvidia_aiplay.embeddings import NVAIPlayEmbeddings
__all__ = ["ChatNVAIPlay", "NVAIPlayEmbeddings"]

View File

@@ -0,0 +1,525 @@
from __future__ import annotations
import json
import logging
from typing import (
Any,
AsyncIterator,
Callable,
Dict,
Generator,
Iterator,
List,
Optional,
Sequence,
Tuple,
Union,
)
import aiohttp
import requests
from langchain_core.messages import BaseMessage
from langchain_core.pydantic_v1 import (
BaseModel,
Field,
PrivateAttr,
SecretStr,
root_validator,
)
from langchain_core.utils import get_from_dict_or_env
from requests.models import Response
logger = logging.getLogger(__name__)
class NVCRModel(BaseModel):
"""
Underlying Client for interacting with the AI Playground API.
Leveraged by the NVAIPlayBaseModel to provide a simple requests-oriented interface.
Direct abstraction over NGC-recommended streaming/non-streaming Python solutions.
NOTE: AI Playground does not currently support raw text continuation.
"""
## Core defaults. These probably should not be changed
fetch_url_format: str = Field("https://api.nvcf.nvidia.com/v2/nvcf/pexec/status/")
call_invoke_base: str = Field("https://api.nvcf.nvidia.com/v2/nvcf/pexec/functions")
get_session_fn: Callable = Field(requests.Session)
get_asession_fn: Callable = Field(aiohttp.ClientSession)
nvidia_api_key: SecretStr = Field(
...,
description="API key for NVIDIA AI Playground. Should start with `nvapi-`",
)
is_staging: bool = Field(False, description="Whether to use staging API")
## Generation arguments
max_tries: int = Field(5, ge=1)
headers_tmpl: dict = Field(
...,
description="Headers template for API calls."
" Should contain `call` and `stream` keys.",
)
_available_functions: Optional[List[dict]] = PrivateAttr(default=None)
_available_models: Optional[dict] = PrivateAttr(default=None)
@property
def headers(self) -> dict:
"""Return headers with API key injected"""
headers_ = self.headers_tmpl.copy()
for header in headers_.values():
if "{nvidia_api_key}" in header["Authorization"]:
header["Authorization"] = header["Authorization"].format(
nvidia_api_key=self.nvidia_api_key.get_secret_value(),
)
return headers_
@root_validator(pre=True)
def validate_model(cls, values: Dict[str, Any]) -> Dict[str, Any]:
"""Validate and update model arguments, including API key and formatting"""
values["nvidia_api_key"] = get_from_dict_or_env(
values,
"nvidia_api_key",
"NVIDIA_API_KEY",
)
if "nvapi-" not in values.get("nvidia_api_key", ""):
raise ValueError("Invalid NVAPI key detected. Should start with `nvapi-`")
is_staging = "nvapi-stg-" in values["nvidia_api_key"]
values["is_staging"] = is_staging
if "headers_tmpl" not in values:
values["headers_tmpl"] = {
"call": {
"Authorization": "Bearer {nvidia_api_key}",
"Accept": "application/json",
},
"stream": {
"Authorization": "Bearer {nvidia_api_key}",
"Accept": "text/event-stream",
"content-type": "application/json",
},
}
values["fetch_url_format"] = cls._stagify(
is_staging,
values.get(
"fetch_url_format", "https://api.nvcf.nvidia.com/v2/nvcf/pexec/status/"
),
)
values["call_invoke_base"] = cls._stagify(
is_staging,
values.get(
"call_invoke_base",
"https://api.nvcf.nvidia.com/v2/nvcf/pexec/functions",
),
)
return values
@property
def available_models(self) -> dict:
"""List the available models that can be invoked."""
if self._available_models is not None:
return self._available_models
live_fns = [v for v in self.available_functions if v.get("status") == "ACTIVE"]
self._available_models = {v["name"]: v["id"] for v in live_fns}
return self._available_models
@property
def available_functions(self) -> List[dict]:
"""List the available functions that can be invoked."""
if self._available_functions is not None:
return self._available_functions
invoke_url = self._stagify(
self.is_staging, "https://api.nvcf.nvidia.com/v2/nvcf/functions"
)
query_res = self.query(invoke_url)
if "functions" not in query_res:
raise ValueError(
f"Unexpected response when querying {invoke_url}\n{query_res}"
)
self._available_functions = query_res["functions"]
return self._available_functions
@classmethod
def _stagify(cls, is_staging: bool, path: str) -> str:
"""Helper method to switch between staging and production endpoints"""
if is_staging and "stg.api" not in path:
return path.replace("api.", "stg.api.")
if not is_staging and "stg.api" in path:
return path.replace("stg.api.", "api.")
return path
####################################################################################
## Core utilities for posting and getting from NVCR
def _post(self, invoke_url: str, payload: dict = {}) -> Tuple[Response, Any]:
"""Method for posting to the AI Playground API."""
call_inputs = {
"url": invoke_url,
"headers": self.headers["call"],
"json": payload,
"stream": False,
}
session = self.get_session_fn()
response = session.post(**call_inputs)
self._try_raise(response)
return response, session
def _get(self, invoke_url: str, payload: dict = {}) -> Tuple[Response, Any]:
"""Method for getting from the AI Playground API."""
last_inputs = {
"url": invoke_url,
"headers": self.headers["call"],
"json": payload,
"stream": False,
}
session = self.get_session_fn()
last_response = session.get(**last_inputs)
self._try_raise(last_response)
return last_response, session
def _wait(self, response: Response, session: Any) -> Response:
"""Wait for a response from API after an initial response is made."""
i = 1
while response.status_code == 202:
request_id = response.headers.get("NVCF-REQID", "")
response = session.get(
self.fetch_url_format + request_id,
headers=self.headers["call"],
)
if response.status_code == 202:
try:
body = response.json()
except ValueError:
body = str(response)
if i > self.max_tries:
raise ValueError(f"Failed to get response with {i} tries: {body}")
self._try_raise(response)
return response
def _try_raise(self, response: Response) -> None:
"""Try to raise an error from a response"""
try:
response.raise_for_status()
except requests.HTTPError as e:
try:
rd = response.json()
except json.JSONDecodeError:
rd = response.__dict__
rd = rd.get("_content", rd)
if isinstance(rd, bytes):
rd = rd.decode("utf-8")[5:] ## lop of data: prefix ??
try:
rd = json.loads(rd)
except Exception:
rd = {"detail": rd}
title = f"[{rd.get('status', '###')}] {rd.get('title', 'Unknown Error')}"
body = f"{rd.get('detail', rd.get('type', rd))}"
raise Exception(f"{title}\n{body}") from e
####################################################################################
## Simple query interface to show the set of model options
def query(self, invoke_url: str, payload: dict = {}) -> dict:
"""Simple method for an end-to-end get query. Returns result dictionary"""
response, session = self._get(invoke_url, payload)
response = self._wait(response, session)
output = self._process_response(response)[0]
return output
def _process_response(self, response: Union[str, Response]) -> List[dict]:
"""General-purpose response processing for single responses and streams"""
if hasattr(response, "json"): ## For single response (i.e. non-streaming)
try:
return [response.json()]
except json.JSONDecodeError:
response = str(response.__dict__)
if isinstance(response, str): ## For set of responses (i.e. streaming)
msg_list = []
for msg in response.split("\n\n"):
if "{" not in msg:
continue
msg_list += [json.loads(msg[msg.find("{") :])]
return msg_list
raise ValueError(f"Received ill-formed response: {response}")
def _get_invoke_url(
self, model_name: Optional[str] = None, invoke_url: Optional[str] = None
) -> str:
"""Helper method to get invoke URL from a model name, URL, or endpoint stub"""
if not invoke_url:
if not model_name:
raise ValueError("URL or model name must be specified to invoke")
if model_name in self.available_models:
invoke_url = self.available_models[model_name]
elif f"playground_{model_name}" in self.available_models:
invoke_url = self.available_models[f"playground_{model_name}"]
else:
available_models_str = "\n".join(
[f"{k} - {v}" for k, v in self.available_models.items()]
)
raise ValueError(
f"Unknown model name {model_name} specified."
"\nAvailable models are:\n"
f"{available_models_str}"
)
if not invoke_url:
# For mypy
raise ValueError("URL or model name must be specified to invoke")
# Why is this even needed?
if "http" not in invoke_url:
invoke_url = f"{self.call_invoke_base}/{invoke_url}"
return invoke_url
####################################################################################
## Generation interface to allow users to generate new values from endpoints
def get_req(
self,
model_name: Optional[str] = None,
payload: dict = {},
invoke_url: Optional[str] = None,
stop: Optional[Sequence[str]] = None,
) -> Response:
"""Post to the API."""
invoke_url = self._get_invoke_url(model_name, invoke_url)
if payload.get("stream", False) is True:
payload = {**payload, "stream": False}
response, session = self._post(invoke_url, payload)
return self._wait(response, session)
def get_req_generation(
self,
model_name: Optional[str] = None,
payload: dict = {},
invoke_url: Optional[str] = None,
stop: Optional[Sequence[str]] = None,
) -> dict:
"""Method for an end-to-end post query with NVCR post-processing."""
response = self.get_req(model_name, payload, invoke_url)
output, _ = self.postprocess(response, stop=stop)
return output
def postprocess(
self, response: Union[str, Response], stop: Optional[Sequence[str]] = None
) -> Tuple[dict, bool]:
"""Parses a response from the AI Playground API.
Strongly assumes that the API will return a single response.
"""
msg_list = self._process_response(response)
msg, is_stopped = self._aggregate_msgs(msg_list)
msg, is_stopped = self._early_stop_msg(msg, is_stopped, stop=stop)
return msg, is_stopped
def _aggregate_msgs(self, msg_list: Sequence[dict]) -> Tuple[dict, bool]:
"""Dig out relevant details of aggregated message"""
content_buffer: Dict[str, Any] = dict()
content_holder: Dict[Any, Any] = dict()
is_stopped = False
for msg in msg_list:
if "choices" in msg:
## Tease out ['choices'][0]...['delta'/'message']
msg = msg.get("choices", [{}])[0]
is_stopped = msg.get("finish_reason", "") == "stop"
msg = msg.get("delta", msg.get("message", {"content": ""}))
elif "data" in msg:
## Tease out ['data'][0]...['embedding']
msg = msg.get("data", [{}])[0]
content_holder = msg
for k, v in msg.items():
if k in ("content",) and k in content_buffer:
content_buffer[k] += v
else:
content_buffer[k] = v
if is_stopped:
break
content_holder = {**content_holder, **content_buffer}
return content_holder, is_stopped
def _early_stop_msg(
self, msg: dict, is_stopped: bool, stop: Optional[Sequence[str]] = None
) -> Tuple[dict, bool]:
"""Try to early-terminate streaming or generation by iterating over stop list"""
content = msg.get("content", "")
if content and stop:
for stop_str in stop:
if stop_str and stop_str in content:
msg["content"] = content[: content.find(stop_str) + 1]
is_stopped = True
return msg, is_stopped
####################################################################################
## Streaming interface to allow you to iterate through progressive generations
def get_req_stream(
self,
model: Optional[str] = None,
payload: dict = {},
invoke_url: Optional[str] = None,
stop: Optional[Sequence[str]] = None,
) -> Iterator:
invoke_url = self._get_invoke_url(model, invoke_url)
if payload.get("stream", True) is False:
payload = {**payload, "stream": True}
last_inputs = {
"url": invoke_url,
"headers": self.headers["stream"],
"json": payload,
"stream": True,
}
response = self.get_session_fn().post(**last_inputs)
self._try_raise(response)
call = self.copy()
def out_gen() -> Generator[dict, Any, Any]:
## Good for client, since it allows self.last_input
for line in response.iter_lines():
if line and line.strip() != b"data: [DONE]":
line = line.decode("utf-8")
msg, final_line = call.postprocess(line, stop=stop)
yield msg
if final_line:
break
self._try_raise(response)
return (r for r in out_gen())
####################################################################################
## Asynchronous streaming interface to allow multiple generations to happen at once.
async def get_req_astream(
self,
model: Optional[str] = None,
payload: dict = {},
invoke_url: Optional[str] = None,
stop: Optional[Sequence[str]] = None,
) -> AsyncIterator:
invoke_url = self._get_invoke_url(model, invoke_url)
if payload.get("stream", True) is False:
payload = {**payload, "stream": True}
last_inputs = {
"url": invoke_url,
"headers": self.headers["stream"],
"json": payload,
}
async with self.get_asession_fn() as session:
async with session.post(**last_inputs) as response:
self._try_raise(response)
async for line in response.content.iter_any():
if line and line.strip() != b"data: [DONE]":
line = line.decode("utf-8")
msg, final_line = self.postprocess(line, stop=stop)
yield msg
if final_line:
break
class _NVAIPlayClient(BaseModel):
"""
Higher-Level Client for interacting with AI Playground API with argument defaults.
Is subclassed by NVAIPlayLLM/ChatNVAIPlay to provide a simple LangChain interface.
"""
client: NVCRModel = Field(NVCRModel)
model: str = Field(..., description="Name of the model to invoke")
temperature: float = Field(0.2, le=1.0, gt=0.0)
top_p: float = Field(0.7, le=1.0, ge=0.0)
max_tokens: int = Field(1024, le=1024, ge=32)
####################################################################################
@root_validator(pre=True)
def validate_client(cls, values: Any) -> Any:
"""Validate and update client arguments, including API key and formatting"""
if not values.get("client"):
values["client"] = NVCRModel(**values)
return values
@classmethod
def is_lc_serializable(cls) -> bool:
return True
@property
def available_functions(self) -> List[dict]:
"""Map the available functions that can be invoked."""
return self.client.available_functions
@property
def available_models(self) -> dict:
"""Map the available models that can be invoked."""
return self.client.available_models
def get_model_details(self, model: Optional[str] = None) -> dict:
"""Get more meta-details about a model retrieved by a given name"""
if model is None:
model = self.model
model_key = self.client._get_invoke_url(model).split("/")[-1]
known_fns = self.client.available_functions
fn_spec = [f for f in known_fns if f.get("id") == model_key][0]
return fn_spec
def get_generation(
self,
inputs: Sequence[Dict],
labels: Optional[dict] = None,
stop: Optional[Sequence[str]] = None,
**kwargs: Any,
) -> dict:
"""Call to client generate method with call scope"""
payload = self.get_payload(inputs=inputs, stream=False, labels=labels, **kwargs)
out = self.client.get_req_generation(self.model, stop=stop, payload=payload)
return out
def get_stream(
self,
inputs: Sequence[Dict],
labels: Optional[dict] = None,
stop: Optional[Sequence[str]] = None,
**kwargs: Any,
) -> Iterator:
"""Call to client stream method with call scope"""
payload = self.get_payload(inputs=inputs, stream=True, labels=labels, **kwargs)
return self.client.get_req_stream(self.model, stop=stop, payload=payload)
def get_astream(
self,
inputs: Sequence[Dict],
labels: Optional[dict] = None,
stop: Optional[Sequence[str]] = None,
**kwargs: Any,
) -> AsyncIterator:
"""Call to client astream methods with call scope"""
payload = self.get_payload(inputs=inputs, stream=True, labels=labels, **kwargs)
return self.client.get_req_astream(self.model, stop=stop, payload=payload)
def get_payload(
self, inputs: Sequence[Dict], labels: Optional[dict] = None, **kwargs: Any
) -> dict:
"""Generates payload for the _NVAIPlayClient API to send to service."""
return {
**self.preprocess(inputs=inputs, labels=labels),
**kwargs,
}
def preprocess(self, inputs: Sequence[Dict], labels: Optional[dict] = None) -> dict:
"""Prepares a message or list of messages for the payload"""
messages = [self.prep_msg(m) for m in inputs]
if labels:
# (WFH) Labels are currently (?) always passed as an assistant
# suffix message, but this API seems less stable.
messages += [{"labels": labels, "role": "assistant"}]
return {"messages": messages}
def prep_msg(self, msg: Union[str, dict, BaseMessage]) -> dict:
"""Helper Method: Ensures a message is a dictionary with a role and content."""
if isinstance(msg, str):
# (WFH) this shouldn't ever be reached but leaving this here bcs
# it's a Chesterton's fence I'm unwilling to touch
return dict(role="user", content=msg)
if isinstance(msg, dict):
if msg.get("content", None) is None:
raise ValueError(f"Message {msg} has no content")
return msg
raise ValueError(f"Unknown message received: {msg} of type {type(msg)}")

View File

@@ -0,0 +1,207 @@
"""Chat Model Components Derived from ChatModel/NVAIPlay"""
from __future__ import annotations
import base64
import logging
import os
import urllib.parse
from typing import (
Any,
AsyncIterator,
Dict,
Iterator,
List,
Mapping,
Optional,
Sequence,
Union,
)
import requests
from langchain_core.callbacks.manager import (
AsyncCallbackManagerForLLMRun,
CallbackManagerForLLMRun,
)
from langchain_core.language_models.chat_models import SimpleChatModel
from langchain_core.messages import BaseMessage, ChatMessage, ChatMessageChunk
from langchain_core.outputs import ChatGenerationChunk
from langchain_nvidia_aiplay import _common as nv_aiplay
logger = logging.getLogger(__name__)
def _is_openai_parts_format(part: dict) -> bool:
return "type" in part
def _is_url(s: str) -> bool:
try:
result = urllib.parse.urlparse(s)
return all([result.scheme, result.netloc])
except Exception as e:
logger.debug(f"Unable to parse URL: {e}")
return False
def _is_b64(s: str) -> bool:
return s.startswith("data:image")
def _url_to_b64_string(image_source: str) -> str:
b64_template = "data:image/png;base64,{b64_string}"
try:
if _is_url(image_source):
response = requests.get(image_source)
response.raise_for_status()
encoded = base64.b64encode(response.content).decode("utf-8")
return b64_template.format(b64_string=encoded)
elif _is_b64(image_source):
return image_source
elif os.path.exists(image_source):
with open(image_source, "rb") as f:
encoded = base64.b64encode(f.read()).decode("utf-8")
return b64_template.format(b64_string=encoded)
else:
raise ValueError(
"The provided string is not a valid URL, base64, or file path."
)
except Exception as e:
raise ValueError(f"Unable to process the provided image source: {e}")
class ChatNVAIPlay(nv_aiplay._NVAIPlayClient, SimpleChatModel):
"""NVAIPlay chat model.
Example:
.. code-block:: python
from langchain_nvidia_aiplay import ChatNVAIPlay
model = ChatNVAIPlay(model="llama2_13b")
response = model.invoke("Hello")
"""
@property
def _llm_type(self) -> str:
"""Return type of NVIDIA AI Playground Interface."""
return "chat-nvidia-ai-playground"
def _call(
self,
messages: List[BaseMessage],
stop: Optional[Sequence[str]] = None,
run_manager: Optional[CallbackManagerForLLMRun] = None,
labels: Optional[dict] = None,
**kwargs: Any,
) -> str:
"""Invoke on a single list of chat messages."""
inputs = self.custom_preprocess(messages)
responses = self.get_generation(
inputs=inputs, stop=stop, labels=labels, **kwargs
)
outputs = self.custom_postprocess(responses)
return outputs
def _get_filled_chunk(
self, text: str, role: Optional[str] = "assistant"
) -> ChatGenerationChunk:
"""Fill the generation chunk."""
return ChatGenerationChunk(message=ChatMessageChunk(content=text, role=role))
def _stream(
self,
messages: List[BaseMessage],
stop: Optional[Sequence[str]] = None,
run_manager: Optional[CallbackManagerForLLMRun] = None,
labels: Optional[dict] = None,
**kwargs: Any,
) -> Iterator[ChatGenerationChunk]:
"""Allows streaming to model!"""
inputs = self.custom_preprocess(messages)
for response in self.get_stream(
inputs=inputs, stop=stop, labels=labels, **kwargs
):
chunk = self._get_filled_chunk(self.custom_postprocess(response))
yield chunk
if run_manager:
run_manager.on_llm_new_token(chunk.text, chunk=chunk)
async def _astream(
self,
messages: List[BaseMessage],
stop: Optional[Sequence[str]] = None,
run_manager: Optional[AsyncCallbackManagerForLLMRun] = None,
labels: Optional[dict] = None,
**kwargs: Any,
) -> AsyncIterator[ChatGenerationChunk]:
inputs = self.custom_preprocess(messages)
async for response in self.get_astream(
inputs=inputs, stop=stop, labels=labels, **kwargs
):
chunk = self._get_filled_chunk(self.custom_postprocess(response))
yield chunk
if run_manager:
await run_manager.on_llm_new_token(chunk.text, chunk=chunk)
def custom_preprocess(
self, msg_list: Sequence[BaseMessage]
) -> List[Dict[str, str]]:
# The previous author had a lot of custom preprocessing here
# but I'm just going to assume it's a list
return [self.preprocess_msg(m) for m in msg_list]
def _process_content(self, content: Union[str, List[Union[dict, str]]]) -> str:
if isinstance(content, str):
return content
string_array: list = []
for part in content:
if isinstance(part, str):
string_array.append(part)
elif isinstance(part, Mapping):
# OpenAI Format
if _is_openai_parts_format(part):
if part["type"] == "text":
string_array.append(str(part["text"]))
elif part["type"] == "image_url":
img_url = part["image_url"]
if isinstance(img_url, dict):
if "url" not in img_url:
raise ValueError(
f"Unrecognized message image format: {img_url}"
)
img_url = img_url["url"]
b64_string = _url_to_b64_string(img_url)
string_array.append(f'<img src="{b64_string}" />')
else:
raise ValueError(
f"Unrecognized message part type: {part['type']}"
)
else:
raise ValueError(f"Unrecognized message part format: {part}")
return "".join(string_array)
def preprocess_msg(self, msg: BaseMessage) -> Dict[str, str]:
## (WFH): Previous author added a bunch of
# custom processing here, but I'm just going to support
# the LCEL api.
if isinstance(msg, BaseMessage):
role_convert = {"ai": "assistant", "human": "user"}
if isinstance(msg, ChatMessage):
role = msg.role
else:
role = msg.type
role = role_convert.get(role, role)
content = self._process_content(msg.content)
return {"role": role, "content": content}
raise ValueError(f"Invalid message: {repr(msg)} of type {type(msg)}")
def custom_postprocess(self, msg: dict) -> str:
if "content" in msg:
return msg["content"]
logger.warning(
f"Got ambiguous message in postprocessing; returning as-is: msg = {msg}"
)
return str(msg)

View File

@@ -0,0 +1,74 @@
"""Embeddings Components Derived from ChatModel/NVAIPlay"""
from typing import Any, List, Literal, Optional
from langchain_core.embeddings import Embeddings
from langchain_core.pydantic_v1 import BaseModel, Field, root_validator
import langchain_nvidia_aiplay._common as nvaiplay_common
class NVAIPlayEmbeddings(BaseModel, Embeddings):
"""NVIDIA's AI Playground NVOLVE Question-Answer Asymmetric Model."""
client: nvaiplay_common.NVCRModel = Field(nvaiplay_common.NVCRModel)
model: str = Field(
..., description="The embedding model to use. Example: nvolveqa_40k"
)
max_length: int = Field(2048, ge=1, le=2048)
max_batch_size: int = Field(default=50)
model_type: Optional[Literal["passage", "query"]] = Field(
"passage", description="The type of text to be embedded."
)
@root_validator(pre=True)
def _validate_client(cls, values: Any) -> Any:
if "client" not in values:
values["client"] = nvaiplay_common.NVCRModel()
return values
@property
def available_models(self) -> dict:
"""Map the available models that can be invoked."""
return self.client.available_models
def _embed(
self, texts: List[str], model_type: Literal["passage", "query"]
) -> List[List[float]]:
"""Embed a single text entry to either passage or query type"""
response = self.client.get_req(
model_name=self.model,
payload={
"input": texts,
"model": model_type,
"encoding_format": "float",
},
)
response.raise_for_status()
result = response.json()
data = result["data"]
if not isinstance(data, list):
raise ValueError(f"Expected a list of embeddings. Got: {data}")
embedding_list = [(res["embedding"], res["index"]) for res in data]
return [x[0] for x in sorted(embedding_list, key=lambda x: x[1])]
def embed_query(self, text: str) -> List[float]:
"""Input pathway for query embeddings."""
return self._embed([text], model_type=self.model_type or "query")[0]
def embed_documents(self, texts: List[str]) -> List[List[float]]:
"""Input pathway for document embeddings."""
# From https://catalog.ngc.nvidia.com/orgs/nvidia/teams/ai-foundation/models/nvolve-40k/documentation
# The input must not exceed the 2048 max input characters and inputs above 512
# model tokens will be truncated. The input array must not exceed 50 input
# strings.
all_embeddings = []
for i in range(0, len(texts), self.max_batch_size):
batch = texts[i : i + self.max_batch_size]
truncated = [
text[: self.max_length] if len(text) > self.max_length else text
for text in batch
]
all_embeddings.extend(
self._embed(truncated, model_type=self.model_type or "passage")
)
return all_embeddings

1235
libs/partners/nvidia-aiplay/poetry.lock generated Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,92 @@
[tool.poetry]
name = "langchain-nvidia-aiplay"
version = "0.0.1"
description = "An integration package connecting NVidia AIPlay and LangChain"
authors = []
readme = "README.md"
repository = "https://github.com/langchain-ai/langchain/tree/master/libs/partners/nvidia-aiplay"
[tool.poetry.dependencies]
python = ">=3.8.1,<4.0"
langchain-core = "^0.1.0"
aiohttp = "^3.9.1"
[tool.poetry.group.test]
optional = true
[tool.poetry.group.test.dependencies]
pytest = "^7.3.0"
freezegun = "^1.2.2"
pytest-mock = "^3.10.0"
syrupy = "^4.0.2"
pytest-watcher = "^0.3.4"
pytest-asyncio = "^0.21.1"
langchain-core = {path = "../../core", develop = true}
[tool.poetry.group.codespell]
optional = true
[tool.poetry.group.codespell.dependencies]
codespell = "^2.2.0"
[tool.poetry.group.test_integration]
optional = true
[tool.poetry.group.test_integration.dependencies]
[tool.poetry.group.lint]
optional = true
[tool.poetry.group.lint.dependencies]
ruff = "^0.1.5"
[tool.poetry.group.typing.dependencies]
mypy = "^0.991"
langchain-core = {path = "../../core", develop = true}
types-requests = "^2.31.0.10"
[tool.poetry.group.dev]
optional = true
[tool.poetry.group.dev.dependencies]
langchain-core = {path = "../../core", develop = true}
[tool.ruff]
select = [
"E", # pycodestyle
"F", # pyflakes
"I", # isort
]
[tool.mypy]
disallow_untyped_defs = "True"
exclude = ["notebooks", "examples", "example_data", "langchain_core/pydantic"]
[tool.coverage.run]
omit = [
"tests/*",
]
[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"
[tool.pytest.ini_options]
# --strict-markers will raise errors on unknown marks.
# https://docs.pytest.org/en/7.1.x/how-to/mark.html#raising-errors-on-unknown-marks
#
# https://docs.pytest.org/en/7.1.x/reference/reference.html
# --strict-config any warnings encountered while parsing the `pytest`
# section of the configuration file raise errors.
#
# https://github.com/tophat/syrupy
# --snapshot-warn-unused Prints a warning on unused snapshots rather than fail the test suite.
addopts = "--snapshot-warn-unused --strict-markers --strict-config --durations=5"
# Registering custom markers.
# https://docs.pytest.org/en/7.1.x/example/markers.html#registering-markers
markers = [
"requires: mark tests as requiring a specific library",
"asyncio: mark tests as requiring asyncio",
"compile: mark placeholder test used to compile integration tests without running them",
]
asyncio_mode = "auto"

View File

@@ -0,0 +1,17 @@
import sys
import traceback
from importlib.machinery import SourceFileLoader
if __name__ == "__main__":
files = sys.argv[1:]
has_failure = False
for file in files:
try:
SourceFileLoader("x", file).load_module()
except Exception:
has_faillure = True
print(file)
traceback.print_exc()
print()
sys.exit(1 if has_failure else 0)

View File

@@ -0,0 +1,27 @@
#!/bin/bash
#
# This script searches for lines starting with "import pydantic" or "from pydantic"
# in tracked files within a Git repository.
#
# Usage: ./scripts/check_pydantic.sh /path/to/repository
# Check if a path argument is provided
if [ $# -ne 1 ]; then
echo "Usage: $0 /path/to/repository"
exit 1
fi
repository_path="$1"
# Search for lines matching the pattern within the specified repository
result=$(git -C "$repository_path" grep -E '^import pydantic|^from pydantic')
# Check if any matching lines were found
if [ -n "$result" ]; then
echo "ERROR: The following lines need to be updated:"
echo "$result"
echo "Please replace the code with an import from langchain_core.pydantic_v1."
echo "For example, replace 'from pydantic import BaseModel'"
echo "with 'from langchain_core.pydantic_v1 import BaseModel'"
exit 1
fi

View File

@@ -0,0 +1,17 @@
#!/bin/bash
set -eu
# Initialize a variable to keep track of errors
errors=0
# make sure not importing from langchain or langchain_experimental
git --no-pager grep '^from langchain\.' . && errors=$((errors+1))
git --no-pager grep '^from langchain_experimental\.' . && errors=$((errors+1))
# Decide on an exit status based on the errors
if [ "$errors" -gt 0 ]; then
exit 1
else
exit 0
fi

View File

@@ -0,0 +1,96 @@
"""Test ChatNVAIPlay chat model."""
from langchain_core.messages import BaseMessage, HumanMessage, SystemMessage
from langchain_nvidia_aiplay.chat_models import ChatNVAIPlay
def test_chat_aiplay() -> None:
"""Test ChatNVAIPlay wrapper."""
chat = ChatNVAIPlay(
model="llama2_13b",
temperature=0.7,
)
message = HumanMessage(content="Hello")
response = chat([message])
assert isinstance(response, BaseMessage)
assert isinstance(response.content, str)
def test_chat_aiplay_model() -> None:
"""Test GeneralChat wrapper handles model."""
chat = ChatNVAIPlay(model="mistral")
assert chat.model == "mistral"
def test_chat_aiplay_system_message() -> None:
"""Test GeneralChat wrapper with system message."""
chat = ChatNVAIPlay(model="llama2_13b", max_tokens=36)
system_message = SystemMessage(content="You are to chat with the user.")
human_message = HumanMessage(content="Hello")
response = chat([system_message, human_message])
assert isinstance(response, BaseMessage)
assert isinstance(response.content, str)
## TODO: Not sure if we want to support the n syntax. Trash or keep test
def test_aiplay_streaming() -> None:
"""Test streaming tokens from aiplay."""
llm = ChatNVAIPlay(model="llama2_13b", max_tokens=36)
for token in llm.stream("I'm Pickle Rick"):
assert isinstance(token.content, str)
async def test_aiplay_astream() -> None:
"""Test streaming tokens from aiplay."""
llm = ChatNVAIPlay(model="llama2_13b", max_tokens=35)
async for token in llm.astream("I'm Pickle Rick"):
assert isinstance(token.content, str)
async def test_aiplay_abatch() -> None:
"""Test streaming tokens from GeneralChat."""
llm = ChatNVAIPlay(model="llama2_13b", max_tokens=36)
result = await llm.abatch(["I'm Pickle Rick", "I'm not Pickle Rick"])
for token in result:
assert isinstance(token.content, str)
async def test_aiplay_abatch_tags() -> None:
"""Test batch tokens from GeneralChat."""
llm = ChatNVAIPlay(model="llama2_13b", max_tokens=55)
result = await llm.abatch(
["I'm Pickle Rick", "I'm not Pickle Rick"], config={"tags": ["foo"]}
)
for token in result:
assert isinstance(token.content, str)
def test_aiplay_batch() -> None:
"""Test batch tokens from GeneralChat."""
llm = ChatNVAIPlay(model="llama2_13b", max_tokens=60)
result = llm.batch(["I'm Pickle Rick", "I'm not Pickle Rick"])
for token in result:
assert isinstance(token.content, str)
async def test_aiplay_ainvoke() -> None:
"""Test invoke tokens from GeneralChat."""
llm = ChatNVAIPlay(model="llama2_13b", max_tokens=60)
result = await llm.ainvoke("I'm Pickle Rick", config={"tags": ["foo"]})
assert isinstance(result.content, str)
def test_aiplay_invoke() -> None:
"""Test invoke tokens from GeneralChat."""
llm = ChatNVAIPlay(model="llama2_13b", max_tokens=60)
result = llm.invoke("I'm Pickle Rick", config=dict(tags=["foo"]))
assert isinstance(result.content, str)

View File

@@ -0,0 +1,7 @@
import pytest
@pytest.mark.compile
def test_placeholder() -> None:
"""Used for compiling integration tests without running any real tests."""
pass

View File

@@ -0,0 +1,48 @@
"""Test NVIDIA AI Playground Embeddings.
Note: These tests are designed to validate the functionality of NVAIPlayEmbeddings.
"""
from langchain_nvidia_aiplay import NVAIPlayEmbeddings
def test_nvai_play_embedding_documents() -> None:
"""Test NVAIPlay embeddings for documents."""
documents = ["foo bar"]
embedding = NVAIPlayEmbeddings(model="nvolveqa_40k")
output = embedding.embed_documents(documents)
assert len(output) == 1
assert len(output[0]) == 1024 # Assuming embedding size is 2048
def test_nvai_play_embedding_documents_multiple() -> None:
"""Test NVAIPlay embeddings for multiple documents."""
documents = ["foo bar", "bar foo", "foo"]
embedding = NVAIPlayEmbeddings(model="nvolveqa_40k")
output = embedding.embed_documents(documents)
assert len(output) == 3
assert all(len(doc) == 1024 for doc in output)
def test_nvai_play_embedding_query() -> None:
"""Test NVAIPlay embeddings for a single query."""
query = "foo bar"
embedding = NVAIPlayEmbeddings(model="nvolveqa_40k")
output = embedding.embed_query(query)
assert len(output) == 1024
async def test_nvai_play_embedding_async_query() -> None:
"""Test NVAIPlay async embeddings for a single query."""
query = "foo bar"
embedding = NVAIPlayEmbeddings(model="nvolveqa_40k")
output = await embedding.aembed_query(query)
assert len(output) == 1024
async def test_nvai_play_embedding_async_documents() -> None:
"""Test NVAIPlay async embeddings for multiple documents."""
documents = ["foo bar", "bar foo", "foo"]
embedding = NVAIPlayEmbeddings(model="nvolveqa_40k")
output = await embedding.aembed_documents(documents)
assert len(output) == 3
assert all(len(doc) == 1024 for doc in output)

View File

@@ -0,0 +1,16 @@
"""Test chat model integration."""
from langchain_nvidia_aiplay.chat_models import ChatNVAIPlay
def test_integration_initialization() -> None:
"""Test chat model initialization."""
ChatNVAIPlay(
model="llama2_13b",
nvidia_api_key="nvapi-...",
temperature=0.5,
top_p=0.9,
max_tokens=50,
)
ChatNVAIPlay(model="mistral", nvidia_api_key="nvapi-...")

View File

@@ -0,0 +1,7 @@
from langchain_nvidia_aiplay import __all__
EXPECTED_ALL = ["ChatNVAIPlay", "NVAIPlayEmbeddings"]
def test_all_imports() -> None:
assert sorted(EXPECTED_ALL) == sorted(__all__)

View File

@@ -0,0 +1,3 @@
docs/img_*.jpg
chroma_db_proposals
multi_vector_retriever_metadata

View File

@@ -0,0 +1,21 @@
MIT License
Copyright (c) 2023 LangChain, Inc.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View File

@@ -0,0 +1,81 @@
# rag-chroma-dense-retrieval
This template demonstrates the multi-vector indexing strategy proposed by Chen, et. al.'s [Dense X Retrieval: What Retrieval Granularity Should We Use?](https://arxiv.org/abs/2312.06648). The prompt, which you can [try out on the hub](https://smith.langchain.com/hub/wfh/proposal-indexing), directs an LLM to generate de-contextualized "propositions" which can be vectorized to increase the retrieval accuracy. You can see the full definition in `proposal_chain.py`.
![Retriever Diagram](./_images/retriever_diagram.png)
## Storage
For this demo, we index a simple academic paper using the RecursiveUrlLoader, and store all retriever information locally (using chroma and a bytestore stored on the local filesystem). You can modify the storage layer in `storage.py`.
## Environment Setup
Set the `OPENAI_API_KEY` environment variable to access `gpt-3.5` and the OpenAI Embeddings classes.
## Indexing
Create the index by running the following:
```python
poetry install
poetry run python rag_chroma_dense_retrieval/ingest.py
```
## Usage
To use this package, you should first have the LangChain CLI installed:
```shell
pip install -U langchain-cli
```
To create a new LangChain project and install this as the only package, you can do:
```shell
langchain app new my-app --package rag-chroma-dense-retrieval
```
If you want to add this to an existing project, you can just run:
```shell
langchain app add rag-chroma-dense-retrieval
```
And add the following code to your `server.py` file:
```python
from rag_chroma_dense_retrieval import chain
add_routes(app, chain, path="/rag-chroma-dense-retrieval")
```
(Optional) Let's now configure LangSmith.
LangSmith will help us trace, monitor and debug LangChain applications.
LangSmith is currently in private beta, you can sign up [here](https://smith.langchain.com/).
If you don't have access, you can skip this section
```shell
export LANGCHAIN_TRACING_V2=true
export LANGCHAIN_API_KEY=<your-api-key>
export LANGCHAIN_PROJECT=<your-project> # if not specified, defaults to "default"
```
If you are inside this directory, then you can spin up a LangServe instance directly by:
```shell
langchain serve
```
This will start the FastAPI app with a server is running locally at
[http://localhost:8000](http://localhost:8000)
We can see all templates at [http://127.0.0.1:8000/docs](http://127.0.0.1:8000/docs)
We can access the playground at [http://127.0.0.1:8000/rag-chroma-dense-retrieval/playground](http://127.0.0.1:8000/rag-chroma-dense-retrieval/playground)
We can access the template from code with:
```python
from langserve.client import RemoteRunnable
runnable = RemoteRunnable("http://localhost:8000/rag-chroma-dense-retrieval")
```

Binary file not shown.

After

Width:  |  Height:  |  Size: 375 KiB

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,35 @@
[tool.poetry]
name = "rag-chroma-dense-retrieval"
version = "0.1.0"
description = "Dense retrieval using vectorized propositions.s"
authors = [
"William Fu-Hinthorn <will@langchain.dev>",
]
readme = "README.md"
[tool.poetry.dependencies]
python = ">=3.8.1,<4.0"
langchain = ">=0.0.350"
openai = "<2"
tiktoken = ">=0.5.1"
chromadb = ">=0.4.14"
bs4 = "^0.0.1"
[tool.poetry.group.dev.dependencies]
langchain-cli = ">=0.0.15"
[tool.langserve]
export_module = "rag_chroma_multi_modal_multi_vector"
export_attr = "chain"
[tool.templates-hub]
use-case = "rag"
author = "LangChain"
integrations = ["OpenAI", "Chroma"]
tags = ["vectordbs"]
[build-system]
requires = [
"poetry-core",
]
build-backend = "poetry.core.masonry.api"

View File

@@ -0,0 +1,68 @@
{
"cells": [
{
"attachments": {},
"cell_type": "markdown",
"id": "681a5d1e",
"metadata": {},
"source": [
"## Run Template\n",
"\n",
"In `server.py`, set -\n",
"```\n",
"from fastapi import FastAPI\n",
"from langserve import add_routes\n",
"from rag_chroma_dense_retrieval import chain\n",
"\n",
"app = FastAPI(\n",
" title=\"LangChain Server\",\n",
" version=\"1.0\",\n",
" description=\"Retriever and Generator for RAG Chroma Dense Retrieval\",\n",
")\n",
"\n",
"add_routes(app, chain, path=\"/rag-chroma-dense-retrieval\")\n",
"\n",
"if __name__ == \"__main__\":\n",
" import uvicorn\n",
"\n",
" uvicorn.run(app, host=\"localhost\", port=8000)\n",
"\n",
"```"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "d774be2a",
"metadata": {},
"outputs": [],
"source": [
"from langserve.client import RemoteRunnable\n",
"\n",
"rag_app = RemoteRunnable(\"http://localhost:8001/rag-chroma-dense-retrieval\")\n",
"rag_app.invoke(\"How are transformers related to convolutional neural networks?\")"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.2"
}
},
"nbformat": 4,
"nbformat_minor": 5
}

View File

@@ -0,0 +1,4 @@
from rag_chroma_dense_retrieval.chain import chain
from rag_chroma_dense_retrieval.proposal_chain import proposition_chain
__all__ = ["chain", "proposition_chain"]

View File

@@ -0,0 +1,67 @@
from langchain_community.chat_models import ChatOpenAI
from langchain_core.load import load
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.pydantic_v1 import BaseModel
from langchain_core.runnables import RunnablePassthrough
from rag_chroma_dense_retrieval.constants import DOCSTORE_ID_KEY
from rag_chroma_dense_retrieval.storage import get_multi_vector_retriever
def format_docs(docs: list) -> str:
loaded_docs = [load(doc) for doc in docs]
return "\n".join(
[
f"<Document id={i}>\n{doc.page_content}\n</Document>"
for i, doc in enumerate(loaded_docs)
]
)
def rag_chain(retriever):
"""
The RAG chain
:param retriever: A function that retrieves the necessary context for the model.
:return: A chain of functions representing the multi-modal RAG process.
"""
model = ChatOpenAI(temperature=0, model="gpt-4-1106-preview", max_tokens=1024)
prompt = ChatPromptTemplate.from_messages(
[
(
"system",
"You are an AI assistant. Answer based on the retrieved documents:"
"\n<Documents>\n{context}\n</Documents>",
),
("user", "{question}?"),
]
)
# Define the RAG pipeline
chain = (
{
"context": retriever | format_docs,
"question": RunnablePassthrough(),
}
| prompt
| model
| StrOutputParser()
)
return chain
# Create the multi-vector retriever
retriever = get_multi_vector_retriever(DOCSTORE_ID_KEY)
# Create RAG chain
chain = rag_chain(retriever)
# Add typing for input
class Question(BaseModel):
__root__: str
chain = chain.with_types(input_type=Question)

View File

@@ -0,0 +1 @@
DOCSTORE_ID_KEY = "doc_id"

View File

@@ -0,0 +1,87 @@
import logging
import uuid
from typing import Sequence
from langchain_core.documents import Document
from langchain_core.runnables import Runnable
from rag_chroma_dense_retrieval.constants import DOCSTORE_ID_KEY
from rag_chroma_dense_retrieval.proposal_chain import proposition_chain
from rag_chroma_dense_retrieval.storage import get_multi_vector_retriever
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def add_documents(
retriever,
propositions: Sequence[Sequence[str]],
docs: Sequence[Document],
id_key: str = DOCSTORE_ID_KEY,
):
doc_ids = [
str(uuid.uuid5(uuid.NAMESPACE_DNS, doc.metadata["source"])) for doc in docs
]
prop_docs = [
Document(page_content=prop, metadata={id_key: doc_ids[i]})
for i, props in enumerate(propositions)
for prop in props
if prop
]
retriever.vectorstore.add_documents(prop_docs)
retriever.docstore.mset(list(zip(doc_ids, docs)))
def create_index(
docs: Sequence[Document],
indexer: Runnable,
docstore_id_key: str = DOCSTORE_ID_KEY,
):
"""
Create retriever that indexes docs and their propositions
:param docs: Documents to index
:param indexer: Runnable creates additional propositions per doc
:param docstore_id_key: Key to use to store the docstore id
:return: Retriever
"""
logger.info("Creating multi-vector retriever")
retriever = get_multi_vector_retriever(docstore_id_key)
propositions = indexer.batch([{"input": doc.page_content} for doc in docs])
add_documents(
retriever,
propositions,
docs,
id_key=docstore_id_key,
)
return retriever
if __name__ == "__main__":
# For our example, we'll load docs from the web
from langchain.text_splitter import RecursiveCharacterTextSplitter # noqa
from langchain_community.document_loaders.recursive_url_loader import (
RecursiveUrlLoader,
) # noqa
# The attention is all you need paper
# Could add more parsing here, as it's very raw.
loader = RecursiveUrlLoader("https://ar5iv.labs.arxiv.org/html/1706.03762")
data = loader.load()
logger.info(f"Loaded {len(data)} documents")
# Split
text_splitter = RecursiveCharacterTextSplitter(chunk_size=8000, chunk_overlap=0)
all_splits = text_splitter.split_documents(data)
logger.info(f"Split into {len(all_splits)} documents")
# Create retriever
retriever_multi_vector_img = create_index(
all_splits,
proposition_chain,
DOCSTORE_ID_KEY,
)

View File

@@ -0,0 +1,107 @@
import logging
from langchain.output_parsers.openai_tools import JsonOutputToolsParser
from langchain_community.chat_models import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableLambda
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Modified from the paper to be more robust to benign prompt injection
# https://arxiv.org/abs/2312.06648
# @misc{chen2023dense,
# title={Dense X Retrieval: What Retrieval Granularity Should We Use?},
# author={Tong Chen and Hongwei Wang and Sihao Chen and Wenhao Yu and Kaixin Ma
# and Xinran Zhao and Hongming Zhang and Dong Yu},
# year={2023},
# eprint={2312.06648},
# archivePrefix={arXiv},
# primaryClass={cs.CL}
# }
PROMPT = ChatPromptTemplate.from_messages(
[
(
"system",
"""Decompose the "Content" into clear and simple propositions, ensuring they are interpretable out of
context.
1. Split compound sentence into simple sentences. Maintain the original phrasing from the input
whenever possible.
2. For any named entity that is accompanied by additional descriptive information, separate this
information into its own distinct proposition.
3. Decontextualize the proposition by adding necessary modifier to nouns or entire sentences
and replacing pronouns (e.g., "it", "he", "she", "they", "this", "that") with the full name of the
entities they refer to.
4. Present the results as a list of strings, formatted in JSON.
Example:
Input: Title: ¯Eostre. Section: Theories and interpretations, Connection to Easter Hares. Content:
The earliest evidence for the Easter Hare (Osterhase) was recorded in south-west Germany in
1678 by the professor of medicine Georg Franck von Franckenau, but it remained unknown in
other parts of Germany until the 18th century. Scholar Richard Sermon writes that "hares were
frequently seen in gardens in spring, and thus may have served as a convenient explanation for the
origin of the colored eggs hidden there for children. Alternatively, there is a European tradition
that hares laid eggs, since a hares scratch or form and a lapwings nest look very similar, and
both occur on grassland and are first seen in the spring. In the nineteenth century the influence
of Easter cards, toys, and books was to make the Easter Hare/Rabbit popular throughout Europe.
German immigrants then exported the custom to Britain and America where it evolved into the
Easter Bunny."
Output: [ "The earliest evidence for the Easter Hare was recorded in south-west Germany in
1678 by Georg Franck von Franckenau.", "Georg Franck von Franckenau was a professor of
medicine.", "The evidence for the Easter Hare remained unknown in other parts of Germany until
the 18th century.", "Richard Sermon was a scholar.", "Richard Sermon writes a hypothesis about
the possible explanation for the connection between hares and the tradition during Easter", "Hares
were frequently seen in gardens in spring.", "Hares may have served as a convenient explanation
for the origin of the colored eggs hidden in gardens for children.", "There is a European tradition
that hares laid eggs.", "A hares scratch or form and a lapwings nest look very similar.", "Both
hares and lapwings nests occur on grassland and are first seen in the spring.", "In the nineteenth
century the influence of Easter cards, toys, and books was to make the Easter Hare/Rabbit popular
throughout Europe.", "German immigrants exported the custom of the Easter Hare/Rabbit to
Britain and America.", "The custom of the Easter Hare/Rabbit evolved into the Easter Bunny in
Britain and America."]""", # noqa
),
("user", "Decompose the following:\n{input}"),
]
)
def get_propositions(tool_calls: list) -> list:
if not tool_calls:
raise ValueError("No tool calls found")
return tool_calls[0]["args"]["propositions"]
def empty_proposals(x):
# Model couldn't generate proposals
return []
proposition_chain = (
PROMPT
| ChatOpenAI(model="gpt-3.5-turbo-16k").bind(
tools=[
{
"type": "function",
"function": {
"name": "decompose_content",
"description": "Return the decomposed propositions",
"parameters": {
"type": "object",
"properties": {
"propositions": {
"type": "array",
"items": {"type": "string"},
}
},
"required": ["propositions"],
},
},
}
],
tool_choice={"type": "function", "function": {"name": "decompose_content"}},
)
| JsonOutputToolsParser()
| get_propositions
).with_fallbacks([RunnableLambda(empty_proposals)])

View File

@@ -0,0 +1,38 @@
import logging
from pathlib import Path
from langchain.embeddings import OpenAIEmbeddings
from langchain.retrievers.multi_vector import MultiVectorRetriever
from langchain.storage import LocalFileStore
from langchain_community.vectorstores import Chroma
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def get_multi_vector_retriever(docstore_id_key: str):
"""Create the composed retriever object."""
vectorstore = get_vectorstore()
store = get_docstore()
return MultiVectorRetriever(
vectorstore=vectorstore,
byte_store=store,
id_key=docstore_id_key,
)
def get_vectorstore(collection_name: str = "proposals"):
"""Get the vectorstore used for this example."""
return Chroma(
collection_name=collection_name,
persist_directory=str(Path(__file__).parent.parent / "chroma_db_proposals"),
embedding_function=OpenAIEmbeddings(),
)
def get_docstore():
"""Get the metadata store used for this example."""
return LocalFileStore(
str(Path(__file__).parent.parent / "multi_vector_retriever_metadata")
)