LangChain Community: VectorStores: Azure Cosmos DB Filtered Vector Search (#24087)

Thank you for contributing to LangChain!

- This PR adds vector search filtering for Azure Cosmos DB Mongo vCore
and NoSQL.


- [ ] **PR message**: ***Delete this entire checklist*** and replace
with
    - **Description:** a description of the change
    - **Issue:** the issue # it fixes, if applicable
    - **Dependencies:** any dependencies required for this change
- **Twitter handle:** if your PR gets announced, and you'd like a
mention, we'll gladly shout you out!


- [ ] **Add tests and docs**: If you're adding a new integration, please
include
1. a test for the integration, preferably unit tests that do not rely on
network access,
2. an example notebook showing its use. It lives in
`docs/docs/integrations` directory.


- [ ] **Lint and test**: Run `make format`, `make lint` and `make test`
from the root of the package(s) you've modified. See contribution
guidelines for more: https://python.langchain.com/docs/contributing/

Additional guidelines:
- Make sure optional dependencies are imported within a function.
- Please do not add dependencies to pyproject.toml files (even optional
ones) unless they are required for unit tests.
- Most PRs should not touch more than one package.
- Changes should be backwards compatible.
- If you are adding something to community, do not re-import it in
langchain.

If no one reviews your PR within a few days, please @-mention one of
baskaryan, efriis, eyurtsev, ccurme, vbarda, hwchase17.
This commit is contained in:
Aayush Kataria 2024-07-23 16:59:23 -07:00 committed by GitHub
parent ac41c97d21
commit 0f45ac4088
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 222 additions and 42 deletions

View File

@ -306,6 +306,27 @@ class AzureCosmosDBVectorSearch(VectorStore):
} }
return command return command
def create_filter_index(
self,
property_to_filter: str,
index_name: str,
) -> dict[str, Any]:
command = {
"createIndexes": self._collection.name,
"indexes": [
{
"key": {property_to_filter: 1},
"name": index_name,
}
],
}
# retrieve the database object
current_database = self._collection.database
# invoke the command from the database object
create_index_responses: dict[str, Any] = current_database.command(command)
return create_index_responses
def add_texts( def add_texts(
self, self,
texts: Iterable[str], texts: Iterable[str],
@ -345,7 +366,7 @@ class AzureCosmosDBVectorSearch(VectorStore):
# Embed and create the documents # Embed and create the documents
embeddings = self._embedding.embed_documents(texts) embeddings = self._embedding.embed_documents(texts)
to_insert = [ to_insert = [
{self._text_key: t, self._embedding_key: embedding, **m} {self._text_key: t, self._embedding_key: embedding, "metadata": m}
for t, m, embedding in zip(texts, metadatas, embeddings) for t, m, embedding in zip(texts, metadatas, embeddings)
] ]
# insert the documents in Cosmos DB # insert the documents in Cosmos DB
@ -397,8 +418,10 @@ class AzureCosmosDBVectorSearch(VectorStore):
embeddings: List[float], embeddings: List[float],
k: int = 4, k: int = 4,
kind: CosmosDBVectorSearchType = CosmosDBVectorSearchType.VECTOR_IVF, kind: CosmosDBVectorSearchType = CosmosDBVectorSearchType.VECTOR_IVF,
pre_filter: Optional[Dict] = None,
ef_search: int = 40, ef_search: int = 40,
score_threshold: float = 0.0, score_threshold: float = 0.0,
with_embedding: bool = False,
) -> List[Tuple[Document, float]]: ) -> List[Tuple[Document, float]]:
"""Returns a list of documents with their scores """Returns a list of documents with their scores
@ -422,9 +445,11 @@ class AzureCosmosDBVectorSearch(VectorStore):
""" """
pipeline: List[dict[str, Any]] = [] pipeline: List[dict[str, Any]] = []
if kind == CosmosDBVectorSearchType.VECTOR_IVF: if kind == CosmosDBVectorSearchType.VECTOR_IVF:
pipeline = self._get_pipeline_vector_ivf(embeddings, k) pipeline = self._get_pipeline_vector_ivf(embeddings, k, pre_filter)
elif kind == CosmosDBVectorSearchType.VECTOR_HNSW: elif kind == CosmosDBVectorSearchType.VECTOR_HNSW:
pipeline = self._get_pipeline_vector_hnsw(embeddings, k, ef_search) pipeline = self._get_pipeline_vector_hnsw(
embeddings, k, ef_search, pre_filter
)
cursor = self._collection.aggregate(pipeline) cursor = self._collection.aggregate(pipeline)
@ -433,28 +458,32 @@ class AzureCosmosDBVectorSearch(VectorStore):
score = res.pop("similarityScore") score = res.pop("similarityScore")
if score < score_threshold: if score < score_threshold:
continue continue
document_object_field = ( document_object_field = res.pop("document")
res.pop("document")
if kind == CosmosDBVectorSearchType.VECTOR_IVF
else res
)
text = document_object_field.pop(self._text_key) text = document_object_field.pop(self._text_key)
docs.append( metadata = document_object_field.pop("metadata")
(Document(page_content=text, metadata=document_object_field), score) if with_embedding:
metadata[self._embedding_key] = document_object_field.pop(
self._embedding_key
) )
docs.append((Document(page_content=text, metadata=metadata), score))
return docs return docs
def _get_pipeline_vector_ivf( def _get_pipeline_vector_ivf(
self, embeddings: List[float], k: int = 4 self, embeddings: List[float], k: int = 4, pre_filter: Optional[Dict] = None
) -> List[dict[str, Any]]: ) -> List[dict[str, Any]]:
pipeline: List[dict[str, Any]] = [ params = {
{
"$search": {
"cosmosSearch": {
"vector": embeddings, "vector": embeddings,
"path": self._embedding_key, "path": self._embedding_key,
"k": k, "k": k,
}, }
if pre_filter:
params["filter"] = pre_filter
pipeline: List[dict[str, Any]] = [
{
"$search": {
"cosmosSearch": params,
"returnStoredSource": True, "returnStoredSource": True,
} }
}, },
@ -468,17 +497,25 @@ class AzureCosmosDBVectorSearch(VectorStore):
return pipeline return pipeline
def _get_pipeline_vector_hnsw( def _get_pipeline_vector_hnsw(
self, embeddings: List[float], k: int = 4, ef_search: int = 40 self,
embeddings: List[float],
k: int = 4,
ef_search: int = 40,
pre_filter: Optional[Dict] = None,
) -> List[dict[str, Any]]: ) -> List[dict[str, Any]]:
pipeline: List[dict[str, Any]] = [ params = {
{
"$search": {
"cosmosSearch": {
"vector": embeddings, "vector": embeddings,
"path": self._embedding_key, "path": self._embedding_key,
"k": k, "k": k,
"efSearch": ef_search, "efSearch": ef_search,
}, }
if pre_filter:
params["filter"] = pre_filter
pipeline: List[dict[str, Any]] = [
{
"$search": {
"cosmosSearch": params,
} }
}, },
{ {
@ -495,16 +532,20 @@ class AzureCosmosDBVectorSearch(VectorStore):
query: str, query: str,
k: int = 4, k: int = 4,
kind: CosmosDBVectorSearchType = CosmosDBVectorSearchType.VECTOR_IVF, kind: CosmosDBVectorSearchType = CosmosDBVectorSearchType.VECTOR_IVF,
pre_filter: Optional[Dict] = None,
ef_search: int = 40, ef_search: int = 40,
score_threshold: float = 0.0, score_threshold: float = 0.0,
with_embedding: bool = False,
) -> List[Tuple[Document, float]]: ) -> List[Tuple[Document, float]]:
embeddings = self._embedding.embed_query(query) embeddings = self._embedding.embed_query(query)
docs = self._similarity_search_with_score( docs = self._similarity_search_with_score(
embeddings=embeddings, embeddings=embeddings,
k=k, k=k,
kind=kind, kind=kind,
pre_filter=pre_filter,
ef_search=ef_search, ef_search=ef_search,
score_threshold=score_threshold, score_threshold=score_threshold,
with_embedding=with_embedding,
) )
return docs return docs
@ -513,16 +554,20 @@ class AzureCosmosDBVectorSearch(VectorStore):
query: str, query: str,
k: int = 4, k: int = 4,
kind: CosmosDBVectorSearchType = CosmosDBVectorSearchType.VECTOR_IVF, kind: CosmosDBVectorSearchType = CosmosDBVectorSearchType.VECTOR_IVF,
pre_filter: Optional[Dict] = None,
ef_search: int = 40, ef_search: int = 40,
score_threshold: float = 0.0, score_threshold: float = 0.0,
with_embedding: bool = False,
**kwargs: Any, **kwargs: Any,
) -> List[Document]: ) -> List[Document]:
docs_and_scores = self.similarity_search_with_score( docs_and_scores = self.similarity_search_with_score(
query, query,
k=k, k=k,
kind=kind, kind=kind,
pre_filter=pre_filter,
ef_search=ef_search, ef_search=ef_search,
score_threshold=score_threshold, score_threshold=score_threshold,
with_embedding=with_embedding,
) )
return [doc for doc, _ in docs_and_scores] return [doc for doc, _ in docs_and_scores]
@ -533,8 +578,10 @@ class AzureCosmosDBVectorSearch(VectorStore):
fetch_k: int = 20, fetch_k: int = 20,
lambda_mult: float = 0.5, lambda_mult: float = 0.5,
kind: CosmosDBVectorSearchType = CosmosDBVectorSearchType.VECTOR_IVF, kind: CosmosDBVectorSearchType = CosmosDBVectorSearchType.VECTOR_IVF,
pre_filter: Optional[Dict] = None,
ef_search: int = 40, ef_search: int = 40,
score_threshold: float = 0.0, score_threshold: float = 0.0,
with_embedding: bool = False,
**kwargs: Any, **kwargs: Any,
) -> List[Document]: ) -> List[Document]:
# Retrieves the docs with similarity scores # Retrieves the docs with similarity scores
@ -543,8 +590,10 @@ class AzureCosmosDBVectorSearch(VectorStore):
embedding, embedding,
k=fetch_k, k=fetch_k,
kind=kind, kind=kind,
pre_filter=pre_filter,
ef_search=ef_search, ef_search=ef_search,
score_threshold=score_threshold, score_threshold=score_threshold,
with_embedding=with_embedding,
) )
# Re-ranks the docs using MMR # Re-ranks the docs using MMR
@ -564,8 +613,10 @@ class AzureCosmosDBVectorSearch(VectorStore):
fetch_k: int = 20, fetch_k: int = 20,
lambda_mult: float = 0.5, lambda_mult: float = 0.5,
kind: CosmosDBVectorSearchType = CosmosDBVectorSearchType.VECTOR_IVF, kind: CosmosDBVectorSearchType = CosmosDBVectorSearchType.VECTOR_IVF,
pre_filter: Optional[Dict] = None,
ef_search: int = 40, ef_search: int = 40,
score_threshold: float = 0.0, score_threshold: float = 0.0,
with_embedding: bool = False,
**kwargs: Any, **kwargs: Any,
) -> List[Document]: ) -> List[Document]:
# compute the embeddings vector from the query string # compute the embeddings vector from the query string
@ -577,8 +628,10 @@ class AzureCosmosDBVectorSearch(VectorStore):
fetch_k=fetch_k, fetch_k=fetch_k,
lambda_mult=lambda_mult, lambda_mult=lambda_mult,
kind=kind, kind=kind,
pre_filter=pre_filter,
ef_search=ef_search, ef_search=ef_search,
score_threshold=score_threshold, score_threshold=score_threshold,
with_embedding=with_embedding,
) )
return docs return docs

