community[minor]: Opensearch hybridsearch implementation (#25375)

community: add hybrid search in opensearch

# Langchain OpenSearch Hybrid Search Implementation

## Implementation of Hybrid Search: 

I have taken LangChain's OpenSearch integration to the next level by
adding hybrid search capabilities. Building on the existing
OpenSearchVectorSearch class, I have implemented Hybrid Search
functionality (which combines the best of both keyword and semantic
search). This new functionality allows users to harness the power of
OpenSearch's advanced hybrid search features without leaving the
familiar LangChain ecosystem. By blending traditional text matching with
vector-based similarity, the enhanced class delivers more accurate and
contextually relevant results. It's designed to seamlessly fit into
existing LangChain workflows, making it easy for developers to upgrade
their search capabilities.

In implementing the hybrid search for OpenSearch within the LangChain
framework, I also incorporated filtering capabilities. It's important to
note that according to the OpenSearch hybrid search documentation, only
post-filtering is supported for hybrid queries. This means that the
filtering is applied after the hybrid search results are obtained,
rather than during the initial search process.

**Note:** For the implementation of hybrid search, I strictly followed
the official OpenSearch Hybrid search documentation and I took
inspiration from
https://github.com/AndreasThinks/langchain/tree/feature/opensearch_hybrid_search
Thanks Mate!  

### Experiments

I conducted few experiments to verify that the hybrid search
implementation is accurate and capable of reproducing the results of
both plain keyword search and vector search.

Experiment - 1
Hybrid Search
Keyword_weight: 1, vector_weight: 0

I conducted an experiment to verify the accuracy of my hybrid search
implementation by comparing it to a plain keyword search. For this test,
I set the keyword_weight to 1 and the vector_weight to 0 in the hybrid
search, effectively giving full weightage to the keyword component. The
results from this hybrid search configuration matched those of a plain
keyword search, confirming that my implementation can accurately
reproduce keyword-only search results when needed. It's important to
note that while the results were the same, the scores differed between
the two methods. This difference is expected because the plain keyword
search in OpenSearch uses the BM25 algorithm for scoring, whereas the
hybrid search still performs both keyword and vector searches before
normalizing the scores, even when the vector component is given zero
weight. This experiment validates that my hybrid search solution
correctly handles the keyword search component and properly applies the
weighting system, demonstrating its accuracy and flexibility in
emulating different search scenarios.


Experiment - 2
Hybrid Search
keyword_weight = 0.0, vector_weight = 1.0

For experiment-2, I took the inverse approach to further validate my
hybrid search implementation. I set the keyword_weight to 0 and the
vector_weight to 1, effectively giving full weightage to the vector
search component (KNN search). I then compared these results with a pure
vector search. The outcome was consistent with my expectations: the
results from the hybrid search with these settings exactly matched those
from a standalone vector search. This confirms that my implementation
accurately reproduces vector search results when configured to do so. As
with the first experiment, I observed that while the results were
identical, the scores differed between the two methods. This difference
in scoring is expected and can be attributed to the normalization
process in hybrid search, which still considers both components even
when one is given zero weight. This experiment further validates the
accuracy and flexibility of my hybrid search solution, demonstrating its
ability to effectively emulate pure vector search when needed while
maintaining the underlying hybrid search structure.



Experiment - 3
Hybrid Search - balanced

keyword_weight = 0.5, vector_weight = 0.5

For experiment-3, I adopted a balanced approach to further evaluate the
effectiveness of my hybrid search implementation. In this test, I set
both the keyword_weight and vector_weight to 0.5, giving equal
importance to keyword-based and vector-based search components. This
configuration aims to leverage the strengths of both search methods
simultaneously. By setting both weights to 0.5, I intended to create a
scenario where the hybrid search would consider lexical matches and
semantic similarity equally. This balanced approach is often ideal for
many real-world applications, as it can capture both exact keyword
matches and contextually relevant results that might not contain the
exact search terms.

Kindly verify the notebook for the experiments conducted!  

**Notebook:**
https://github.com/karthikbharadhwajKB/Langchain_OpenSearch_Hybrid_search/blob/main/Opensearch_Hybridsearch.ipynb

### Instructions to follow for Performing Hybrid Search:

**Step-1: Instantiating OpenSearchVectorSearch Class:**
```python
opensearch_vectorstore = OpenSearchVectorSearch(
    index_name=os.getenv("INDEX_NAME"),
    embedding_function=embedding_model,
    opensearch_url=os.getenv("OPENSEARCH_URL"),
    http_auth=(os.getenv("OPENSEARCH_USERNAME"),os.getenv("OPENSEARCH_PASSWORD")),
    use_ssl=False,
    verify_certs=False,
    ssl_assert_hostname=False,
    ssl_show_warn=False
)
```

**Parameters:**
1. **index_name:** The name of the OpenSearch index to use.
2. **embedding_function:** The function or model used to generate
embeddings for the documents. It's assumed that embedding_model is
defined elsewhere in the code.
3. **opensearch_url:** The URL of the OpenSearch instance.
4. **http_auth:** A tuple containing the username and password for
authentication.
5. **use_ssl:** Set to False, indicating that the connection to
OpenSearch is not using SSL/TLS encryption.
6. **verify_certs:** Set to False, which means the SSL certificates are
not being verified. This is often used in development environments but
is not recommended for production.
7. **ssl_assert_hostname:** Set to False, disabling hostname
verification in SSL certificates.
8. **ssl_show_warn:** Set to False, suppressing SSL-related warnings.

**Step-2: Configure Search Pipeline:**

To initiate hybrid search functionality, you need to configures a search
pipeline first.

**Implementation Details:**

This method configures a search pipeline in OpenSearch that:
1. Normalizes the scores from both keyword and vector searches using the
min-max technique.
2. Applies the specified weights to the normalized scores.
3. Calculates the final score using an arithmetic mean of the weighted,
normalized scores.


**Parameters:**

* **pipeline_name (str):** A unique identifier for the search pipeline.
It's recommended to use a descriptive name that indicates the weights
used for keyword and vector searches.
* **keyword_weight (float):** The weight assigned to the keyword search
component. This should be a float value between 0 and 1. In this
example, 0.3 gives 30% importance to traditional text matching.
* **vector_weight (float):** The weight assigned to the vector search
component. This should be a float value between 0 and 1. In this
example, 0.7 gives 70% importance to semantic similarity.

```python
opensearch_vectorstore.configure_search_pipelines(
    pipeline_name="search_pipeline_keyword_0.3_vector_0.7",
    keyword_weight=0.3,
    vector_weight=0.7,
)
```

**Step-3: Performing Hybrid Search:**

After creating the search pipeline, you can perform a hybrid search
using the `similarity_search()` method (or) any methods that are
supported by `langchain`. This method combines both `keyword-based and
semantic similarity` searches on your OpenSearch index, leveraging the
strengths of both traditional information retrieval and vector embedding
techniques.

**parameters:**
* **query:** The search query string.
* **k:** The number of top results to return (in this case, 3).
* **search_type:** Set to `hybrid_search` to use both keyword and vector
search capabilities.
* **search_pipeline:** The name of the previously created search
pipeline.

```python
query = "what are the country named in our database?"

top_k = 3

pipeline_name = "search_pipeline_keyword_0.3_vector_0.7"

matched_docs = opensearch_vectorstore.similarity_search_with_score(
                query=query,
                k=top_k,
                search_type="hybrid_search",
                search_pipeline = pipeline_name
            )

matched_docs
```

twitter handle: @iamkarthik98

---------

Co-authored-by: Karthik Kolluri <karthik.kolluri@eidosmedia.com>
Co-authored-by: Eugene Yurtsev <eyurtsev@gmail.com>
This commit is contained in:
Karthik Bharadhwaj 2024-12-13 22:34:12 +01:00 committed by GitHub
parent f3fb5a9c68
commit 498f0249e2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 317 additions and 0 deletions

View File

@ -22,6 +22,7 @@ Please install it with `pip install opensearch-py`."""
SCRIPT_SCORING_SEARCH = "script_scoring" SCRIPT_SCORING_SEARCH = "script_scoring"
PAINLESS_SCRIPTING_SEARCH = "painless_scripting" PAINLESS_SCRIPTING_SEARCH = "painless_scripting"
MATCH_ALL_QUERY = {"match_all": {}} # type: Dict MATCH_ALL_QUERY = {"match_all": {}} # type: Dict
HYBRID_SEARCH = "hybrid_search"
if TYPE_CHECKING: if TYPE_CHECKING:
from opensearchpy import AsyncOpenSearch, OpenSearch from opensearchpy import AsyncOpenSearch, OpenSearch
@ -372,6 +373,65 @@ def _default_painless_scripting_query(
} }
def _default_hybrid_search_query(
query_text: str, query_vector: List[float], k: int = 4
) -> Dict:
"""Returns payload for performing hybrid search for given options.
Args:
query_text: The query text to search for.
query_vector: The embedding vector (query) to search for.
k: Number of Documents to return. Defaults to 4.
Returns:
dict: The payload for hybrid search.
"""
payload = {
"_source": {"exclude": ["vector_field"]},
"query": {
"hybrid": {
"queries": [
{
"match": {
"text": {
"query": query_text,
}
}
},
{"knn": {"vector_field": {"vector": query_vector, "k": k}}},
]
}
},
"size": k,
}
return payload
def _hybrid_search_query_with_post_filter(
query_text: str,
query_vector: List[float],
k: int,
post_filter: Dict,
) -> Dict:
"""Returns payload for performing hybrid search with post filter.
Args:
query_text: The query text to search for.
query_vector: The embedding vector to search for.
k: Number of Documents to return.
post_filter: The post filter to apply.
Returns:
dict: The payload for hybrid search with post filter.
"""
search_query = _default_hybrid_search_query(query_text, query_vector, k)
search_query["post_filter"] = post_filter
return search_query
class OpenSearchVectorSearch(VectorStore): class OpenSearchVectorSearch(VectorStore):
"""`Amazon OpenSearch Vector Engine` vector store. """`Amazon OpenSearch Vector Engine` vector store.
@ -713,6 +773,122 @@ class OpenSearchVectorSearch(VectorStore):
item.get("delete", {}).get("error") for item in response["items"] item.get("delete", {}).get("error") for item in response["items"]
) )
def configure_search_pipelines(
self,
pipeline_name: str,
keyword_weight: float = 0.7,
vector_weight: float = 0.3,
) -> dict:
"""
Configures a search pipeline for hybrid search.
Args:
pipeline_name: Name of the pipeline
keyword_weight: Weight for keyword search
vector_weight: Weight for vector search
Returns:
response: Acknowledgement of the pipeline creation.
(if there is any error while configuring the pipeline, it will return None)
Raises:
Exception: If an error occurs
"""
if not pipeline_name.isidentifier():
raise ValueError(f"Invalid pipeline name: {pipeline_name}")
path = f"/_search/pipeline/{pipeline_name}"
payload = {
"description": "Post processor for hybrid search",
"phase_results_processors": [
{
"normalization-processor": {
"normalization": {"technique": "min_max"},
"combination": {
"technique": "arithmetic_mean",
"parameters": {"weights": [keyword_weight, vector_weight]},
},
}
}
],
}
response = self.client.transport.perform_request(
method="PUT", url=path, body=payload
)
return response
def search_pipeline_exists(self, pipeline_name: str) -> bool:
"""
Checks if a search pipeline exists.
Args:
pipeline_name: Name of the pipeline
Returns:
bool: True if the pipeline exists, False otherwise
Raises:
Exception: If an error occurs
Example:
>>> search_pipeline_exists("my_pipeline_1")
True
>>> search_pipeline_exists("my_pipeline_2")
False
"""
if not pipeline_name.isidentifier():
raise ValueError(f"Invalid pipeline name: {pipeline_name}")
existed_pipelines = self.client.transport.perform_request(
method="GET", url="/_search/pipeline/"
)
return pipeline_name in existed_pipelines
def get_search_pipeline_info(self, pipeline_name: str) -> Optional[Dict]:
"""
Get information about a search pipeline.
Args:
pipeline_name: Name of the pipeline
Returns:
dict: Information about the pipeline
None: If pipeline does not exist
Raises:
Exception: If an error occurs
Example:
>>> get_search_pipeline_info("my_pipeline_1")
{'search_pipeline_1': {
"description": "Post processor for hybrid search",
"phase_results_processors": [
{
"normalization-processor": {
"normalization": {"technique": "min_max"},
"combination": {
"technique": "arithmetic_mean",
"parameters": {"weights": [0.7, 0.3]}
}
}
}
]
}
}
>>> get_search_pipeline_info("my_pipeline_2")
None
"""
response = None
if not pipeline_name.isidentifier():
raise ValueError(f"Invalid pipeline name: {pipeline_name}")
response = self.client.transport.perform_request(
method="GET", url=f"/_search/pipeline/{pipeline_name}"
)
return response
@staticmethod @staticmethod
def _identity_fn(score: float) -> float: def _identity_fn(score: float) -> float:
return score return score
@ -837,6 +1013,8 @@ class OpenSearchVectorSearch(VectorStore):
Optional Args: Optional Args:
same as `similarity_search` same as `similarity_search`
""" """
# added query_text to kwargs for Hybrid Search
kwargs["query_text"] = query
embedding = self.embedding_function.embed_query(query) embedding = self.embedding_function.embed_query(query)
return self.similarity_search_with_score_by_vector( return self.similarity_search_with_score_by_vector(
embedding, k, score_threshold, **kwargs embedding, k, score_threshold, **kwargs
@ -1024,6 +1202,38 @@ class OpenSearchVectorSearch(VectorStore):
vector_field, vector_field,
score_threshold=score_threshold, score_threshold=score_threshold,
) )
elif search_type == HYBRID_SEARCH:
search_pipeline = kwargs.get("search_pipeline")
post_filter = kwargs.get("post_filter", {})
query_text = kwargs.get("query_text")
path = f"/{index_name}/_search?search_pipeline={search_pipeline}"
if query_text is None:
raise ValueError("query_text must be provided for hybrid search")
if search_pipeline is None:
raise ValueError("search_pipeline must be provided for hybrid search")
# embedding the query_text
embeded_query = self.embedding_function.embed_query(query_text)
# if post filter is provided
if post_filter != {}:
# hybrid search with post filter
payload = _hybrid_search_query_with_post_filter(
query_text, embeded_query, k, post_filter
)
else:
# hybrid search without post filter
payload = _default_hybrid_search_query(query_text, embeded_query, k)
response = self.client.transport.perform_request(
method="GET", url=path, body=payload
)
return [hit for hit in response["hits"]["hits"]]
else: else:
raise ValueError("Invalid `search_type` provided as an argument") raise ValueError("Invalid `search_type` provided as an argument")

View File

@ -4,6 +4,7 @@ import pytest
from langchain_core.documents import Document from langchain_core.documents import Document
from langchain_community.vectorstores.opensearch_vector_search import ( from langchain_community.vectorstores.opensearch_vector_search import (
HYBRID_SEARCH,
PAINLESS_SCRIPTING_SEARCH, PAINLESS_SCRIPTING_SEARCH,
SCRIPT_SCORING_SEARCH, SCRIPT_SCORING_SEARCH,
OpenSearchVectorSearch, OpenSearchVectorSearch,
@ -75,6 +76,112 @@ def test_opensearch_with_custom_field_name() -> None:
assert output == [Document(page_content="foo", id="id_foo")] assert output == [Document(page_content="foo", id="id_foo")]
def test_configure_search_pipeline() -> None:
"""Test configure search pipeline functionality."""
test_search_pipeline_name = "test_search_pipeline"
keyword_weight = 0.7
vector_weight = 0.3
docsearch = OpenSearchVectorSearch.from_texts(
texts, FakeEmbeddings(), opensearch_url=DEFAULT_OPENSEARCH_URL
)
docsearch.configure_search_pipelines(
pipeline_name=test_search_pipeline_name,
keyword_weight=keyword_weight,
vector_weight=vector_weight,
)
assert docsearch.search_pipeline_exists(test_search_pipeline_name)
def test_get_search_pipeline_info() -> None:
"""Test get search pipeline info functionality."""
test_search_pipeline_name = "test_search_pipeline"
docsearch = OpenSearchVectorSearch.from_texts(
texts, FakeEmbeddings(), opensearch_url=DEFAULT_OPENSEARCH_URL
)
test_pipeline_info = docsearch.get_search_pipeline_info(test_search_pipeline_name)
assert test_pipeline_info == {
"test_search_pipeline": {
"description": "Post processor for hybrid search",
"phase_results_processors": [
{
"normalization-processor": {
"normalization": {"technique": "min_max"},
"combination": {
"technique": "arithmetic_mean",
"parameters": {"weights": [0.7, 0.3]},
},
}
}
],
}
}
def test_hybrid_search() -> None:
"""Test hybrid search functionality."""
metadatas = [{"page": i} for i in range(len(texts))]
docsearch = OpenSearchVectorSearch.from_texts(
texts,
ConsistentFakeEmbeddings(),
metadatas=metadatas,
opensearch_url=DEFAULT_OPENSEARCH_URL,
)
output = docsearch.similarity_search(
query="foo",
k=2,
search_type=HYBRID_SEARCH,
search_pipeline="test_search_pipeline",
)
assert output == [
Document(page_content="foo", metadata={"page": 0}),
Document(page_content="bar", metadata={"page": 1}),
]
def test_hybrid_search_with_score() -> None:
"""Test hybrid search with score functionality."""
metadatas = [{"page": i} for i in range(len(texts))]
docsearch = OpenSearchVectorSearch.from_texts(
texts,
ConsistentFakeEmbeddings(),
metadatas=metadatas,
opensearch_url=DEFAULT_OPENSEARCH_URL,
)
output = docsearch.similarity_search_with_score(
query="foo",
k=2,
search_type=HYBRID_SEARCH,
search_pipeline="test_search_pipeline",
)
assert output == [
(Document(page_content="foo", metadata={"page": 0}), 1.0),
(Document(page_content="bar", metadata={"page": 1}), 0.0003),
]
def test_hybrid_search_with_post_filter() -> None:
"""Test hybrid search with post filter functionality."""
metadatas = [{"page": i} for i in range(len(texts))]
docsearch = OpenSearchVectorSearch.from_texts(
texts,
ConsistentFakeEmbeddings(),
metadatas=metadatas,
opensearch_url=DEFAULT_OPENSEARCH_URL,
)
output = docsearch.similarity_search(
query="foo",
k=2,
search_type="hybrid_search",
search_pipeline="test_search_pipeline",
post_filter={"bool": {"filter": {"term": {"metadata.page": 1}}}},
)
assert output == [Document(page_content="bar", metadata={"page": 1})]
def test_opensearch_with_metadatas() -> None: def test_opensearch_with_metadatas() -> None:
"""Test end to end indexing and search with metadata.""" """Test end to end indexing and search with metadata."""
metadatas = [{"page": i} for i in range(len(texts))] metadatas = [{"page": i} for i in range(len(texts))]