add from_existing_graph to neo4j vector (#11124)

This PR adds the option to create a Neo4jvector instance from existing
graph, which embeds existing text in the database and creates relevant
indices.
This commit is contained in:
Tomaz Bratanic
2023-09-29 00:02:26 +02:00
committed by GitHub
parent 2c952de21a
commit 7d25a65b10
3 changed files with 424 additions and 70 deletions

View File

@@ -2,6 +2,7 @@ from __future__ import annotations
import enum
import logging
import os
import uuid
from typing import (
Any,
@@ -148,7 +149,10 @@ class Neo4jVector(VectorStore):
)
# Handle if the credentials are environment variables
url = get_from_env("url", "NEO4J_URL", url)
# Support URL for backwards compatibility
url = os.environ.get("NEO4J_URL", url)
url = get_from_env("url", "NEO4J_URI", url)
username = get_from_env("username", "NEO4J_USERNAME", username)
password = get_from_env("password", "NEO4J_PASSWORD", password)
database = get_from_env("database", "NEO4J_DATABASE", database)
@@ -299,7 +303,9 @@ class Neo4jVector(VectorStore):
except IndexError:
return None
def retrieve_existing_fts_index(self) -> Optional[str]:
def retrieve_existing_fts_index(
self, text_node_properties: List[str] = []
) -> Optional[str]:
"""
Check if the fulltext index exists in the Neo4j database
@@ -314,12 +320,12 @@ class Neo4jVector(VectorStore):
"SHOW INDEXES YIELD name, type, labelsOrTypes, properties, options "
"WHERE type = 'FULLTEXT' AND (name = $keyword_index_name "
"OR (labelsOrTypes = [$node_label] AND "
"properties = [$text_node_property])) "
"properties = $text_node_property)) "
"RETURN name, labelsOrTypes, properties, options ",
params={
"keyword_index_name": self.keyword_index_name,
"node_label": self.node_label,
"text_node_property": self.text_node_property,
"text_node_property": text_node_properties or [self.text_node_property],
},
)
# sort by index_name
@@ -355,17 +361,17 @@ class Neo4jVector(VectorStore):
}
self.query(index_query, params=parameters)
def create_new_keyword_index(self) -> None:
def create_new_keyword_index(self, text_node_properties: List[str] = []) -> None:
"""
This method constructs a Cypher query and executes it
to create a new full text index in Neo4j.
"""
node_props = text_node_properties or [self.text_node_property]
fts_index_query = (
f"CREATE FULLTEXT INDEX {self.keyword_index_name} "
f"FOR (n:`{self.node_label}`) ON EACH "
f"[n.`{self.text_node_property}`]"
f"[{', '.join(['n.`' + el + '`' for el in node_props])}]"
)
self.query(fts_index_query)
@property
@@ -782,6 +788,131 @@ class Neo4jVector(VectorStore):
**kwargs,
)
@classmethod
def from_existing_graph(
cls: Type[Neo4jVector],
embedding: Embeddings,
node_label: str,
embedding_node_property: str,
text_node_properties: List[str],
*,
keyword_index_name: Optional[str] = "keyword",
index_name: str = "vector",
search_type: SearchType = DEFAULT_SEARCH_TYPE,
retrieval_query: str = "",
**kwargs: Any,
) -> Neo4jVector:
"""
Initialize and return a Neo4jVector instance from an existing graph.
This method initializes a Neo4jVector instance using the provided
parameters and the existing graph. It validates the existence of
the indices and creates new ones if they don't exist.
Returns:
Neo4jVector: An instance of Neo4jVector initialized with the provided parameters
and existing graph.
Example:
>>> neo4j_vector = Neo4jVector.from_existing_graph(
... embedding=my_embedding,
... node_label="Document",
... embedding_node_property="embedding",
... text_node_properties=["title", "content"]
... )
Note:
Neo4j credentials are required in the form of `url`, `username`, and `password`,
and optional `database` parameters passed as additional keyword arguments.
"""
# Validate the list is not empty
if not text_node_properties:
raise ValueError(
"Parameter `text_node_properties` must not be an empty list"
)
# Prefer retrieval query from params, otherwise construct it
if not retrieval_query:
retrieval_query = (
f"RETURN reduce(str='', k IN {text_node_properties} |"
" str + '\\n' + k + ': ' + coalesce(node[k], '')) AS text, "
"node {.*, `"
+ embedding_node_property
+ "`: Null, id: Null, "
+ ", ".join([f"`{prop}`: Null" for prop in text_node_properties])
+ "} AS metadata, score"
)
store = cls(
embedding=embedding,
index_name=index_name,
keyword_index_name=keyword_index_name,
search_type=search_type,
retrieval_query=retrieval_query,
node_label=node_label,
embedding_node_property=embedding_node_property,
**kwargs,
)
# Check if the vector index already exists
embedding_dimension = store.retrieve_existing_index()
# If the vector index doesn't exist yet
if not embedding_dimension:
store.create_new_index()
# If the index already exists, check if embedding dimensions match
elif not store.embedding_dimension == embedding_dimension:
raise ValueError(
f"Index with name {store.index_name} already exists."
"The provided embedding function and vector index "
"dimensions do not match.\n"
f"Embedding function dimension: {store.embedding_dimension}\n"
f"Vector index dimension: {embedding_dimension}"
)
# FTS index for Hybrid search
if search_type == SearchType.HYBRID:
fts_node_label = store.retrieve_existing_fts_index(text_node_properties)
# If the FTS index doesn't exist yet
if not fts_node_label:
store.create_new_keyword_index(text_node_properties)
else: # Validate that FTS and Vector index use the same information
if not fts_node_label == store.node_label:
raise ValueError(
"Vector and keyword index don't index the same node label"
)
# Populate embeddings
while True:
fetch_query = (
f"MATCH (n:`{node_label}`) "
f"WHERE n.{embedding_node_property} IS null "
"AND any(k in $props WHERE n[k] IS NOT null) "
f"RETURN elementId(n) AS id, reduce(str='',"
"k IN $props | str + '\\n' + k + ':' + coalesce(n[k], '')) AS text "
"LIMIT 1000"
)
data = store.query(fetch_query, params={"props": text_node_properties})
text_embeddings = embedding.embed_documents([el["text"] for el in data])
params = {
"data": [
{"id": el["id"], "embedding": embedding}
for el, embedding in zip(data, text_embeddings)
]
}
store.query(
"UNWIND $data AS row "
f"MATCH (n:`{node_label}`) "
"WHERE elementId(n) = row.id "
f"CALL db.create.setVectorProperty(n, "
f"'{embedding_node_property}', row.embedding) "
"YIELD node RETURN count(*)",
params=params,
)
# If embedding calculation should be stopped
if len(data) < 1000:
break
return store
def _select_relevance_score_fn(self) -> Callable[[float], float]:
"""
The 'correct' relevance function

View File

@@ -470,3 +470,147 @@ def test_neo4jvector_hybrid_from_existing() -> None:
assert output == [Document(page_content="foo")]
drop_vector_indexes(existing)
def test_neo4jvector_from_existing_graph() -> None:
"""Test from_existing_graph with a single property."""
graph = Neo4jVector.from_texts(
texts=["test"],
embedding=FakeEmbeddingsWithOsDimension(),
url=url,
username=username,
password=password,
index_name="foo",
node_label="Foo",
embedding_node_property="vector",
text_node_property="info",
pre_delete_collection=True,
)
graph.query("MATCH (n) DETACH DELETE n")
graph.query("CREATE (:Test {name:'Foo'})," "(:Test {name:'Bar'})")
existing = Neo4jVector.from_existing_graph(
embedding=FakeEmbeddingsWithOsDimension(),
url=url,
username=username,
password=password,
index_name="vector",
node_label="Test",
text_node_properties=["name"],
embedding_node_property="embedding",
)
output = existing.similarity_search("foo", k=1)
assert output == [Document(page_content="\nname: Foo")]
drop_vector_indexes(existing)
def test_neo4jvector_from_existing_graph_hybrid() -> None:
"""Test from_existing_graph hybrid with a single property."""
graph = Neo4jVector.from_texts(
texts=["test"],
embedding=FakeEmbeddingsWithOsDimension(),
url=url,
username=username,
password=password,
index_name="foo",
node_label="Foo",
embedding_node_property="vector",
text_node_property="info",
pre_delete_collection=True,
)
graph.query("MATCH (n) DETACH DELETE n")
graph.query("CREATE (:Test {name:'foo'})," "(:Test {name:'Bar'})")
existing = Neo4jVector.from_existing_graph(
embedding=FakeEmbeddingsWithOsDimension(),
url=url,
username=username,
password=password,
index_name="vector",
node_label="Test",
text_node_properties=["name"],
embedding_node_property="embedding",
search_type=SearchType.HYBRID,
)
output = existing.similarity_search("foo", k=1)
assert output == [Document(page_content="\nname: foo")]
drop_vector_indexes(existing)
def test_neo4jvector_from_existing_graph_multiple_properties() -> None:
"""Test from_existing_graph with a two property."""
graph = Neo4jVector.from_texts(
texts=["test"],
embedding=FakeEmbeddingsWithOsDimension(),
url=url,
username=username,
password=password,
index_name="foo",
node_label="Foo",
embedding_node_property="vector",
text_node_property="info",
pre_delete_collection=True,
)
graph.query("MATCH (n) DETACH DELETE n")
graph.query("CREATE (:Test {name:'Foo', name2: 'Fooz'})," "(:Test {name:'Bar'})")
existing = Neo4jVector.from_existing_graph(
embedding=FakeEmbeddingsWithOsDimension(),
url=url,
username=username,
password=password,
index_name="vector",
node_label="Test",
text_node_properties=["name", "name2"],
embedding_node_property="embedding",
)
output = existing.similarity_search("foo", k=1)
assert output == [Document(page_content="\nname: Foo\nname2: Fooz")]
drop_vector_indexes(existing)
def test_neo4jvector_from_existing_graph_multiple_properties_hybrid() -> None:
"""Test from_existing_graph with a two property."""
graph = Neo4jVector.from_texts(
texts=["test"],
embedding=FakeEmbeddingsWithOsDimension(),
url=url,
username=username,
password=password,
index_name="foo",
node_label="Foo",
embedding_node_property="vector",
text_node_property="info",
pre_delete_collection=True,
)
graph.query("MATCH (n) DETACH DELETE n")
graph.query("CREATE (:Test {name:'Foo', name2: 'Fooz'})," "(:Test {name:'Bar'})")
existing = Neo4jVector.from_existing_graph(
embedding=FakeEmbeddingsWithOsDimension(),
url=url,
username=username,
password=password,
index_name="vector",
node_label="Test",
text_node_properties=["name", "name2"],
embedding_node_property="embedding",
search_type=SearchType.HYBRID,
)
output = existing.similarity_search("foo", k=1)
assert output == [Document(page_content="\nname: Foo\nname2: Fooz")]
drop_vector_indexes(existing)