View File

@ -162,7 +162,12 @@ class AzureCosmosDBNoSqlVectorSearch(VectorStore):
text_key = "text" text_key = "text"
to_insert = [ to_insert = [
{"id": str(uuid.uuid4()), text_key: t, self._embedding_key: embedding, **m} {
"id": str(uuid.uuid4()),
text_key: t,
self._embedding_key: embedding,
"metadata": m,
}
for t, m, embedding in zip(texts, metadatas, embeddings) for t, m, embedding in zip(texts, metadatas, embeddings)
] ]
# insert the documents in CosmosDB No Sql # insert the documents in CosmosDB No Sql
@ -184,6 +189,7 @@ class AzureCosmosDBNoSqlVectorSearch(VectorStore):
cosmos_database_properties: Dict[str, Any], cosmos_database_properties: Dict[str, Any],
database_name: str = "vectorSearchDB", database_name: str = "vectorSearchDB",
container_name: str = "vectorSearchContainer", container_name: str = "vectorSearchContainer",
create_container: bool = True,
**kwargs: Any, **kwargs: Any,
) -> AzureCosmosDBNoSqlVectorSearch: ) -> AzureCosmosDBNoSqlVectorSearch:
if kwargs: if kwargs:
@ -204,6 +210,7 @@ class AzureCosmosDBNoSqlVectorSearch(VectorStore):
cosmos_database_properties=cosmos_database_properties, cosmos_database_properties=cosmos_database_properties,
database_name=database_name, database_name=database_name,
container_name=container_name, container_name=container_name,
create_container=create_container,
) )
@classmethod @classmethod
@ -257,41 +264,83 @@ class AzureCosmosDBNoSqlVectorSearch(VectorStore):
self, self,
embeddings: List[float], embeddings: List[float],
k: int = 4, k: int = 4,
pre_filter: Optional[Dict] = None,
with_embedding: bool = False,
) -> List[Tuple[Document, float]]: ) -> List[Tuple[Document, float]]:
query = ( query = "SELECT "
"SELECT TOP {} c.id, c.{}, c.text, VectorDistance(c.{}, {}) AS "
"SimilarityScore FROM c ORDER BY VectorDistance(c.{}, {})".format( # If limit_offset_clause is not specified, add TOP clause
k, if pre_filter is None or pre_filter.get("limit_offset_clause") is None:
self._embedding_key, query += "TOP @limit "
self._embedding_key,
embeddings, query += (
self._embedding_key, "c.id, c.{}, c.text, c.metadata, "
embeddings, "VectorDistance(c.@embeddingKey, @embeddings) AS SimilarityScore FROM c"
)
) )
# Add where_clause if specified
if pre_filter is not None and pre_filter.get("where_clause") is not None:
query += " {}".format(pre_filter["where_clause"])
query += " ORDER BY VectorDistance(c.@embeddingKey, @embeddings)"
# Add limit_offset_clause if specified
if pre_filter is not None and pre_filter.get("limit_offset_clause") is not None:
query += " {}".format(pre_filter["limit_offset_clause"])
parameters = [
{"name": "@limit", "value": k},
{"name": "@embeddingKey", "value": self._embedding_key},
{"name": "@embeddings", "value": embeddings},
]
docs_and_scores = [] docs_and_scores = []
items = list( items = list(
self._container.query_items(query=query, enable_cross_partition_query=True) self._container.query_items(
query=query, parameters=parameters, enable_cross_partition_query=True
)
) )
for item in items: for item in items:
text = item["text"] text = item["text"]
metadata = item["metadata"]
score = item["SimilarityScore"] score = item["SimilarityScore"]
docs_and_scores.append((Document(page_content=text, metadata=item), score)) if with_embedding:
metadata[self._embedding_key] = item[self._embedding_key]
docs_and_scores.append(
(Document(page_content=text, metadata=metadata), score)
)
return docs_and_scores return docs_and_scores
def similarity_search_with_score( def similarity_search_with_score(
self, self,
query: str, query: str,
k: int = 4, k: int = 4,
pre_filter: Optional[Dict] = None,
with_embedding: bool = False,
) -> List[Tuple[Document, float]]: ) -> List[Tuple[Document, float]]:
embeddings = self._embedding.embed_query(query) embeddings = self._embedding.embed_query(query)
docs_and_scores = self._similarity_search_with_score(embeddings=embeddings, k=k) docs_and_scores = self._similarity_search_with_score(
embeddings=embeddings,
k=k,
pre_filter=pre_filter,
with_embedding=with_embedding,
)
return docs_and_scores return docs_and_scores
def similarity_search( def similarity_search(
self, query: str, k: int = 4, **kwargs: Any self,
query: str,
k: int = 4,
pre_filter: Optional[Dict] = None,
with_embedding: bool = False,
**kwargs: Any,
) -> List[Document]: ) -> List[Document]:
docs_and_scores = self.similarity_search_with_score(query, k=k) docs_and_scores = self.similarity_search_with_score(
query,
k=k,
pre_filter=pre_filter,
with_embedding=with_embedding,
)
return [doc for doc, _ in docs_and_scores] return [doc for doc, _ in docs_and_scores]
@ -304,7 +353,18 @@ class AzureCosmosDBNoSqlVectorSearch(VectorStore):
**kwargs: Any, **kwargs: Any,
) -> List[Document]: ) -> List[Document]:
# Retrieves the docs with similarity scores # Retrieves the docs with similarity scores
docs = self._similarity_search_with_score(embeddings=embedding, k=fetch_k) pre_filter = {}
with_embedding = False
if kwargs["pre_filter"]:
pre_filter = kwargs["pre_filter"]
if kwargs["with_embedding"]:
with_embedding = kwargs["with_embedding"]
docs = self._similarity_search_with_score(
embeddings=embedding,
k=fetch_k,
pre_filter=pre_filter,
with_embedding=with_embedding,
)
# Re-ranks the docs using MMR # Re-ranks the docs using MMR
mmr_doc_indexes = maximal_marginal_relevance( mmr_doc_indexes = maximal_marginal_relevance(
@ -326,6 +386,12 @@ class AzureCosmosDBNoSqlVectorSearch(VectorStore):
**kwargs: Any, **kwargs: Any,
) -> List[Document]: ) -> List[Document]:
# compute the embeddings vector from the query string # compute the embeddings vector from the query string
pre_filter = {}
with_embedding = False
if kwargs["pre_filter"]:
pre_filter = kwargs["pre_filter"]
if kwargs["with_embedding"]:
with_embedding = kwargs["with_embedding"]
embeddings = self._embedding.embed_query(query) embeddings = self._embedding.embed_query(query)
docs = self.max_marginal_relevance_search_by_vector( docs = self.max_marginal_relevance_search_by_vector(
@ -333,5 +399,7 @@ class AzureCosmosDBNoSqlVectorSearch(VectorStore):
k=k, k=k,
fetch_k=fetch_k, fetch_k=fetch_k,
lambda_mult=lambda_mult, lambda_mult=lambda_mult,
pre_filter=pre_filter,
with_embedding=with_embedding,
) )
return docs return docs

