diff --git a/libs/community/langchain_community/vectorstores/opensearch_vector_search.py b/libs/community/langchain_community/vectorstores/opensearch_vector_search.py index 734153d989a..9cb60913e21 100644 --- a/libs/community/langchain_community/vectorstores/opensearch_vector_search.py +++ b/libs/community/langchain_community/vectorstores/opensearch_vector_search.py @@ -22,6 +22,7 @@ Please install it with `pip install opensearch-py`.""" SCRIPT_SCORING_SEARCH = "script_scoring" PAINLESS_SCRIPTING_SEARCH = "painless_scripting" MATCH_ALL_QUERY = {"match_all": {}} # type: Dict +HYBRID_SEARCH = "hybrid_search" if TYPE_CHECKING: from opensearchpy import AsyncOpenSearch, OpenSearch @@ -372,6 +373,65 @@ def _default_painless_scripting_query( } +def _default_hybrid_search_query( + query_text: str, query_vector: List[float], k: int = 4 +) -> Dict: + """Returns payload for performing hybrid search for given options. + + Args: + query_text: The query text to search for. + query_vector: The embedding vector (query) to search for. + k: Number of Documents to return. Defaults to 4. + + Returns: + dict: The payload for hybrid search. + """ + payload = { + "_source": {"exclude": ["vector_field"]}, + "query": { + "hybrid": { + "queries": [ + { + "match": { + "text": { + "query": query_text, + } + } + }, + {"knn": {"vector_field": {"vector": query_vector, "k": k}}}, + ] + } + }, + "size": k, + } + + return payload + + +def _hybrid_search_query_with_post_filter( + query_text: str, + query_vector: List[float], + k: int, + post_filter: Dict, +) -> Dict: + """Returns payload for performing hybrid search with post filter. + + Args: + query_text: The query text to search for. + query_vector: The embedding vector to search for. + k: Number of Documents to return. + post_filter: The post filter to apply. + + Returns: + dict: The payload for hybrid search with post filter. + """ + search_query = _default_hybrid_search_query(query_text, query_vector, k) + + search_query["post_filter"] = post_filter + + return search_query + + class OpenSearchVectorSearch(VectorStore): """`Amazon OpenSearch Vector Engine` vector store. @@ -713,6 +773,122 @@ class OpenSearchVectorSearch(VectorStore): item.get("delete", {}).get("error") for item in response["items"] ) + def configure_search_pipelines( + self, + pipeline_name: str, + keyword_weight: float = 0.7, + vector_weight: float = 0.3, + ) -> dict: + """ + Configures a search pipeline for hybrid search. + Args: + pipeline_name: Name of the pipeline + keyword_weight: Weight for keyword search + vector_weight: Weight for vector search + Returns: + response: Acknowledgement of the pipeline creation. + (if there is any error while configuring the pipeline, it will return None) + Raises: + Exception: If an error occurs + """ + if not pipeline_name.isidentifier(): + raise ValueError(f"Invalid pipeline name: {pipeline_name}") + + path = f"/_search/pipeline/{pipeline_name}" + + payload = { + "description": "Post processor for hybrid search", + "phase_results_processors": [ + { + "normalization-processor": { + "normalization": {"technique": "min_max"}, + "combination": { + "technique": "arithmetic_mean", + "parameters": {"weights": [keyword_weight, vector_weight]}, + }, + } + } + ], + } + + response = self.client.transport.perform_request( + method="PUT", url=path, body=payload + ) + return response + + def search_pipeline_exists(self, pipeline_name: str) -> bool: + """ + Checks if a search pipeline exists. + + Args: + pipeline_name: Name of the pipeline + + Returns: + bool: True if the pipeline exists, False otherwise + + Raises: + Exception: If an error occurs + + Example: + >>> search_pipeline_exists("my_pipeline_1") + True + >>> search_pipeline_exists("my_pipeline_2") + False + """ + if not pipeline_name.isidentifier(): + raise ValueError(f"Invalid pipeline name: {pipeline_name}") + + existed_pipelines = self.client.transport.perform_request( + method="GET", url="/_search/pipeline/" + ) + + return pipeline_name in existed_pipelines + + def get_search_pipeline_info(self, pipeline_name: str) -> Optional[Dict]: + """ + Get information about a search pipeline. + + Args: + pipeline_name: Name of the pipeline + + Returns: + dict: Information about the pipeline + None: If pipeline does not exist + + Raises: + Exception: If an error occurs + + Example: + >>> get_search_pipeline_info("my_pipeline_1") + {'search_pipeline_1': { + "description": "Post processor for hybrid search", + "phase_results_processors": [ + { + "normalization-processor": { + "normalization": {"technique": "min_max"}, + "combination": { + "technique": "arithmetic_mean", + "parameters": {"weights": [0.7, 0.3]} + } + } + } + ] + } + } + >>> get_search_pipeline_info("my_pipeline_2") + None + """ + response = None + + if not pipeline_name.isidentifier(): + raise ValueError(f"Invalid pipeline name: {pipeline_name}") + + response = self.client.transport.perform_request( + method="GET", url=f"/_search/pipeline/{pipeline_name}" + ) + + return response + @staticmethod def _identity_fn(score: float) -> float: return score @@ -837,6 +1013,8 @@ class OpenSearchVectorSearch(VectorStore): Optional Args: same as `similarity_search` """ + # added query_text to kwargs for Hybrid Search + kwargs["query_text"] = query embedding = self.embedding_function.embed_query(query) return self.similarity_search_with_score_by_vector( embedding, k, score_threshold, **kwargs @@ -1024,6 +1202,38 @@ class OpenSearchVectorSearch(VectorStore): vector_field, score_threshold=score_threshold, ) + + elif search_type == HYBRID_SEARCH: + search_pipeline = kwargs.get("search_pipeline") + post_filter = kwargs.get("post_filter", {}) + query_text = kwargs.get("query_text") + path = f"/{index_name}/_search?search_pipeline={search_pipeline}" + + if query_text is None: + raise ValueError("query_text must be provided for hybrid search") + + if search_pipeline is None: + raise ValueError("search_pipeline must be provided for hybrid search") + + # embedding the query_text + embeded_query = self.embedding_function.embed_query(query_text) + + # if post filter is provided + if post_filter != {}: + # hybrid search with post filter + payload = _hybrid_search_query_with_post_filter( + query_text, embeded_query, k, post_filter + ) + else: + # hybrid search without post filter + payload = _default_hybrid_search_query(query_text, embeded_query, k) + + response = self.client.transport.perform_request( + method="GET", url=path, body=payload + ) + + return [hit for hit in response["hits"]["hits"]] + else: raise ValueError("Invalid `search_type` provided as an argument") diff --git a/libs/community/tests/integration_tests/vectorstores/test_opensearch.py b/libs/community/tests/integration_tests/vectorstores/test_opensearch.py index 22bff520073..bae4e0454a7 100644 --- a/libs/community/tests/integration_tests/vectorstores/test_opensearch.py +++ b/libs/community/tests/integration_tests/vectorstores/test_opensearch.py @@ -4,6 +4,7 @@ import pytest from langchain_core.documents import Document from langchain_community.vectorstores.opensearch_vector_search import ( + HYBRID_SEARCH, PAINLESS_SCRIPTING_SEARCH, SCRIPT_SCORING_SEARCH, OpenSearchVectorSearch, @@ -75,6 +76,112 @@ def test_opensearch_with_custom_field_name() -> None: assert output == [Document(page_content="foo", id="id_foo")] +def test_configure_search_pipeline() -> None: + """Test configure search pipeline functionality.""" + test_search_pipeline_name = "test_search_pipeline" + keyword_weight = 0.7 + vector_weight = 0.3 + + docsearch = OpenSearchVectorSearch.from_texts( + texts, FakeEmbeddings(), opensearch_url=DEFAULT_OPENSEARCH_URL + ) + docsearch.configure_search_pipelines( + pipeline_name=test_search_pipeline_name, + keyword_weight=keyword_weight, + vector_weight=vector_weight, + ) + assert docsearch.search_pipeline_exists(test_search_pipeline_name) + + +def test_get_search_pipeline_info() -> None: + """Test get search pipeline info functionality.""" + test_search_pipeline_name = "test_search_pipeline" + + docsearch = OpenSearchVectorSearch.from_texts( + texts, FakeEmbeddings(), opensearch_url=DEFAULT_OPENSEARCH_URL + ) + test_pipeline_info = docsearch.get_search_pipeline_info(test_search_pipeline_name) + assert test_pipeline_info == { + "test_search_pipeline": { + "description": "Post processor for hybrid search", + "phase_results_processors": [ + { + "normalization-processor": { + "normalization": {"technique": "min_max"}, + "combination": { + "technique": "arithmetic_mean", + "parameters": {"weights": [0.7, 0.3]}, + }, + } + } + ], + } + } + + +def test_hybrid_search() -> None: + """Test hybrid search functionality.""" + metadatas = [{"page": i} for i in range(len(texts))] + docsearch = OpenSearchVectorSearch.from_texts( + texts, + ConsistentFakeEmbeddings(), + metadatas=metadatas, + opensearch_url=DEFAULT_OPENSEARCH_URL, + ) + output = docsearch.similarity_search( + query="foo", + k=2, + search_type=HYBRID_SEARCH, + search_pipeline="test_search_pipeline", + ) + + assert output == [ + Document(page_content="foo", metadata={"page": 0}), + Document(page_content="bar", metadata={"page": 1}), + ] + + +def test_hybrid_search_with_score() -> None: + """Test hybrid search with score functionality.""" + metadatas = [{"page": i} for i in range(len(texts))] + docsearch = OpenSearchVectorSearch.from_texts( + texts, + ConsistentFakeEmbeddings(), + metadatas=metadatas, + opensearch_url=DEFAULT_OPENSEARCH_URL, + ) + output = docsearch.similarity_search_with_score( + query="foo", + k=2, + search_type=HYBRID_SEARCH, + search_pipeline="test_search_pipeline", + ) + assert output == [ + (Document(page_content="foo", metadata={"page": 0}), 1.0), + (Document(page_content="bar", metadata={"page": 1}), 0.0003), + ] + + +def test_hybrid_search_with_post_filter() -> None: + """Test hybrid search with post filter functionality.""" + metadatas = [{"page": i} for i in range(len(texts))] + docsearch = OpenSearchVectorSearch.from_texts( + texts, + ConsistentFakeEmbeddings(), + metadatas=metadatas, + opensearch_url=DEFAULT_OPENSEARCH_URL, + ) + output = docsearch.similarity_search( + query="foo", + k=2, + search_type="hybrid_search", + search_pipeline="test_search_pipeline", + post_filter={"bool": {"filter": {"term": {"metadata.page": 1}}}}, + ) + + assert output == [Document(page_content="bar", metadata={"page": 1})] + + def test_opensearch_with_metadatas() -> None: """Test end to end indexing and search with metadata.""" metadatas = [{"page": i} for i in range(len(texts))]