community[minor]: Adding Azure Cosmos Mongo vCore Vector DB Cache (#16856)

Description:

This pull request introduces several enhancements for Azure Cosmos
Vector DB, primarily focused on improving caching and search
capabilities using Azure Cosmos MongoDB vCore Vector DB. Here's a
summary of the changes:

- **AzureCosmosDBSemanticCache**: Added a new cache implementation
called AzureCosmosDBSemanticCache, which utilizes Azure Cosmos MongoDB
vCore Vector DB for efficient caching of semantic data. Added
comprehensive test cases for AzureCosmosDBSemanticCache to ensure its
correctness and robustness. These tests cover various scenarios and edge
cases to validate the cache's behavior.
- **HNSW Vector Search**: Added HNSW vector search functionality in the
CosmosDB Vector Search module. This enhancement enables more efficient
and accurate vector searches by utilizing the HNSW (Hierarchical
Navigable Small World) algorithm. Added corresponding test cases to
validate the HNSW vector search functionality in both
AzureCosmosDBSemanticCache and AzureCosmosDBVectorSearch. These tests
ensure the correctness and performance of the HNSW search algorithm.
- **LLM Caching Notebook** - The notebook now includes a comprehensive
example showcasing the usage of the AzureCosmosDBSemanticCache. This
example highlights how the cache can be employed to efficiently store
and retrieve semantic data. Additionally, the example provides default
values for all parameters used within the AzureCosmosDBSemanticCache,
ensuring clarity and ease of understanding for users who are new to the
cache implementation.
 
 @hwchase17,@baskaryan, @eyurtsev,
This commit is contained in:
Aayush Kataria
2024-03-03 14:04:15 -08:00
committed by GitHub
parent db47b5deee
commit 7c2f3f6f95
6 changed files with 1507 additions and 126 deletions

View File

@@ -29,6 +29,7 @@ import uuid
import warnings
from abc import ABC
from datetime import timedelta
from enum import Enum
from functools import lru_cache, wraps
from typing import (
TYPE_CHECKING,
@@ -51,6 +52,11 @@ from sqlalchemy.engine import Row
from sqlalchemy.engine.base import Engine
from sqlalchemy.orm import Session
from langchain_community.vectorstores.azure_cosmos_db import (
CosmosDBSimilarityType,
CosmosDBVectorSearchType,
)
try:
from sqlalchemy.orm import declarative_base
except ImportError:
@@ -68,6 +74,7 @@ from langchain_community.utilities.astradb import (
SetupMode,
_AstraDBCollectionEnvironment,
)
from langchain_community.vectorstores import AzureCosmosDBVectorSearch
from langchain_community.vectorstores.redis import Redis as RedisVectorstore
logger = logging.getLogger(__file__)
@@ -1837,3 +1844,194 @@ class AstraDBSemanticCache(BaseCache):
async def aclear(self, **kwargs: Any) -> None:
await self.astra_env.aensure_db_setup()
await self.async_collection.clear()
class AzureCosmosDBSemanticCache(BaseCache):
"""Cache that uses Cosmos DB Mongo vCore vector-store backend"""
DEFAULT_DATABASE_NAME = "CosmosMongoVCoreCacheDB"
DEFAULT_COLLECTION_NAME = "CosmosMongoVCoreCacheColl"
def __init__(
self,
cosmosdb_connection_string: str,
database_name: str,
collection_name: str,
embedding: Embeddings,
*,
cosmosdb_client: Optional[Any] = None,
num_lists: int = 100,
similarity: CosmosDBSimilarityType = CosmosDBSimilarityType.COS,
kind: CosmosDBVectorSearchType = CosmosDBVectorSearchType.VECTOR_IVF,
dimensions: int = 1536,
m: int = 16,
ef_construction: int = 64,
ef_search: int = 40,
score_threshold: Optional[float] = None,
):
"""
Args:
cosmosdb_connection_string: Cosmos DB Mongo vCore connection string
cosmosdb_client: Cosmos DB Mongo vCore client
embedding (Embedding): Embedding provider for semantic encoding and search.
database_name: Database name for the CosmosDBMongoVCoreSemanticCache
collection_name: Collection name for the CosmosDBMongoVCoreSemanticCache
num_lists: This integer is the number of clusters that the
inverted file (IVF) index uses to group the vector data.
We recommend that numLists is set to documentCount/1000
for up to 1 million documents and to sqrt(documentCount)
for more than 1 million documents.
Using a numLists value of 1 is akin to performing
brute-force search, which has limited performance
dimensions: Number of dimensions for vector similarity.
The maximum number of supported dimensions is 2000
similarity: Similarity metric to use with the IVF index.
Possible options are:
- CosmosDBSimilarityType.COS (cosine distance),
- CosmosDBSimilarityType.L2 (Euclidean distance), and
- CosmosDBSimilarityType.IP (inner product).
kind: Type of vector index to create.
Possible options are:
- vector-ivf
- vector-hnsw: available as a preview feature only,
to enable visit https://learn.microsoft.com/en-us/azure/azure-resource-manager/management/preview-features
m: The max number of connections per layer (16 by default, minimum
value is 2, maximum value is 100). Higher m is suitable for datasets
with high dimensionality and/or high accuracy requirements.
ef_construction: the size of the dynamic candidate list for constructing
the graph (64 by default, minimum value is 4, maximum
value is 1000). Higher ef_construction will result in
better index quality and higher accuracy, but it will
also increase the time required to build the index.
ef_construction has to be at least 2 * m
ef_search: The size of the dynamic candidate list for search
(40 by default). A higher value provides better
recall at the cost of speed.
score_threshold: Maximum score used to filter the vector search documents.
"""
self._validate_enum_value(similarity, CosmosDBSimilarityType)
self._validate_enum_value(kind, CosmosDBVectorSearchType)
if not cosmosdb_connection_string:
raise ValueError(" CosmosDB connection string can be empty.")
self.cosmosdb_connection_string = cosmosdb_connection_string
self.cosmosdb_client = cosmosdb_client
self.embedding = embedding
self.database_name = database_name or self.DEFAULT_DATABASE_NAME
self.collection_name = collection_name or self.DEFAULT_COLLECTION_NAME
self.num_lists = num_lists
self.dimensions = dimensions
self.similarity = similarity
self.kind = kind
self.m = m
self.ef_construction = ef_construction
self.ef_search = ef_search
self.score_threshold = score_threshold
self._cache_dict: Dict[str, AzureCosmosDBVectorSearch] = {}
def _index_name(self, llm_string: str) -> str:
hashed_index = _hash(llm_string)
return f"cache:{hashed_index}"
def _get_llm_cache(self, llm_string: str) -> AzureCosmosDBVectorSearch:
index_name = self._index_name(llm_string)
namespace = self.database_name + "." + self.collection_name
# return vectorstore client for the specific llm string
if index_name in self._cache_dict:
return self._cache_dict[index_name]
# create new vectorstore client for the specific llm string
if self.cosmosdb_client:
collection = self.cosmosdb_client[self.database_name][self.collection_name]
self._cache_dict[index_name] = AzureCosmosDBVectorSearch(
collection=collection,
embedding=self.embedding,
index_name=index_name,
)
else:
self._cache_dict[
index_name
] = AzureCosmosDBVectorSearch.from_connection_string(
connection_string=self.cosmosdb_connection_string,
namespace=namespace,
embedding=self.embedding,
index_name=index_name,
)
# create index for the vectorstore
vectorstore = self._cache_dict[index_name]
if not vectorstore.index_exists():
vectorstore.create_index(
self.num_lists,
self.dimensions,
self.similarity,
self.kind,
self.m,
self.ef_construction,
)
return vectorstore
def lookup(self, prompt: str, llm_string: str) -> Optional[RETURN_VAL_TYPE]:
"""Look up based on prompt and llm_string."""
llm_cache = self._get_llm_cache(llm_string)
generations: List = []
# Read from a Hash
results = llm_cache.similarity_search(
query=prompt,
k=1,
kind=self.kind,
ef_search=self.ef_search,
score_threshold=self.score_threshold,
)
if results:
for document in results:
try:
generations.extend(loads(document.metadata["return_val"]))
except Exception:
logger.warning(
"Retrieving a cache value that could not be deserialized "
"properly. This is likely due to the cache being in an "
"older format. Please recreate your cache to avoid this "
"error."
)
# In a previous life we stored the raw text directly
# in the table, so assume it's in that format.
generations.extend(
_load_generations_from_json(document.metadata["return_val"])
)
return generations if generations else None
def update(self, prompt: str, llm_string: str, return_val: RETURN_VAL_TYPE) -> None:
"""Update cache based on prompt and llm_string."""
for gen in return_val:
if not isinstance(gen, Generation):
raise ValueError(
"CosmosDBMongoVCoreSemanticCache only supports caching of "
f"normal LLM generations, got {type(gen)}"
)
llm_cache = self._get_llm_cache(llm_string)
metadata = {
"llm_string": llm_string,
"prompt": prompt,
"return_val": dumps([g for g in return_val]),
}
llm_cache.add_texts(texts=[prompt], metadatas=[metadata])
def clear(self, **kwargs: Any) -> None:
"""Clear semantic cache for a given llm_string."""
index_name = self._index_name(kwargs["llm_string"])
if index_name in self._cache_dict:
self._cache_dict[index_name].get_collection().delete_many({})
# self._cache_dict[index_name].clear_collection()
@staticmethod
def _validate_enum_value(value: Any, enum_type: Type[Enum]) -> None:
if not isinstance(value, enum_type):
raise ValueError(f"Invalid enum value: {value}. Expected {enum_type}.")