mirror of
https://github.com/hwchase17/langchain.git
synced 2025-06-24 15:43:54 +00:00
community[minor]: Add relyt vector database (#20316)
Co-authored-by: kaka <kaka@zbyte-inc.cloud> Co-authored-by: Bagatur <baskaryan@gmail.com> Co-authored-by: jingsi <jingsi@leadincloud.com> Co-authored-by: Bagatur <22008038+baskaryan@users.noreply.github.com>
This commit is contained in:
parent
f386f71bb3
commit
1202017c56
166
docs/docs/integrations/vectorstores/relyt.ipynb
Normal file
166
docs/docs/integrations/vectorstores/relyt.ipynb
Normal file
@ -0,0 +1,166 @@
|
||||
{
|
||||
"cells": [
|
||||
{
|
||||
"attachments": {},
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"# Relyt\n",
|
||||
"\n",
|
||||
">[Relyt](https://docs.relyt.cn/docs/vector-engine/use/) is a cloud native data warehousing service that is designed to analyze large volumes of data online.\n",
|
||||
"\n",
|
||||
">`Relyt` is compatible with the ANSI SQL 2003 syntax and the PostgreSQL and Oracle database ecosystems. Relyt also supports row store and column store. Relyt processes petabytes of data offline at a high performance level and supports highly concurrent online queries.\n",
|
||||
"\n",
|
||||
"This notebook shows how to use functionality related to the `Relyt` vector database.\n",
|
||||
"To run, you should have an [Relyt](https://docs.relyt.cn/) instance up and running:\n",
|
||||
"- Using [Relyt Vector Database](https://docs.relyt.cn/docs/vector-engine/use/). Click here to fast deploy it."
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"%pip install \"pgvecto_rs[sdk]\""
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {
|
||||
"tags": []
|
||||
},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"from langchain_community.document_loaders import TextLoader\n",
|
||||
"from langchain_community.embeddings.fake import FakeEmbeddings\n",
|
||||
"from langchain_community.vectorstores import Relyt\n",
|
||||
"from langchain_text_splitters import CharacterTextSplitter"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"Split documents and get embeddings by call community API"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 2,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"loader = TextLoader(\"../../modules/state_of_the_union.txt\")\n",
|
||||
"documents = loader.load()\n",
|
||||
"text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=0)\n",
|
||||
"docs = text_splitter.split_documents(documents)\n",
|
||||
"\n",
|
||||
"embeddings = FakeEmbeddings(size=1536)"
|
||||
]
|
||||
},
|
||||
{
|
||||
"attachments": {},
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"Connect to Relyt by setting related ENVIRONMENTS.\n",
|
||||
"```\n",
|
||||
"export PG_HOST={your_relyt_hostname}\n",
|
||||
"export PG_PORT={your_relyt_port} # Optional, default is 5432\n",
|
||||
"export PG_DATABASE={your_database} # Optional, default is postgres\n",
|
||||
"export PG_USER={database_username}\n",
|
||||
"export PG_PASSWORD={database_password}\n",
|
||||
"```\n",
|
||||
"\n",
|
||||
"Then store your embeddings and documents into Relyt"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 3,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"import os\n",
|
||||
"\n",
|
||||
"connection_string = Relyt.connection_string_from_db_params(\n",
|
||||
" driver=os.environ.get(\"PG_DRIVER\", \"psycopg2cffi\"),\n",
|
||||
" host=os.environ.get(\"PG_HOST\", \"localhost\"),\n",
|
||||
" port=int(os.environ.get(\"PG_PORT\", \"5432\")),\n",
|
||||
" database=os.environ.get(\"PG_DATABASE\", \"postgres\"),\n",
|
||||
" user=os.environ.get(\"PG_USER\", \"postgres\"),\n",
|
||||
" password=os.environ.get(\"PG_PASSWORD\", \"postgres\"),\n",
|
||||
")\n",
|
||||
"\n",
|
||||
"vector_db = Relyt.from_documents(\n",
|
||||
" docs,\n",
|
||||
" embeddings,\n",
|
||||
" connection_string=connection_string,\n",
|
||||
")"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"Query and retrieve data"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 4,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"query = \"What did the president say about Ketanji Brown Jackson\"\n",
|
||||
"docs = vector_db.similarity_search(query)"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 5,
|
||||
"metadata": {},
|
||||
"outputs": [
|
||||
{
|
||||
"name": "stdout",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
"Tonight. I call on the Senate to: Pass the Freedom to Vote Act. Pass the John Lewis Voting Rights Act. And while you’re at it, pass the Disclose Act so Americans can know who is funding our elections. \n",
|
||||
"\n",
|
||||
"Tonight, I’d like to honor someone who has dedicated his life to serve this country: Justice Stephen Breyer—an Army veteran, Constitutional scholar, and retiring Justice of the United States Supreme Court. Justice Breyer, thank you for your service. \n",
|
||||
"\n",
|
||||
"One of the most serious constitutional responsibilities a President has is nominating someone to serve on the United States Supreme Court. \n",
|
||||
"\n",
|
||||
"And I did that 4 days ago, when I nominated Circuit Court of Appeals Judge Ketanji Brown Jackson. One of our nation’s top legal minds, who will continue Justice Breyer’s legacy of excellence.\n"
|
||||
]
|
||||
}
|
||||
],
|
||||
"source": [
|
||||
"print(docs[0].page_content)"
|
||||
]
|
||||
}
|
||||
],
|
||||
"metadata": {
|
||||
"kernelspec": {
|
||||
"display_name": "Python 3 (ipykernel)",
|
||||
"language": "python",
|
||||
"name": "python3"
|
||||
},
|
||||
"language_info": {
|
||||
"codemirror_mode": {
|
||||
"name": "ipython",
|
||||
"version": 3
|
||||
},
|
||||
"file_extension": ".py",
|
||||
"mimetype": "text/x-python",
|
||||
"name": "python",
|
||||
"nbconvert_exporter": "python",
|
||||
"pygments_lexer": "ipython3",
|
||||
"version": "3.10.6"
|
||||
}
|
||||
},
|
||||
"nbformat": 4,
|
||||
"nbformat_minor": 4
|
||||
}
|
@ -196,6 +196,9 @@ if TYPE_CHECKING:
|
||||
from langchain_community.vectorstores.redis import (
|
||||
Redis, # noqa: F401
|
||||
)
|
||||
from langchain_community.vectorstores.relyt import (
|
||||
Relyt, # noqa: F401
|
||||
)
|
||||
from langchain_community.vectorstores.rocksetdb import (
|
||||
Rockset, # noqa: F401
|
||||
)
|
||||
@ -344,6 +347,7 @@ __all__ = [
|
||||
"Pinecone",
|
||||
"Qdrant",
|
||||
"Redis",
|
||||
"Relyt",
|
||||
"Rockset",
|
||||
"SKLearnVectorStore",
|
||||
"SQLiteVSS",
|
||||
@ -437,6 +441,7 @@ _module_lookup = {
|
||||
"Pinecone": "langchain_community.vectorstores.pinecone",
|
||||
"Qdrant": "langchain_community.vectorstores.qdrant",
|
||||
"Redis": "langchain_community.vectorstores.redis",
|
||||
"Relyt": "langchain_community.vectorstores.relyt",
|
||||
"Rockset": "langchain_community.vectorstores.rocksetdb",
|
||||
"SKLearnVectorStore": "langchain_community.vectorstores.sklearn",
|
||||
"SQLiteVSS": "langchain_community.vectorstores.sqlitevss",
|
||||
|
518
libs/community/langchain_community/vectorstores/relyt.py
Normal file
518
libs/community/langchain_community/vectorstores/relyt.py
Normal file
@ -0,0 +1,518 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import uuid
|
||||
from typing import Any, Callable, Dict, Iterable, List, Optional, Sequence, Tuple, Type
|
||||
|
||||
from sqlalchemy import Column, String, Table, create_engine, insert, text
|
||||
from sqlalchemy.dialects.postgresql import JSON, TEXT
|
||||
|
||||
try:
|
||||
from sqlalchemy.orm import declarative_base
|
||||
except ImportError:
|
||||
from sqlalchemy.ext.declarative import declarative_base
|
||||
|
||||
from langchain_core.documents import Document
|
||||
from langchain_core.embeddings import Embeddings
|
||||
from langchain_core.utils import get_from_dict_or_env
|
||||
from langchain_core.vectorstores import VectorStore
|
||||
|
||||
_LANGCHAIN_DEFAULT_EMBEDDING_DIM = 1536
|
||||
_LANGCHAIN_DEFAULT_COLLECTION_NAME = "langchain_document"
|
||||
|
||||
Base = declarative_base() # type: Any
|
||||
|
||||
|
||||
class Relyt(VectorStore):
|
||||
"""`Relyt` (distributed PostgreSQL) vector store.
|
||||
|
||||
Relyt is a distributed full postgresql syntax cloud-native database.
|
||||
- `connection_string` is a postgres connection string.
|
||||
- `embedding_function` any embedding function implementing
|
||||
`langchain.embeddings.base.Embeddings` interface.
|
||||
- `collection_name` is the name of the collection to use. (default: langchain)
|
||||
- NOTE: This is not the name of the table, but the name of the collection.
|
||||
The tables will be created when initializing the store (if not exists)
|
||||
So, make sure the user has the right permissions to create tables.
|
||||
- `pre_delete_collection` if True, will delete the collection if it exists.
|
||||
(default: False)
|
||||
- Useful for testing.
|
||||
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
connection_string: str,
|
||||
embedding_function: Embeddings,
|
||||
embedding_dimension: int = _LANGCHAIN_DEFAULT_EMBEDDING_DIM,
|
||||
collection_name: str = _LANGCHAIN_DEFAULT_COLLECTION_NAME,
|
||||
pre_delete_collection: bool = False,
|
||||
logger: Optional[logging.Logger] = None,
|
||||
engine_args: Optional[dict] = None,
|
||||
) -> None:
|
||||
"""Initialize a PGVecto_rs vectorstore.
|
||||
|
||||
Args:
|
||||
embedding: Embeddings to use.
|
||||
dimension: Dimension of the embeddings.
|
||||
db_url: Database URL.
|
||||
collection_name: Name of the collection.
|
||||
new_table: Whether to create a new table or connect to an existing one.
|
||||
If true, the table will be dropped if exists, then recreated.
|
||||
Defaults to False.
|
||||
"""
|
||||
try:
|
||||
from pgvecto_rs.sdk import PGVectoRs
|
||||
|
||||
PGVectoRs(
|
||||
db_url=connection_string,
|
||||
collection_name=collection_name,
|
||||
dimension=embedding_dimension,
|
||||
recreate=pre_delete_collection,
|
||||
)
|
||||
except ImportError as e:
|
||||
raise ImportError(
|
||||
"Unable to import pgvector_rs.sdk , please install with "
|
||||
'`pip install "pgvecto_rs[sdk]"`.'
|
||||
) from e
|
||||
|
||||
self.connection_string = connection_string
|
||||
self.embedding_function = embedding_function
|
||||
self.embedding_dimension = embedding_dimension
|
||||
self.collection_name = collection_name
|
||||
self.pre_delete_collection = pre_delete_collection
|
||||
self.logger = logger or logging.getLogger(__name__)
|
||||
self.__post_init__(engine_args)
|
||||
|
||||
def __post_init__(
|
||||
self,
|
||||
engine_args: Optional[dict] = None,
|
||||
) -> None:
|
||||
"""
|
||||
Initialize the store.
|
||||
"""
|
||||
|
||||
_engine_args = engine_args or {}
|
||||
|
||||
if (
|
||||
"pool_recycle" not in _engine_args
|
||||
): # Check if pool_recycle is not in _engine_args
|
||||
_engine_args[
|
||||
"pool_recycle"
|
||||
] = 3600 # Set pool_recycle to 3600s if not present
|
||||
|
||||
self.engine = create_engine(self.connection_string, **_engine_args)
|
||||
self.create_collection()
|
||||
|
||||
@property
|
||||
def embeddings(self) -> Embeddings:
|
||||
return self.embedding_function
|
||||
|
||||
def _select_relevance_score_fn(self) -> Callable[[float], float]:
|
||||
return self._euclidean_relevance_score_fn
|
||||
|
||||
def create_table_if_not_exists(self) -> None:
|
||||
# Define the dynamic table
|
||||
"""
|
||||
Table(
|
||||
self.collection_name,
|
||||
Base.metadata,
|
||||
Column("id", TEXT, primary_key=True, default=uuid.uuid4),
|
||||
Column("embedding", Vector(self.embedding_dimension)),
|
||||
Column("document", String, nullable=True),
|
||||
Column("metadata", JSON, nullable=True),
|
||||
extend_existing=True,
|
||||
)
|
||||
"""
|
||||
with self.engine.connect() as conn:
|
||||
with conn.begin():
|
||||
# create vectors
|
||||
conn.execute(text("CREATE EXTENSION IF NOT EXISTS vectors"))
|
||||
conn.execute(text('CREATE EXTENSION IF NOT EXISTS "uuid-ossp"'))
|
||||
|
||||
# Create the table
|
||||
# Base.metadata.create_all(conn)
|
||||
table_name = f"{self.collection_name}"
|
||||
table_query = text(
|
||||
f"""
|
||||
SELECT 1
|
||||
FROM pg_class
|
||||
WHERE relname = '{table_name}';
|
||||
"""
|
||||
)
|
||||
result = conn.execute(table_query).scalar()
|
||||
if not result:
|
||||
table_statement = text(
|
||||
f"""
|
||||
CREATE TABLE {table_name} (
|
||||
id TEXT PRIMARY KEY DEFAULT uuid_generate_v4(),
|
||||
embedding vector({self.embedding_dimension}),
|
||||
document TEXT,
|
||||
metadata JSON
|
||||
) USING heap;
|
||||
"""
|
||||
)
|
||||
conn.execute(table_statement)
|
||||
|
||||
# Check if the index exists
|
||||
index_name = f"{self.collection_name}_embedding_idx"
|
||||
index_query = text(
|
||||
f"""
|
||||
SELECT 1
|
||||
FROM pg_indexes
|
||||
WHERE indexname = '{index_name}';
|
||||
"""
|
||||
)
|
||||
result = conn.execute(index_query).scalar()
|
||||
|
||||
# Create the index if it doesn't exist
|
||||
if not result:
|
||||
index_statement = text(
|
||||
f"""
|
||||
CREATE INDEX {index_name}
|
||||
ON {self.collection_name}
|
||||
USING vectors (embedding vector_l2_ops)
|
||||
WITH (options = $$
|
||||
optimizing.optimizing_threads = 30
|
||||
segment.max_growing_segment_size = 600
|
||||
segment.max_sealed_segment_size = 30000000
|
||||
[indexing.hnsw]
|
||||
m=30
|
||||
ef_construction=500
|
||||
$$);
|
||||
"""
|
||||
)
|
||||
conn.execute(index_statement)
|
||||
|
||||
def create_collection(self) -> None:
|
||||
if self.pre_delete_collection:
|
||||
self.delete_collection()
|
||||
self.create_table_if_not_exists()
|
||||
|
||||
def delete_collection(self) -> None:
|
||||
self.logger.debug("Trying to delete collection")
|
||||
drop_statement = text(f"DROP TABLE IF EXISTS {self.collection_name};")
|
||||
with self.engine.connect() as conn:
|
||||
with conn.begin():
|
||||
conn.execute(drop_statement)
|
||||
|
||||
def add_texts(
|
||||
self,
|
||||
texts: Iterable[str],
|
||||
metadatas: Optional[List[dict]] = None,
|
||||
ids: Optional[List[str]] = None,
|
||||
batch_size: int = 500,
|
||||
**kwargs: Any,
|
||||
) -> List[str]:
|
||||
"""Run more texts through the embeddings and add to the vectorstore.
|
||||
|
||||
Args:
|
||||
texts: Iterable of strings to add to the vectorstore.
|
||||
metadatas: Optional list of metadatas associated with the texts.
|
||||
kwargs: vectorstore specific parameters
|
||||
|
||||
Returns:
|
||||
List of ids from adding the texts into the vectorstore.
|
||||
"""
|
||||
from pgvecto_rs.sqlalchemy import Vector
|
||||
|
||||
if ids is None:
|
||||
ids = [str(uuid.uuid1()) for _ in texts]
|
||||
|
||||
embeddings = self.embedding_function.embed_documents(list(texts))
|
||||
|
||||
if not metadatas:
|
||||
metadatas = [{} for _ in texts]
|
||||
|
||||
# Define the table schema
|
||||
chunks_table = Table(
|
||||
self.collection_name,
|
||||
Base.metadata,
|
||||
Column("id", TEXT, primary_key=True),
|
||||
Column("embedding", Vector(self.embedding_dimension)),
|
||||
Column("document", String, nullable=True),
|
||||
Column("metadata", JSON, nullable=True),
|
||||
extend_existing=True,
|
||||
)
|
||||
|
||||
chunks_table_data = []
|
||||
with self.engine.connect() as conn:
|
||||
with conn.begin():
|
||||
for document, metadata, chunk_id, embedding in zip(
|
||||
texts, metadatas, ids, embeddings
|
||||
):
|
||||
chunks_table_data.append(
|
||||
{
|
||||
"id": chunk_id,
|
||||
"embedding": embedding,
|
||||
"document": document,
|
||||
"metadata": metadata,
|
||||
}
|
||||
)
|
||||
|
||||
# Execute the batch insert when the batch size is reached
|
||||
if len(chunks_table_data) == batch_size:
|
||||
conn.execute(insert(chunks_table).values(chunks_table_data))
|
||||
# Clear the chunks_table_data list for the next batch
|
||||
chunks_table_data.clear()
|
||||
|
||||
# Insert any remaining records that didn't make up a full batch
|
||||
if chunks_table_data:
|
||||
conn.execute(insert(chunks_table).values(chunks_table_data))
|
||||
|
||||
return ids
|
||||
|
||||
def similarity_search(
|
||||
self,
|
||||
query: str,
|
||||
k: int = 4,
|
||||
filter: Optional[dict] = None,
|
||||
**kwargs: Any,
|
||||
) -> List[Document]:
|
||||
"""Run similarity search with AnalyticDB with distance.
|
||||
|
||||
Args:
|
||||
query (str): Query text to search for.
|
||||
k (int): Number of results to return. Defaults to 4.
|
||||
filter (Optional[Dict[str, str]]): Filter by metadata. Defaults to None.
|
||||
|
||||
Returns:
|
||||
List of Documents most similar to the query.
|
||||
"""
|
||||
embedding = self.embedding_function.embed_query(text=query)
|
||||
return self.similarity_search_by_vector(
|
||||
embedding=embedding,
|
||||
k=k,
|
||||
filter=filter,
|
||||
)
|
||||
|
||||
def similarity_search_with_score(
|
||||
self,
|
||||
query: str,
|
||||
k: int = 4,
|
||||
filter: Optional[dict] = None,
|
||||
) -> List[Tuple[Document, float]]:
|
||||
"""Return docs most similar to query.
|
||||
|
||||
Args:
|
||||
query: Text to look up documents similar to.
|
||||
k: Number of Documents to return. Defaults to 4.
|
||||
filter (Optional[Dict[str, str]]): Filter by metadata. Defaults to None.
|
||||
|
||||
Returns:
|
||||
List of Documents most similar to the query and score for each
|
||||
"""
|
||||
embedding = self.embedding_function.embed_query(query)
|
||||
docs = self.similarity_search_with_score_by_vector(
|
||||
embedding=embedding, k=k, filter=filter
|
||||
)
|
||||
return docs
|
||||
|
||||
def similarity_search_with_score_by_vector(
|
||||
self,
|
||||
embedding: List[float],
|
||||
k: int = 4,
|
||||
filter: Optional[dict] = None,
|
||||
) -> List[Tuple[Document, float]]:
|
||||
# Add the filter if provided
|
||||
try:
|
||||
from sqlalchemy.engine import Row
|
||||
except ImportError:
|
||||
raise ImportError(
|
||||
"Could not import Row from sqlalchemy.engine. "
|
||||
"Please 'pip install sqlalchemy>=1.4'."
|
||||
)
|
||||
|
||||
filter_condition = ""
|
||||
if filter is not None:
|
||||
conditions = [
|
||||
f"metadata->>{key!r} = {value!r}" for key, value in filter.items()
|
||||
]
|
||||
filter_condition = f"WHERE {' AND '.join(conditions)}"
|
||||
|
||||
# Define the base query
|
||||
sql_query = f"""
|
||||
set vectors.enable_search_growing = on;
|
||||
set vectors.enable_search_write = on;
|
||||
SELECT document, metadata, embedding <-> :embedding as distance
|
||||
FROM {self.collection_name}
|
||||
{filter_condition}
|
||||
ORDER BY embedding <-> :embedding
|
||||
LIMIT :k
|
||||
"""
|
||||
|
||||
# Set up the query parameters
|
||||
embedding_str = ", ".join(format(x) for x in embedding)
|
||||
embedding_str = "[" + embedding_str + "]"
|
||||
params = {"embedding": embedding_str, "k": k}
|
||||
|
||||
# Execute the query and fetch the results
|
||||
with self.engine.connect() as conn:
|
||||
results: Sequence[Row] = conn.execute(text(sql_query), params).fetchall()
|
||||
|
||||
documents_with_scores = [
|
||||
(
|
||||
Document(
|
||||
page_content=result.document,
|
||||
metadata=result.metadata,
|
||||
),
|
||||
result.distance if self.embedding_function is not None else None,
|
||||
)
|
||||
for result in results
|
||||
]
|
||||
return documents_with_scores
|
||||
|
||||
def similarity_search_by_vector(
|
||||
self,
|
||||
embedding: List[float],
|
||||
k: int = 4,
|
||||
filter: Optional[dict] = None,
|
||||
**kwargs: Any,
|
||||
) -> List[Document]:
|
||||
"""Return docs most similar to embedding vector.
|
||||
|
||||
Args:
|
||||
embedding: Embedding to look up documents similar to.
|
||||
k: Number of Documents to return. Defaults to 4.
|
||||
filter (Optional[Dict[str, str]]): Filter by metadata. Defaults to None.
|
||||
|
||||
Returns:
|
||||
List of Documents most similar to the query vector.
|
||||
"""
|
||||
docs_and_scores = self.similarity_search_with_score_by_vector(
|
||||
embedding=embedding, k=k, filter=filter
|
||||
)
|
||||
return [doc for doc, _ in docs_and_scores]
|
||||
|
||||
def delete(self, ids: Optional[List[str]] = None, **kwargs: Any) -> Optional[bool]:
|
||||
"""Delete by vector IDs.
|
||||
|
||||
Args:
|
||||
ids: List of ids to delete.
|
||||
"""
|
||||
from pgvecto_rs.sqlalchemy import Vector
|
||||
|
||||
if ids is None:
|
||||
raise ValueError("No ids provided to delete.")
|
||||
|
||||
# Define the table schema
|
||||
chunks_table = Table(
|
||||
self.collection_name,
|
||||
Base.metadata,
|
||||
Column("id", TEXT, primary_key=True),
|
||||
Column("embedding", Vector(self.embedding_dimension)),
|
||||
Column("document", String, nullable=True),
|
||||
Column("metadata", JSON, nullable=True),
|
||||
extend_existing=True,
|
||||
)
|
||||
|
||||
try:
|
||||
with self.engine.connect() as conn:
|
||||
with conn.begin():
|
||||
delete_condition = chunks_table.c.id.in_(ids)
|
||||
conn.execute(chunks_table.delete().where(delete_condition))
|
||||
return True
|
||||
except Exception as e:
|
||||
print("Delete operation failed:", str(e)) # noqa: T201
|
||||
return False
|
||||
|
||||
@classmethod
|
||||
def from_texts(
|
||||
cls: Type[Relyt],
|
||||
texts: List[str],
|
||||
embedding: Embeddings,
|
||||
metadatas: Optional[List[dict]] = None,
|
||||
embedding_dimension: int = _LANGCHAIN_DEFAULT_EMBEDDING_DIM,
|
||||
collection_name: str = _LANGCHAIN_DEFAULT_COLLECTION_NAME,
|
||||
ids: Optional[List[str]] = None,
|
||||
pre_delete_collection: bool = False,
|
||||
engine_args: Optional[dict] = None,
|
||||
**kwargs: Any,
|
||||
) -> Relyt:
|
||||
"""
|
||||
Return VectorStore initialized from texts and embeddings.
|
||||
Postgres Connection string is required
|
||||
Either pass it as a parameter
|
||||
or set the PG_CONNECTION_STRING environment variable.
|
||||
"""
|
||||
|
||||
connection_string = cls.get_connection_string(kwargs)
|
||||
|
||||
store = cls(
|
||||
connection_string=connection_string,
|
||||
collection_name=collection_name,
|
||||
embedding_function=embedding,
|
||||
embedding_dimension=embedding_dimension,
|
||||
pre_delete_collection=pre_delete_collection,
|
||||
engine_args=engine_args,
|
||||
)
|
||||
|
||||
store.add_texts(texts=texts, metadatas=metadatas, ids=ids, **kwargs)
|
||||
return store
|
||||
|
||||
@classmethod
|
||||
def get_connection_string(cls, kwargs: Dict[str, Any]) -> str:
|
||||
connection_string: str = get_from_dict_or_env(
|
||||
data=kwargs,
|
||||
key="connection_string",
|
||||
env_key="PG_CONNECTION_STRING",
|
||||
)
|
||||
|
||||
if not connection_string:
|
||||
raise ValueError(
|
||||
"Postgres connection string is required"
|
||||
"Either pass it as a parameter"
|
||||
"or set the PG_CONNECTION_STRING environment variable."
|
||||
)
|
||||
|
||||
return connection_string
|
||||
|
||||
@classmethod
|
||||
def from_documents(
|
||||
cls: Type[Relyt],
|
||||
documents: List[Document],
|
||||
embedding: Embeddings,
|
||||
embedding_dimension: int = _LANGCHAIN_DEFAULT_EMBEDDING_DIM,
|
||||
collection_name: str = _LANGCHAIN_DEFAULT_COLLECTION_NAME,
|
||||
ids: Optional[List[str]] = None,
|
||||
pre_delete_collection: bool = False,
|
||||
engine_args: Optional[dict] = None,
|
||||
**kwargs: Any,
|
||||
) -> Relyt:
|
||||
"""
|
||||
Return VectorStore initialized from documents and embeddings.
|
||||
Postgres Connection string is required
|
||||
Either pass it as a parameter
|
||||
or set the PG_CONNECTION_STRING environment variable.
|
||||
"""
|
||||
|
||||
texts = [d.page_content for d in documents]
|
||||
metadatas = [d.metadata for d in documents]
|
||||
connection_string = cls.get_connection_string(kwargs)
|
||||
|
||||
kwargs["connection_string"] = connection_string
|
||||
|
||||
return cls.from_texts(
|
||||
texts=texts,
|
||||
pre_delete_collection=pre_delete_collection,
|
||||
embedding=embedding,
|
||||
embedding_dimension=embedding_dimension,
|
||||
metadatas=metadatas,
|
||||
ids=ids,
|
||||
collection_name=collection_name,
|
||||
engine_args=engine_args,
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def connection_string_from_db_params(
|
||||
cls,
|
||||
driver: str,
|
||||
host: str,
|
||||
port: int,
|
||||
database: str,
|
||||
user: str,
|
||||
password: str,
|
||||
) -> str:
|
||||
"""Return connection string from database parameters."""
|
||||
return f"postgresql+{driver}://{user}:{password}@{host}:{port}/{database}"
|
@ -0,0 +1,167 @@
|
||||
"""Test Relyt functionality."""
|
||||
import os
|
||||
from typing import List
|
||||
|
||||
from langchain_core.documents import Document
|
||||
|
||||
from langchain_community.vectorstores.relyt import Relyt
|
||||
from tests.integration_tests.vectorstores.fake_embeddings import FakeEmbeddings
|
||||
|
||||
CONNECTION_STRING = Relyt.connection_string_from_db_params(
|
||||
driver=os.environ.get("PG_DRIVER", "psycopg2cffi"),
|
||||
host=os.environ.get("PG_HOST", "localhost"),
|
||||
port=int(os.environ.get("PG_PORT", "5432")),
|
||||
database=os.environ.get("PG_DATABASE", "postgres"),
|
||||
user=os.environ.get("PG_USER", "postgres"),
|
||||
password=os.environ.get("PG_PASSWORD", "postgres"),
|
||||
)
|
||||
|
||||
|
||||
ADA_TOKEN_COUNT = 1536
|
||||
|
||||
|
||||
class FakeEmbeddingsWithAdaDimension(FakeEmbeddings):
|
||||
"""Fake embeddings functionality for testing."""
|
||||
|
||||
def embed_documents(self, texts: List[str]) -> List[List[float]]:
|
||||
"""Return simple embeddings."""
|
||||
return [
|
||||
[float(1.0)] * (ADA_TOKEN_COUNT - 1) + [float(i)] for i in range(len(texts))
|
||||
]
|
||||
|
||||
def embed_query(self, text: str) -> List[float]:
|
||||
"""Return simple embeddings."""
|
||||
return [float(1.0)] * (ADA_TOKEN_COUNT - 1) + [float(0.0)]
|
||||
|
||||
|
||||
def test_relyt() -> None:
|
||||
"""Test end to end construction and search."""
|
||||
texts = ["foo", "bar", "baz"]
|
||||
docsearch = Relyt.from_texts(
|
||||
texts=texts,
|
||||
collection_name="test_collection",
|
||||
embedding=FakeEmbeddingsWithAdaDimension(),
|
||||
connection_string=CONNECTION_STRING,
|
||||
pre_delete_collection=True,
|
||||
)
|
||||
output = docsearch.similarity_search("foo", k=1)
|
||||
assert output == [Document(page_content="foo")]
|
||||
|
||||
|
||||
def test_relyt_with_engine_args() -> None:
|
||||
engine_args = {"pool_recycle": 3600, "pool_size": 50}
|
||||
"""Test end to end construction and search."""
|
||||
texts = ["foo", "bar", "baz"]
|
||||
docsearch = Relyt.from_texts(
|
||||
texts=texts,
|
||||
collection_name="test_collection",
|
||||
embedding=FakeEmbeddingsWithAdaDimension(),
|
||||
connection_string=CONNECTION_STRING,
|
||||
pre_delete_collection=True,
|
||||
engine_args=engine_args,
|
||||
)
|
||||
output = docsearch.similarity_search("foo", k=1)
|
||||
assert output == [Document(page_content="foo")]
|
||||
|
||||
|
||||
def test_relyt_with_metadatas() -> None:
|
||||
"""Test end to end construction and search."""
|
||||
texts = ["foo", "bar", "baz"]
|
||||
metadatas = [{"page": str(i)} for i in range(len(texts))]
|
||||
docsearch = Relyt.from_texts(
|
||||
texts=texts,
|
||||
collection_name="test_collection",
|
||||
embedding=FakeEmbeddingsWithAdaDimension(),
|
||||
metadatas=metadatas,
|
||||
connection_string=CONNECTION_STRING,
|
||||
pre_delete_collection=True,
|
||||
)
|
||||
output = docsearch.similarity_search("foo", k=1)
|
||||
assert output == [Document(page_content="foo", metadata={"page": "0"})]
|
||||
|
||||
|
||||
def test_relyt_with_metadatas_with_scores() -> None:
|
||||
"""Test end to end construction and search."""
|
||||
texts = ["foo", "bar", "baz"]
|
||||
metadatas = [{"page": str(i)} for i in range(len(texts))]
|
||||
docsearch = Relyt.from_texts(
|
||||
texts=texts,
|
||||
collection_name="test_collection",
|
||||
embedding=FakeEmbeddingsWithAdaDimension(),
|
||||
metadatas=metadatas,
|
||||
connection_string=CONNECTION_STRING,
|
||||
pre_delete_collection=True,
|
||||
)
|
||||
output = docsearch.similarity_search_with_score("foo", k=1)
|
||||
assert output == [(Document(page_content="foo", metadata={"page": "0"}), 0.0)]
|
||||
|
||||
|
||||
def test_relyt_with_filter_match() -> None:
|
||||
"""Test end to end construction and search."""
|
||||
texts = ["foo", "bar", "baz"]
|
||||
metadatas = [{"page": str(i)} for i in range(len(texts))]
|
||||
docsearch = Relyt.from_texts(
|
||||
texts=texts,
|
||||
collection_name="test_collection_filter",
|
||||
embedding=FakeEmbeddingsWithAdaDimension(),
|
||||
metadatas=metadatas,
|
||||
connection_string=CONNECTION_STRING,
|
||||
pre_delete_collection=True,
|
||||
)
|
||||
output = docsearch.similarity_search_with_score("foo", k=1, filter={"page": "0"})
|
||||
assert output == [(Document(page_content="foo", metadata={"page": "0"}), 0.0)]
|
||||
|
||||
|
||||
def test_relyt_with_filter_distant_match() -> None:
|
||||
"""Test end to end construction and search."""
|
||||
texts = ["foo", "bar", "baz"]
|
||||
metadatas = [{"page": str(i)} for i in range(len(texts))]
|
||||
docsearch = Relyt.from_texts(
|
||||
texts=texts,
|
||||
collection_name="test_collection_filter",
|
||||
embedding=FakeEmbeddingsWithAdaDimension(),
|
||||
metadatas=metadatas,
|
||||
connection_string=CONNECTION_STRING,
|
||||
pre_delete_collection=True,
|
||||
)
|
||||
output = docsearch.similarity_search_with_score("foo", k=1, filter={"page": "2"})
|
||||
print(output) # noqa: T201
|
||||
assert output == [(Document(page_content="baz", metadata={"page": "2"}), 4.0)]
|
||||
|
||||
|
||||
def test_relyt_with_filter_no_match() -> None:
|
||||
"""Test end to end construction and search."""
|
||||
texts = ["foo", "bar", "baz"]
|
||||
metadatas = [{"page": str(i)} for i in range(len(texts))]
|
||||
docsearch = Relyt.from_texts(
|
||||
texts=texts,
|
||||
collection_name="test_collection_filter",
|
||||
embedding=FakeEmbeddingsWithAdaDimension(),
|
||||
metadatas=metadatas,
|
||||
connection_string=CONNECTION_STRING,
|
||||
pre_delete_collection=True,
|
||||
)
|
||||
output = docsearch.similarity_search_with_score("foo", k=1, filter={"page": "5"})
|
||||
assert output == []
|
||||
|
||||
|
||||
def test_relyt_delete() -> None:
|
||||
"""Test end to end construction and search."""
|
||||
texts = ["foo", "bar", "baz"]
|
||||
ids = ["fooid", "barid", "bazid"]
|
||||
metadatas = [{"page": str(i)} for i in range(len(texts))]
|
||||
docsearch = Relyt.from_texts(
|
||||
texts=texts,
|
||||
collection_name="test_collection_delete",
|
||||
embedding=FakeEmbeddingsWithAdaDimension(),
|
||||
metadatas=metadatas,
|
||||
connection_string=CONNECTION_STRING,
|
||||
ids=ids,
|
||||
pre_delete_collection=True,
|
||||
)
|
||||
output = docsearch.similarity_search_with_score("foo", k=1, filter={"page": "2"})
|
||||
print(output) # noqa: T201
|
||||
assert output == [(Document(page_content="baz", metadata={"page": "2"}), 4.0)]
|
||||
docsearch.delete(ids=ids)
|
||||
output = docsearch.similarity_search_with_score("foo", k=1, filter={"page": "2"})
|
||||
assert output == []
|
@ -66,6 +66,7 @@ EXPECTED_ALL = [
|
||||
"Pinecone",
|
||||
"Qdrant",
|
||||
"Redis",
|
||||
"Relyt",
|
||||
"Rockset",
|
||||
"SKLearnVectorStore",
|
||||
"SQLiteVSS",
|
||||
|
@ -76,6 +76,7 @@ def test_compatible_vectorstore_documentation() -> None:
|
||||
"Pinecone",
|
||||
"Qdrant",
|
||||
"Redis",
|
||||
"Relyt",
|
||||
"Rockset",
|
||||
"ScaNN",
|
||||
"SemaDB",
|
||||
|
@ -61,6 +61,7 @@ _EXPECTED = [
|
||||
"Pinecone",
|
||||
"Qdrant",
|
||||
"Redis",
|
||||
"Relyt",
|
||||
"Rockset",
|
||||
"SKLearnVectorStore",
|
||||
"ScaNN",
|
||||
|
Loading…
Reference in New Issue
Block a user