mirror of
https://github.com/hwchase17/langchain.git
synced 2025-09-25 04:49:17 +00:00
community[minor]: Add async methods to CassandraVectorStore (#20602)
Co-authored-by: Eugene Yurtsev <eyurtsev@gmail.com>
This commit is contained in:
committed by
GitHub
parent
06d18c106d
commit
c909ae0152
@@ -1,6 +1,7 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from enum import Enum
|
||||
from typing import TYPE_CHECKING, Any, Callable
|
||||
|
||||
if TYPE_CHECKING:
|
||||
@@ -22,3 +23,9 @@ async def wrapped_response_future(
|
||||
|
||||
response_future.add_callbacks(success_handler, error_handler)
|
||||
return await asyncio_future
|
||||
|
||||
|
||||
class SetupMode(Enum):
|
||||
SYNC = 1
|
||||
ASYNC = 2
|
||||
OFF = 3
|
||||
|
@@ -1,9 +1,11 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import typing
|
||||
import uuid
|
||||
from typing import (
|
||||
Any,
|
||||
Awaitable,
|
||||
Callable,
|
||||
Dict,
|
||||
Iterable,
|
||||
@@ -24,6 +26,7 @@ from langchain_core.documents import Document
|
||||
from langchain_core.embeddings import Embeddings
|
||||
from langchain_core.vectorstores import VectorStore
|
||||
|
||||
from langchain_community.utilities.cassandra import SetupMode
|
||||
from langchain_community.vectorstores.utils import maximal_marginal_relevance
|
||||
|
||||
CVST = TypeVar("CVST", bound="Cassandra")
|
||||
@@ -70,6 +73,13 @@ class Cassandra(VectorStore):
|
||||
)
|
||||
return self._embedding_dimension
|
||||
|
||||
async def _aget_embedding_dimension(self) -> int:
|
||||
if self._embedding_dimension is None:
|
||||
self._embedding_dimension = len(
|
||||
await self.embedding.aembed_query("This is a sample sentence.")
|
||||
)
|
||||
return self._embedding_dimension
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
embedding: Embeddings,
|
||||
@@ -79,6 +89,7 @@ class Cassandra(VectorStore):
|
||||
ttl_seconds: Optional[int] = None,
|
||||
*,
|
||||
body_index_options: Optional[List[Tuple[str, Any]]] = None,
|
||||
setup_mode: SetupMode = SetupMode.SYNC,
|
||||
) -> None:
|
||||
try:
|
||||
from cassio.table import MetadataVectorCassandraTable
|
||||
@@ -96,17 +107,26 @@ class Cassandra(VectorStore):
|
||||
#
|
||||
self._embedding_dimension = None
|
||||
#
|
||||
kwargs = {}
|
||||
kwargs: Dict[str, Any] = {}
|
||||
if body_index_options is not None:
|
||||
kwargs["body_index_options"] = body_index_options
|
||||
if setup_mode == SetupMode.ASYNC:
|
||||
kwargs["async_setup"] = True
|
||||
|
||||
embedding_dimension: Union[int, Awaitable[int], None] = None
|
||||
if setup_mode == SetupMode.ASYNC:
|
||||
embedding_dimension = self._aget_embedding_dimension()
|
||||
elif setup_mode == SetupMode.SYNC:
|
||||
embedding_dimension = self._get_embedding_dimension()
|
||||
|
||||
self.table = MetadataVectorCassandraTable(
|
||||
session=session,
|
||||
keyspace=keyspace,
|
||||
table=table_name,
|
||||
vector_dimension=self._get_embedding_dimension(),
|
||||
vector_dimension=embedding_dimension,
|
||||
metadata_indexing="all",
|
||||
primary_key_type="TEXT",
|
||||
skip_provisioning=setup_mode == SetupMode.OFF,
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
@@ -129,17 +149,30 @@ class Cassandra(VectorStore):
|
||||
"""
|
||||
self.clear()
|
||||
|
||||
async def adelete_collection(self) -> None:
|
||||
"""
|
||||
Just an alias for `aclear`
|
||||
(to better align with other VectorStore implementations).
|
||||
"""
|
||||
await self.aclear()
|
||||
|
||||
def clear(self) -> None:
|
||||
"""Empty the table."""
|
||||
self.table.clear()
|
||||
|
||||
async def aclear(self) -> None:
|
||||
"""Empty the table."""
|
||||
await self.table.aclear()
|
||||
|
||||
def delete_by_document_id(self, document_id: str) -> None:
|
||||
return self.table.delete(row_id=document_id)
|
||||
|
||||
async def adelete_by_document_id(self, document_id: str) -> None:
|
||||
return await self.table.adelete(row_id=document_id)
|
||||
|
||||
def delete(self, ids: Optional[List[str]] = None, **kwargs: Any) -> Optional[bool]:
|
||||
"""Delete by vector IDs.
|
||||
|
||||
|
||||
Args:
|
||||
ids: List of ids to delete.
|
||||
|
||||
@@ -155,6 +188,26 @@ class Cassandra(VectorStore):
|
||||
self.delete_by_document_id(document_id)
|
||||
return True
|
||||
|
||||
async def adelete(
|
||||
self, ids: Optional[List[str]] = None, **kwargs: Any
|
||||
) -> Optional[bool]:
|
||||
"""Delete by vector IDs.
|
||||
|
||||
Args:
|
||||
ids: List of ids to delete.
|
||||
|
||||
Returns:
|
||||
Optional[bool]: True if deletion is successful,
|
||||
False otherwise, None if not implemented.
|
||||
"""
|
||||
|
||||
if ids is None:
|
||||
raise ValueError("No ids provided to delete.")
|
||||
|
||||
for document_id in ids:
|
||||
await self.adelete_by_document_id(document_id)
|
||||
return True
|
||||
|
||||
def add_texts(
|
||||
self,
|
||||
texts: Iterable[str],
|
||||
@@ -176,16 +229,12 @@ class Cassandra(VectorStore):
|
||||
Returns:
|
||||
List[str]: List of IDs of the added texts.
|
||||
"""
|
||||
_texts = list(texts) # lest it be a generator or something
|
||||
if ids is None:
|
||||
ids = [uuid.uuid4().hex for _ in _texts]
|
||||
if metadatas is None:
|
||||
metadatas = [{} for _ in _texts]
|
||||
#
|
||||
_texts = list(texts)
|
||||
ids = ids or [uuid.uuid4().hex for _ in _texts]
|
||||
metadatas = metadatas or [{}] * len(_texts)
|
||||
ttl_seconds = ttl_seconds or self.ttl_seconds
|
||||
#
|
||||
embedding_vectors = self.embedding.embed_documents(_texts)
|
||||
#
|
||||
|
||||
for i in range(0, len(_texts), batch_size):
|
||||
batch_texts = _texts[i : i + batch_size]
|
||||
batch_embedding_vectors = embedding_vectors[i : i + batch_size]
|
||||
@@ -208,6 +257,77 @@ class Cassandra(VectorStore):
|
||||
future.result()
|
||||
return ids
|
||||
|
||||
async def aadd_texts(
|
||||
self,
|
||||
texts: Iterable[str],
|
||||
metadatas: Optional[List[dict]] = None,
|
||||
ids: Optional[List[str]] = None,
|
||||
concurrency: int = 16,
|
||||
ttl_seconds: Optional[int] = None,
|
||||
**kwargs: Any,
|
||||
) -> List[str]:
|
||||
"""Run more texts through the embeddings and add to the vectorstore.
|
||||
|
||||
Args:
|
||||
texts: Texts to add to the vectorstore.
|
||||
metadatas: Optional list of metadatas.
|
||||
ids: Optional list of IDs.
|
||||
concurrency: Number of concurrent queries to the database.
|
||||
Defaults to 16.
|
||||
ttl_seconds: Optional time-to-live for the added texts.
|
||||
|
||||
Returns:
|
||||
List[str]: List of IDs of the added texts.
|
||||
"""
|
||||
_texts = list(texts)
|
||||
ids = ids or [uuid.uuid4().hex for _ in _texts]
|
||||
_metadatas: List[dict] = metadatas or [{}] * len(_texts)
|
||||
ttl_seconds = ttl_seconds or self.ttl_seconds
|
||||
embedding_vectors = await self.embedding.aembed_documents(_texts)
|
||||
|
||||
sem = asyncio.Semaphore(concurrency)
|
||||
|
||||
async def send_concurrently(
|
||||
row_id: str, text: str, embedding_vector: List[float], metadata: dict
|
||||
) -> None:
|
||||
async with sem:
|
||||
await self.table.aput(
|
||||
row_id=row_id,
|
||||
body_blob=text,
|
||||
vector=embedding_vector,
|
||||
metadata=metadata or {},
|
||||
ttl_seconds=ttl_seconds,
|
||||
)
|
||||
|
||||
for i in range(0, len(_texts)):
|
||||
tasks = [
|
||||
asyncio.create_task(
|
||||
send_concurrently(
|
||||
ids[i], _texts[i], embedding_vectors[i], _metadatas[i]
|
||||
)
|
||||
)
|
||||
]
|
||||
await asyncio.gather(*tasks)
|
||||
return ids
|
||||
|
||||
@staticmethod
|
||||
def _search_to_documents(
|
||||
hits: Iterable[Dict[str, Any]],
|
||||
) -> List[Tuple[Document, float, str]]:
|
||||
# We stick to 'cos' distance as it can be normalized on a 0-1 axis
|
||||
# (1=most relevant), as required by this class' contract.
|
||||
return [
|
||||
(
|
||||
Document(
|
||||
page_content=hit["body_blob"],
|
||||
metadata=hit["metadata"],
|
||||
),
|
||||
0.5 + 0.5 * hit["distance"],
|
||||
hit["row_id"],
|
||||
)
|
||||
for hit in hits
|
||||
]
|
||||
|
||||
# id-returning search facilities
|
||||
def similarity_search_with_score_id_by_vector(
|
||||
self,
|
||||
@@ -232,26 +352,46 @@ class Cassandra(VectorStore):
|
||||
kwargs["metadata"] = filter
|
||||
if body_search is not None:
|
||||
kwargs["body_search"] = body_search
|
||||
#
|
||||
|
||||
hits = self.table.metric_ann_search(
|
||||
vector=embedding,
|
||||
n=k,
|
||||
metric="cos",
|
||||
**kwargs,
|
||||
)
|
||||
# We stick to 'cos' distance as it can be normalized on a 0-1 axis
|
||||
# (1=most relevant), as required by this class' contract.
|
||||
return [
|
||||
(
|
||||
Document(
|
||||
page_content=hit["body_blob"],
|
||||
metadata=hit["metadata"],
|
||||
),
|
||||
0.5 + 0.5 * hit["distance"],
|
||||
hit["row_id"],
|
||||
)
|
||||
for hit in hits
|
||||
]
|
||||
return self._search_to_documents(hits)
|
||||
|
||||
async def asimilarity_search_with_score_id_by_vector(
|
||||
self,
|
||||
embedding: List[float],
|
||||
k: int = 4,
|
||||
filter: Optional[Dict[str, str]] = None,
|
||||
body_search: Optional[Union[str, List[str]]] = None,
|
||||
) -> List[Tuple[Document, float, str]]:
|
||||
"""Return docs most similar to embedding vector.
|
||||
|
||||
Args:
|
||||
embedding (str): Embedding to look up documents similar to.
|
||||
k (int): Number of Documents to return. Defaults to 4.
|
||||
filter: Filter on the metadata to apply.
|
||||
body_search: Document textual search terms to apply.
|
||||
Only supported by Astra DB at the moment.
|
||||
Returns:
|
||||
List of (Document, score, id), the most similar to the query vector.
|
||||
"""
|
||||
kwargs: Dict[str, Any] = {}
|
||||
if filter is not None:
|
||||
kwargs["metadata"] = filter
|
||||
if body_search is not None:
|
||||
kwargs["body_search"] = body_search
|
||||
|
||||
hits = await self.table.ametric_ann_search(
|
||||
vector=embedding,
|
||||
n=k,
|
||||
metric="cos",
|
||||
**kwargs,
|
||||
)
|
||||
return self._search_to_documents(hits)
|
||||
|
||||
def similarity_search_with_score_id(
|
||||
self,
|
||||
@@ -268,6 +408,21 @@ class Cassandra(VectorStore):
|
||||
body_search=body_search,
|
||||
)
|
||||
|
||||
async def asimilarity_search_with_score_id(
|
||||
self,
|
||||
query: str,
|
||||
k: int = 4,
|
||||
filter: Optional[Dict[str, str]] = None,
|
||||
body_search: Optional[Union[str, List[str]]] = None,
|
||||
) -> List[Tuple[Document, float, str]]:
|
||||
embedding_vector = await self.embedding.aembed_query(query)
|
||||
return await self.asimilarity_search_with_score_id_by_vector(
|
||||
embedding=embedding_vector,
|
||||
k=k,
|
||||
filter=filter,
|
||||
body_search=body_search,
|
||||
)
|
||||
|
||||
# id-unaware search facilities
|
||||
def similarity_search_with_score_by_vector(
|
||||
self,
|
||||
@@ -297,6 +452,38 @@ class Cassandra(VectorStore):
|
||||
)
|
||||
]
|
||||
|
||||
async def asimilarity_search_with_score_by_vector(
|
||||
self,
|
||||
embedding: List[float],
|
||||
k: int = 4,
|
||||
filter: Optional[Dict[str, str]] = None,
|
||||
body_search: Optional[Union[str, List[str]]] = None,
|
||||
) -> List[Tuple[Document, float]]:
|
||||
"""Return docs most similar to embedding vector.
|
||||
|
||||
Args:
|
||||
embedding (str): Embedding to look up documents similar to.
|
||||
k (int): Number of Documents to return. Defaults to 4.
|
||||
filter: Filter on the metadata to apply.
|
||||
body_search: Document textual search terms to apply.
|
||||
Only supported by Astra DB at the moment.
|
||||
Returns:
|
||||
List of (Document, score), the most similar to the query vector.
|
||||
"""
|
||||
return [
|
||||
(doc, score)
|
||||
for (
|
||||
doc,
|
||||
score,
|
||||
_,
|
||||
) in await self.asimilarity_search_with_score_id_by_vector(
|
||||
embedding=embedding,
|
||||
k=k,
|
||||
filter=filter,
|
||||
body_search=body_search,
|
||||
)
|
||||
]
|
||||
|
||||
def similarity_search(
|
||||
self,
|
||||
query: str,
|
||||
@@ -313,6 +500,22 @@ class Cassandra(VectorStore):
|
||||
body_search=body_search,
|
||||
)
|
||||
|
||||
async def asimilarity_search(
|
||||
self,
|
||||
query: str,
|
||||
k: int = 4,
|
||||
filter: Optional[Dict[str, str]] = None,
|
||||
body_search: Optional[Union[str, List[str]]] = None,
|
||||
**kwargs: Any,
|
||||
) -> List[Document]:
|
||||
embedding_vector = await self.embedding.aembed_query(query)
|
||||
return await self.asimilarity_search_by_vector(
|
||||
embedding_vector,
|
||||
k,
|
||||
filter=filter,
|
||||
body_search=body_search,
|
||||
)
|
||||
|
||||
def similarity_search_by_vector(
|
||||
self,
|
||||
embedding: List[float],
|
||||
@@ -331,6 +534,24 @@ class Cassandra(VectorStore):
|
||||
)
|
||||
]
|
||||
|
||||
async def asimilarity_search_by_vector(
|
||||
self,
|
||||
embedding: List[float],
|
||||
k: int = 4,
|
||||
filter: Optional[Dict[str, str]] = None,
|
||||
body_search: Optional[Union[str, List[str]]] = None,
|
||||
**kwargs: Any,
|
||||
) -> List[Document]:
|
||||
return [
|
||||
doc
|
||||
for doc, _ in await self.asimilarity_search_with_score_by_vector(
|
||||
embedding,
|
||||
k,
|
||||
filter=filter,
|
||||
body_search=body_search,
|
||||
)
|
||||
]
|
||||
|
||||
def similarity_search_with_score(
|
||||
self,
|
||||
query: str,
|
||||
@@ -346,6 +567,48 @@ class Cassandra(VectorStore):
|
||||
body_search=body_search,
|
||||
)
|
||||
|
||||
async def asimilarity_search_with_score(
|
||||
self,
|
||||
query: str,
|
||||
k: int = 4,
|
||||
filter: Optional[Dict[str, str]] = None,
|
||||
body_search: Optional[Union[str, List[str]]] = None,
|
||||
) -> List[Tuple[Document, float]]:
|
||||
embedding_vector = await self.embedding.aembed_query(query)
|
||||
return await self.asimilarity_search_with_score_by_vector(
|
||||
embedding_vector,
|
||||
k,
|
||||
filter=filter,
|
||||
body_search=body_search,
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _mmr_search_to_documents(
|
||||
prefetch_hits: List[Dict[str, Any]],
|
||||
embedding: List[float],
|
||||
k: int,
|
||||
lambda_mult: float,
|
||||
) -> List[Document]:
|
||||
# let the mmr utility pick the *indices* in the above array
|
||||
mmr_chosen_indices = maximal_marginal_relevance(
|
||||
np.array(embedding, dtype=np.float32),
|
||||
[pf_hit["vector"] for pf_hit in prefetch_hits],
|
||||
k=k,
|
||||
lambda_mult=lambda_mult,
|
||||
)
|
||||
mmr_hits = [
|
||||
pf_hit
|
||||
for pf_index, pf_hit in enumerate(prefetch_hits)
|
||||
if pf_index in mmr_chosen_indices
|
||||
]
|
||||
return [
|
||||
Document(
|
||||
page_content=hit["body_blob"],
|
||||
metadata=hit["metadata"],
|
||||
)
|
||||
for hit in mmr_hits
|
||||
]
|
||||
|
||||
def max_marginal_relevance_search_by_vector(
|
||||
self,
|
||||
embedding: List[float],
|
||||
@@ -388,25 +651,51 @@ class Cassandra(VectorStore):
|
||||
**_kwargs,
|
||||
)
|
||||
)
|
||||
# let the mmr utility pick the *indices* in the above array
|
||||
mmr_chosen_indices = maximal_marginal_relevance(
|
||||
np.array(embedding, dtype=np.float32),
|
||||
[pf_hit["vector"] for pf_hit in prefetch_hits],
|
||||
k=k,
|
||||
lambda_mult=lambda_mult,
|
||||
)
|
||||
mmr_hits = [
|
||||
pf_hit
|
||||
for pf_index, pf_hit in enumerate(prefetch_hits)
|
||||
if pf_index in mmr_chosen_indices
|
||||
]
|
||||
return [
|
||||
Document(
|
||||
page_content=hit["body_blob"],
|
||||
metadata=hit["metadata"],
|
||||
return self._mmr_search_to_documents(prefetch_hits, embedding, k, lambda_mult)
|
||||
|
||||
async def amax_marginal_relevance_search_by_vector(
|
||||
self,
|
||||
embedding: List[float],
|
||||
k: int = 4,
|
||||
fetch_k: int = 20,
|
||||
lambda_mult: float = 0.5,
|
||||
filter: Optional[Dict[str, str]] = None,
|
||||
body_search: Optional[Union[str, List[str]]] = None,
|
||||
**kwargs: Any,
|
||||
) -> List[Document]:
|
||||
"""Return docs selected using the maximal marginal relevance.
|
||||
Maximal marginal relevance optimizes for similarity to query AND diversity
|
||||
among selected documents.
|
||||
Args:
|
||||
embedding: Embedding to look up documents similar to.
|
||||
k: Number of Documents to return. Defaults to 4.
|
||||
fetch_k: Number of Documents to fetch to pass to MMR algorithm.
|
||||
Defaults to 20.
|
||||
lambda_mult: Number between 0 and 1 that determines the degree
|
||||
of diversity among the results with 0 corresponding to maximum
|
||||
diversity and 1 to minimum diversity.
|
||||
Defaults to 0.5.
|
||||
filter: Filter on the metadata to apply.
|
||||
body_search: Document textual search terms to apply.
|
||||
Only supported by Astra DB at the moment.
|
||||
Returns:
|
||||
List of Documents selected by maximal marginal relevance.
|
||||
"""
|
||||
_kwargs: Dict[str, Any] = {}
|
||||
if filter is not None:
|
||||
_kwargs["metadata"] = filter
|
||||
if body_search is not None:
|
||||
_kwargs["body_search"] = body_search
|
||||
|
||||
prefetch_hits = list(
|
||||
await self.table.ametric_ann_search(
|
||||
vector=embedding,
|
||||
n=fetch_k,
|
||||
metric="cos",
|
||||
**_kwargs,
|
||||
)
|
||||
for hit in mmr_hits
|
||||
]
|
||||
)
|
||||
return self._mmr_search_to_documents(prefetch_hits, embedding, k, lambda_mult)
|
||||
|
||||
def max_marginal_relevance_search(
|
||||
self,
|
||||
@@ -446,6 +735,43 @@ class Cassandra(VectorStore):
|
||||
body_search=body_search,
|
||||
)
|
||||
|
||||
async def amax_marginal_relevance_search(
|
||||
self,
|
||||
query: str,
|
||||
k: int = 4,
|
||||
fetch_k: int = 20,
|
||||
lambda_mult: float = 0.5,
|
||||
filter: Optional[Dict[str, str]] = None,
|
||||
body_search: Optional[Union[str, List[str]]] = None,
|
||||
**kwargs: Any,
|
||||
) -> List[Document]:
|
||||
"""Return docs selected using the maximal marginal relevance.
|
||||
Maximal marginal relevance optimizes for similarity to query AND diversity
|
||||
among selected documents.
|
||||
Args:
|
||||
query: Text to look up documents similar to.
|
||||
k: Number of Documents to return.
|
||||
fetch_k: Number of Documents to fetch to pass to MMR algorithm.
|
||||
lambda_mult: Number between 0 and 1 that determines the degree
|
||||
of diversity among the results with 0 corresponding to maximum
|
||||
diversity and 1 to minimum diversity.
|
||||
Defaults to 0.5.
|
||||
filter: Filter on the metadata to apply.
|
||||
body_search: Document textual search terms to apply.
|
||||
Only supported by Astra DB at the moment.
|
||||
Returns:
|
||||
List of Documents selected by maximal marginal relevance.
|
||||
"""
|
||||
embedding_vector = await self.embedding.aembed_query(query)
|
||||
return await self.amax_marginal_relevance_search_by_vector(
|
||||
embedding_vector,
|
||||
k,
|
||||
fetch_k,
|
||||
lambda_mult=lambda_mult,
|
||||
filter=filter,
|
||||
body_search=body_search,
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def from_texts(
|
||||
cls: Type[CVST],
|
||||
@@ -500,6 +826,61 @@ class Cassandra(VectorStore):
|
||||
)
|
||||
return store
|
||||
|
||||
@classmethod
|
||||
async def afrom_texts(
|
||||
cls: Type[CVST],
|
||||
texts: List[str],
|
||||
embedding: Embeddings,
|
||||
metadatas: Optional[List[dict]] = None,
|
||||
*,
|
||||
session: Session = _NOT_SET,
|
||||
keyspace: str = "",
|
||||
table_name: str = "",
|
||||
ids: Optional[List[str]] = None,
|
||||
concurrency: int = 16,
|
||||
ttl_seconds: Optional[int] = None,
|
||||
body_index_options: Optional[List[Tuple[str, Any]]] = None,
|
||||
**kwargs: Any,
|
||||
) -> CVST:
|
||||
"""Create a Cassandra vectorstore from raw texts.
|
||||
|
||||
Args:
|
||||
texts: Texts to add to the vectorstore.
|
||||
embedding: Embedding function to use.
|
||||
metadatas: Optional list of metadatas associated with the texts.
|
||||
session: Cassandra driver session (required).
|
||||
keyspace: Cassandra key space (required).
|
||||
table_name: Cassandra table (required).
|
||||
ids: Optional list of IDs associated with the texts.
|
||||
concurrency: Number of concurrent queries to send to the database.
|
||||
Defaults to 16.
|
||||
ttl_seconds: Optional time-to-live for the added texts.
|
||||
body_index_options: Optional options used to create the body index.
|
||||
Eg. body_index_options = [cassio.table.cql.STANDARD_ANALYZER]
|
||||
|
||||
Returns:
|
||||
a Cassandra vectorstore.
|
||||
"""
|
||||
if session is _NOT_SET:
|
||||
raise ValueError("session parameter is required")
|
||||
if not keyspace:
|
||||
raise ValueError("keyspace parameter is required")
|
||||
if not table_name:
|
||||
raise ValueError("table_name parameter is required")
|
||||
store = cls(
|
||||
embedding=embedding,
|
||||
session=session,
|
||||
keyspace=keyspace,
|
||||
table_name=table_name,
|
||||
ttl_seconds=ttl_seconds,
|
||||
setup_mode=SetupMode.ASYNC,
|
||||
body_index_options=body_index_options,
|
||||
)
|
||||
await store.aadd_texts(
|
||||
texts=texts, metadatas=metadatas, ids=ids, concurrency=concurrency
|
||||
)
|
||||
return store
|
||||
|
||||
@classmethod
|
||||
def from_documents(
|
||||
cls: Type[CVST],
|
||||
@@ -548,3 +929,52 @@ class Cassandra(VectorStore):
|
||||
body_index_options=body_index_options,
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
@classmethod
|
||||
async def afrom_documents(
|
||||
cls: Type[CVST],
|
||||
documents: List[Document],
|
||||
embedding: Embeddings,
|
||||
*,
|
||||
session: Session = _NOT_SET,
|
||||
keyspace: str = "",
|
||||
table_name: str = "",
|
||||
ids: Optional[List[str]] = None,
|
||||
concurrency: int = 16,
|
||||
ttl_seconds: Optional[int] = None,
|
||||
body_index_options: Optional[List[Tuple[str, Any]]] = None,
|
||||
**kwargs: Any,
|
||||
) -> CVST:
|
||||
"""Create a Cassandra vectorstore from a document list.
|
||||
|
||||
Args:
|
||||
documents: Documents to add to the vectorstore.
|
||||
embedding: Embedding function to use.
|
||||
session: Cassandra driver session (required).
|
||||
keyspace: Cassandra key space (required).
|
||||
table_name: Cassandra table (required).
|
||||
ids: Optional list of IDs associated with the documents.
|
||||
concurrency: Number of concurrent queries to send to the database.
|
||||
Defaults to 16.
|
||||
ttl_seconds: Optional time-to-live for the added documents.
|
||||
body_index_options: Optional options used to create the body index.
|
||||
Eg. body_index_options = [cassio.table.cql.STANDARD_ANALYZER]
|
||||
|
||||
Returns:
|
||||
a Cassandra vectorstore.
|
||||
"""
|
||||
texts = [doc.page_content for doc in documents]
|
||||
metadatas = [doc.metadata for doc in documents]
|
||||
return await cls.afrom_texts(
|
||||
texts=texts,
|
||||
embedding=embedding,
|
||||
metadatas=metadatas,
|
||||
session=session,
|
||||
keyspace=keyspace,
|
||||
table_name=table_name,
|
||||
ids=ids,
|
||||
concurrency=concurrency,
|
||||
ttl_seconds=ttl_seconds,
|
||||
body_index_options=body_index_options,
|
||||
**kwargs,
|
||||
)
|
||||
|
Reference in New Issue
Block a user