community[patch]: Upstash Vector Store Namespace Support (#22251)

This PR introduces namespace support for Upstash Vector Store, which
would allow users to partition their data in the vector index.

---------

Co-authored-by: Bagatur <baskaryan@gmail.com>
Co-authored-by: Bagatur <22008038+baskaryan@users.noreply.github.com>
This commit is contained in:
Fahreddin Özcan
2024-06-04 02:30:56 +02:00
committed by GitHub
parent 25cf1a74d5
commit 0061ded002
2 changed files with 154 additions and 19 deletions

View File

@@ -2,7 +2,7 @@ from __future__ import annotations
import logging
import uuid
from typing import TYPE_CHECKING, Any, Iterable, List, Optional, Tuple, Union
from typing import TYPE_CHECKING, Any, Iterable, List, Optional, Tuple, Union, cast
import numpy as np
from langchain_core.documents import Document
@@ -64,6 +64,8 @@ class UpstashVectorStore(VectorStore):
index_url: Optional[str] = None,
index_token: Optional[str] = None,
embedding: Optional[Union[Embeddings, bool]] = None,
*,
namespace: str = "",
):
"""
Constructor for UpstashVectorStore.
@@ -83,6 +85,7 @@ class UpstashVectorStore(VectorStore):
is applied. If true, Upstash embeddings are used. When Upstash
embeddings are used, text is sent directly to Upstash and
embedding is applied there instead of embedding in Langchain.
namespace: Namespace to use from the index.
Example:
.. code-block:: python
@@ -94,7 +97,8 @@ class UpstashVectorStore(VectorStore):
vectorstore = UpstashVectorStore(
embedding=embeddings,
index_url="...",
index_token="..."
index_token="...",
namespace="..."
)
# With an existing index
@@ -103,7 +107,8 @@ class UpstashVectorStore(VectorStore):
index = Index(url="...", token="...")
vectorstore = UpstashVectorStore(
embedding=embeddings,
index=index
index=index,
namespace="..."
)
"""
@@ -145,6 +150,7 @@ class UpstashVectorStore(VectorStore):
self._embeddings = embedding
self._text_key = text_key
self._namespace = namespace
@property
def embeddings(self) -> Optional[Union[Embeddings, bool]]: # type: ignore
@@ -187,6 +193,8 @@ class UpstashVectorStore(VectorStore):
ids: Optional[List[str]] = None,
batch_size: int = 32,
embedding_chunk_size: int = 1000,
*,
namespace: Optional[str] = None,
**kwargs: Any,
) -> List[str]:
"""
@@ -202,6 +210,7 @@ class UpstashVectorStore(VectorStore):
batch_size: Batch size to use when upserting the embeddings.
Upstash supports at max 1000 vectors per request.
embedding_batch_size: Chunk size to use when embedding the texts.
namespace: Namespace to use from the index.
Returns:
List of ids from adding the texts into the vectorstore.
@@ -216,6 +225,7 @@ class UpstashVectorStore(VectorStore):
batch_size=batch_size,
ids=ids,
embedding_chunk_size=embedding_chunk_size,
namespace=namespace,
**kwargs,
)
@@ -225,6 +235,8 @@ class UpstashVectorStore(VectorStore):
ids: Optional[List[str]] = None,
batch_size: int = 32,
embedding_chunk_size: int = 1000,
*,
namespace: Optional[str] = None,
**kwargs: Any,
) -> List[str]:
"""
@@ -240,6 +252,7 @@ class UpstashVectorStore(VectorStore):
batch_size: Batch size to use when upserting the embeddings.
Upstash supports at max 1000 vectors per request.
embedding_batch_size: Chunk size to use when embedding the texts.
namespace: Namespace to use from the index.
Returns:
List of ids from adding the texts into the vectorstore.
@@ -254,6 +267,7 @@ class UpstashVectorStore(VectorStore):
ids=ids,
batch_size=batch_size,
embedding_chunk_size=embedding_chunk_size,
namespace=namespace,
**kwargs,
)
@@ -264,6 +278,8 @@ class UpstashVectorStore(VectorStore):
ids: Optional[List[str]] = None,
batch_size: int = 32,
embedding_chunk_size: int = 1000,
*,
namespace: Optional[str] = None,
**kwargs: Any,
) -> List[str]:
"""
@@ -281,11 +297,15 @@ class UpstashVectorStore(VectorStore):
batch_size: Batch size to use when upserting the embeddings.
Upstash supports at max 1000 vectors per request.
embedding_batch_size: Chunk size to use when embedding the texts.
namespace: Namespace to use from the index.
Returns:
List of ids from adding the texts into the vectorstore.
"""
if namespace is None:
namespace = self._namespace
texts = list(texts)
ids = ids or [str(uuid.uuid4()) for _ in texts]
@@ -308,7 +328,9 @@ class UpstashVectorStore(VectorStore):
for batch in batch_iterate(
batch_size, zip(chunk_ids, embeddings, chunk_metadatas)
):
self._index.upsert(vectors=batch, **kwargs)
self._index.upsert(
vectors=batch, namespace=cast(str, namespace), **kwargs
)
return ids
@@ -319,6 +341,8 @@ class UpstashVectorStore(VectorStore):
ids: Optional[List[str]] = None,
batch_size: int = 32,
embedding_chunk_size: int = 1000,
*,
namespace: Optional[str] = None,
**kwargs: Any,
) -> List[str]:
"""
@@ -336,11 +360,15 @@ class UpstashVectorStore(VectorStore):
batch_size: Batch size to use when upserting the embeddings.
Upstash supports at max 1000 vectors per request.
embedding_batch_size: Chunk size to use when embedding the texts.
namespace: Namespace to use from the index.
Returns:
List of ids from adding the texts into the vectorstore.
"""
if namespace is None:
namespace = self._namespace
texts = list(texts)
ids = ids or [str(uuid.uuid4()) for _ in texts]
@@ -363,7 +391,9 @@ class UpstashVectorStore(VectorStore):
for batch in batch_iterate(
batch_size, zip(chunk_ids, embeddings, chunk_metadatas)
):
await self._async_index.upsert(vectors=batch, **kwargs)
await self._async_index.upsert(
vectors=batch, namespace=cast(str, namespace), **kwargs
)
return ids
@@ -372,6 +402,8 @@ class UpstashVectorStore(VectorStore):
query: str,
k: int = 4,
filter: Optional[str] = None,
*,
namespace: Optional[str] = None,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
"""Retrieve texts most similar to query and
@@ -381,12 +413,13 @@ class UpstashVectorStore(VectorStore):
query: Text to look up documents similar to.
k: Number of Documents to return. Defaults to 4.
filter: Optional metadata filter in str format
namespace: Namespace to use from the index.
Returns:
List of Documents most similar to the query and score for each
"""
return self.similarity_search_by_vector_with_score(
self._embed_query(query), k=k, filter=filter, **kwargs
self._embed_query(query), k=k, filter=filter, namespace=namespace, **kwargs
)
async def asimilarity_search_with_score(
@@ -394,6 +427,8 @@ class UpstashVectorStore(VectorStore):
query: str,
k: int = 4,
filter: Optional[str] = None,
*,
namespace: Optional[str] = None,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
"""Retrieve texts most similar to query and
@@ -403,12 +438,13 @@ class UpstashVectorStore(VectorStore):
query: Text to look up documents similar to.
k: Number of Documents to return. Defaults to 4.
filter: Optional metadata filter in str format
namespace: Namespace to use from the index.
Returns:
List of Documents most similar to the query and score for each
"""
return await self.asimilarity_search_by_vector_with_score(
self._embed_query(query), k=k, filter=filter, **kwargs
self._embed_query(query), k=k, filter=filter, namespace=namespace, **kwargs
)
def _process_results(self, results: List) -> List[Tuple[Document, float]]:
@@ -430,15 +466,25 @@ class UpstashVectorStore(VectorStore):
embedding: Union[List[float], str],
k: int = 4,
filter: Optional[str] = None,
*,
namespace: Optional[str] = None,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
"""Return texts whose embedding is closest to the given embedding"""
filter = filter or ""
if namespace is None:
namespace = self._namespace
if isinstance(embedding, str):
results = self._index.query(
data=embedding, top_k=k, include_metadata=True, filter=filter, **kwargs
data=embedding,
top_k=k,
include_metadata=True,
filter=filter,
namespace=namespace,
**kwargs,
)
else:
results = self._index.query(
@@ -446,6 +492,7 @@ class UpstashVectorStore(VectorStore):
top_k=k,
include_metadata=True,
filter=filter,
namespace=namespace,
**kwargs,
)
@@ -456,15 +503,25 @@ class UpstashVectorStore(VectorStore):
embedding: Union[List[float], str],
k: int = 4,
filter: Optional[str] = None,
*,
namespace: Optional[str] = None,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
"""Return texts whose embedding is closest to the given embedding"""
filter = filter or ""
if namespace is None:
namespace = self._namespace
if isinstance(embedding, str):
results = await self._async_index.query(
data=embedding, top_k=k, include_metadata=True, filter=filter, **kwargs
data=embedding,
top_k=k,
include_metadata=True,
filter=filter,
namespace=namespace,
**kwargs,
)
else:
results = await self._async_index.query(
@@ -472,6 +529,7 @@ class UpstashVectorStore(VectorStore):
top_k=k,
include_metadata=True,
filter=filter,
namespace=namespace,
**kwargs,
)
@@ -482,6 +540,8 @@ class UpstashVectorStore(VectorStore):
query: str,
k: int = 4,
filter: Optional[str] = None,
*,
namespace: Optional[str] = None,
**kwargs: Any,
) -> List[Document]:
"""Return documents most similar to query.
@@ -490,12 +550,13 @@ class UpstashVectorStore(VectorStore):
query: Text to look up documents similar to.
k: Number of Documents to return. Defaults to 4.
filter: Optional metadata filter in str format
namespace: Namespace to use from the index.
Returns:
List of Documents most similar to the query and score for each
"""
docs_and_scores = self.similarity_search_with_score(
query, k=k, filter=filter, **kwargs
query, k=k, filter=filter, namespace=namespace, **kwargs
)
return [doc for doc, _ in docs_and_scores]
@@ -504,6 +565,8 @@ class UpstashVectorStore(VectorStore):
query: str,
k: int = 4,
filter: Optional[str] = None,
*,
namespace: Optional[str] = None,
**kwargs: Any,
) -> List[Document]:
"""Return documents most similar to query.
@@ -512,12 +575,13 @@ class UpstashVectorStore(VectorStore):
query: Text to look up documents similar to.
k: Number of Documents to return. Defaults to 4.
filter: Optional metadata filter in str format
namespace: Namespace to use from the index.
Returns:
List of Documents most similar to the query
"""
docs_and_scores = await self.asimilarity_search_with_score(
query, k=k, filter=filter, **kwargs
query, k=k, filter=filter, namespace=namespace, **kwargs
)
return [doc for doc, _ in docs_and_scores]
@@ -526,6 +590,8 @@ class UpstashVectorStore(VectorStore):
embedding: Union[List[float], str],
k: int = 4,
filter: Optional[str] = None,
*,
namespace: Optional[str] = None,
**kwargs: Any,
) -> List[Document]:
"""Return documents closest to the given embedding.
@@ -534,12 +600,13 @@ class UpstashVectorStore(VectorStore):
embedding: Embedding to look up documents similar to.
k: Number of Documents to return. Defaults to 4.
filter: Optional metadata filter in str format
namespace: Namespace to use from the index.
Returns:
List of Documents most similar to the query
"""
docs_and_scores = self.similarity_search_by_vector_with_score(
embedding, k=k, filter=filter, **kwargs
embedding, k=k, filter=filter, namespace=namespace, **kwargs
)
return [doc for doc, _ in docs_and_scores]
@@ -548,6 +615,8 @@ class UpstashVectorStore(VectorStore):
embedding: Union[List[float], str],
k: int = 4,
filter: Optional[str] = None,
*,
namespace: Optional[str] = None,
**kwargs: Any,
) -> List[Document]:
"""Return documents closest to the given embedding.
@@ -556,12 +625,13 @@ class UpstashVectorStore(VectorStore):
embedding: Embedding to look up documents similar to.
k: Number of Documents to return. Defaults to 4.
filter: Optional metadata filter in str format
namespace: Namespace to use from the index.
Returns:
List of Documents most similar to the query
"""
docs_and_scores = await self.asimilarity_search_by_vector_with_score(
embedding, k=k, filter=filter, **kwargs
embedding, k=k, filter=filter, namespace=namespace, **kwargs
)
return [doc for doc, _ in docs_and_scores]
@@ -570,25 +640,31 @@ class UpstashVectorStore(VectorStore):
query: str,
k: int = 4,
filter: Optional[str] = None,
*,
namespace: Optional[str] = None,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
"""
Since Upstash always returns relevance scores, default implementation is used.
"""
return self.similarity_search_with_score(query, k=k, filter=filter, **kwargs)
return self.similarity_search_with_score(
query, k=k, filter=filter, namespace=namespace, **kwargs
)
async def _asimilarity_search_with_relevance_scores(
self,
query: str,
k: int = 4,
filter: Optional[str] = None,
*,
namespace: Optional[str] = None,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
"""
Since Upstash always returns relevance scores, default implementation is used.
"""
return await self.asimilarity_search_with_score(
query, k=k, filter=filter, **kwargs
query, k=k, filter=filter, namespace=namespace, **kwargs
)
def max_marginal_relevance_search_by_vector(
@@ -598,6 +674,8 @@ class UpstashVectorStore(VectorStore):
fetch_k: int = 20,
lambda_mult: float = 0.5,
filter: Optional[str] = None,
*,
namespace: Optional[str] = None,
**kwargs: Any,
) -> List[Document]:
"""Return docs selected using the maximal marginal relevance.
@@ -614,10 +692,14 @@ class UpstashVectorStore(VectorStore):
to maximum diversity and 1 to minimum diversity.
Defaults to 0.5.
filter: Optional metadata filter in str format
namespace: Namespace to use from the index.
Returns:
List of Documents selected by maximal marginal relevance.
"""
if namespace is None:
namespace = self._namespace
assert isinstance(self.embeddings, Embeddings)
if isinstance(embedding, str):
results = self._index.query(
@@ -626,6 +708,7 @@ class UpstashVectorStore(VectorStore):
include_vectors=True,
include_metadata=True,
filter=filter or "",
namespace=namespace,
**kwargs,
)
else:
@@ -635,6 +718,7 @@ class UpstashVectorStore(VectorStore):
include_vectors=True,
include_metadata=True,
filter=filter or "",
namespace=namespace,
**kwargs,
)
@@ -657,6 +741,8 @@ class UpstashVectorStore(VectorStore):
fetch_k: int = 20,
lambda_mult: float = 0.5,
filter: Optional[str] = None,
*,
namespace: Optional[str] = None,
**kwargs: Any,
) -> List[Document]:
"""Return docs selected using the maximal marginal relevance.
@@ -673,10 +759,15 @@ class UpstashVectorStore(VectorStore):
to maximum diversity and 1 to minimum diversity.
Defaults to 0.5.
filter: Optional metadata filter in str format
namespace: Namespace to use from the index.
Returns:
List of Documents selected by maximal marginal relevance.
"""
if namespace is None:
namespace = self._namespace
assert isinstance(self.embeddings, Embeddings)
if isinstance(embedding, str):
results = await self._async_index.query(
@@ -685,6 +776,7 @@ class UpstashVectorStore(VectorStore):
include_vectors=True,
include_metadata=True,
filter=filter or "",
namespace=namespace,
**kwargs,
)
else:
@@ -694,6 +786,7 @@ class UpstashVectorStore(VectorStore):
include_vectors=True,
include_metadata=True,
filter=filter or "",
namespace=namespace,
**kwargs,
)
@@ -716,6 +809,8 @@ class UpstashVectorStore(VectorStore):
fetch_k: int = 20,
lambda_mult: float = 0.5,
filter: Optional[str] = None,
*,
namespace: Optional[str] = None,
**kwargs: Any,
) -> List[Document]:
"""Return docs selected using the maximal marginal relevance.
@@ -732,6 +827,7 @@ class UpstashVectorStore(VectorStore):
to maximum diversity and 1 to minimum diversity.
Defaults to 0.5.
filter: Optional metadata filter in str format
namespace: Namespace to use from the index.
Returns:
List of Documents selected by maximal marginal relevance.
@@ -743,6 +839,7 @@ class UpstashVectorStore(VectorStore):
fetch_k=fetch_k,
lambda_mult=lambda_mult,
filter=filter,
namespace=namespace,
**kwargs,
)
@@ -753,6 +850,8 @@ class UpstashVectorStore(VectorStore):
fetch_k: int = 20,
lambda_mult: float = 0.5,
filter: Optional[str] = None,
*,
namespace: Optional[str] = None,
**kwargs: Any,
) -> List[Document]:
"""Return docs selected using the maximal marginal relevance.
@@ -769,6 +868,7 @@ class UpstashVectorStore(VectorStore):
to maximum diversity and 1 to minimum diversity.
Defaults to 0.5.
filter: Optional metadata filter in str format
namespace: Namespace to use from the index.
Returns:
List of Documents selected by maximal marginal relevance.
@@ -780,6 +880,7 @@ class UpstashVectorStore(VectorStore):
fetch_k=fetch_k,
lambda_mult=lambda_mult,
filter=filter,
namespace=namespace,
**kwargs,
)
@@ -797,6 +898,8 @@ class UpstashVectorStore(VectorStore):
async_index: Optional[AsyncIndex] = None,
index_url: Optional[str] = None,
index_token: Optional[str] = None,
*,
namespace: str = "",
**kwargs: Any,
) -> UpstashVectorStore:
"""Create a new UpstashVectorStore from a list of texts.
@@ -819,6 +922,7 @@ class UpstashVectorStore(VectorStore):
async_index=async_index,
index_url=index_url,
index_token=index_token,
namespace=namespace,
**kwargs,
)
@@ -828,6 +932,7 @@ class UpstashVectorStore(VectorStore):
ids=ids,
batch_size=batch_size,
embedding_chunk_size=embedding_chunk_size,
namespace=namespace,
)
return vector_store
@@ -845,6 +950,8 @@ class UpstashVectorStore(VectorStore):
async_index: Optional[AsyncIndex] = None,
index_url: Optional[str] = None,
index_token: Optional[str] = None,
*,
namespace: str = "",
**kwargs: Any,
) -> UpstashVectorStore:
"""Create a new UpstashVectorStore from a list of texts.
@@ -865,6 +972,7 @@ class UpstashVectorStore(VectorStore):
text_key=text_key,
index=index,
async_index=async_index,
namespace=namespace,
index_url=index_url,
index_token=index_token,
**kwargs,
@@ -875,6 +983,7 @@ class UpstashVectorStore(VectorStore):
metadatas=metadatas,
ids=ids,
batch_size=batch_size,
namespace=namespace,
embedding_chunk_size=embedding_chunk_size,
)
return vector_store
@@ -884,6 +993,8 @@ class UpstashVectorStore(VectorStore):
ids: Optional[List[str]] = None,
delete_all: Optional[bool] = None,
batch_size: Optional[int] = 1000,
*,
namespace: Optional[str] = None,
**kwargs: Any,
) -> None:
"""Delete by vector IDs
@@ -892,14 +1003,17 @@ class UpstashVectorStore(VectorStore):
ids: List of ids to delete.
delete_all: Delete all vectors in the index.
batch_size: Batch size to use when deleting the embeddings.
namespace: Namespace to use from the index.
Upstash supports at max 1000 deletions per request.
"""
if namespace is None:
namespace = self._namespace
if delete_all:
self._index.reset()
self._index.reset(namespace=namespace)
elif ids is not None:
for batch in batch_iterate(batch_size, ids):
self._index.delete(ids=batch)
self._index.delete(ids=batch, namespace=namespace)
else:
raise ValueError("Either ids or delete_all should be provided")
@@ -910,6 +1024,8 @@ class UpstashVectorStore(VectorStore):
ids: Optional[List[str]] = None,
delete_all: Optional[bool] = None,
batch_size: Optional[int] = 1000,
*,
namespace: Optional[str] = None,
**kwargs: Any,
) -> None:
"""Delete by vector IDs
@@ -918,14 +1034,17 @@ class UpstashVectorStore(VectorStore):
ids: List of ids to delete.
delete_all: Delete all vectors in the index.
batch_size: Batch size to use when deleting the embeddings.
namespace: Namespace to use from the index.
Upstash supports at max 1000 deletions per request.
"""
if namespace is None:
namespace = self._namespace
if delete_all:
await self._async_index.reset()
await self._async_index.reset(namespace=namespace)
elif ids is not None:
for batch in batch_iterate(batch_size, ids):
await self._async_index.delete(ids=batch)
await self._async_index.delete(ids=batch, namespace=namespace)
else:
raise ValueError("Either ids or delete_all should be provided")