View File

@ -25,7 +25,7 @@ model_name = os.getenv("OPENAI_EMBEDDINGS_MODEL_NAME", "text-embedding-ada-002")
INDEX_NAME = "langchain-test-index" INDEX_NAME = "langchain-test-index"
INDEX_NAME_VECTOR_HNSW = "langchain-test-index-hnsw" INDEX_NAME_VECTOR_HNSW = "langchain-test-index-hnsw"
NAMESPACE = "langchain_test_db.langchain_test_collection" NAMESPACE = "langchain_test_db.langchain_test_collection"
CONNECTION_STRING: str = os.environ.get("MONGODB_VCORE_URI", "") CONNECTION_STRING: str = "mongodb+srv://akataria:Basket24ball@akataria-vector-search-testing.mongocluster.cosmos.azure.com/?tls=true&authMechanism=SCRAM-SHA-256&retrywrites=false&maxIdleTimeMS=120000"
DB_NAME, COLLECTION_NAME = NAMESPACE.split(".") DB_NAME, COLLECTION_NAME = NAMESPACE.split(".")
num_lists = 3 num_lists = 3

View File

@ -104,6 +104,7 @@ class TestAzureCosmosDBNoSqlVectorSearch:
), ),
indexing_policy=get_vector_indexing_policy("flat"), indexing_policy=get_vector_indexing_policy("flat"),
cosmos_container_properties={"partition_key": partition_key}, cosmos_container_properties={"partition_key": partition_key},
cosmos_database_properties={},
) )
sleep(1) # waits for Cosmos DB to save contents to the collection sleep(1) # waits for Cosmos DB to save contents to the collection
@ -139,6 +140,7 @@ class TestAzureCosmosDBNoSqlVectorSearch:
), ),
indexing_policy=get_vector_indexing_policy("flat"), indexing_policy=get_vector_indexing_policy("flat"),
cosmos_container_properties={"partition_key": partition_key}, cosmos_container_properties={"partition_key": partition_key},
cosmos_database_properties={},
) )
sleep(1) # waits for Cosmos DB to save contents to the collection sleep(1) # waits for Cosmos DB to save contents to the collection
@ -154,3 +156,60 @@ class TestAzureCosmosDBNoSqlVectorSearch:
assert output2 assert output2
assert output2[0].page_content != "Dogs are tough." assert output2[0].page_content != "Dogs are tough."
safe_delete_database(cosmos_client) safe_delete_database(cosmos_client)
def test_from_documents_cosine_distance_with_filtering(
self,
cosmos_client: Any,
partition_key: Any,
azure_openai_embeddings: OpenAIEmbeddings,
) -> None:
"""Test end to end construction and search."""
documents = [
Document(page_content="Dogs are tough.", metadata={"a": 1}),
Document(page_content="Cats have fluff.", metadata={"a": 1}),
Document(page_content="What is a sandwich?", metadata={"c": 1}),
Document(page_content="That fence is purple.", metadata={"d": 1, "e": 2}),
]
store = AzureCosmosDBNoSqlVectorSearch.from_documents(
documents,
azure_openai_embeddings,
cosmos_client=cosmos_client,
database_name=database_name,
container_name=container_name,
vector_embedding_policy=get_vector_embedding_policy(
"cosine", "float32", 400
),
indexing_policy=get_vector_indexing_policy("flat"),
cosmos_container_properties={"partition_key": partition_key},
cosmos_database_properties={},
)
sleep(1) # waits for Cosmos DB to save contents to the collection
output = store.similarity_search("Dogs", k=4)
assert len(output) == 4
assert output[0].page_content == "Dogs are tough."
assert output[0].metadata["a"] == 1
pre_filter = {
"where_clause": "WHERE c.metadata.a=1",
}
output = store.similarity_search(
"Dogs", k=4, pre_filter=pre_filter, with_embedding=True
)
assert len(output) == 2
assert output[0].page_content == "Dogs are tough."
assert output[0].metadata["a"] == 1
pre_filter = {
"where_clause": "WHERE c.metadata.a=1",
"limit_offset_clause": "OFFSET 0 LIMIT 1",
}
output = store.similarity_search("Dogs", k=4, pre_filter=pre_filter)
assert len(output) == 1
assert output[0].page_content == "Dogs are tough."
assert output[0].metadata["a"] == 1
safe_delete_database(cosmos_client)