Add Neo4j vector index hybrid search (#10442)

Adding support for Neo4j vector index hybrid search option. In Neo4j,
you can achieve hybrid search by using a combination of vector and
fulltext indexes.

---------

Co-authored-by: Bagatur <baskaryan@gmail.com>
This commit is contained in:
Tomaz Bratanic
2023-09-14 17:29:16 +02:00
committed by GitHub
parent 596f294b01
commit e1e01d6586
3 changed files with 371 additions and 76 deletions

View File

@@ -1,5 +1,6 @@
from __future__ import annotations
import enum
import logging
import uuid
from typing import (
@@ -20,13 +21,44 @@ from langchain.vectorstores.base import VectorStore
from langchain.vectorstores.utils import DistanceStrategy
DEFAULT_DISTANCE_STRATEGY = DistanceStrategy.COSINE
distance_mapping = {
DISTANCE_MAPPING = {
DistanceStrategy.EUCLIDEAN_DISTANCE: "euclidean",
DistanceStrategy.COSINE: "cosine",
}
class SearchType(str, enum.Enum):
"""Enumerator of the Distance strategies."""
VECTOR = "vector"
HYBRID = "hybrid"
DEFAULT_SEARCH_TYPE = SearchType.VECTOR
def _get_search_index_query(search_type: SearchType) -> str:
type_to_query_map = {
SearchType.VECTOR: (
"CALL db.index.vector.queryNodes($index, $k, $embedding) YIELD node, score "
),
SearchType.HYBRID: (
"CALL { "
"CALL db.index.vector.queryNodes($index, $k, $embedding) "
"YIELD node, score "
"RETURN node, score UNION "
"CALL db.index.fulltext.queryNodes($keyword_index, $query, {limit: $k}) "
"YIELD node, score "
"WITH collect({node:node, score:score}) AS nodes, max(score) AS max "
"UNWIND nodes AS n "
"RETURN n.node AS node, (n.score / max) AS score " # We use 0 as min
"} "
"WITH node, max(score) AS score ORDER BY score DESC LIMIT $k " # dedup
),
}
return type_to_query_map[search_type]
def check_if_not_null(props: List[str], values: List[Any]) -> None:
for prop, value in zip(props, values):
if not value:
@@ -82,9 +114,11 @@ class Neo4jVector(VectorStore):
self,
embedding: Embeddings,
*,
search_type: SearchType = SearchType.VECTOR,
username: Optional[str] = None,
password: Optional[str] = None,
url: Optional[str] = None,
keyword_index_name: Optional[str] = "keyword",
database: str = "neo4j",
index_name: str = "vector",
node_label: str = "Chunk",
@@ -153,12 +187,14 @@ class Neo4jVector(VectorStore):
self.embedding = embedding
self._distance_strategy = distance_strategy
self.index_name = index_name
self.keyword_index_name = keyword_index_name
self.node_label = node_label
self.embedding_node_property = embedding_node_property
self.text_node_property = text_node_property
self.logger = logger or logging.getLogger(__name__)
self.override_relevance_score_fn = relevance_score_fn
self.retrieval_query = retrieval_query
self.search_type = search_type
# Calculate embedding dimension
self.embedding_dimension = len(embedding.embed_query("foo"))
@@ -263,6 +299,39 @@ class Neo4jVector(VectorStore):
except IndexError:
return None
def retrieve_existing_fts_index(self) -> Optional[str]:
"""
Check if the fulltext index exists in the Neo4j database
This method queries the Neo4j database for existing fts indexes
with the specified name.
Returns:
(Tuple): keyword index information
"""
index_information = self.query(
"SHOW INDEXES YIELD name, type, labelsOrTypes, properties, options "
"WHERE type = 'FULLTEXT' AND (name = $keyword_index_name "
"OR (labelsOrTypes = [$node_label] AND "
"properties = [$text_node_property])) "
"RETURN name, labelsOrTypes, properties, options ",
params={
"keyword_index_name": self.keyword_index_name,
"node_label": self.node_label,
"text_node_property": self.text_node_property,
},
)
# sort by index_name
index_information = sort_by_index_name(index_information, self.index_name)
try:
self.keyword_index_name = index_information[0]["name"]
self.text_node_property = index_information[0]["properties"][0]
node_label = index_information[0]["labelsOrTypes"][0]
return node_label
except IndexError:
return None
def create_new_index(self) -> None:
"""
This method constructs a Cypher query and executes it
@@ -282,10 +351,23 @@ class Neo4jVector(VectorStore):
"node_label": self.node_label,
"embedding_node_property": self.embedding_node_property,
"embedding_dimension": self.embedding_dimension,
"similarity_metric": distance_mapping[self._distance_strategy],
"similarity_metric": DISTANCE_MAPPING[self._distance_strategy],
}
self.query(index_query, params=parameters)
def create_new_keyword_index(self) -> None:
"""
This method constructs a Cypher query and executes it
to create a new full text index in Neo4j.
"""
fts_index_query = (
f"CREATE FULLTEXT INDEX {self.keyword_index_name} "
f"FOR (n:`{self.node_label}`) ON EACH "
f"[n.`{self.text_node_property}`]"
)
self.query(fts_index_query)
@property
def embeddings(self) -> Embeddings:
return self.embedding
@@ -299,6 +381,7 @@ class Neo4jVector(VectorStore):
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
create_id_index: bool = True,
search_type: SearchType = SearchType.VECTOR,
**kwargs: Any,
) -> Neo4jVector:
if ids is None:
@@ -309,13 +392,13 @@ class Neo4jVector(VectorStore):
store = cls(
embedding=embedding,
search_type=search_type,
**kwargs,
)
# Check if the index already exists
# Check if the vector index already exists
embedding_dimension = store.retrieve_existing_index()
# If the index doesn't exist yet
# If the vector index doesn't exist yet
if not embedding_dimension:
store.create_new_index()
# If the index already exists, check if embedding dimensions match
@@ -328,6 +411,17 @@ class Neo4jVector(VectorStore):
f"Vector index dimension: {embedding_dimension}"
)
if search_type == SearchType.HYBRID:
fts_node_label = store.retrieve_existing_fts_index()
# If the FTS index doesn't exist yet
if not fts_node_label:
store.create_new_keyword_index()
else: # Validate that FTS and Vector index use the same information
if not fts_node_label == store.node_label:
raise ValueError(
"Vector and keyword index don't index the same node label"
)
# Create unique constraint for faster import
if create_id_index:
store.query(
@@ -429,6 +523,7 @@ class Neo4jVector(VectorStore):
return self.similarity_search_by_vector(
embedding=embedding,
k=k,
query=query,
)
def similarity_search_with_score(
@@ -444,11 +539,13 @@ class Neo4jVector(VectorStore):
List of Documents most similar to the query and score for each
"""
embedding = self.embedding.embed_query(query)
docs = self.similarity_search_with_score_by_vector(embedding=embedding, k=k)
docs = self.similarity_search_with_score_by_vector(
embedding=embedding, k=k, query=query
)
return docs
def similarity_search_with_score_by_vector(
self, embedding: List[float], k: int = 4
self, embedding: List[float], k: int = 4, **kwargs: Any
) -> List[Tuple[Document, float]]:
"""
Perform a similarity search in the Neo4j database using a
@@ -478,12 +575,14 @@ class Neo4jVector(VectorStore):
self.retrieval_query if self.retrieval_query else default_retrieval
)
read_query = (
"CALL db.index.vector.queryNodes($index, $k, $embedding) "
"YIELD node, score "
) + retrieval_query
parameters = {"index": self.index_name, "k": k, "embedding": embedding}
read_query = _get_search_index_query(self.search_type) + retrieval_query
parameters = {
"index": self.index_name,
"k": k,
"embedding": embedding,
"keyword_index": self.keyword_index_name,
"query": kwargs["query"],
}
results = self.query(read_query, params=parameters)
@@ -517,7 +616,7 @@ class Neo4jVector(VectorStore):
List of Documents most similar to the query vector.
"""
docs_and_scores = self.similarity_search_with_score_by_vector(
embedding=embedding, k=k
embedding=embedding, k=k, **kwargs
)
return [doc for doc, _ in docs_and_scores]
@@ -596,6 +695,8 @@ class Neo4jVector(VectorStore):
cls: Type[Neo4jVector],
embedding: Embeddings,
index_name: str,
search_type: SearchType = DEFAULT_SEARCH_TYPE,
keyword_index_name: Optional[str] = None,
**kwargs: Any,
) -> Neo4jVector:
"""
@@ -607,9 +708,17 @@ class Neo4jVector(VectorStore):
the `index_name` definition.
"""
if search_type == SearchType.HYBRID and not keyword_index_name:
raise ValueError(
"keyword_index name has to be specified "
"when using hybrid search option"
)
store = cls(
embedding=embedding,
index_name=index_name,
keyword_index_name=keyword_index_name,
search_type=search_type,
**kwargs,
)
@@ -630,6 +739,20 @@ class Neo4jVector(VectorStore):
f"Vector index dimension: {embedding_dimension}"
)
if search_type == SearchType.HYBRID:
fts_node_label = store.retrieve_existing_fts_index()
# If the FTS index doesn't exist yet
if not fts_node_label:
raise ValueError(
"The specified keyword index name does not exist. "
"Make sure to check if you spelled it correctly"
)
else: # Validate that FTS and Vector index use the same information
if not fts_node_label == store.node_label:
raise ValueError(
"Vector and keyword index don't index the same node label"
)
return store
@classmethod

View File

@@ -3,7 +3,7 @@ import os
from typing import List
from langchain.docstore.document import Document
from langchain.vectorstores import Neo4jVector
from langchain.vectorstores.neo4j_vector import Neo4jVector, SearchType
from langchain.vectorstores.utils import DistanceStrategy
from tests.integration_tests.vectorstores.fake_embeddings import FakeEmbeddings
@@ -26,7 +26,7 @@ def drop_vector_indexes(store: Neo4jVector) -> None:
all_indexes = store.query(
"""
SHOW INDEXES YIELD name, type
WHERE type = "VECTOR"
WHERE type IN ["VECTOR", "FULLTEXT"]
RETURN name
"""
)
@@ -331,3 +331,142 @@ def test_neo4jvector_prefer_indexname_insert() -> None:
Document(page_content="foo", metadata={}),
]
drop_vector_indexes(existing_index)
def test_neo4jvector_hybrid() -> None:
"""Test end to end construction with hybrid search."""
text_embeddings = FakeEmbeddingsWithOsDimension().embed_documents(texts)
text_embedding_pairs = list(zip(texts, text_embeddings))
docsearch = Neo4jVector.from_embeddings(
text_embeddings=text_embedding_pairs,
embedding=FakeEmbeddingsWithOsDimension(),
url=url,
username=username,
password=password,
pre_delete_collection=True,
search_type=SearchType.HYBRID,
)
output = docsearch.similarity_search("foo", k=1)
assert output == [Document(page_content="foo")]
drop_vector_indexes(docsearch)
def test_neo4jvector_hybrid_deduplicate() -> None:
"""Test result deduplication with hybrid search."""
text_embeddings = FakeEmbeddingsWithOsDimension().embed_documents(texts)
text_embedding_pairs = list(zip(texts, text_embeddings))
docsearch = Neo4jVector.from_embeddings(
text_embeddings=text_embedding_pairs,
embedding=FakeEmbeddingsWithOsDimension(),
url=url,
username=username,
password=password,
pre_delete_collection=True,
search_type=SearchType.HYBRID,
)
output = docsearch.similarity_search("foo", k=3)
assert output == [
Document(page_content="foo"),
Document(page_content="bar"),
Document(page_content="baz"),
]
drop_vector_indexes(docsearch)
def test_neo4jvector_hybrid_retrieval_query() -> None:
"""Test custom retrieval_query with hybrid search."""
text_embeddings = FakeEmbeddingsWithOsDimension().embed_documents(texts)
text_embedding_pairs = list(zip(texts, text_embeddings))
docsearch = Neo4jVector.from_embeddings(
text_embeddings=text_embedding_pairs,
embedding=FakeEmbeddingsWithOsDimension(),
url=url,
username=username,
password=password,
pre_delete_collection=True,
search_type=SearchType.HYBRID,
retrieval_query="RETURN 'moo' AS text, score, {test: 'test'} AS metadata",
)
output = docsearch.similarity_search("foo", k=1)
assert output == [Document(page_content="moo", metadata={"test": "test"})]
drop_vector_indexes(docsearch)
def test_neo4jvector_hybrid_retrieval_query2() -> None:
"""Test custom retrieval_query with hybrid search."""
text_embeddings = FakeEmbeddingsWithOsDimension().embed_documents(texts)
text_embedding_pairs = list(zip(texts, text_embeddings))
docsearch = Neo4jVector.from_embeddings(
text_embeddings=text_embedding_pairs,
embedding=FakeEmbeddingsWithOsDimension(),
url=url,
username=username,
password=password,
pre_delete_collection=True,
search_type=SearchType.HYBRID,
retrieval_query="RETURN node.text AS text, score, {test: 'test'} AS metadata",
)
output = docsearch.similarity_search("foo", k=1)
assert output == [Document(page_content="foo", metadata={"test": "test"})]
drop_vector_indexes(docsearch)
def test_neo4jvector_missing_keyword() -> None:
"""Test hybrid search with missing keyword_index_search."""
text_embeddings = FakeEmbeddingsWithOsDimension().embed_documents(texts)
text_embedding_pairs = list(zip(texts, text_embeddings))
docsearch = Neo4jVector.from_embeddings(
text_embeddings=text_embedding_pairs,
embedding=FakeEmbeddingsWithOsDimension(),
url=url,
username=username,
password=password,
pre_delete_collection=True,
)
try:
Neo4jVector.from_existing_index(
embedding=FakeEmbeddingsWithOsDimension(),
url=url,
username=username,
password=password,
index_name="vector",
search_type=SearchType.HYBRID,
)
except ValueError as e:
assert str(e) == (
"keyword_index name has to be specified when " "using hybrid search option"
)
drop_vector_indexes(docsearch)
def test_neo4jvector_hybrid_from_existing() -> None:
"""Test hybrid search with missing keyword_index_search."""
text_embeddings = FakeEmbeddingsWithOsDimension().embed_documents(texts)
text_embedding_pairs = list(zip(texts, text_embeddings))
Neo4jVector.from_embeddings(
text_embeddings=text_embedding_pairs,
embedding=FakeEmbeddingsWithOsDimension(),
url=url,
username=username,
password=password,
pre_delete_collection=True,
search_type=SearchType.HYBRID,
)
existing = Neo4jVector.from_existing_index(
embedding=FakeEmbeddingsWithOsDimension(),
url=url,
username=username,
password=password,
index_name="vector",
keyword_index_name="keyword",
search_type=SearchType.HYBRID,
)
output = existing.similarity_search("foo", k=1)
assert output == [Document(page_content="foo")]
drop_vector_indexes(existing)