From 98c0b093bbd4b4e1797660c938976f51e8bff7b3 Mon Sep 17 00:00:00 2001 From: Oleksii Pokotylo Date: Wed, 22 May 2024 22:36:06 +0200 Subject: [PATCH] community[patch]: Extend AzureSearch with `maximal_marginal_relevance`, `from_embeddings` (#21065) **Description:** - Extend AzureSearch with `maximal_marginal_relevance` (for vector and hybrid search) - Add construction `from_embeddings` - if the user has already embedded the texts - Add `add_embeddings` - Refactor common parts (`_simple_search`, `_results_to_documents`, `_reorder_results_with_maximal_marginal_relevance`) - Add `vector_search_dimensions` as a parameter to the constructor to avoid extra calls to `embed_query` (most of the time the user applies the same model and knows the dimension) **Issue:** none **Dependencies:** none - [x] **Add tests and docs**: The docstrings have been added to the new functions, and unified for the existing ones. The example notebook is great in illustrating the main usage of AzureSearch, adding the new methods would only dilute the main content. - [x] **Lint and test** --------- Co-authored-by: Oleksii Pokotylo Co-authored-by: Bagatur <22008038+baskaryan@users.noreply.github.com> Co-authored-by: Bagatur --- .../vectorstores/azuresearch.py | 359 ++++++++++++++---- 1 file changed, 282 insertions(+), 77 deletions(-) diff --git a/libs/community/langchain_community/vectorstores/azuresearch.py b/libs/community/langchain_community/vectorstores/azuresearch.py index a49bf03984a..8ff5a9583b3 100644 --- a/libs/community/langchain_community/vectorstores/azuresearch.py +++ b/libs/community/langchain_community/vectorstores/azuresearch.py @@ -1,6 +1,7 @@ from __future__ import annotations import base64 +import itertools import json import logging import uuid @@ -29,10 +30,12 @@ from langchain_core.retrievers import BaseRetriever from langchain_core.utils import get_from_env from langchain_core.vectorstores import VectorStore +from langchain_community.vectorstores.utils import maximal_marginal_relevance + logger = logging.getLogger() if TYPE_CHECKING: - from azure.search.documents import SearchClient + from azure.search.documents import SearchClient, SearchItemPaged from azure.search.documents.indexes.models import ( CorsOptions, ScoringProfile, @@ -236,6 +239,8 @@ class AzureSearch(VectorStore): scoring_profiles: Optional[List[ScoringProfile]] = None, default_scoring_profile: Optional[str] = None, cors_options: Optional[CorsOptions] = None, + *, + vector_search_dimensions: Optional[int] = None, **kwargs: Any, ): from azure.search.documents.indexes.models import ( @@ -269,7 +274,8 @@ class AzureSearch(VectorStore): name=FIELDS_CONTENT_VECTOR, type=SearchFieldDataType.Collection(SearchFieldDataType.Single), searchable=True, - vector_search_dimensions=len(self.embed_query("Text")), + vector_search_dimensions=vector_search_dimensions + or len(self.embed_query("Text")), vector_search_profile_name="myHnswProfile", ), SearchableField( @@ -311,7 +317,6 @@ class AzureSearch(VectorStore): ) -> List[str]: """Add texts data to an existing index.""" keys = kwargs.get("keys") - ids = [] # batching support if embedding function is an Embeddings object if isinstance(self.embedding_function, Embeddings): @@ -326,9 +331,21 @@ class AzureSearch(VectorStore): logger.debug("Nothing to insert, skipping.") return [] + return self.add_embeddings(zip(texts, embeddings), metadatas, keys=keys) + + def add_embeddings( + self, + text_embeddings: Iterable[Tuple[str, List[float]]], + metadatas: Optional[List[dict]] = None, + *, + keys: Optional[List[str]] = None, + ) -> List[str]: + """Add embeddings to an existing index.""" + ids = [] + # Write data to index data = [] - for i, text in enumerate(texts): + for i, (text, embedding) in enumerate(text_embeddings): # Use provided key otherwise use default key key = keys[i] if keys else str(uuid.uuid4()) # Encoding key for Azure Search valid characters @@ -340,9 +357,7 @@ class AzureSearch(VectorStore): "@search.action": "upload", FIELDS_ID: key, FIELDS_CONTENT: text, - FIELDS_CONTENT_VECTOR: np.array( - embeddings[i], dtype=np.float32 - ).tolist(), + FIELDS_CONTENT_VECTOR: np.array(embedding, dtype=np.float32).tolist(), FIELDS_METADATA: json.dumps(metadata), } if metadata: @@ -358,7 +373,7 @@ class AzureSearch(VectorStore): if len(data) == MAX_UPLOAD_BATCH_SIZE: response = self.client.upload_documents(documents=data) # Check if all documents were successfully uploaded - if not all([r.succeeded for r in response]): + if not all(r.succeeded for r in response): raise Exception(response) # Reset data data = [] @@ -370,7 +385,7 @@ class AzureSearch(VectorStore): # Upload data to index response = self.client.upload_documents(documents=data) # Check if all documents were successfully uploaded - if all([r.succeeded for r in response]): + if all(r.succeeded for r in response): return ids else: raise Exception(response) @@ -433,48 +448,61 @@ class AzureSearch(VectorStore): return [doc for doc, _ in docs_and_scores] def vector_search_with_score( - self, query: str, k: int = 4, filters: Optional[str] = None + self, + query: str, + k: int = 4, + filters: Optional[str] = None, + **kwargs: Any, ) -> List[Tuple[Document, float]]: """Return docs most similar to query. Args: - query: Text to look up documents similar to. - k: Number of Documents to return. Defaults to 4. + query (str): Text to look up documents similar to. + k (int, optional): Number of Documents to return. Defaults to 4. + filters (str, optional): Filtering expression. Defaults to None. Returns: - List of Documents most similar to the query and score for each + List[Tuple[Document, float]]: List of Documents most similar + to the query and score for each """ + embedding = self.embed_query(query) + results = self._simple_search(embedding, "", k, filters=filters, **kwargs) - from azure.search.documents.models import VectorizedQuery + return _results_to_documents(results) - results = self.client.search( - search_text="", - vector_queries=[ - VectorizedQuery( - vector=np.array(self.embed_query(query), dtype=np.float32).tolist(), - k_nearest_neighbors=k, - fields=FIELDS_CONTENT_VECTOR, - ) - ], - filter=filters, - top=k, + def max_marginal_relevance_search_with_score( + self, + query: str, + k: int = 4, + fetch_k: int = 20, + lambda_mult: float = 0.5, + *, + filters: Optional[str] = None, + **kwargs: Any, + ) -> List[Tuple[Document, float]]: + """Perform a search and return results that are reordered by MMR. + + Args: + query (str): Text to look up documents similar to. + k (int, optional): How many results to give. Defaults to 4. + fetch_k (int, optional): Total results to select k from. + Defaults to 20. + lambda_mult: Number between 0 and 1 that determines the degree + of diversity among the results with 0 corresponding + to maximum diversity and 1 to minimum diversity. + Defaults to 0.5 + filters (str, optional): Filtering expression. Defaults to None. + + Returns: + List[Tuple[Document, float]]: List of Documents most similar + to the query and score for each + """ + embedding = self.embed_query(query) + results = self._simple_search(embedding, "", fetch_k, filters=filters, **kwargs) + + return _reorder_results_with_maximal_marginal_relevance( + results, query_embedding=np.array(embedding), lambda_mult=lambda_mult, k=k ) - # Convert results to Document objects - docs = [ - ( - Document( - page_content=result.pop(FIELDS_CONTENT), - metadata=json.loads(result[FIELDS_METADATA]) - if FIELDS_METADATA in result - else { - k: v for k, v in result.items() if k != FIELDS_CONTENT_VECTOR - }, - ), - float(result["@search.score"]), - ) - for result in results - ] - return docs def hybrid_search(self, query: str, k: int = 4, **kwargs: Any) -> List[Document]: """ @@ -487,13 +515,15 @@ class AzureSearch(VectorStore): Returns: List[Document]: A list of documents that are most similar to the query text. """ - docs_and_scores = self.hybrid_search_with_score( - query, k=k, filters=kwargs.get("filters", None) - ) + docs_and_scores = self.hybrid_search_with_score(query, k=k, **kwargs) return [doc for doc, _ in docs_and_scores] def hybrid_search_with_score( - self, query: str, k: int = 4, filters: Optional[str] = None + self, + query: str, + k: int = 4, + filters: Optional[str] = None, + **kwargs: Any, ) -> List[Tuple[Document, float]]: """Return docs most similar to query with a hybrid query. @@ -504,36 +534,11 @@ class AzureSearch(VectorStore): Returns: List of Documents most similar to the query and score for each """ - from azure.search.documents.models import VectorizedQuery - results = self.client.search( - search_text=query, - vector_queries=[ - VectorizedQuery( - vector=np.array(self.embed_query(query), dtype=np.float32).tolist(), - k_nearest_neighbors=k, - fields=FIELDS_CONTENT_VECTOR, - ) - ], - filter=filters, - top=k, - ) - # Convert results to Document objects - docs = [ - ( - Document( - page_content=result.pop(FIELDS_CONTENT), - metadata=json.loads(result[FIELDS_METADATA]) - if FIELDS_METADATA in result - else { - k: v for k, v in result.items() if k != FIELDS_CONTENT_VECTOR - }, - ), - float(result["@search.score"]), - ) - for result in results - ] - return docs + embedding = self.embed_query(query) + results = self._simple_search(embedding, query, k, filters=filters, **kwargs) + + return _results_to_documents(results) def hybrid_search_with_relevance_scores( self, query: str, k: int = 4, **kwargs: Any @@ -546,6 +551,79 @@ class AzureSearch(VectorStore): else [r for r in result if r[1] >= score_threshold] ) + def hybrid_max_marginal_relevance_search_with_score( + self, + query: str, + k: int = 4, + fetch_k: int = 20, + lambda_mult: float = 0.5, + *, + filters: Optional[str] = None, + **kwargs: Any, + ) -> List[Tuple[Document, float]]: + """Return docs most similar to query with a hybrid query + and reorder results by MMR. + + Args: + query (str): Text to look up documents similar to. + k (int, optional): Number of Documents to return. Defaults to 4. + fetch_k (int, optional): Total results to select k from. + Defaults to 20. + lambda_mult: Number between 0 and 1 that determines the degree + of diversity among the results with 0 corresponding + to maximum diversity and 1 to minimum diversity. + Defaults to 0.5 + filters (str, optional): Filtering expression. Defaults to None. + + Returns: + List of Documents most similar to the query and score for each + """ + + embedding = self.embed_query(query) + results = self._simple_search( + embedding, query, fetch_k, filters=filters, **kwargs + ) + + return _reorder_results_with_maximal_marginal_relevance( + results, query_embedding=np.array(embedding), lambda_mult=lambda_mult, k=k + ) + + def _simple_search( + self, + embedding: List[float], + text_query: str, + k: int, + *, + filters: Optional[str] = None, + **kwargs: Any, + ) -> SearchItemPaged[dict]: + """Perform vector or hybrid search in the Azure search index. + + Args: + embedding: A vector embedding to search in the vector space. + text_query: A full-text search query expression; + Use "*" or omit this parameter to perform only vector search. + k: Number of documents to return. + filters: Filtering expression. + Returns: + Search items + """ + from azure.search.documents.models import VectorizedQuery + + return self.client.search( + search_text=text_query, + vector_queries=[ + VectorizedQuery( + vector=np.array(embedding, dtype=np.float32).tolist(), + k_nearest_neighbors=k, + fields=FIELDS_CONTENT_VECTOR, + ) + ], + filter=filters, + top=k, + **kwargs, + ) + def semantic_hybrid_search( self, query: str, k: int = 4, **kwargs: Any ) -> List[Document]: @@ -555,12 +633,13 @@ class AzureSearch(VectorStore): Args: query (str): The query text for which to find similar documents. k (int): The number of documents to return. Default is 4. + filters: Filtering expression. Returns: List[Document]: A list of documents that are most similar to the query text. """ docs_and_scores = self.semantic_hybrid_search_with_score_and_rerank( - query, k=k, filters=kwargs.get("filters", None) + query, k=k, **kwargs ) return [doc for doc, _, _ in docs_and_scores] @@ -579,6 +658,7 @@ class AzureSearch(VectorStore): k (int): The number of documents to return. Default is 4. score_type: Must either be "score" or "reranker_score". Defaulted to "score". + filters: Filtering expression. Returns: List[Tuple[Document, float]]: A list of documents and their @@ -586,7 +666,7 @@ class AzureSearch(VectorStore): """ score_threshold = kwargs.pop("score_threshold", None) docs_and_scores = self.semantic_hybrid_search_with_score_and_rerank( - query, k=k, filters=kwargs.get("filters", None) + query, k=k, **kwargs ) if score_type == "score": return [ @@ -602,13 +682,14 @@ class AzureSearch(VectorStore): ] def semantic_hybrid_search_with_score_and_rerank( - self, query: str, k: int = 4, filters: Optional[str] = None + self, query: str, k: int = 4, *, filters: Optional[str] = None, **kwargs: Any ) -> List[Tuple[Document, float, float]]: """Return docs most similar to query with a hybrid query. Args: query: Text to look up documents similar to. k: Number of Documents to return. Defaults to 4. + filters: Filtering expression. Returns: List of Documents most similar to the query and score for each @@ -630,6 +711,7 @@ class AzureSearch(VectorStore): query_caption="extractive", query_answer="extractive", top=k, + **kwargs, ) # Get Semantic Answers semantic_answers = results.get_answers() or [] @@ -696,10 +778,66 @@ class AzureSearch(VectorStore): index_name, embedding, fields=fields, + **kwargs, ) azure_search.add_texts(texts, metadatas, **kwargs) return azure_search + @classmethod + async def afrom_embeddings( + cls: Type[AzureSearch], + text_embeddings: Iterable[Tuple[str, List[float]]], + embedding: Embeddings, + metadatas: Optional[List[dict]] = None, + *, + azure_search_endpoint: str = "", + azure_search_key: str = "", + index_name: str = "langchain-index", + fields: Optional[List[SearchField]] = None, + **kwargs: Any, + ) -> AzureSearch: + return cls.from_embeddings( + text_embeddings, + embedding, + metadatas=metadatas, + azure_search_endpoint=azure_search_endpoint, + azure_search_key=azure_search_key, + index_name=index_name, + fields=fields, + **kwargs, + ) + + @classmethod + def from_embeddings( + cls: Type[AzureSearch], + text_embeddings: Iterable[Tuple[str, List[float]]], + embedding: Embeddings, + metadatas: Optional[List[dict]] = None, + *, + azure_search_endpoint: str = "", + azure_search_key: str = "", + index_name: str = "langchain-index", + fields: Optional[List[SearchField]] = None, + **kwargs: Any, + ) -> AzureSearch: + # Creating a new Azure Search instance + text_embeddings, first_text_embedding = _peek(text_embeddings) + if first_text_embedding is None: + raise ValueError("Cannot create AzureSearch from empty embeddings.") + vector_search_dimensions = len(first_text_embedding[1]) + + azure_search = cls( + azure_search_endpoint=azure_search_endpoint, + azure_search_key=azure_search_key, + index_name=index_name, + embedding_function=embedding, + fields=fields, + vector_search_dimensions=vector_search_dimensions, + **kwargs, + ) + azure_search.add_embeddings(text_embeddings, metadatas, **kwargs) + return azure_search + def as_retriever(self, **kwargs: Any) -> AzureSearchVectorStoreRetriever: # type: ignore """Return AzureSearchVectorStoreRetriever initialized from this VectorStore. @@ -799,3 +937,70 @@ class AzureSearchVectorStoreRetriever(BaseRetriever): else: raise ValueError(f"search_type of {self.search_type} not allowed.") return docs + + +def _results_to_documents( + results: SearchItemPaged[Dict], +) -> List[Tuple[Document, float]]: + docs = [ + ( + _result_to_document(result), + float(result["@search.score"]), + ) + for result in results + ] + return docs + + +def _reorder_results_with_maximal_marginal_relevance( + results: SearchItemPaged[Dict], + query_embedding: np.ndarray, + lambda_mult: float = 0.5, + k: int = 4, +) -> List[Tuple[Document, float]]: + # Convert results to Document objects + docs = [ + ( + _result_to_document(result), + float(result["@search.score"]), + result[FIELDS_CONTENT_VECTOR], + ) + for result in results + ] + documents, scores, vectors = map(list, zip(*docs)) + + # Get the new order of results. + new_ordering = maximal_marginal_relevance( + query_embedding, vectors, k=k, lambda_mult=lambda_mult + ) + + # Reorder the values and return. + ret: List[Tuple[Document, float]] = [] + for x in new_ordering: + # Function can return -1 index + if x == -1: + break + ret.append((documents[x], scores[x])) # type: ignore + + return ret + + +def _result_to_document(result: Dict) -> Document: + return Document( + page_content=result.pop(FIELDS_CONTENT), + metadata=json.loads(result[FIELDS_METADATA]) + if FIELDS_METADATA in result + else { + key: value for key, value in result.items() if key != FIELDS_CONTENT_VECTOR + }, + ) + + +def _peek(iterable: Iterable, default: Optional[Any] = None) -> Tuple[Iterable, Any]: + try: + iterator = iter(iterable) + value = next(iterator) + iterable = itertools.chain([value], iterator) + return iterable, value + except StopIteration: + return iterable, default