Community: Azure CosmosDB No Sql Vector Store: Full Text and Hybrid Search Support (#28716)

Thank you for contributing to LangChain!

- Added [full
text](https://learn.microsoft.com/en-us/azure/cosmos-db/gen-ai/full-text-search)
and [hybrid
search](https://learn.microsoft.com/en-us/azure/cosmos-db/gen-ai/hybrid-search)
support for Azure CosmosDB NoSql Vector Store
- Added a new enum called CosmosDBQueryType which supports the following
values:
    - VECTOR = "vector"
    - FULL_TEXT_SEARCH = "full_text_search"
    - FULL_TEXT_RANK = "full_text_rank"
    - HYBRID = "hybrid"
- User now needs to provide this query_type to the similarity_search
method for the vectorStore to make the correct query api call.
- Added a couple of work arounds as for the FULL_TEXT_RANK and HYBRID
query functions we don't support parameterized queries right now. I have
added TODO's in place, and will remove these work arounds by end of
January.
- Added necessary test cases and updated the 


- [x] **Add tests and docs**: If you're adding a new integration, please
include
1. a test for the integration, preferably unit tests that do not rely on
network access,
2. an example notebook showing its use. It lives in
`docs/docs/integrations` directory.


- [x] **Lint and test**: Run `make format`, `make lint` and `make test`
from the root of the package(s) you've modified. See contribution
guidelines for more: https://python.langchain.com/docs/contributing/

Additional guidelines:
- Make sure optional dependencies are imported within a function.
- Please do not add dependencies to pyproject.toml files (even optional
ones) unless they are required for unit tests.
- Most PRs should not touch more than one package.
- Changes should be backwards compatible.
- If you are adding something to community, do not re-import it in
langchain.

If no one reviews your PR within a few days, please @-mention one of
baskaryan, efriis, eyurtsev, ccurme, vbarda, hwchase17.

---------

Co-authored-by: Erick Friis <erickfriis@gmail.com>
This commit is contained in:
Aayush Kataria 2024-12-15 13:26:32 -08:00 committed by GitHub
parent 4c1871d9a8
commit d417e4b372
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 1269 additions and 170 deletions

File diff suppressed because one or more lines are too long

View File

@ -131,6 +131,7 @@ class AzureCosmosDBVectorSearch(VectorStore):
connection_string: The MongoDB vCore instance connection string
namespace: The namespace (database.collection)
embedding: The embedding utility
application_name: The user agent for telemetry
**kwargs: Dynamic keyword arguments
Returns:

View File

@ -2,17 +2,42 @@ from __future__ import annotations
import uuid
import warnings
from enum import Enum
from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Tuple
import numpy as np
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_core.vectorstores import VectorStore
from pydantic import BaseModel, Field
from langchain_community.vectorstores.utils import maximal_marginal_relevance
if TYPE_CHECKING:
from azure.cosmos.cosmos_client import CosmosClient
from azure.cosmos import CosmosClient
from azure.identity import DefaultAzureCredential
USER_AGENT = ("LangChain-CDBNoSql-VectorStore-Python",)
class Condition(BaseModel):
property: str
operator: str
value: Any
class PreFilter(BaseModel):
conditions: List[Condition] = Field(default_factory=list)
logical_operator: Optional[str] = None
class CosmosDBQueryType(str, Enum):
"""CosmosDB Query Type"""
VECTOR = "vector"
FULL_TEXT_SEARCH = "full_text_search"
FULL_TEXT_RANK = "full_text_rank"
HYBRID = "hybrid"
class AzureCosmosDBNoSqlVectorSearch(VectorStore):
@ -21,8 +46,11 @@ class AzureCosmosDBNoSqlVectorSearch(VectorStore):
To use, you should have both:
- the ``azure-cosmos`` python package installed
You can read more about vector search using AzureCosmosDBNoSQL here:
You can read more about vector search, full text search
and hybrid search using AzureCosmosDBNoSQL here:
https://learn.microsoft.com/en-us/azure/cosmos-db/nosql/vector-search
https://learn.microsoft.com/en-us/azure/cosmos-db/gen-ai/full-text-search
https://learn.microsoft.com/en-us/azure/cosmos-db/gen-ai/hybrid-search
"""
def __init__(
@ -34,9 +62,14 @@ class AzureCosmosDBNoSqlVectorSearch(VectorStore):
indexing_policy: Dict[str, Any],
cosmos_container_properties: Dict[str, Any],
cosmos_database_properties: Dict[str, Any],
full_text_policy: Optional[Dict[str, Any]] = None,
database_name: str = "vectorSearchDB",
container_name: str = "vectorSearchContainer",
text_key: str = "text",
embedding_key: str = "embedding",
metadata_key: str = "metadata",
create_container: bool = True,
full_text_search_enabled: bool = False,
):
"""
Constructor for AzureCosmosDBNoSqlVectorSearch
@ -47,30 +80,42 @@ class AzureCosmosDBNoSqlVectorSearch(VectorStore):
container_name: Name of the container to be created.
embedding: Text embedding model to use.
vector_embedding_policy: Vector Embedding Policy for the container.
full_text_policy: Full Text Policy for the container.
indexing_policy: Indexing Policy for the container.
cosmos_container_properties: Container Properties for the container.
cosmos_database_properties: Database Properties for the container.
text_key: Text key to use for text property which will be
embedded in the data schema.
embedding_key: Embedding key to use for vector embedding.
metadata_key: Metadata key to use for data schema.
create_container: Set to true if the container does not exist.
full_text_search_enabled: Set to true if the full text search is enabled.
"""
self._cosmos_client = cosmos_client
self._database_name = database_name
self._container_name = container_name
self._embedding = embedding
self._vector_embedding_policy = vector_embedding_policy
self._full_text_policy = full_text_policy
self._indexing_policy = indexing_policy
self._cosmos_container_properties = cosmos_container_properties
self._cosmos_database_properties = cosmos_database_properties
self._text_key = text_key
self._embedding_key = embedding_key
self._metadata_key = metadata_key
self._create_container = create_container
self._full_text_search_enabled = full_text_search_enabled
if self._create_container:
if (
indexing_policy["vectorIndexes"] is None
or len(indexing_policy["vectorIndexes"]) == 0
self._indexing_policy["vectorIndexes"] is None
or len(self._indexing_policy["vectorIndexes"]) == 0
):
raise ValueError(
"vectorIndexes cannot be null or empty in the indexing_policy."
)
if (
vector_embedding_policy is None
self._vector_embedding_policy is None
or len(vector_embedding_policy["vectorEmbeddings"]) == 0
):
raise ValueError(
@ -81,6 +126,23 @@ class AzureCosmosDBNoSqlVectorSearch(VectorStore):
raise ValueError(
"partition_key cannot be null or empty for a container."
)
if self._full_text_search_enabled:
if (
self._indexing_policy["fullTextIndexes"] is None
or len(self._indexing_policy["fullTextIndexes"]) == 0
):
raise ValueError(
"fullTextIndexes cannot be null or empty in the "
"indexing_policy if full text search is enabled."
)
if (
self._full_text_policy is None
or len(self._full_text_policy["fullTextPaths"]) == 0
):
raise ValueError(
"fullTextPaths cannot be null or empty in the "
"full_text_policy if full text search is enabled."
)
# Create the database if it already doesn't exist
self._database = self._cosmos_client.create_database_if_not_exists(
@ -116,12 +178,9 @@ class AzureCosmosDBNoSqlVectorSearch(VectorStore):
session_token=self._cosmos_container_properties.get("session_token"),
initial_headers=self._cosmos_container_properties.get("initial_headers"),
vector_embedding_policy=self._vector_embedding_policy,
full_text_policy=self._full_text_policy,
)
self._embedding_key = self._vector_embedding_policy["vectorEmbeddings"][0][
"path"
][1:]
def add_texts(
self,
texts: Iterable[str],
@ -187,9 +246,14 @@ class AzureCosmosDBNoSqlVectorSearch(VectorStore):
indexing_policy: Dict[str, Any],
cosmos_container_properties: Dict[str, Any],
cosmos_database_properties: Dict[str, Any],
full_text_policy: Optional[Dict[str, Any]] = None,
database_name: str = "vectorSearchDB",
container_name: str = "vectorSearchContainer",
text_key: str = "text",
embedding_key: str = "embedding",
metadata_key: str = "metadata",
create_container: bool = True,
full_text_search_enabled: bool = False,
**kwargs: Any,
) -> AzureCosmosDBNoSqlVectorSearch:
if kwargs:
@ -205,12 +269,17 @@ class AzureCosmosDBNoSqlVectorSearch(VectorStore):
embedding=embedding,
cosmos_client=cosmos_client,
vector_embedding_policy=vector_embedding_policy,
full_text_policy=full_text_policy,
indexing_policy=indexing_policy,
cosmos_container_properties=cosmos_container_properties,
cosmos_database_properties=cosmos_database_properties,
database_name=database_name,
container_name=container_name,
text_key=text_key,
embedding_key=embedding_key,
metadata_key=metadata_key,
create_container=create_container,
full_text_search_enabled=full_text_search_enabled,
)
@classmethod
@ -242,6 +311,46 @@ class AzureCosmosDBNoSqlVectorSearch(VectorStore):
)
return vectorstore
@classmethod
def from_connection_string_and_aad(
cls,
connection_string: str,
defaultAzureCredential: DefaultAzureCredential,
texts: List[str],
embedding: Embeddings,
metadatas: Optional[List[dict]] = None,
**kwargs: Any,
) -> AzureCosmosDBNoSqlVectorSearch:
cosmos_client = CosmosClient(
connection_string, defaultAzureCredential, user_agent=USER_AGENT
)
kwargs["cosmos_client"] = cosmos_client
vectorstore = cls._from_kwargs(embedding, **kwargs)
vectorstore.add_texts(
texts=texts,
metadatas=metadatas,
)
return vectorstore
@classmethod
def from_connection_string_and_key(
cls,
connection_string: str,
key: str,
texts: List[str],
embedding: Embeddings,
metadatas: Optional[List[dict]] = None,
**kwargs: Any,
) -> AzureCosmosDBNoSqlVectorSearch:
cosmos_client = CosmosClient(connection_string, key, user_agent=USER_AGENT)
kwargs["cosmos_client"] = cosmos_client
vectorstore = cls._from_kwargs(embedding, **kwargs)
vectorstore.add_texts(
texts=texts,
metadatas=metadatas,
)
return vectorstore
def delete(self, ids: Optional[List[str]] = None, **kwargs: Any) -> Optional[bool]:
if ids is None:
raise ValueError("No document ids provided to delete.")
@ -262,85 +371,169 @@ class AzureCosmosDBNoSqlVectorSearch(VectorStore):
def _similarity_search_with_score(
self,
query_type: CosmosDBQueryType,
embeddings: List[float],
k: int = 4,
pre_filter: Optional[Dict] = None,
pre_filter: Optional[PreFilter] = None,
with_embedding: bool = False,
offset_limit: Optional[str] = None,
*,
projection_mapping: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
query = "SELECT "
# If limit_offset_clause is not specified, add TOP clause
if pre_filter is None or pre_filter.get("limit_offset_clause") is None:
query += "TOP @limit "
query += (
"c.id, c[@embeddingKey], c.text, c.metadata, "
"VectorDistance(c[@embeddingKey], @embeddings) AS SimilarityScore FROM c"
query, parameters = self._construct_query(
k=k,
query_type=query_type,
embeddings=embeddings,
pre_filter=pre_filter,
offset_limit=offset_limit,
projection_mapping=projection_mapping,
)
# Add where_clause if specified
if pre_filter is not None and pre_filter.get("where_clause") is not None:
query += " {}".format(pre_filter["where_clause"])
query += " ORDER BY VectorDistance(c[@embeddingKey], @embeddings)"
# Add limit_offset_clause if specified
if pre_filter is not None and pre_filter.get("limit_offset_clause") is not None:
query += " {}".format(pre_filter["limit_offset_clause"])
parameters = [
{"name": "@limit", "value": k},
{"name": "@embeddingKey", "value": self._embedding_key},
{"name": "@embeddings", "value": embeddings},
]
docs_and_scores = []
items = list(
self._container.query_items(
query=query, parameters=parameters, enable_cross_partition_query=True
)
return self._execute_query(
query=query,
query_type=query_type,
parameters=parameters,
with_embedding=with_embedding,
projection_mapping=projection_mapping,
)
def _full_text_search(
self,
query_type: CosmosDBQueryType,
search_text: Optional[str] = None,
k: int = 4,
pre_filter: Optional[PreFilter] = None,
offset_limit: Optional[str] = None,
*,
projection_mapping: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
query, parameters = self._construct_query(
k=k,
query_type=query_type,
search_text=search_text,
pre_filter=pre_filter,
offset_limit=offset_limit,
projection_mapping=projection_mapping,
)
return self._execute_query(
query=query,
query_type=query_type,
parameters=parameters,
with_embedding=False,
projection_mapping=projection_mapping,
)
def _hybrid_search_with_score(
self,
query_type: CosmosDBQueryType,
embeddings: List[float],
search_text: str,
k: int = 4,
pre_filter: Optional[PreFilter] = None,
with_embedding: bool = False,
offset_limit: Optional[str] = None,
*,
projection_mapping: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
query, parameters = self._construct_query(
k=k,
query_type=query_type,
embeddings=embeddings,
search_text=search_text,
pre_filter=pre_filter,
offset_limit=offset_limit,
projection_mapping=projection_mapping,
)
return self._execute_query(
query=query,
query_type=query_type,
parameters=parameters,
with_embedding=with_embedding,
projection_mapping=projection_mapping,
)
for item in items:
text = item["text"]
metadata = item["metadata"]
score = item["SimilarityScore"]
if with_embedding:
metadata[self._embedding_key] = item[self._embedding_key]
docs_and_scores.append(
(Document(page_content=text, metadata=metadata), score)
)
return docs_and_scores
def similarity_search_with_score(
self,
query: str,
k: int = 4,
pre_filter: Optional[Dict] = None,
pre_filter: Optional[PreFilter] = None,
with_embedding: bool = False,
query_type: CosmosDBQueryType = CosmosDBQueryType.VECTOR,
offset_limit: Optional[str] = None,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
embeddings = self._embedding.embed_query(query)
docs_and_scores = self._similarity_search_with_score(
embeddings=embeddings,
k=k,
pre_filter=pre_filter,
with_embedding=with_embedding,
)
docs_and_scores = []
if query_type == CosmosDBQueryType.VECTOR:
docs_and_scores = self._similarity_search_with_score(
query_type=query_type,
embeddings=embeddings,
k=k,
pre_filter=pre_filter,
with_embedding=with_embedding,
offset_limit=offset_limit,
**kwargs,
)
elif query_type == CosmosDBQueryType.FULL_TEXT_SEARCH:
docs_and_scores = self._full_text_search(
k=k,
query_type=query_type,
pre_filter=pre_filter,
offset_limit=offset_limit,
**kwargs,
)
elif query_type == CosmosDBQueryType.FULL_TEXT_RANK:
docs_and_scores = self._full_text_search(
search_text=query,
k=k,
query_type=query_type,
pre_filter=pre_filter,
offset_limit=offset_limit,
**kwargs,
)
elif query_type == CosmosDBQueryType.HYBRID:
docs_and_scores = self._hybrid_search_with_score(
query_type=query_type,
embeddings=embeddings,
search_text=query,
k=k,
pre_filter=pre_filter,
with_embedding=with_embedding,
offset_limit=offset_limit,
**kwargs,
)
return docs_and_scores
def similarity_search(
self,
query: str,
k: int = 4,
pre_filter: Optional[Dict] = None,
pre_filter: Optional[PreFilter] = None,
with_embedding: bool = False,
query_type: CosmosDBQueryType = CosmosDBQueryType.VECTOR,
offset_limit: Optional[str] = None,
**kwargs: Any,
) -> List[Document]:
docs_and_scores = self.similarity_search_with_score(
query,
k=k,
pre_filter=pre_filter,
with_embedding=with_embedding,
)
if query_type not in CosmosDBQueryType.__members__.values():
raise ValueError(
f"Invalid query_type: {query_type}. "
f"Expected one of: {', '.join(t.value for t in CosmosDBQueryType)}."
)
else:
docs_and_scores = self.similarity_search_with_score(
query,
k=k,
pre_filter=pre_filter,
with_embedding=with_embedding,
query_type=query_type,
offset_limit=offset_limit,
kwargs=kwargs,
)
return [doc for doc, _ in docs_and_scores]
@ -350,18 +543,20 @@ class AzureCosmosDBNoSqlVectorSearch(VectorStore):
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
query_type: CosmosDBQueryType = CosmosDBQueryType.VECTOR,
pre_filter: Optional[PreFilter] = None,
with_embedding: bool = False,
**kwargs: Any,
) -> List[Document]:
# Retrieves the docs with similarity scores
pre_filter = {}
with_embedding = False
if kwargs["pre_filter"]:
pre_filter = kwargs["pre_filter"]
if kwargs["with_embedding"]:
with_embedding = kwargs["with_embedding"]
# if kwargs["pre_filter"]:
# pre_filter = kwargs["pre_filter"]
# if kwargs["with_embedding"]:
# with_embedding = kwargs["with_embedding"]
docs = self._similarity_search_with_score(
embeddings=embedding,
k=fetch_k,
query_type=query_type,
pre_filter=pre_filter,
with_embedding=with_embedding,
)
@ -383,15 +578,16 @@ class AzureCosmosDBNoSqlVectorSearch(VectorStore):
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
query_type: CosmosDBQueryType = CosmosDBQueryType.VECTOR,
pre_filter: Optional[PreFilter] = None,
with_embedding: bool = False,
**kwargs: Any,
) -> List[Document]:
# compute the embeddings vector from the query string
pre_filter = {}
with_embedding = False
if kwargs["pre_filter"]:
pre_filter = kwargs["pre_filter"]
if kwargs["with_embedding"]:
with_embedding = kwargs["with_embedding"]
# if kwargs["pre_filter"]:
# pre_filter = kwargs["pre_filter"]
# if kwargs["with_embedding"]:
# with_embedding = kwargs["with_embedding"]
embeddings = self._embedding.embed_query(query)
docs = self.max_marginal_relevance_search_by_vector(
@ -400,6 +596,266 @@ class AzureCosmosDBNoSqlVectorSearch(VectorStore):
fetch_k=fetch_k,
lambda_mult=lambda_mult,
pre_filter=pre_filter,
query_type=query_type,
with_embedding=with_embedding,
)
return docs
def _construct_query(
self,
k: int,
query_type: CosmosDBQueryType,
embeddings: Optional[List[float]] = None,
search_text: Optional[str] = None,
pre_filter: Optional[PreFilter] = None,
offset_limit: Optional[str] = None,
projection_mapping: Optional[Dict[str, Any]] = None,
) -> Tuple[str, List[Dict[str, Any]]]:
if (
query_type == CosmosDBQueryType.FULL_TEXT_RANK
or query_type == CosmosDBQueryType.HYBRID
):
query = f"SELECT {'TOP ' + str(k) + ' ' if not offset_limit else ''}"
else:
query = f"""SELECT {'TOP @limit ' if not offset_limit else ''}"""
query += self._generate_projection_fields(
projection_mapping, query_type, embeddings
)
query += " FROM c "
# Add where_clause if specified
if pre_filter:
where_clause = self._build_where_clause(pre_filter)
query += f"""{where_clause}"""
# TODO: Update the code to use parameters once parametrized queries
# are allowed for these query functions
if query_type == CosmosDBQueryType.FULL_TEXT_RANK:
if search_text is None:
raise ValueError(
"search text cannot be None for FULL_TEXT_RANK queries."
)
query += f""" ORDER BY RANK FullTextScore(c.{self._text_key},
[{", ".join(f"'{term}'" for term in search_text.split())}])"""
elif query_type == CosmosDBQueryType.VECTOR:
query += " ORDER BY VectorDistance(c[@embeddingKey], @embeddings)"
elif query_type == CosmosDBQueryType.HYBRID:
if search_text is None:
raise ValueError("search text cannot be None for HYBRID queries.")
query += f""" ORDER BY RANK RRF(FullTextScore(c.{self._text_key},
[{", ".join(f"'{term}'" for term in search_text.split())}]),
VectorDistance(c.{self._embedding_key}, {embeddings}))"""
else:
query += ""
# Add limit_offset_clause if specified
if offset_limit is not None:
query += f""" {offset_limit}"""
# TODO: Remove this if check once parametrized queries
# are allowed for these query functions
parameters = []
if (
query_type == CosmosDBQueryType.FULL_TEXT_SEARCH
or query_type == CosmosDBQueryType.VECTOR
):
parameters = self._build_parameters(
k=k,
query_type=query_type,
embeddings=embeddings,
projection_mapping=projection_mapping,
)
return query, parameters
def _generate_projection_fields(
self,
projection_mapping: Optional[Dict[str, Any]],
query_type: CosmosDBQueryType,
embeddings: Optional[List[float]] = None,
) -> str:
# TODO: Remove this if check once parametrized queries
# are allowed for these query functions
if (
query_type == CosmosDBQueryType.FULL_TEXT_RANK
or query_type == CosmosDBQueryType.HYBRID
):
if projection_mapping:
projection = ", ".join(
f"c.{key} as {alias}" for key, alias in projection_mapping.items()
)
else:
projection = (
f"c.id, c.{self._text_key} as text, "
f"c.{self._metadata_key} as metadata"
)
if query_type == CosmosDBQueryType.HYBRID:
projection += (
f", c.{self._embedding_key} as embedding, "
f"VectorDistance(c.{self._embedding_key}, "
f"{embeddings}) as SimilarityScore"
)
else:
if projection_mapping:
projection = ", ".join(
f"c.[@{key}] as {alias}"
for key, alias in projection_mapping.items()
)
else:
projection = "c.id, c[@textKey] as text, c[@metadataKey] as metadata"
if (
query_type == CosmosDBQueryType.VECTOR
or query_type == CosmosDBQueryType.HYBRID
):
projection += (
", c[@embeddingKey] as embedding, "
"VectorDistance(c[@embeddingKey], "
"@embeddings) as SimilarityScore"
)
return projection
def _build_parameters(
self,
k: int,
query_type: CosmosDBQueryType,
embeddings: Optional[List[float]],
search_terms: Optional[List[str]] = None,
projection_mapping: Optional[Dict[str, Any]] = None,
) -> List[Dict[str, Any]]:
parameters: List[Dict[str, Any]] = [
{"name": "@limit", "value": k},
{"name": "@textKey", "value": self._text_key},
]
if projection_mapping:
for key in projection_mapping.keys():
parameters.append({"name": f"@{key}", "value": key})
else:
parameters.append({"name": "@metadataKey", "value": self._metadata_key})
if (
query_type == CosmosDBQueryType.FULL_TEXT_RANK
or query_type == CosmosDBQueryType.HYBRID
):
parameters.append({"name": "@searchTerms", "value": search_terms})
elif (
query_type == CosmosDBQueryType.VECTOR
or query_type == CosmosDBQueryType.HYBRID
):
parameters.append({"name": "@embeddingKey", "value": self._embedding_key})
parameters.append({"name": "@embeddings", "value": embeddings})
return parameters
def _build_where_clause(self, pre_filter: PreFilter) -> str:
"""
Builds a where clause based on the given pre_filter.
"""
operator_map = self._where_clause_operator_map()
if (
pre_filter.logical_operator
and pre_filter.logical_operator not in operator_map
):
raise ValueError(
f"unsupported logical_operator: {pre_filter.logical_operator}"
)
sql_logical_operator = operator_map.get(pre_filter.logical_operator or "", "")
clauses = []
for condition in pre_filter.conditions:
if condition.operator not in operator_map:
raise ValueError(f"Unsupported operator: {condition.operator}")
if "full_text" in condition.operator:
if not isinstance(condition.value, str):
raise ValueError(
f"Expected a string for {condition.operator}, "
f"got {type(condition.value)}"
)
search_terms = ", ".join(
f"'{term}'" for term in condition.value.split()
)
sql_function = operator_map[condition.operator]
clauses.append(
f"{sql_function}(c.{condition.property}, {search_terms})"
)
else:
sql_operator = operator_map[condition.operator]
if isinstance(condition.value, str):
value = f"'{condition.value}'"
elif isinstance(condition.value, list):
# e.g., for IN clauses
value = f"({', '.join(map(str, condition.value))})"
clauses.append(f"c.{condition.property} {sql_operator} {value}")
return f""" WHERE {' {} '.format(sql_logical_operator).join(clauses)}""".strip()
def _execute_query(
self,
query: str,
query_type: CosmosDBQueryType,
parameters: List[Dict[str, Any]],
with_embedding: bool,
projection_mapping: Optional[Dict[str, Any]],
) -> List[Tuple[Document, float]]:
docs_and_scores = []
items = list(
self._container.query_items(
query=query, parameters=parameters, enable_cross_partition_query=True
)
)
for item in items:
text = item[self._text_key]
metadata = item.pop(self._metadata_key, {})
score = 0.0
if projection_mapping:
for key, alias in projection_mapping.items():
if key == self._text_key:
continue
metadata[alias] = item[alias]
else:
metadata["id"] = item["id"]
if (
query_type == CosmosDBQueryType.VECTOR
or query_type == CosmosDBQueryType.HYBRID
):
score = item["SimilarityScore"]
if with_embedding:
metadata[self._embedding_key] = item[self._embedding_key]
docs_and_scores.append(
(Document(page_content=text, metadata=metadata), score)
)
return docs_and_scores
def _where_clause_operator_map(self) -> Dict[str, str]:
operator_map = {
"$eq": "=",
"$ne": "!=",
"$lt": "<",
"$lte": "<=",
"$gt": ">",
"$gte": ">=",
"$add": "+",
"$sub": "-",
"$mul": "*",
"$div": "/",
"$mod": "%",
"$or": "OR",
"$and": "AND",
"$not": "NOT",
"$concat": "||",
"$bit_or": "|",
"$bit_and": "&",
"$bit_xor": "^",
"$bit_lshift": "<<",
"$bit_rshift": ">>",
"$bit_zerofill_rshift": ">>>",
"$full_text_contains": "FullTextContains",
"$full_text_contains_all": "FullTextContainsAll",
"$full_text_contains_any": "FullTextContainsAny",
}
return operator_map

View File

@ -3,7 +3,7 @@
import logging
import os
from time import sleep
from typing import Any
from typing import Any, Dict, List, Tuple
import pytest
from langchain_core.documents import Document
@ -11,6 +11,9 @@ from langchain_core.documents import Document
from langchain_community.embeddings import OpenAIEmbeddings
from langchain_community.vectorstores.azure_cosmos_db_no_sql import (
AzureCosmosDBNoSqlVectorSearch,
Condition,
CosmosDBQueryType,
PreFilter,
)
logging.basicConfig(level=logging.DEBUG)
@ -60,6 +63,7 @@ def get_vector_indexing_policy(embedding_type: str) -> dict:
"includedPaths": [{"path": "/*"}],
"excludedPaths": [{"path": '/"_etag"/?'}],
"vectorIndexes": [{"path": "/embedding", "type": embedding_type}],
"fullTextIndexes": [{"path": "/text"}],
}
@ -78,6 +82,13 @@ def get_vector_embedding_policy(
}
def get_full_text_policy() -> dict:
return {
"defaultLanguage": "en-US",
"fullTextPaths": [{"path": "/text", "language": "en-US"}],
}
class TestAzureCosmosDBNoSqlVectorSearch:
def test_from_documents_cosine_distance(
self,
@ -86,12 +97,7 @@ class TestAzureCosmosDBNoSqlVectorSearch:
azure_openai_embeddings: OpenAIEmbeddings,
) -> None:
"""Test end to end construction and search."""
documents = [
Document(page_content="Dogs are tough.", metadata={"a": 1}),
Document(page_content="Cats have fluff.", metadata={"b": 1}),
Document(page_content="What is a sandwich?", metadata={"c": 1}),
Document(page_content="That fence is purple.", metadata={"d": 1, "e": 2}),
]
documents = self._get_documents()
store = AzureCosmosDBNoSqlVectorSearch.from_documents(
documents,
@ -105,13 +111,16 @@ class TestAzureCosmosDBNoSqlVectorSearch:
indexing_policy=get_vector_indexing_policy("flat"),
cosmos_container_properties={"partition_key": partition_key},
cosmos_database_properties={},
full_text_policy=get_full_text_policy(),
full_text_search_enabled=True,
)
sleep(1) # waits for Cosmos DB to save contents to the collection
output = store.similarity_search("Dogs", k=2)
output = store.similarity_search("intelligent herders", k=5)
assert output
assert output[0].page_content == "Dogs are tough."
assert len(output) == 5
assert "Border Collies" in output[0].page_content
safe_delete_database(cosmos_client)
def test_from_texts_cosine_distance_delete_one(
@ -120,13 +129,7 @@ class TestAzureCosmosDBNoSqlVectorSearch:
partition_key: Any,
azure_openai_embeddings: OpenAIEmbeddings,
) -> None:
texts = [
"Dogs are tough.",
"Cats have fluff.",
"What is a sandwich?",
"That fence is purple.",
]
metadatas = [{"a": 1}, {"b": 1}, {"c": 1}, {"d": 1, "e": 2}]
texts, metadatas = self._get_texts_and_metadata()
store = AzureCosmosDBNoSqlVectorSearch.from_texts(
texts,
@ -141,20 +144,24 @@ class TestAzureCosmosDBNoSqlVectorSearch:
indexing_policy=get_vector_indexing_policy("flat"),
cosmos_container_properties={"partition_key": partition_key},
cosmos_database_properties={},
full_text_policy=get_full_text_policy(),
full_text_search_enabled=True,
)
sleep(1) # waits for Cosmos DB to save contents to the collection
output = store.similarity_search("Dogs", k=1)
output = store.similarity_search("intelligent herders", k=1)
assert output
assert output[0].page_content == "Dogs are tough."
assert len(output) == 1
assert "Border Collies" in output[0].page_content
# delete one document
store.delete_document_by_id(str(output[0].metadata["id"]))
sleep(2)
output2 = store.similarity_search("Dogs", k=1)
output2 = store.similarity_search("intelligent herders", k=1)
assert output2
assert output2[0].page_content != "Dogs are tough."
assert len(output2) == 1
assert "Border Collies" not in output2[0].page_content
safe_delete_database(cosmos_client)
def test_from_documents_cosine_distance_with_filtering(
@ -164,12 +171,7 @@ class TestAzureCosmosDBNoSqlVectorSearch:
azure_openai_embeddings: OpenAIEmbeddings,
) -> None:
"""Test end to end construction and search."""
documents = [
Document(page_content="Dogs are tough.", metadata={"a": 1}),
Document(page_content="Cats have fluff.", metadata={"a": 1}),
Document(page_content="What is a sandwich?", metadata={"c": 1}),
Document(page_content="That fence is purple.", metadata={"d": 1, "e": 2}),
]
documents = self._get_documents()
store = AzureCosmosDBNoSqlVectorSearch.from_documents(
documents,
@ -183,33 +185,321 @@ class TestAzureCosmosDBNoSqlVectorSearch:
indexing_policy=get_vector_indexing_policy("flat"),
cosmos_container_properties={"partition_key": partition_key},
cosmos_database_properties={},
full_text_policy=get_full_text_policy(),
full_text_search_enabled=True,
)
sleep(1) # waits for Cosmos DB to save contents to the collection
output = store.similarity_search("Dogs", k=4)
output = store.similarity_search("intelligent herders", k=4)
assert len(output) == 4
assert output[0].page_content == "Dogs are tough."
assert "Border Collies" in output[0].page_content
assert output[0].metadata["a"] == 1
pre_filter = {
"where_clause": "WHERE c.metadata.a=1",
}
# pre_filter = {
# "conditions": [
# {"property": "metadata.a", "operator": "$eq", "value": 1},
# ],
# }
pre_filter = PreFilter(
conditions=[
Condition(property="metadata.a", operator="$eq", value=1),
],
)
output = store.similarity_search(
"Dogs", k=4, pre_filter=pre_filter, with_embedding=True
"intelligent herders", k=4, pre_filter=pre_filter, with_embedding=True
)
assert len(output) == 2
assert output[0].page_content == "Dogs are tough."
assert len(output) == 3
assert "Border Collies" in output[0].page_content
assert output[0].metadata["a"] == 1
pre_filter = {
"where_clause": "WHERE c.metadata.a=1",
"limit_offset_clause": "OFFSET 0 LIMIT 1",
}
# pre_filter = {
# "conditions": [
# {"property": "metadata.a", "operator": "$eq", "value": 1},
# ],
# }
pre_filter = PreFilter(
conditions=[
Condition(property="metadata.a", operator="$eq", value=1),
],
)
offset_limit = "OFFSET 0 LIMIT 1"
output = store.similarity_search("Dogs", k=4, pre_filter=pre_filter)
output = store.similarity_search(
"intelligent herders", k=4, pre_filter=pre_filter, offset_limit=offset_limit
)
assert len(output) == 1
assert output[0].page_content == "Dogs are tough."
assert "Border Collies" in output[0].page_content
assert output[0].metadata["a"] == 1
safe_delete_database(cosmos_client)
def test_from_documents_full_text_and_hybrid(
self,
cosmos_client: Any,
partition_key: Any,
azure_openai_embeddings: OpenAIEmbeddings,
) -> None:
"""Test end to end construction and search."""
documents = self._get_documents()
store = AzureCosmosDBNoSqlVectorSearch.from_documents(
documents,
embedding=azure_openai_embeddings,
cosmos_client=cosmos_client,
database_name=database_name,
container_name=container_name,
vector_embedding_policy=get_vector_embedding_policy(
"cosine", "float32", 1536
),
full_text_policy=get_full_text_policy(),
indexing_policy=get_vector_indexing_policy("diskANN"),
cosmos_container_properties={"partition_key": partition_key},
cosmos_database_properties={},
full_text_search_enabled=True,
)
sleep(480) # waits for Cosmos DB to save contents to the collection
# Full text search contains any
# pre_filter = {
# "conditions": [
# {
# "property": "text",
# "operator": "$full_text_contains_any",
# "value": "intelligent herders",
# },
# ],
# }
pre_filter = PreFilter(
conditions=[
Condition(
property="text",
operator="$full_text_contains_all",
value="intelligent herders",
),
],
)
output = store.similarity_search(
"intelligent herders",
k=5,
pre_filter=pre_filter,
query_type=CosmosDBQueryType.FULL_TEXT_SEARCH,
)
assert output
assert len(output) == 3
assert "Border Collies" in output[0].page_content
# Full text search contains all
# pre_filter = {
# "conditions": [
# {
# "property": "text",
# "operator": "$full_text_contains_all",
# "value": "intelligent herders",
# },
# ],
# }
pre_filter = PreFilter(
conditions=[
Condition(
property="text",
operator="$full_text_contains_all",
value="intelligent herders",
),
],
)
output = store.similarity_search(
"intelligent herders",
k=5,
pre_filter=pre_filter,
query_type=CosmosDBQueryType.FULL_TEXT_SEARCH,
)
assert output
assert len(output) == 1
assert "Border Collies" in output[0].page_content
# Full text search BM25 ranking
output = store.similarity_search(
"intelligent herders", k=5, query_type=CosmosDBQueryType.FULL_TEXT_RANK
)
assert output
assert len(output) == 5
assert "Standard Poodles" in output[0].page_content
# Full text search BM25 ranking with filtering
# pre_filter = {
# "conditions": [
# {"property": "metadata.a", "operator": "$eq", "value": 1},
# ],
# }
pre_filter = PreFilter(
conditions=[
Condition(property="metadata.a", operator="$eq", value=1),
],
)
output = store.similarity_search(
"intelligent herders",
k=5,
pre_filter=pre_filter,
query_type=CosmosDBQueryType.FULL_TEXT_RANK,
)
assert output
assert len(output) == 3
assert "Border Collies" in output[0].page_content
# Hybrid search RRF ranking combination of full text search and vector search
output = store.similarity_search(
"intelligent herders", k=5, query_type=CosmosDBQueryType.HYBRID
)
assert output
assert len(output) == 5
assert "Border Collies" in output[0].page_content
# Hybrid search RRF ranking with filtering
# pre_filter = {
# "conditions": [
# {"property": "metadata.a", "operator": "$eq", "value": 1},
# ],
# }
pre_filter = PreFilter(
conditions=[
Condition(property="metadata.a", operator="$eq", value=1),
],
)
output = store.similarity_search(
"intelligent herders",
k=5,
pre_filter=pre_filter,
query_type=CosmosDBQueryType.HYBRID,
)
assert output
assert len(output) == 3
assert "Border Collies" in output[0].page_content
# Full text search BM25 ranking with full text filtering
# pre_filter = {
# "conditions": [
# {
# "property": "text",
# "operator": "$full_text_contains",
# "value": "energetic",
# },
# ]
# }
pre_filter = PreFilter(
conditions=[
Condition(
property="text", operator="$full_text_contains", value="energetic"
),
],
)
output = store.similarity_search(
"intelligent herders",
k=5,
pre_filter=pre_filter,
query_type=CosmosDBQueryType.FULL_TEXT_RANK,
)
assert output
assert len(output) == 3
assert "Border Collies" in output[0].page_content
# Full text search BM25 ranking with full text filtering
# pre_filter = {
# "conditions": [
# {
# "property": "text",
# "operator": "$full_text_contains",
# "value": "energetic",
# },
# {"property": "metadata.a", "operator": "$eq", "value": 2},
# ],
# "logical_operator": "$and",
# }
pre_filter = PreFilter(
conditions=[
Condition(
property="text", operator="$full_text_contains", value="energetic"
),
Condition(property="metadata.a", operator="$eq", value=2),
],
logical_operator="$and",
)
output = store.similarity_search(
"intelligent herders",
k=5,
pre_filter=pre_filter,
query_type=CosmosDBQueryType.FULL_TEXT_RANK,
)
assert output
assert len(output) == 2
assert "Standard Poodles" in output[0].page_content
def _get_documents(self) -> List[Document]:
return [
Document(
page_content="Border Collies are intelligent, energetic "
"herders skilled in outdoor activities.",
metadata={"a": 1},
),
Document(
page_content="Golden Retrievers are friendly, loyal companions "
"with excellent retrieving skills.",
metadata={"a": 2},
),
Document(
page_content="Labrador Retrievers are playful, eager "
"learners and skilled retrievers.",
metadata={"a": 1},
),
Document(
page_content="Australian Shepherds are agile, energetic "
"herders excelling in outdoor tasks.",
metadata={"a": 2, "b": 1},
),
Document(
page_content="German Shepherds are brave, loyal protectors "
"excelling in versatile tasks.",
metadata={"a": 1, "b": 2},
),
Document(
page_content="Standard Poodles are intelligent, energetic "
"learners excelling in agility.",
metadata={"a": 2, "b": 3},
),
]
def _get_texts_and_metadata(self) -> Tuple[List[str], List[Dict[str, Any]]]:
texts = [
"Border Collies are intelligent, "
"energetic herders skilled in outdoor activities.",
"Golden Retrievers are friendly, "
"loyal companions with excellent retrieving skills.",
"Labrador Retrievers are playful, "
"eager learners and skilled retrievers.",
"Australian Shepherds are agile, "
"energetic herders excelling in outdoor tasks.",
"German Shepherds are brave, "
"loyal protectors excelling in versatile tasks.",
"Standard Poodles are intelligent, "
"energetic learners excelling in agility.",
]
metadatas = [
{"a": 1},
{"a": 2},
{"a": 1},
{"a": 2, "b": 1},
{"a": 1, "b": 2},
{"a": 2, "b": 1},
]
return texts, metadatas