diff --git a/libs/community/langchain_community/vectorstores/azuresearch.py b/libs/community/langchain_community/vectorstores/azuresearch.py index a49bf03984a..8ff5a9583b3 100644 --- a/libs/community/langchain_community/vectorstores/azuresearch.py +++ b/libs/community/langchain_community/vectorstores/azuresearch.py @@ -1,6 +1,7 @@ from __future__ import annotations import base64 +import itertools import json import logging import uuid @@ -29,10 +30,12 @@ from langchain_core.retrievers import BaseRetriever from langchain_core.utils import get_from_env from langchain_core.vectorstores import VectorStore +from langchain_community.vectorstores.utils import maximal_marginal_relevance + logger = logging.getLogger() if TYPE_CHECKING: - from azure.search.documents import SearchClient + from azure.search.documents import SearchClient, SearchItemPaged from azure.search.documents.indexes.models import ( CorsOptions, ScoringProfile, @@ -236,6 +239,8 @@ class AzureSearch(VectorStore): scoring_profiles: Optional[List[ScoringProfile]] = None, default_scoring_profile: Optional[str] = None, cors_options: Optional[CorsOptions] = None, + *, + vector_search_dimensions: Optional[int] = None, **kwargs: Any, ): from azure.search.documents.indexes.models import ( @@ -269,7 +274,8 @@ class AzureSearch(VectorStore): name=FIELDS_CONTENT_VECTOR, type=SearchFieldDataType.Collection(SearchFieldDataType.Single), searchable=True, - vector_search_dimensions=len(self.embed_query("Text")), + vector_search_dimensions=vector_search_dimensions + or len(self.embed_query("Text")), vector_search_profile_name="myHnswProfile", ), SearchableField( @@ -311,7 +317,6 @@ class AzureSearch(VectorStore): ) -> List[str]: """Add texts data to an existing index.""" keys = kwargs.get("keys") - ids = [] # batching support if embedding function is an Embeddings object if isinstance(self.embedding_function, Embeddings): @@ -326,9 +331,21 @@ class AzureSearch(VectorStore): logger.debug("Nothing to insert, skipping.") return [] + return self.add_embeddings(zip(texts, embeddings), metadatas, keys=keys) + + def add_embeddings( + self, + text_embeddings: Iterable[Tuple[str, List[float]]], + metadatas: Optional[List[dict]] = None, + *, + keys: Optional[List[str]] = None, + ) -> List[str]: + """Add embeddings to an existing index.""" + ids = [] + # Write data to index data = [] - for i, text in enumerate(texts): + for i, (text, embedding) in enumerate(text_embeddings): # Use provided key otherwise use default key key = keys[i] if keys else str(uuid.uuid4()) # Encoding key for Azure Search valid characters @@ -340,9 +357,7 @@ class AzureSearch(VectorStore): "@search.action": "upload", FIELDS_ID: key, FIELDS_CONTENT: text, - FIELDS_CONTENT_VECTOR: np.array( - embeddings[i], dtype=np.float32 - ).tolist(), + FIELDS_CONTENT_VECTOR: np.array(embedding, dtype=np.float32).tolist(), FIELDS_METADATA: json.dumps(metadata), } if metadata: @@ -358,7 +373,7 @@ class AzureSearch(VectorStore): if len(data) == MAX_UPLOAD_BATCH_SIZE: response = self.client.upload_documents(documents=data) # Check if all documents were successfully uploaded - if not all([r.succeeded for r in response]): + if not all(r.succeeded for r in response): raise Exception(response) # Reset data data = [] @@ -370,7 +385,7 @@ class AzureSearch(VectorStore): # Upload data to index response = self.client.upload_documents(documents=data) # Check if all documents were successfully uploaded - if all([r.succeeded for r in response]): + if all(r.succeeded for r in response): return ids else: raise Exception(response) @@ -433,48 +448,61 @@ class AzureSearch(VectorStore): return [doc for doc, _ in docs_and_scores] def vector_search_with_score( - self, query: str, k: int = 4, filters: Optional[str] = None + self, + query: str, + k: int = 4, + filters: Optional[str] = None, + **kwargs: Any, ) -> List[Tuple[Document, float]]: """Return docs most similar to query. Args: - query: Text to look up documents similar to. - k: Number of Documents to return. Defaults to 4. + query (str): Text to look up documents similar to. + k (int, optional): Number of Documents to return. Defaults to 4. + filters (str, optional): Filtering expression. Defaults to None. Returns: - List of Documents most similar to the query and score for each + List[Tuple[Document, float]]: List of Documents most similar + to the query and score for each """ + embedding = self.embed_query(query) + results = self._simple_search(embedding, "", k, filters=filters, **kwargs) - from azure.search.documents.models import VectorizedQuery + return _results_to_documents(results) - results = self.client.search( - search_text="", - vector_queries=[ - VectorizedQuery( - vector=np.array(self.embed_query(query), dtype=np.float32).tolist(), - k_nearest_neighbors=k, - fields=FIELDS_CONTENT_VECTOR, - ) - ], - filter=filters, - top=k, + def max_marginal_relevance_search_with_score( + self, + query: str, + k: int = 4, + fetch_k: int = 20, + lambda_mult: float = 0.5, + *, + filters: Optional[str] = None, + **kwargs: Any, + ) -> List[Tuple[Document, float]]: + """Perform a search and return results that are reordered by MMR. + + Args: + query (str): Text to look up documents similar to. + k (int, optional): How many results to give. Defaults to 4. + fetch_k (int, optional): Total results to select k from. + Defaults to 20. + lambda_mult: Number between 0 and 1 that determines the degree + of diversity among the results with 0 corresponding + to maximum diversity and 1 to minimum diversity. + Defaults to 0.5 + filters (str, optional): Filtering expression. Defaults to None. + + Returns: + List[Tuple[Document, float]]: List of Documents most similar + to the query and score for each + """ + embedding = self.embed_query(query) + results = self._simple_search(embedding, "", fetch_k, filters=filters, **kwargs) + + return _reorder_results_with_maximal_marginal_relevance( + results, query_embedding=np.array(embedding), lambda_mult=lambda_mult, k=k ) - # Convert results to Document objects - docs = [ - ( - Document( - page_content=result.pop(FIELDS_CONTENT), - metadata=json.loads(result[FIELDS_METADATA]) - if FIELDS_METADATA in result - else { - k: v for k, v in result.items() if k != FIELDS_CONTENT_VECTOR - }, - ), - float(result["@search.score"]), - ) - for result in results - ] - return docs def hybrid_search(self, query: str, k: int = 4, **kwargs: Any) -> List[Document]: """ @@ -487,13 +515,15 @@ class AzureSearch(VectorStore): Returns: List[Document]: A list of documents that are most similar to the query text. """ - docs_and_scores = self.hybrid_search_with_score( - query, k=k, filters=kwargs.get("filters", None) - ) + docs_and_scores = self.hybrid_search_with_score(query, k=k, **kwargs) return [doc for doc, _ in docs_and_scores] def hybrid_search_with_score( - self, query: str, k: int = 4, filters: Optional[str] = None + self, + query: str, + k: int = 4, + filters: Optional[str] = None, + **kwargs: Any, ) -> List[Tuple[Document, float]]: """Return docs most similar to query with a hybrid query. @@ -504,36 +534,11 @@ class AzureSearch(VectorStore): Returns: List of Documents most similar to the query and score for each """ - from azure.search.documents.models import VectorizedQuery - results = self.client.search( - search_text=query, - vector_queries=[ - VectorizedQuery( - vector=np.array(self.embed_query(query), dtype=np.float32).tolist(), - k_nearest_neighbors=k, - fields=FIELDS_CONTENT_VECTOR, - ) - ], - filter=filters, - top=k, - ) - # Convert results to Document objects - docs = [ - ( - Document( - page_content=result.pop(FIELDS_CONTENT), - metadata=json.loads(result[FIELDS_METADATA]) - if FIELDS_METADATA in result - else { - k: v for k, v in result.items() if k != FIELDS_CONTENT_VECTOR - }, - ), - float(result["@search.score"]), - ) - for result in results - ] - return docs + embedding = self.embed_query(query) + results = self._simple_search(embedding, query, k, filters=filters, **kwargs) + + return _results_to_documents(results) def hybrid_search_with_relevance_scores( self, query: str, k: int = 4, **kwargs: Any @@ -546,6 +551,79 @@ class AzureSearch(VectorStore): else [r for r in result if r[1] >= score_threshold] ) + def hybrid_max_marginal_relevance_search_with_score( + self, + query: str, + k: int = 4, + fetch_k: int = 20, + lambda_mult: float = 0.5, + *, + filters: Optional[str] = None, + **kwargs: Any, + ) -> List[Tuple[Document, float]]: + """Return docs most similar to query with a hybrid query + and reorder results by MMR. + + Args: + query (str): Text to look up documents similar to. + k (int, optional): Number of Documents to return. Defaults to 4. + fetch_k (int, optional): Total results to select k from. + Defaults to 20. + lambda_mult: Number between 0 and 1 that determines the degree + of diversity among the results with 0 corresponding + to maximum diversity and 1 to minimum diversity. + Defaults to 0.5 + filters (str, optional): Filtering expression. Defaults to None. + + Returns: + List of Documents most similar to the query and score for each + """ + + embedding = self.embed_query(query) + results = self._simple_search( + embedding, query, fetch_k, filters=filters, **kwargs + ) + + return _reorder_results_with_maximal_marginal_relevance( + results, query_embedding=np.array(embedding), lambda_mult=lambda_mult, k=k + ) + + def _simple_search( + self, + embedding: List[float], + text_query: str, + k: int, + *, + filters: Optional[str] = None, + **kwargs: Any, + ) -> SearchItemPaged[dict]: + """Perform vector or hybrid search in the Azure search index. + + Args: + embedding: A vector embedding to search in the vector space. + text_query: A full-text search query expression; + Use "*" or omit this parameter to perform only vector search. + k: Number of documents to return. + filters: Filtering expression. + Returns: + Search items + """ + from azure.search.documents.models import VectorizedQuery + + return self.client.search( + search_text=text_query, + vector_queries=[ + VectorizedQuery( + vector=np.array(embedding, dtype=np.float32).tolist(), + k_nearest_neighbors=k, + fields=FIELDS_CONTENT_VECTOR, + ) + ], + filter=filters, + top=k, + **kwargs, + ) + def semantic_hybrid_search( self, query: str, k: int = 4, **kwargs: Any ) -> List[Document]: @@ -555,12 +633,13 @@ class AzureSearch(VectorStore): Args: query (str): The query text for which to find similar documents. k (int): The number of documents to return. Default is 4. + filters: Filtering expression. Returns: List[Document]: A list of documents that are most similar to the query text. """ docs_and_scores = self.semantic_hybrid_search_with_score_and_rerank( - query, k=k, filters=kwargs.get("filters", None) + query, k=k, **kwargs ) return [doc for doc, _, _ in docs_and_scores] @@ -579,6 +658,7 @@ class AzureSearch(VectorStore): k (int): The number of documents to return. Default is 4. score_type: Must either be "score" or "reranker_score". Defaulted to "score". + filters: Filtering expression. Returns: List[Tuple[Document, float]]: A list of documents and their @@ -586,7 +666,7 @@ class AzureSearch(VectorStore): """ score_threshold = kwargs.pop("score_threshold", None) docs_and_scores = self.semantic_hybrid_search_with_score_and_rerank( - query, k=k, filters=kwargs.get("filters", None) + query, k=k, **kwargs ) if score_type == "score": return [ @@ -602,13 +682,14 @@ class AzureSearch(VectorStore): ] def semantic_hybrid_search_with_score_and_rerank( - self, query: str, k: int = 4, filters: Optional[str] = None + self, query: str, k: int = 4, *, filters: Optional[str] = None, **kwargs: Any ) -> List[Tuple[Document, float, float]]: """Return docs most similar to query with a hybrid query. Args: query: Text to look up documents similar to. k: Number of Documents to return. Defaults to 4. + filters: Filtering expression. Returns: List of Documents most similar to the query and score for each @@ -630,6 +711,7 @@ class AzureSearch(VectorStore): query_caption="extractive", query_answer="extractive", top=k, + **kwargs, ) # Get Semantic Answers semantic_answers = results.get_answers() or [] @@ -696,10 +778,66 @@ class AzureSearch(VectorStore): index_name, embedding, fields=fields, + **kwargs, ) azure_search.add_texts(texts, metadatas, **kwargs) return azure_search + @classmethod + async def afrom_embeddings( + cls: Type[AzureSearch], + text_embeddings: Iterable[Tuple[str, List[float]]], + embedding: Embeddings, + metadatas: Optional[List[dict]] = None, + *, + azure_search_endpoint: str = "", + azure_search_key: str = "", + index_name: str = "langchain-index", + fields: Optional[List[SearchField]] = None, + **kwargs: Any, + ) -> AzureSearch: + return cls.from_embeddings( + text_embeddings, + embedding, + metadatas=metadatas, + azure_search_endpoint=azure_search_endpoint, + azure_search_key=azure_search_key, + index_name=index_name, + fields=fields, + **kwargs, + ) + + @classmethod + def from_embeddings( + cls: Type[AzureSearch], + text_embeddings: Iterable[Tuple[str, List[float]]], + embedding: Embeddings, + metadatas: Optional[List[dict]] = None, + *, + azure_search_endpoint: str = "", + azure_search_key: str = "", + index_name: str = "langchain-index", + fields: Optional[List[SearchField]] = None, + **kwargs: Any, + ) -> AzureSearch: + # Creating a new Azure Search instance + text_embeddings, first_text_embedding = _peek(text_embeddings) + if first_text_embedding is None: + raise ValueError("Cannot create AzureSearch from empty embeddings.") + vector_search_dimensions = len(first_text_embedding[1]) + + azure_search = cls( + azure_search_endpoint=azure_search_endpoint, + azure_search_key=azure_search_key, + index_name=index_name, + embedding_function=embedding, + fields=fields, + vector_search_dimensions=vector_search_dimensions, + **kwargs, + ) + azure_search.add_embeddings(text_embeddings, metadatas, **kwargs) + return azure_search + def as_retriever(self, **kwargs: Any) -> AzureSearchVectorStoreRetriever: # type: ignore """Return AzureSearchVectorStoreRetriever initialized from this VectorStore. @@ -799,3 +937,70 @@ class AzureSearchVectorStoreRetriever(BaseRetriever): else: raise ValueError(f"search_type of {self.search_type} not allowed.") return docs + + +def _results_to_documents( + results: SearchItemPaged[Dict], +) -> List[Tuple[Document, float]]: + docs = [ + ( + _result_to_document(result), + float(result["@search.score"]), + ) + for result in results + ] + return docs + + +def _reorder_results_with_maximal_marginal_relevance( + results: SearchItemPaged[Dict], + query_embedding: np.ndarray, + lambda_mult: float = 0.5, + k: int = 4, +) -> List[Tuple[Document, float]]: + # Convert results to Document objects + docs = [ + ( + _result_to_document(result), + float(result["@search.score"]), + result[FIELDS_CONTENT_VECTOR], + ) + for result in results + ] + documents, scores, vectors = map(list, zip(*docs)) + + # Get the new order of results. + new_ordering = maximal_marginal_relevance( + query_embedding, vectors, k=k, lambda_mult=lambda_mult + ) + + # Reorder the values and return. + ret: List[Tuple[Document, float]] = [] + for x in new_ordering: + # Function can return -1 index + if x == -1: + break + ret.append((documents[x], scores[x])) # type: ignore + + return ret + + +def _result_to_document(result: Dict) -> Document: + return Document( + page_content=result.pop(FIELDS_CONTENT), + metadata=json.loads(result[FIELDS_METADATA]) + if FIELDS_METADATA in result + else { + key: value for key, value in result.items() if key != FIELDS_CONTENT_VECTOR + }, + ) + + +def _peek(iterable: Iterable, default: Optional[Any] = None) -> Tuple[Iterable, Any]: + try: + iterator = iter(iterable) + value = next(iterator) + iterable = itertools.chain([value], iterator) + return iterable, value + except StopIteration: + return iterable, default