Add integration for Timescale Vector(Postgres) (#10650)

**Description:**
This commit adds a vector store for the Postgres-based vector database
(`TimescaleVector`).

Timescale Vector(https://www.timescale.com/ai) is PostgreSQL++ for AI
applications. It enables you to efficiently store and query billions of
vector embeddings in `PostgreSQL`:
- Enhances `pgvector` with faster and more accurate similarity search on
1B+ vectors via DiskANN inspired indexing algorithm.
- Enables fast time-based vector search via automatic time-based
partitioning and indexing.
- Provides a familiar SQL interface for querying vector embeddings and
relational data.

Timescale Vector scales with you from POC to production:
- Simplifies operations by enabling you to store relational metadata,
vector embeddings, and time-series data in a single database.
- Benefits from rock-solid PostgreSQL foundation with enterprise-grade
feature liked streaming backups and replication, high-availability and
row-level security.
- Enables a worry-free experience with enterprise-grade security and
compliance.

Timescale Vector is available on Timescale, the cloud PostgreSQL
platform. (There is no self-hosted version at this time.) LangChain
users get a 90-day free trial for Timescale Vector.

---------

Co-authored-by: Bagatur <baskaryan@gmail.com>
Co-authored-by: Avthar Sewrathan <avthar@timescale.com>
This commit is contained in:
Matvey Arye
2023-09-21 09:33:37 -05:00
committed by GitHub
parent 55570e54e1
commit 6e02c45ca4
10 changed files with 3863 additions and 470 deletions

View File

@@ -18,6 +18,7 @@ from langchain.retrievers.self_query.pinecone import PineconeTranslator
from langchain.retrievers.self_query.qdrant import QdrantTranslator
from langchain.retrievers.self_query.redis import RedisTranslator
from langchain.retrievers.self_query.supabase import SupabaseVectorTranslator
from langchain.retrievers.self_query.timescalevector import TimescaleVectorTranslator
from langchain.retrievers.self_query.vectara import VectaraTranslator
from langchain.retrievers.self_query.weaviate import WeaviateTranslator
from langchain.schema import BaseRetriever, Document
@@ -33,6 +34,7 @@ from langchain.vectorstores import (
Qdrant,
Redis,
SupabaseVectorStore,
TimescaleVector,
Vectara,
VectorStore,
Weaviate,
@@ -53,6 +55,7 @@ def _get_builtin_translator(vectorstore: VectorStore) -> Visitor:
ElasticsearchStore: ElasticsearchTranslator,
Milvus: MilvusTranslator,
SupabaseVectorStore: SupabaseVectorTranslator,
TimescaleVector: TimescaleVectorTranslator,
}
if isinstance(vectorstore, Qdrant):
return QdrantTranslator(metadata_key=vectorstore.metadata_payload_key)

View File

@@ -0,0 +1,84 @@
from __future__ import annotations
from typing import TYPE_CHECKING, Tuple, Union
from langchain.chains.query_constructor.ir import (
Comparator,
Comparison,
Operation,
Operator,
StructuredQuery,
Visitor,
)
if TYPE_CHECKING:
from timescale_vector import client
class TimescaleVectorTranslator(Visitor):
"""Translate the internal query language elements to valid filters."""
allowed_operators = [Operator.AND, Operator.OR, Operator.NOT]
"""Subset of allowed logical operators."""
allowed_comparators = [
Comparator.EQ,
Comparator.GT,
Comparator.GTE,
Comparator.LT,
Comparator.LTE,
]
COMPARATOR_MAP = {
Comparator.EQ: "==",
Comparator.GT: ">",
Comparator.GTE: ">=",
Comparator.LT: "<",
Comparator.LTE: "<=",
}
OPERATOR_MAP = {Operator.AND: "AND", Operator.OR: "OR", Operator.NOT: "NOT"}
def _format_func(self, func: Union[Operator, Comparator]) -> str:
self._validate_func(func)
if isinstance(func, Operator):
value = self.OPERATOR_MAP[func.value] # type: ignore
elif isinstance(func, Comparator):
value = self.COMPARATOR_MAP[func.value] # type: ignore
return f"{value}"
def visit_operation(self, operation: Operation) -> client.Predicates:
try:
from timescale_vector import client
except ImportError as e:
raise ImportError(
"Cannot import timescale-vector. Please install with `pip install "
"timescale-vector`."
) from e
args = [arg.accept(self) for arg in operation.arguments]
return client.Predicates(*args, operator=self._format_func(operation.operator))
def visit_comparison(self, comparison: Comparison) -> client.Predicates:
try:
from timescale_vector import client
except ImportError as e:
raise ImportError(
"Cannot import timescale-vector. Please install with `pip install "
"timescale-vector`."
) from e
return client.Predicates(
(
comparison.attribute,
self._format_func(comparison.comparator),
comparison.value,
)
)
def visit_structured_query(
self, structured_query: StructuredQuery
) -> Tuple[str, dict]:
if structured_query.filter is None:
kwargs = {}
else:
kwargs = {"predicates": structured_query.filter.accept(self)}
return structured_query.query, kwargs

View File

@@ -70,6 +70,7 @@ from langchain.vectorstores.supabase import SupabaseVectorStore
from langchain.vectorstores.tair import Tair
from langchain.vectorstores.tencentvectordb import TencentVectorDB
from langchain.vectorstores.tigris import Tigris
from langchain.vectorstores.timescalevector import TimescaleVector
from langchain.vectorstores.typesense import Typesense
from langchain.vectorstores.usearch import USearch
from langchain.vectorstores.vald import Vald
@@ -135,6 +136,7 @@ __all__ = [
"SupabaseVectorStore",
"Tair",
"Tigris",
"TimescaleVector",
"Typesense",
"USearch",
"Vald",

View File

@@ -0,0 +1,871 @@
"""VectorStore wrapper around a Postgres-TimescaleVector database."""
from __future__ import annotations
import enum
import logging
import uuid
from datetime import timedelta
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
Iterable,
List,
Optional,
Tuple,
Type,
Union,
)
from langchain.docstore.document import Document
from langchain.embeddings.base import Embeddings
from langchain.utils import get_from_dict_or_env
from langchain.vectorstores.base import VectorStore
from langchain.vectorstores.utils import DistanceStrategy
if TYPE_CHECKING:
from timescale_vector import Predicates
DEFAULT_DISTANCE_STRATEGY = DistanceStrategy.COSINE
ADA_TOKEN_COUNT = 1536
_LANGCHAIN_DEFAULT_COLLECTION_NAME = "langchain_store"
class TimescaleVector(VectorStore):
"""VectorStore implementation using the timescale vector client to store vectors
in Postgres.
To use, you should have the ``timescale_vector`` python package installed.
Args:
service_url: Service url on timescale cloud.
embedding: Any embedding function implementing
`langchain.embeddings.base.Embeddings` interface.
collection_name: The name of the collection to use. (default: langchain_store)
This will become the table name used for the collection.
distance_strategy: The distance strategy to use. (default: COSINE)
pre_delete_collection: If True, will delete the collection if it exists.
(default: False). Useful for testing.
Example:
.. code-block:: python
from langchain.vectorstores import TimescaleVector
from langchain.embeddings.openai import OpenAIEmbeddings
SERVICE_URL = "postgres://tsdbadmin:<password>@<id>.tsdb.cloud.timescale.com:<port>/tsdb?sslmode=require"
COLLECTION_NAME = "state_of_the_union_test"
embeddings = OpenAIEmbeddings()
vectorestore = TimescaleVector.from_documents(
embedding=embeddings,
documents=docs,
collection_name=COLLECTION_NAME,
service_url=SERVICE_URL,
)
""" # noqa: E501
def __init__(
self,
service_url: str,
embedding: Embeddings,
collection_name: str = _LANGCHAIN_DEFAULT_COLLECTION_NAME,
num_dimensions: int = ADA_TOKEN_COUNT,
distance_strategy: DistanceStrategy = DEFAULT_DISTANCE_STRATEGY,
pre_delete_collection: bool = False,
logger: Optional[logging.Logger] = None,
relevance_score_fn: Optional[Callable[[float], float]] = None,
time_partition_interval: Optional[timedelta] = None,
) -> None:
try:
from timescale_vector import client
except ImportError:
raise ImportError(
"Could not import timescale_vector python package. "
"Please install it with `pip install timescale-vector`."
)
self.service_url = service_url
self.embedding = embedding
self.collection_name = collection_name
self.num_dimensions = num_dimensions
self._distance_strategy = distance_strategy
self.pre_delete_collection = pre_delete_collection
self.logger = logger or logging.getLogger(__name__)
self.override_relevance_score_fn = relevance_score_fn
self._time_partition_interval = time_partition_interval
self.sync_client = client.Sync(
self.service_url,
self.collection_name,
self.num_dimensions,
self._distance_strategy.value.lower(),
time_partition_interval=self._time_partition_interval,
)
self.async_client = client.Async(
self.service_url,
self.collection_name,
self.num_dimensions,
self._distance_strategy.value.lower(),
time_partition_interval=self._time_partition_interval,
)
self.__post_init__()
def __post_init__(
self,
) -> None:
"""
Initialize the store.
"""
self.sync_client.create_tables()
if self.pre_delete_collection:
self.sync_client.delete_all()
@property
def embeddings(self) -> Embeddings:
return self.embedding
def drop_tables(self) -> None:
self.sync_client.drop_table()
@classmethod
def __from(
cls,
texts: List[str],
embeddings: List[List[float]],
embedding: Embeddings,
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
collection_name: str = _LANGCHAIN_DEFAULT_COLLECTION_NAME,
distance_strategy: DistanceStrategy = DEFAULT_DISTANCE_STRATEGY,
service_url: Optional[str] = None,
pre_delete_collection: bool = False,
**kwargs: Any,
) -> TimescaleVector:
num_dimensions = len(embeddings[0])
if ids is None:
ids = [str(uuid.uuid1()) for _ in texts]
if not metadatas:
metadatas = [{} for _ in texts]
if service_url is None:
service_url = cls.get_service_url(kwargs)
store = cls(
service_url=service_url,
num_dimensions=num_dimensions,
collection_name=collection_name,
embedding=embedding,
distance_strategy=distance_strategy,
pre_delete_collection=pre_delete_collection,
**kwargs,
)
store.add_embeddings(
texts=texts, embeddings=embeddings, metadatas=metadatas, ids=ids, **kwargs
)
return store
@classmethod
async def __afrom(
cls,
texts: List[str],
embeddings: List[List[float]],
embedding: Embeddings,
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
collection_name: str = _LANGCHAIN_DEFAULT_COLLECTION_NAME,
distance_strategy: DistanceStrategy = DEFAULT_DISTANCE_STRATEGY,
service_url: Optional[str] = None,
pre_delete_collection: bool = False,
**kwargs: Any,
) -> TimescaleVector:
num_dimensions = len(embeddings[0])
if ids is None:
ids = [str(uuid.uuid1()) for _ in texts]
if not metadatas:
metadatas = [{} for _ in texts]
if service_url is None:
service_url = cls.get_service_url(kwargs)
store = cls(
service_url=service_url,
num_dimensions=num_dimensions,
collection_name=collection_name,
embedding=embedding,
distance_strategy=distance_strategy,
pre_delete_collection=pre_delete_collection,
**kwargs,
)
await store.aadd_embeddings(
texts=texts, embeddings=embeddings, metadatas=metadatas, ids=ids, **kwargs
)
return store
def add_embeddings(
self,
texts: Iterable[str],
embeddings: List[List[float]],
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
**kwargs: Any,
) -> List[str]:
"""Add embeddings to the vectorstore.
Args:
texts: Iterable of strings to add to the vectorstore.
embeddings: List of list of embedding vectors.
metadatas: List of metadatas associated with the texts.
kwargs: vectorstore specific parameters
"""
if ids is None:
ids = [str(uuid.uuid1()) for _ in texts]
if not metadatas:
metadatas = [{} for _ in texts]
records = list(zip(ids, metadatas, texts, embeddings))
self.sync_client.upsert(records)
return ids
async def aadd_embeddings(
self,
texts: Iterable[str],
embeddings: List[List[float]],
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
**kwargs: Any,
) -> List[str]:
"""Add embeddings to the vectorstore.
Args:
texts: Iterable of strings to add to the vectorstore.
embeddings: List of list of embedding vectors.
metadatas: List of metadatas associated with the texts.
kwargs: vectorstore specific parameters
"""
if ids is None:
ids = [str(uuid.uuid1()) for _ in texts]
if not metadatas:
metadatas = [{} for _ in texts]
records = list(zip(ids, metadatas, texts, embeddings))
await self.async_client.upsert(records)
return ids
def add_texts(
self,
texts: Iterable[str],
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
**kwargs: Any,
) -> List[str]:
"""Run more texts through the embeddings and add to the vectorstore.
Args:
texts: Iterable of strings to add to the vectorstore.
metadatas: Optional list of metadatas associated with the texts.
kwargs: vectorstore specific parameters
Returns:
List of ids from adding the texts into the vectorstore.
"""
embeddings = self.embedding.embed_documents(list(texts))
return self.add_embeddings(
texts=texts, embeddings=embeddings, metadatas=metadatas, ids=ids, **kwargs
)
async def aadd_texts(
self,
texts: Iterable[str],
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
**kwargs: Any,
) -> List[str]:
"""Run more texts through the embeddings and add to the vectorstore.
Args:
texts: Iterable of strings to add to the vectorstore.
metadatas: Optional list of metadatas associated with the texts.
kwargs: vectorstore specific parameters
Returns:
List of ids from adding the texts into the vectorstore.
"""
embeddings = self.embedding.embed_documents(list(texts))
return await self.aadd_embeddings(
texts=texts, embeddings=embeddings, metadatas=metadatas, ids=ids, **kwargs
)
def similarity_search(
self,
query: str,
k: int = 4,
filter: Optional[Union[dict, list]] = None,
predicates: Optional[Predicates] = None,
**kwargs: Any,
) -> List[Document]:
"""Run similarity search with TimescaleVector with distance.
Args:
query (str): Query text to search for.
k (int): Number of results to return. Defaults to 4.
filter (Optional[Dict[str, str]]): Filter by metadata. Defaults to None.
Returns:
List of Documents most similar to the query.
"""
embedding = self.embedding.embed_query(text=query)
return self.similarity_search_by_vector(
embedding=embedding,
k=k,
filter=filter,
predicates=predicates,
**kwargs,
)
async def asimilarity_search(
self,
query: str,
k: int = 4,
filter: Optional[Union[dict, list]] = None,
predicates: Optional[Predicates] = None,
**kwargs: Any,
) -> List[Document]:
"""Run similarity search with TimescaleVector with distance.
Args:
query (str): Query text to search for.
k (int): Number of results to return. Defaults to 4.
filter (Optional[Dict[str, str]]): Filter by metadata. Defaults to None.
Returns:
List of Documents most similar to the query.
"""
embedding = self.embedding.embed_query(text=query)
return await self.asimilarity_search_by_vector(
embedding=embedding,
k=k,
filter=filter,
predicates=predicates,
**kwargs,
)
def similarity_search_with_score(
self,
query: str,
k: int = 4,
filter: Optional[Union[dict, list]] = None,
predicates: Optional[Predicates] = None,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
"""Return docs most similar to query.
Args:
query: Text to look up documents similar to.
k: Number of Documents to return. Defaults to 4.
filter (Optional[Dict[str, str]]): Filter by metadata. Defaults to None.
Returns:
List of Documents most similar to the query and score for each
"""
embedding = self.embedding.embed_query(query)
docs = self.similarity_search_with_score_by_vector(
embedding=embedding,
k=k,
filter=filter,
predicates=predicates,
**kwargs,
)
return docs
async def asimilarity_search_with_score(
self,
query: str,
k: int = 4,
filter: Optional[Union[dict, list]] = None,
predicates: Optional[Predicates] = None,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
"""Return docs most similar to query.
Args:
query: Text to look up documents similar to.
k: Number of Documents to return. Defaults to 4.
filter (Optional[Dict[str, str]]): Filter by metadata. Defaults to None.
Returns:
List of Documents most similar to the query and score for each
"""
embedding = self.embedding.embed_query(query)
return await self.asimilarity_search_with_score_by_vector(
embedding=embedding,
k=k,
filter=filter,
predicates=predicates,
**kwargs,
)
def date_to_range_filter(self, **kwargs: Any) -> Any:
constructor_args = {
key: kwargs[key]
for key in [
"start_date",
"end_date",
"time_delta",
"start_inclusive",
"end_inclusive",
]
if key in kwargs
}
if not constructor_args or len(constructor_args) == 0:
return None
try:
from timescale_vector import client
except ImportError:
raise ImportError(
"Could not import timescale_vector python package. "
"Please install it with `pip install timescale-vector`."
)
return client.UUIDTimeRange(**constructor_args)
def similarity_search_with_score_by_vector(
self,
embedding: List[float],
k: int = 4,
filter: Optional[Union[dict, list]] = None,
predicates: Optional[Predicates] = None,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
try:
from timescale_vector import client
except ImportError:
raise ImportError(
"Could not import timescale_vector python package. "
"Please install it with `pip install timescale-vector`."
)
results = self.sync_client.search(
embedding,
limit=k,
filter=filter,
predicates=predicates,
uuid_time_filter=self.date_to_range_filter(**kwargs),
)
docs = [
(
Document(
page_content=result[client.SEARCH_RESULT_CONTENTS_IDX],
metadata=result[client.SEARCH_RESULT_METADATA_IDX],
),
result[client.SEARCH_RESULT_DISTANCE_IDX],
)
for result in results
]
return docs
async def asimilarity_search_with_score_by_vector(
self,
embedding: List[float],
k: int = 4,
filter: Optional[Union[dict, list]] = None,
predicates: Optional[Predicates] = None,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
try:
from timescale_vector import client
except ImportError:
raise ImportError(
"Could not import timescale_vector python package. "
"Please install it with `pip install timescale-vector`."
)
results = await self.async_client.search(
embedding,
limit=k,
filter=filter,
predicates=predicates,
uuid_time_filter=self.date_to_range_filter(**kwargs),
)
docs = [
(
Document(
page_content=result[client.SEARCH_RESULT_CONTENTS_IDX],
metadata=result[client.SEARCH_RESULT_METADATA_IDX],
),
result[client.SEARCH_RESULT_DISTANCE_IDX],
)
for result in results
]
return docs
def similarity_search_by_vector(
self,
embedding: List[float],
k: int = 4,
filter: Optional[Union[dict, list]] = None,
predicates: Optional[Predicates] = None,
**kwargs: Any,
) -> List[Document]:
"""Return docs most similar to embedding vector.
Args:
embedding: Embedding to look up documents similar to.
k: Number of Documents to return. Defaults to 4.
filter (Optional[Dict[str, str]]): Filter by metadata. Defaults to None.
Returns:
List of Documents most similar to the query vector.
"""
docs_and_scores = self.similarity_search_with_score_by_vector(
embedding=embedding, k=k, filter=filter, predicates=predicates, **kwargs
)
return [doc for doc, _ in docs_and_scores]
async def asimilarity_search_by_vector(
self,
embedding: List[float],
k: int = 4,
filter: Optional[Union[dict, list]] = None,
predicates: Optional[Predicates] = None,
**kwargs: Any,
) -> List[Document]:
"""Return docs most similar to embedding vector.
Args:
embedding: Embedding to look up documents similar to.
k: Number of Documents to return. Defaults to 4.
filter (Optional[Dict[str, str]]): Filter by metadata. Defaults to None.
Returns:
List of Documents most similar to the query vector.
"""
docs_and_scores = await self.asimilarity_search_with_score_by_vector(
embedding=embedding, k=k, filter=filter, predicates=predicates, **kwargs
)
return [doc for doc, _ in docs_and_scores]
@classmethod
def from_texts(
cls: Type[TimescaleVector],
texts: List[str],
embedding: Embeddings,
metadatas: Optional[List[dict]] = None,
collection_name: str = _LANGCHAIN_DEFAULT_COLLECTION_NAME,
distance_strategy: DistanceStrategy = DEFAULT_DISTANCE_STRATEGY,
ids: Optional[List[str]] = None,
pre_delete_collection: bool = False,
**kwargs: Any,
) -> TimescaleVector:
"""
Return VectorStore initialized from texts and embeddings.
Postgres connection string is required
"Either pass it as a parameter
or set the TIMESCALE_SERVICE_URL environment variable.
"""
embeddings = embedding.embed_documents(list(texts))
return cls.__from(
texts,
embeddings,
embedding,
metadatas=metadatas,
ids=ids,
collection_name=collection_name,
distance_strategy=distance_strategy,
pre_delete_collection=pre_delete_collection,
**kwargs,
)
@classmethod
async def afrom_texts(
cls: Type[TimescaleVector],
texts: List[str],
embedding: Embeddings,
metadatas: Optional[List[dict]] = None,
collection_name: str = _LANGCHAIN_DEFAULT_COLLECTION_NAME,
distance_strategy: DistanceStrategy = DEFAULT_DISTANCE_STRATEGY,
ids: Optional[List[str]] = None,
pre_delete_collection: bool = False,
**kwargs: Any,
) -> TimescaleVector:
"""
Return VectorStore initialized from texts and embeddings.
Postgres connection string is required
"Either pass it as a parameter
or set the TIMESCALE_SERVICE_URL environment variable.
"""
embeddings = embedding.embed_documents(list(texts))
return await cls.__afrom(
texts,
embeddings,
embedding,
metadatas=metadatas,
ids=ids,
collection_name=collection_name,
distance_strategy=distance_strategy,
pre_delete_collection=pre_delete_collection,
**kwargs,
)
@classmethod
def from_embeddings(
cls,
text_embeddings: List[Tuple[str, List[float]]],
embedding: Embeddings,
metadatas: Optional[List[dict]] = None,
collection_name: str = _LANGCHAIN_DEFAULT_COLLECTION_NAME,
distance_strategy: DistanceStrategy = DEFAULT_DISTANCE_STRATEGY,
ids: Optional[List[str]] = None,
pre_delete_collection: bool = False,
**kwargs: Any,
) -> TimescaleVector:
"""Construct TimescaleVector wrapper from raw documents and pre-
generated embeddings.
Return VectorStore initialized from documents and embeddings.
Postgres connection string is required
"Either pass it as a parameter
or set the TIMESCALE_SERVICE_URL environment variable.
Example:
.. code-block:: python
from langchain.vectorstores import TimescaleVector
from langchain.embeddings import OpenAIEmbeddings
embeddings = OpenAIEmbeddings()
text_embeddings = embeddings.embed_documents(texts)
text_embedding_pairs = list(zip(texts, text_embeddings))
tvs = TimescaleVector.from_embeddings(text_embedding_pairs, embeddings)
"""
texts = [t[0] for t in text_embeddings]
embeddings = [t[1] for t in text_embeddings]
return cls.__from(
texts,
embeddings,
embedding,
metadatas=metadatas,
ids=ids,
collection_name=collection_name,
distance_strategy=distance_strategy,
pre_delete_collection=pre_delete_collection,
**kwargs,
)
@classmethod
async def afrom_embeddings(
cls,
text_embeddings: List[Tuple[str, List[float]]],
embedding: Embeddings,
metadatas: Optional[List[dict]] = None,
collection_name: str = _LANGCHAIN_DEFAULT_COLLECTION_NAME,
distance_strategy: DistanceStrategy = DEFAULT_DISTANCE_STRATEGY,
ids: Optional[List[str]] = None,
pre_delete_collection: bool = False,
**kwargs: Any,
) -> TimescaleVector:
"""Construct TimescaleVector wrapper from raw documents and pre-
generated embeddings.
Return VectorStore initialized from documents and embeddings.
Postgres connection string is required
"Either pass it as a parameter
or set the TIMESCALE_SERVICE_URL environment variable.
Example:
.. code-block:: python
from langchain.vectorstores import TimescaleVector
from langchain.embeddings import OpenAIEmbeddings
embeddings = OpenAIEmbeddings()
text_embeddings = embeddings.embed_documents(texts)
text_embedding_pairs = list(zip(texts, text_embeddings))
tvs = TimescaleVector.from_embeddings(text_embedding_pairs, embeddings)
"""
texts = [t[0] for t in text_embeddings]
embeddings = [t[1] for t in text_embeddings]
return await cls.__afrom(
texts,
embeddings,
embedding,
metadatas=metadatas,
ids=ids,
collection_name=collection_name,
distance_strategy=distance_strategy,
pre_delete_collection=pre_delete_collection,
**kwargs,
)
@classmethod
def from_existing_index(
cls: Type[TimescaleVector],
embedding: Embeddings,
collection_name: str = _LANGCHAIN_DEFAULT_COLLECTION_NAME,
distance_strategy: DistanceStrategy = DEFAULT_DISTANCE_STRATEGY,
pre_delete_collection: bool = False,
**kwargs: Any,
) -> TimescaleVector:
"""
Get intsance of an existing TimescaleVector store.This method will
return the instance of the store without inserting any new
embeddings
"""
service_url = cls.get_service_url(kwargs)
store = cls(
service_url=service_url,
collection_name=collection_name,
embedding=embedding,
distance_strategy=distance_strategy,
pre_delete_collection=pre_delete_collection,
)
return store
@classmethod
def get_service_url(cls, kwargs: Dict[str, Any]) -> str:
service_url: str = get_from_dict_or_env(
data=kwargs,
key="service_url",
env_key="TIMESCALE_SERVICE_URL",
)
if not service_url:
raise ValueError(
"Postgres connection string is required"
"Either pass it as a parameter"
"or set the TIMESCALE_SERVICE_URL environment variable."
)
return service_url
@classmethod
def service_url_from_db_params(
cls,
host: str,
port: int,
database: str,
user: str,
password: str,
) -> str:
"""Return connection string from database parameters."""
return f"postgresql://{user}:{password}@{host}:{port}/{database}"
def _select_relevance_score_fn(self) -> Callable[[float], float]:
"""
The 'correct' relevance function
may differ depending on a few things, including:
- the distance / similarity metric used by the VectorStore
- the scale of your embeddings (OpenAI's are unit normed. Many others are not!)
- embedding dimensionality
- etc.
"""
if self.override_relevance_score_fn is not None:
return self.override_relevance_score_fn
# Default strategy is to rely on distance strategy provided
# in vectorstore constructor
if self._distance_strategy == DistanceStrategy.COSINE:
return self._cosine_relevance_score_fn
elif self._distance_strategy == DistanceStrategy.EUCLIDEAN_DISTANCE:
return self._euclidean_relevance_score_fn
elif self._distance_strategy == DistanceStrategy.MAX_INNER_PRODUCT:
return self._max_inner_product_relevance_score_fn
else:
raise ValueError(
"No supported normalization function"
f" for distance_strategy of {self._distance_strategy}."
"Consider providing relevance_score_fn to TimescaleVector constructor."
)
def delete(self, ids: Optional[List[str]] = None, **kwargs: Any) -> Optional[bool]:
"""Delete by vector ID or other criteria.
Args:
ids: List of ids to delete.
**kwargs: Other keyword arguments that subclasses might use.
Returns:
Optional[bool]: True if deletion is successful,
False otherwise, None if not implemented.
"""
if ids is None:
raise ValueError("No ids provided to delete.")
self.sync_client.delete_by_ids(ids)
return True
# todo should this be part of delete|()?
def delete_by_metadata(
self, filter: Union[Dict[str, str], List[Dict[str, str]]], **kwargs: Any
) -> Optional[bool]:
"""Delete by vector ID or other criteria.
Args:
ids: List of ids to delete.
**kwargs: Other keyword arguments that subclasses might use.
Returns:
Optional[bool]: True if deletion is successful,
False otherwise, None if not implemented.
"""
self.sync_client.delete_by_metadata(filter)
return True
class IndexType(str, enum.Enum):
"""Enumerator for the supported Index types"""
TIMESCALE_VECTOR = "tsv"
PGVECTOR_IVFFLAT = "ivfflat"
PGVECTOR_HNSW = "hnsw"
DEFAULT_INDEX_TYPE = IndexType.TIMESCALE_VECTOR
def create_index(
self, index_type: Union[IndexType, str] = DEFAULT_INDEX_TYPE, **kwargs: Any
) -> None:
try:
from timescale_vector import client
except ImportError:
raise ImportError(
"Could not import timescale_vector python package. "
"Please install it with `pip install timescale-vector`."
)
index_type = (
index_type.value if isinstance(index_type, self.IndexType) else index_type
)
if index_type == self.IndexType.PGVECTOR_IVFFLAT.value:
self.sync_client.create_embedding_index(client.IvfflatIndex(**kwargs))
if index_type == self.IndexType.PGVECTOR_HNSW.value:
self.sync_client.create_embedding_index(client.HNSWIndex(**kwargs))
if index_type == self.IndexType.TIMESCALE_VECTOR.value:
self.sync_client.create_embedding_index(
client.TimescaleVectorIndex(**kwargs)
)
def drop_index(self) -> None:
self.sync_client.drop_embedding_index()

File diff suppressed because it is too large Load Diff

View File

@@ -129,6 +129,7 @@ markdownify = {version = "^0.11.6", optional = true}
assemblyai = {version = "^0.17.0", optional = true}
dashvector = {version = "^1.0.1", optional = true}
sqlite-vss = {version = "^0.1.2", optional = true}
timescale-vector = {version = "^0.0.1", optional = true}
[tool.poetry.group.test.dependencies]
@@ -345,6 +346,7 @@ extended_testing = [
"markdownify",
"dashvector",
"sqlite-vss",
"timescale-vector",
]
[tool.ruff]

View File

@@ -0,0 +1,433 @@
"""Test TimescaleVector functionality."""
import os
from datetime import datetime, timedelta
from typing import List
import pytest
from langchain.docstore.document import Document
from langchain.vectorstores.timescalevector import TimescaleVector
from tests.integration_tests.vectorstores.fake_embeddings import FakeEmbeddings
SERVICE_URL = TimescaleVector.service_url_from_db_params(
host=os.environ.get("TEST_TIMESCALE_HOST", "localhost"),
port=int(os.environ.get("TEST_TIMESCALE_PORT", "5432")),
database=os.environ.get("TEST_TIMESCALE_DATABASE", "postgres"),
user=os.environ.get("TEST_TIMESCALE_USER", "postgres"),
password=os.environ.get("TEST_TIMESCALE_PASSWORD", "postgres"),
)
ADA_TOKEN_COUNT = 1536
class FakeEmbeddingsWithAdaDimension(FakeEmbeddings):
"""Fake embeddings functionality for testing."""
def embed_documents(self, texts: List[str]) -> List[List[float]]:
"""Return simple embeddings."""
return [
[float(1.0)] * (ADA_TOKEN_COUNT - 1) + [float(i)] for i in range(len(texts))
]
def embed_query(self, text: str) -> List[float]:
"""Return simple embeddings."""
return [float(1.0)] * (ADA_TOKEN_COUNT - 1) + [float(0.0)]
def test_timescalevector() -> None:
"""Test end to end construction and search."""
texts = ["foo", "bar", "baz"]
docsearch = TimescaleVector.from_texts(
texts=texts,
collection_name="test_collection",
embedding=FakeEmbeddingsWithAdaDimension(),
service_url=SERVICE_URL,
pre_delete_collection=True,
)
output = docsearch.similarity_search("foo", k=1)
assert output == [Document(page_content="foo")]
def test_timescalevector_from_documents() -> None:
"""Test end to end construction and search."""
texts = ["foo", "bar", "baz"]
docs = [Document(page_content=t, metadata={"a": "b"}) for t in texts]
docsearch = TimescaleVector.from_documents(
documents=docs,
collection_name="test_collection",
embedding=FakeEmbeddingsWithAdaDimension(),
service_url=SERVICE_URL,
pre_delete_collection=True,
)
output = docsearch.similarity_search("foo", k=1)
assert output == [Document(page_content="foo", metadata={"a": "b"})]
@pytest.mark.asyncio
async def test_timescalevector_afrom_documents() -> None:
"""Test end to end construction and search."""
texts = ["foo", "bar", "baz"]
docs = [Document(page_content=t, metadata={"a": "b"}) for t in texts]
docsearch = await TimescaleVector.afrom_documents(
documents=docs,
collection_name="test_collection",
embedding=FakeEmbeddingsWithAdaDimension(),
service_url=SERVICE_URL,
pre_delete_collection=True,
)
output = await docsearch.asimilarity_search("foo", k=1)
assert output == [Document(page_content="foo", metadata={"a": "b"})]
def test_timescalevector_embeddings() -> None:
"""Test end to end construction with embeddings and search."""
texts = ["foo", "bar", "baz"]
text_embeddings = FakeEmbeddingsWithAdaDimension().embed_documents(texts)
text_embedding_pairs = list(zip(texts, text_embeddings))
docsearch = TimescaleVector.from_embeddings(
text_embeddings=text_embedding_pairs,
collection_name="test_collection",
embedding=FakeEmbeddingsWithAdaDimension(),
service_url=SERVICE_URL,
pre_delete_collection=True,
)
output = docsearch.similarity_search("foo", k=1)
assert output == [Document(page_content="foo")]
@pytest.mark.asyncio
async def test_timescalevector_aembeddings() -> None:
"""Test end to end construction with embeddings and search."""
texts = ["foo", "bar", "baz"]
text_embeddings = FakeEmbeddingsWithAdaDimension().embed_documents(texts)
text_embedding_pairs = list(zip(texts, text_embeddings))
docsearch = await TimescaleVector.afrom_embeddings(
text_embeddings=text_embedding_pairs,
collection_name="test_collection",
embedding=FakeEmbeddingsWithAdaDimension(),
service_url=SERVICE_URL,
pre_delete_collection=True,
)
output = await docsearch.asimilarity_search("foo", k=1)
assert output == [Document(page_content="foo")]
def test_timescalevector_with_metadatas() -> None:
"""Test end to end construction and search."""
texts = ["foo", "bar", "baz"]
metadatas = [{"page": str(i)} for i in range(len(texts))]
docsearch = TimescaleVector.from_texts(
texts=texts,
collection_name="test_collection",
embedding=FakeEmbeddingsWithAdaDimension(),
metadatas=metadatas,
service_url=SERVICE_URL,
pre_delete_collection=True,
)
output = docsearch.similarity_search("foo", k=1)
assert output == [Document(page_content="foo", metadata={"page": "0"})]
def test_timescalevector_with_metadatas_with_scores() -> None:
"""Test end to end construction and search."""
texts = ["foo", "bar", "baz"]
metadatas = [{"page": str(i)} for i in range(len(texts))]
docsearch = TimescaleVector.from_texts(
texts=texts,
collection_name="test_collection",
embedding=FakeEmbeddingsWithAdaDimension(),
metadatas=metadatas,
service_url=SERVICE_URL,
pre_delete_collection=True,
)
output = docsearch.similarity_search_with_score("foo", k=1)
assert output == [(Document(page_content="foo", metadata={"page": "0"}), 0.0)]
@pytest.mark.asyncio
async def test_timescalevector_awith_metadatas_with_scores() -> None:
"""Test end to end construction and search."""
texts = ["foo", "bar", "baz"]
metadatas = [{"page": str(i)} for i in range(len(texts))]
docsearch = await TimescaleVector.afrom_texts(
texts=texts,
collection_name="test_collection",
embedding=FakeEmbeddingsWithAdaDimension(),
metadatas=metadatas,
service_url=SERVICE_URL,
pre_delete_collection=True,
)
output = await docsearch.asimilarity_search_with_score("foo", k=1)
assert output == [(Document(page_content="foo", metadata={"page": "0"}), 0.0)]
def test_timescalevector_with_filter_match() -> None:
"""Test end to end construction and search."""
texts = ["foo", "bar", "baz"]
metadatas = [{"page": str(i)} for i in range(len(texts))]
docsearch = TimescaleVector.from_texts(
texts=texts,
collection_name="test_collection_filter",
embedding=FakeEmbeddingsWithAdaDimension(),
metadatas=metadatas,
service_url=SERVICE_URL,
pre_delete_collection=True,
)
output = docsearch.similarity_search_with_score("foo", k=1, filter={"page": "0"})
assert output == [(Document(page_content="foo", metadata={"page": "0"}), 0.0)]
def test_timescalevector_with_filter_distant_match() -> None:
"""Test end to end construction and search."""
texts = ["foo", "bar", "baz"]
metadatas = [{"page": str(i)} for i in range(len(texts))]
docsearch = TimescaleVector.from_texts(
texts=texts,
collection_name="test_collection_filter",
embedding=FakeEmbeddingsWithAdaDimension(),
metadatas=metadatas,
service_url=SERVICE_URL,
pre_delete_collection=True,
)
output = docsearch.similarity_search_with_score("foo", k=1, filter={"page": "2"})
assert output == [
(Document(page_content="baz", metadata={"page": "2"}), 0.0013003906671379406)
]
def test_timescalevector_with_filter_no_match() -> None:
"""Test end to end construction and search."""
texts = ["foo", "bar", "baz"]
metadatas = [{"page": str(i)} for i in range(len(texts))]
docsearch = TimescaleVector.from_texts(
texts=texts,
collection_name="test_collection_filter",
embedding=FakeEmbeddingsWithAdaDimension(),
metadatas=metadatas,
service_url=SERVICE_URL,
pre_delete_collection=True,
)
output = docsearch.similarity_search_with_score("foo", k=1, filter={"page": "5"})
assert output == []
def test_timescalevector_with_filter_in_set() -> None:
"""Test end to end construction and search."""
texts = ["foo", "bar", "baz"]
metadatas = [{"page": str(i)} for i in range(len(texts))]
docsearch = TimescaleVector.from_texts(
texts=texts,
collection_name="test_collection_filter",
embedding=FakeEmbeddingsWithAdaDimension(),
metadatas=metadatas,
service_url=SERVICE_URL,
pre_delete_collection=True,
)
output = docsearch.similarity_search_with_score(
"foo", k=2, filter=[{"page": "0"}, {"page": "2"}]
)
assert output == [
(Document(page_content="foo", metadata={"page": "0"}), 0.0),
(Document(page_content="baz", metadata={"page": "2"}), 0.0013003906671379406),
]
def test_timescalevector_relevance_score() -> None:
"""Test to make sure the relevance score is scaled to 0-1."""
texts = ["foo", "bar", "baz"]
metadatas = [{"page": str(i)} for i in range(len(texts))]
docsearch = TimescaleVector.from_texts(
texts=texts,
collection_name="test_collection",
embedding=FakeEmbeddingsWithAdaDimension(),
metadatas=metadatas,
service_url=SERVICE_URL,
pre_delete_collection=True,
)
output = docsearch.similarity_search_with_relevance_scores("foo", k=3)
assert output == [
(Document(page_content="foo", metadata={"page": "0"}), 1.0),
(Document(page_content="bar", metadata={"page": "1"}), 0.9996744261675065),
(Document(page_content="baz", metadata={"page": "2"}), 0.9986996093328621),
]
@pytest.mark.asyncio
async def test_timescalevector_relevance_score_async() -> None:
"""Test to make sure the relevance score is scaled to 0-1."""
texts = ["foo", "bar", "baz"]
metadatas = [{"page": str(i)} for i in range(len(texts))]
docsearch = await TimescaleVector.afrom_texts(
texts=texts,
collection_name="test_collection",
embedding=FakeEmbeddingsWithAdaDimension(),
metadatas=metadatas,
service_url=SERVICE_URL,
pre_delete_collection=True,
)
output = await docsearch.asimilarity_search_with_relevance_scores("foo", k=3)
assert output == [
(Document(page_content="foo", metadata={"page": "0"}), 1.0),
(Document(page_content="bar", metadata={"page": "1"}), 0.9996744261675065),
(Document(page_content="baz", metadata={"page": "2"}), 0.9986996093328621),
]
def test_timescalevector_retriever_search_threshold() -> None:
"""Test using retriever for searching with threshold."""
texts = ["foo", "bar", "baz"]
metadatas = [{"page": str(i)} for i in range(len(texts))]
docsearch = TimescaleVector.from_texts(
texts=texts,
collection_name="test_collection",
embedding=FakeEmbeddingsWithAdaDimension(),
metadatas=metadatas,
service_url=SERVICE_URL,
pre_delete_collection=True,
)
retriever = docsearch.as_retriever(
search_type="similarity_score_threshold",
search_kwargs={"k": 3, "score_threshold": 0.999},
)
output = retriever.get_relevant_documents("summer")
assert output == [
Document(page_content="foo", metadata={"page": "0"}),
Document(page_content="bar", metadata={"page": "1"}),
]
def test_timescalevector_retriever_search_threshold_custom_normalization_fn() -> None:
"""Test searching with threshold and custom normalization function"""
texts = ["foo", "bar", "baz"]
metadatas = [{"page": str(i)} for i in range(len(texts))]
docsearch = TimescaleVector.from_texts(
texts=texts,
collection_name="test_collection",
embedding=FakeEmbeddingsWithAdaDimension(),
metadatas=metadatas,
service_url=SERVICE_URL,
pre_delete_collection=True,
relevance_score_fn=lambda d: d * 0,
)
retriever = docsearch.as_retriever(
search_type="similarity_score_threshold",
search_kwargs={"k": 3, "score_threshold": 0.5},
)
output = retriever.get_relevant_documents("foo")
assert output == []
def test_timescalevector_delete() -> None:
"""Test deleting functionality."""
texts = ["bar", "baz"]
docs = [Document(page_content=t, metadata={"a": "b"}) for t in texts]
docsearch = TimescaleVector.from_documents(
documents=docs,
collection_name="test_collection",
embedding=FakeEmbeddingsWithAdaDimension(),
service_url=SERVICE_URL,
pre_delete_collection=True,
)
texts = ["foo"]
meta = [{"b": "c"}]
ids = docsearch.add_texts(texts, meta)
output = docsearch.similarity_search("bar", k=10)
assert len(output) == 3
docsearch.delete(ids)
output = docsearch.similarity_search("bar", k=10)
assert len(output) == 2
docsearch.delete_by_metadata({"a": "b"})
output = docsearch.similarity_search("bar", k=10)
assert len(output) == 0
def test_timescalevector_with_index() -> None:
"""Test deleting functionality."""
texts = ["bar", "baz"]
docs = [Document(page_content=t, metadata={"a": "b"}) for t in texts]
docsearch = TimescaleVector.from_documents(
documents=docs,
collection_name="test_collection",
embedding=FakeEmbeddingsWithAdaDimension(),
service_url=SERVICE_URL,
pre_delete_collection=True,
)
texts = ["foo"]
meta = [{"b": "c"}]
docsearch.add_texts(texts, meta)
docsearch.create_index()
output = docsearch.similarity_search("bar", k=10)
assert len(output) == 3
docsearch.drop_index()
docsearch.create_index(
index_type=TimescaleVector.IndexType.TIMESCALE_VECTOR,
max_alpha=1.0,
num_neighbors=50,
)
docsearch.drop_index()
docsearch.create_index("tsv", max_alpha=1.0, num_neighbors=50)
docsearch.drop_index()
docsearch.create_index("ivfflat", num_lists=20, num_records=1000)
docsearch.drop_index()
docsearch.create_index("hnsw", m=16, ef_construction=64)
def test_timescalevector_time_partitioning() -> None:
"""Test deleting functionality."""
from timescale_vector import client
texts = ["bar", "baz"]
docs = [Document(page_content=t, metadata={"a": "b"}) for t in texts]
docsearch = TimescaleVector.from_documents(
documents=docs,
collection_name="test_collection_time_partitioning",
embedding=FakeEmbeddingsWithAdaDimension(),
service_url=SERVICE_URL,
pre_delete_collection=True,
time_partition_interval=timedelta(hours=1),
)
texts = ["foo"]
meta = [{"b": "c"}]
ids = [client.uuid_from_time(datetime.now() - timedelta(hours=3))]
docsearch.add_texts(texts, meta, ids)
output = docsearch.similarity_search("bar", k=10)
assert len(output) == 3
output = docsearch.similarity_search(
"bar", k=10, start_date=datetime.now() - timedelta(hours=1)
)
assert len(output) == 2
output = docsearch.similarity_search(
"bar", k=10, end_date=datetime.now() - timedelta(hours=1)
)
assert len(output) == 1
output = docsearch.similarity_search(
"bar", k=10, start_date=datetime.now() - timedelta(minutes=200)
)
assert len(output) == 3
output = docsearch.similarity_search(
"bar",
k=10,
start_date=datetime.now() - timedelta(minutes=200),
time_delta=timedelta(hours=1),
)
assert len(output) == 1

View File

@@ -0,0 +1,97 @@
from typing import Dict, Tuple
import pytest as pytest
from langchain.chains.query_constructor.ir import (
Comparator,
Comparison,
Operation,
Operator,
StructuredQuery,
)
from langchain.retrievers.self_query.timescalevector import TimescaleVectorTranslator
DEFAULT_TRANSLATOR = TimescaleVectorTranslator()
@pytest.mark.requires("timescale_vector")
def test_visit_comparison() -> None:
from timescale_vector import client
comp = Comparison(comparator=Comparator.LT, attribute="foo", value=1)
expected = client.Predicates(("foo", "<", 1))
actual = DEFAULT_TRANSLATOR.visit_comparison(comp)
assert expected == actual
@pytest.mark.requires("timescale_vector")
def test_visit_operation() -> None:
from timescale_vector import client
op = Operation(
operator=Operator.AND,
arguments=[
Comparison(comparator=Comparator.LT, attribute="foo", value=2),
Comparison(comparator=Comparator.EQ, attribute="bar", value="baz"),
Comparison(comparator=Comparator.GT, attribute="abc", value=2.0),
],
)
expected = client.Predicates(
client.Predicates(("foo", "<", 2)),
client.Predicates(("bar", "==", "baz")),
client.Predicates(("abc", ">", 2.0)),
)
actual = DEFAULT_TRANSLATOR.visit_operation(op)
assert expected == actual
@pytest.mark.requires("timescale_vector")
def test_visit_structured_query() -> None:
from timescale_vector import client
query = "What is the capital of France?"
structured_query = StructuredQuery(
query=query,
filter=None,
)
expected: Tuple[str, Dict] = (query, {})
actual = DEFAULT_TRANSLATOR.visit_structured_query(structured_query)
assert expected == actual
comp = Comparison(comparator=Comparator.LT, attribute="foo", value=1)
expected = (
query,
{"predicates": client.Predicates(("foo", "<", 1))},
)
structured_query = StructuredQuery(
query=query,
filter=comp,
)
actual = DEFAULT_TRANSLATOR.visit_structured_query(structured_query)
assert expected == actual
op = Operation(
operator=Operator.AND,
arguments=[
Comparison(comparator=Comparator.LT, attribute="foo", value=2),
Comparison(comparator=Comparator.EQ, attribute="bar", value="baz"),
Comparison(comparator=Comparator.GT, attribute="abc", value=2.0),
],
)
structured_query = StructuredQuery(
query=query,
filter=op,
)
expected = (
query,
{
"predicates": client.Predicates(
client.Predicates(("foo", "<", 2)),
client.Predicates(("bar", "==", "baz")),
client.Predicates(("abc", ">", 2.0)),
)
},
)
actual = DEFAULT_TRANSLATOR.visit_structured_query(structured_query)
assert expected == actual