[OpenSearch] Add Self Query Retriever Support to OpenSearch (#11184)

### Description
Add Self Query Retriever Support to OpenSearch

### Maintainers
@rlancemartin, @eyurtsev, @navneet1v

### Twitter Handle
@OpenSearchProj

Signed-off-by: Naveen Tatikonda <navtat@amazon.com>
This commit is contained in:
Naveen Tatikonda
2023-09-28 14:36:52 -05:00
committed by GitHub
parent 0da484be2c
commit 9b0029b9c2
5 changed files with 628 additions and 0 deletions

View File

@@ -14,6 +14,7 @@ from langchain.retrievers.self_query.deeplake import DeepLakeTranslator
from langchain.retrievers.self_query.elasticsearch import ElasticsearchTranslator
from langchain.retrievers.self_query.milvus import MilvusTranslator
from langchain.retrievers.self_query.myscale import MyScaleTranslator
from langchain.retrievers.self_query.opensearch import OpenSearchTranslator
from langchain.retrievers.self_query.pinecone import PineconeTranslator
from langchain.retrievers.self_query.qdrant import QdrantTranslator
from langchain.retrievers.self_query.redis import RedisTranslator
@@ -30,6 +31,7 @@ from langchain.vectorstores import (
ElasticsearchStore,
Milvus,
MyScale,
OpenSearchVectorSearch,
Pinecone,
Qdrant,
Redis,
@@ -56,6 +58,7 @@ def _get_builtin_translator(vectorstore: VectorStore) -> Visitor:
Milvus: MilvusTranslator,
SupabaseVectorStore: SupabaseVectorTranslator,
TimescaleVector: TimescaleVectorTranslator,
OpenSearchVectorSearch: OpenSearchTranslator,
}
if isinstance(vectorstore, Qdrant):
return QdrantTranslator(metadata_key=vectorstore.metadata_payload_key)

View File

@@ -0,0 +1,84 @@
from typing import Dict, Tuple, Union
from langchain.chains.query_constructor.ir import (
Comparator,
Comparison,
Operation,
Operator,
StructuredQuery,
Visitor,
)
class OpenSearchTranslator(Visitor):
"""Translate `OpenSearch` internal query domain-specific
language elements to valid filters."""
allowed_comparators = [
Comparator.EQ,
Comparator.LT,
Comparator.LTE,
Comparator.GT,
Comparator.GTE,
Comparator.CONTAIN,
Comparator.LIKE,
]
"""Subset of allowed logical comparators."""
allowed_operators = [Operator.AND, Operator.OR, Operator.NOT]
"""Subset of allowed logical operators."""
def _format_func(self, func: Union[Operator, Comparator]) -> str:
self._validate_func(func)
comp_operator_map = {
Comparator.EQ: "term",
Comparator.LT: "lt",
Comparator.LTE: "lte",
Comparator.GT: "gt",
Comparator.GTE: "gte",
Comparator.CONTAIN: "match",
Comparator.LIKE: "fuzzy",
Operator.AND: "must",
Operator.OR: "should",
Operator.NOT: "must_not",
}
return comp_operator_map[func]
def visit_operation(self, operation: Operation) -> Dict:
args = [arg.accept(self) for arg in operation.arguments]
return {"bool": {self._format_func(operation.operator): args}}
def visit_comparison(self, comparison: Comparison) -> Dict:
field = f"metadata.{comparison.attribute}"
if comparison.comparator in [
Comparator.LT,
Comparator.LTE,
Comparator.GT,
Comparator.GTE,
]:
return {
"range": {
field: {self._format_func(comparison.comparator): comparison.value}
}
}
if comparison.comparator == Comparator.LIKE:
return {
self._format_func(comparison.comparator): {
field: {"value": comparison.value}
}
}
field = f"{field}.keyword" if isinstance(comparison.value, str) else field
return {self._format_func(comparison.comparator): {field: comparison.value}}
def visit_structured_query(
self, structured_query: StructuredQuery
) -> Tuple[str, dict]:
if structured_query.filter is None:
kwargs = {}
else:
kwargs = {"filter": structured_query.filter.accept(self)}
return structured_query.query, kwargs

View File

@@ -341,6 +341,7 @@ class OpenSearchVectorSearch(VectorStore):
http_auth = _get_kwargs_value(kwargs, "http_auth", None)
self.is_aoss = _is_aoss_enabled(http_auth=http_auth)
self.client = _get_opensearch_client(opensearch_url, **kwargs)
self.engine = _get_kwargs_value(kwargs, "engine", None)
@property
def embeddings(self) -> Embeddings:
@@ -528,6 +529,7 @@ class OpenSearchVectorSearch(VectorStore):
search_type = _get_kwargs_value(kwargs, "search_type", "approximate_search")
vector_field = _get_kwargs_value(kwargs, "vector_field", "vector_field")
index_name = _get_kwargs_value(kwargs, "index_name", self.index_name)
filter = _get_kwargs_value(kwargs, "filter", {})
if (
self.is_aoss
@@ -564,6 +566,17 @@ class OpenSearchVectorSearch(VectorStore):
"is invalid. `lucene_filter` is deprecated"
)
if (
efficient_filter == {}
and boolean_filter == {}
and lucene_filter == {}
and filter != {}
):
if self.engine in ["faiss", "lucene"]:
efficient_filter = filter
else:
boolean_filter = filter
if boolean_filter != {}:
search_query = _approximate_search_query_with_boolean_filter(
embedding,
@@ -745,6 +758,7 @@ class OpenSearchVectorSearch(VectorStore):
max_chunk_bytes = _get_kwargs_value(kwargs, "max_chunk_bytes", 1 * 1024 * 1024)
http_auth = _get_kwargs_value(kwargs, "http_auth", None)
is_aoss = _is_aoss_enabled(http_auth=http_auth)
engine = None
if is_aoss and not is_appx_search:
raise ValueError(
@@ -782,4 +796,5 @@ class OpenSearchVectorSearch(VectorStore):
max_chunk_bytes=max_chunk_bytes,
is_aoss=is_aoss,
)
kwargs["engine"] = engine
return cls(opensearch_url, index_name, embedding, **kwargs)

View File

@@ -0,0 +1,87 @@
from langchain.chains.query_constructor.ir import (
Comparator,
Comparison,
Operation,
Operator,
StructuredQuery,
)
from langchain.retrievers.self_query.opensearch import OpenSearchTranslator
DEFAULT_TRANSLATOR = OpenSearchTranslator()
def test_visit_comparison() -> None:
comp = Comparison(comparator=Comparator.EQ, attribute="foo", value="10")
expected = {"term": {"metadata.foo.keyword": "10"}}
actual = DEFAULT_TRANSLATOR.visit_comparison(comp)
assert expected == actual
def test_visit_operation() -> None:
op = Operation(
operator=Operator.AND,
arguments=[
Comparison(comparator=Comparator.GTE, attribute="bar", value=5),
Comparison(comparator=Comparator.LT, attribute="bar", value=10),
Comparison(comparator=Comparator.EQ, attribute="baz", value="abcd"),
],
)
expected = {
"bool": {
"must": [
{"range": {"metadata.bar": {"gte": 5}}},
{"range": {"metadata.bar": {"lt": 10}}},
{"term": {"metadata.baz.keyword": "abcd"}},
]
}
}
actual = DEFAULT_TRANSLATOR.visit_operation(op)
assert expected == actual
def test_visit_structured_query() -> None:
query = "What is the capital of France?"
operation = Operation(
operator=Operator.AND,
arguments=[
Comparison(comparator=Comparator.EQ, attribute="foo", value="20"),
Operation(
operator=Operator.OR,
arguments=[
Comparison(comparator=Comparator.LTE, attribute="bar", value=7),
Comparison(
comparator=Comparator.LIKE, attribute="baz", value="abc"
),
],
),
],
)
structured_query = StructuredQuery(query=query, filter=operation, limit=None)
expected = (
query,
{
"filter": {
"bool": {
"must": [
{"term": {"metadata.foo.keyword": "20"}},
{
"bool": {
"should": [
{"range": {"metadata.bar": {"lte": 7}}},
{
"fuzzy": {
"metadata.baz": {
"value": "abc",
}
}
},
]
}
},
]
}
}
},
)
actual = DEFAULT_TRANSLATOR.visit_structured_query(structured_query)
assert expected == actual