Add dashvector vectorstore (#9163)

## Description
Add `Dashvector` vectorstore for langchain

- [dashvector quick
start](https://help.aliyun.com/document_detail/2510223.html)
- [dashvector package description](https://pypi.org/project/dashvector/)

## How to use
```python
from langchain.vectorstores.dashvector import DashVector

dashvector = DashVector.from_documents(docs, embeddings)
```

---------

Co-authored-by: smallrain.xuxy <smallrain.xuxy@alibaba-inc.com>
Co-authored-by: Bagatur <baskaryan@gmail.com>
This commit is contained in:
Xiaoyu Xee
2023-08-16 07:19:30 +08:00
committed by GitHub
parent bfbb97b74c
commit b30f449dae
5 changed files with 702 additions and 0 deletions

View File

@@ -33,6 +33,7 @@ from langchain.vectorstores.cassandra import Cassandra
from langchain.vectorstores.chroma import Chroma
from langchain.vectorstores.clarifai import Clarifai
from langchain.vectorstores.clickhouse import Clickhouse, ClickhouseSettings
from langchain.vectorstores.dashvector import DashVector
from langchain.vectorstores.deeplake import DeepLake
from langchain.vectorstores.dingo import Dingo
from langchain.vectorstores.docarray import DocArrayHnswSearch, DocArrayInMemorySearch
@@ -83,6 +84,7 @@ __all__ = [
"Chroma",
"Clickhouse",
"ClickhouseSettings",
"DashVector",
"DeepLake",
"Dingo",
"DocArrayHnswSearch",

View File

@@ -0,0 +1,365 @@
"""Wrapper around DashVector vector database."""
from __future__ import annotations
import logging
import uuid
from typing import (
Any,
Iterable,
List,
Optional,
Tuple,
)
import numpy as np
from langchain.docstore.document import Document
from langchain.embeddings.base import Embeddings
from langchain.utils import get_from_env
from langchain.vectorstores.base import VectorStore
from langchain.vectorstores.utils import maximal_marginal_relevance
logger = logging.getLogger(__name__)
class DashVector(VectorStore):
"""Wrapper around DashVector vector database.
To use, you should have the ``dashvector`` python package installed.
Example:
.. code-block:: python
from langchain.vectorstores import dashvector
from langchain.embeddings.openai import OpenAIEmbeddings
import dashvector
client = dashvector.Client.init(api_key="***")
client.create("langchain")
collection = client.get("langchain")
embeddings = OpenAIEmbeddings()
vectorstore = Dashvector(collection, embeddings.embed_query, "text")
"""
def __init__(
self,
collection: Any,
embedding: Embeddings,
text_field: str,
):
"""Initialize with DashVector collection."""
try:
import dashvector
except ImportError:
raise ValueError(
"Could not import dashvector python package. "
"Please install it with `pip install dashvector`."
)
if not isinstance(collection, dashvector.Collection):
raise ValueError(
f"collection should be an instance of dashvector.Collection, "
f"bug got {type(collection)}"
)
self._collection = collection
self._embedding = embedding
self._text_field = text_field
def _similarity_search_with_score_by_vector(
self,
embedding: List[float],
k: int = 4,
filter: Optional[str] = None,
) -> List[Tuple[Document, float]]:
"""Return docs most similar to query vector, along with scores"""
# query by vector
ret = self._collection.query(embedding, topk=k, filter=filter)
if not ret:
raise ValueError(
f"Fail to query docs by vector, error {self._collection.message}"
)
docs = []
for doc in ret:
metadata = doc.fields
text = metadata.pop(self._text_field)
score = doc.score
docs.append((Document(page_content=text, metadata=metadata), score))
return docs
def add_texts(
self,
texts: Iterable[str],
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
batch_size: int = 25,
**kwargs: Any,
) -> List[str]:
"""Run more texts through the embeddings and add to the vectorstore.
Args:
texts: Iterable of strings to add to the vectorstore.
metadatas: Optional list of metadatas associated with the texts.
ids: Optional list of ids associated with the texts.
batch_size: Optional batch size to upsert docs.
kwargs: vectorstore specific parameters
Returns:
List of ids from adding the texts into the vectorstore.
"""
ids = ids or [str(uuid.uuid4().hex) for _ in texts]
text_list = list(texts)
for i in range(0, len(text_list), batch_size):
# batch end
end = min(i + batch_size, len(text_list))
batch_texts = text_list[i:end]
batch_ids = ids[i:end]
batch_embeddings = self._embedding.embed_documents(list(batch_texts))
# batch metadatas
if metadatas:
batch_metadatas = metadatas[i:end]
else:
batch_metadatas = [{} for _ in range(i, end)]
for metadata, text in zip(batch_metadatas, batch_texts):
metadata[self._text_field] = text
# batch upsert to collection
docs = list(zip(batch_ids, batch_embeddings, batch_metadatas))
ret = self._collection.upsert(docs)
if not ret:
raise ValueError(
f"Fail to upsert docs to dashvector vector database,"
f"Error: {ret.message}"
)
return ids
def delete(self, ids: Optional[List[str]] = None, **kwargs: Any) -> bool:
"""Delete by vector ID.
Args:
ids: List of ids to delete.
Returns:
True if deletion is successful,
False otherwise.
"""
return bool(self._collection.delete(ids))
def similarity_search(
self,
query: str,
k: int = 4,
filter: Optional[str] = None,
**kwargs: Any,
) -> List[Document]:
"""Return docs most similar to query.
Args:
query: Text to search documents similar to.
k: Number of documents to return. Default to 4.
filter: Doc fields filter conditions that meet the SQL where clause
specification.
Returns:
List of Documents most similar to the query text.
"""
docs_and_scores = self.similarity_search_with_relevance_scores(query, k, filter)
return [doc for doc, _ in docs_and_scores]
def similarity_search_with_relevance_scores(
self,
query: str,
k: int = 4,
filter: Optional[str] = None,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
"""Return docs most similar to query text , alone with relevance scores.
Less is more similar, more is more dissimilar.
Args:
query: input text
k: Number of Documents to return. Defaults to 4.
filter: Doc fields filter conditions that meet the SQL where clause
specification.
Returns:
List of Tuples of (doc, similarity_score)
"""
embedding = self._embedding.embed_query(query)
return self._similarity_search_with_score_by_vector(
embedding, k=k, filter=filter
)
def similarity_search_by_vector(
self,
embedding: List[float],
k: int = 4,
filter: Optional[str] = None,
**kwargs: Any,
) -> List[Document]:
"""Return docs most similar to embedding vector.
Args:
embedding: Embedding to look up documents similar to.
k: Number of Documents to return. Defaults to 4.
filter: Doc fields filter conditions that meet the SQL where clause
specification.
Returns:
List of Documents most similar to the query vector.
"""
docs_and_scores = self._similarity_search_with_score_by_vector(
embedding, k, filter
)
return [doc for doc, _ in docs_and_scores]
def max_marginal_relevance_search(
self,
query: str,
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
filter: Optional[dict] = None,
**kwargs: Any,
) -> List[Document]:
"""Return docs selected using the maximal marginal relevance.
Maximal marginal relevance optimizes for similarity to query AND diversity
among selected documents.
Args:
query: Text to look up documents similar to.
k: Number of Documents to return. Defaults to 4.
fetch_k: Number of Documents to fetch to pass to MMR algorithm.
lambda_mult: Number between 0 and 1 that determines the degree
of diversity among the results with 0 corresponding
to maximum diversity and 1 to minimum diversity.
Defaults to 0.5.
filter: Doc fields filter conditions that meet the SQL where clause
specification.
Returns:
List of Documents selected by maximal marginal relevance.
"""
embedding = self._embedding.embed_query(query)
return self.max_marginal_relevance_search_by_vector(
embedding, k, fetch_k, lambda_mult, filter
)
def max_marginal_relevance_search_by_vector(
self,
embedding: List[float],
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
filter: Optional[dict] = None,
**kwargs: Any,
) -> List[Document]:
"""Return docs selected using the maximal marginal relevance.
Maximal marginal relevance optimizes for similarity to query AND diversity
among selected documents.
Args:
embedding: Embedding to look up documents similar to.
k: Number of Documents to return. Defaults to 4.
fetch_k: Number of Documents to fetch to pass to MMR algorithm.
lambda_mult: Number between 0 and 1 that determines the degree
of diversity among the results with 0 corresponding
to maximum diversity and 1 to minimum diversity.
Defaults to 0.5.
filter: Doc fields filter conditions that meet the SQL where clause
specification.
Returns:
List of Documents selected by maximal marginal relevance.
"""
# query by vector
ret = self._collection.query(
embedding, topk=fetch_k, filter=filter, include_vector=True
)
if not ret:
raise ValueError(
f"Fail to query docs by vector, error {self._collection.message}"
)
candidate_embeddings = [doc.vector for doc in ret]
mmr_selected = maximal_marginal_relevance(
np.array(embedding), candidate_embeddings, lambda_mult, k
)
metadatas = [ret.output[i].fields for i in mmr_selected]
return [
Document(page_content=metadata.pop(self._text_field), metadata=metadata)
for metadata in metadatas
]
@classmethod
def from_texts(
cls,
texts: List[str],
embedding: Embeddings,
metadatas: Optional[List[dict]] = None,
dashvector_api_key: Optional[str] = None,
collection_name: str = "langchain",
text_field: str = "text",
batch_size: int = 25,
ids: Optional[List[str]] = None,
**kwargs: Any,
) -> DashVector:
"""Return DashVector VectorStore initialized from texts and embeddings.
This is the quick way to get started with dashvector vector store.
Example:
.. code-block:: python
from langchain.vectorstores import DashVector
from langchain.embeddings import OpenAIEmbeddings
import dashvector
embeddings = OpenAIEmbeddings()
dashvector = DashVector.from_documents(
docs,
embeddings,
dashvector_api_key="{DASHVECTOR_API_KEY}"
)
"""
try:
import dashvector
except ImportError:
raise ValueError(
"Could not import dashvector python package. "
"Please install it with `pip install dashvector`."
)
dashvector_api_key = dashvector_api_key or get_from_env(
"dashvector_api_key", "DASHVECTOR_API_KEY"
)
dashvector_client = dashvector.Client(api_key=dashvector_api_key)
dashvector_client.delete(collection_name)
collection = dashvector_client.get(collection_name)
if not collection:
dim = len(embedding.embed_query(texts[0]))
# create collection if not existed
resp = dashvector_client.create(collection_name, dimension=dim)
if resp:
collection = dashvector_client.get(collection_name)
else:
raise ValueError(
"Fail to create collection. " f"Error: {resp.message}."
)
dashvector_vector_db = cls(collection, embedding, text_field)
dashvector_vector_db.add_texts(texts, metadatas, ids, batch_size)
return dashvector_vector_db

View File

@@ -0,0 +1,75 @@
from time import sleep
from langchain.schema import Document
from langchain.vectorstores import DashVector
from tests.integration_tests.vectorstores.fake_embeddings import FakeEmbeddings
texts = ["foo", "bar", "baz"]
ids = ["1", "2", "3"]
def test_dashvector_from_texts() -> None:
dashvector = DashVector.from_texts(
texts=texts,
embedding=FakeEmbeddings(),
ids=ids,
)
# the vector insert operation is async by design, we wait here a bit for the
# insertion to complete.
sleep(0.5)
output = dashvector.similarity_search("foo", k=1)
assert output == [Document(page_content="foo")]
def test_dashvector_with_text_with_metadatas() -> None:
metadatas = [{"meta": i} for i in range(len(texts))]
dashvector = DashVector.from_texts(
texts=texts,
embedding=FakeEmbeddings(),
metadatas=metadatas,
ids=ids,
)
# the vector insert operation is async by design, we wait here a bit for the
# insertion to complete.
sleep(0.5)
output = dashvector.similarity_search("foo", k=1)
assert output == [Document(page_content="foo", metadata={"meta": 0})]
def test_dashvector_search_with_filter() -> None:
metadatas = [{"meta": i} for i in range(len(texts))]
dashvector = DashVector.from_texts(
texts=texts,
embedding=FakeEmbeddings(),
metadatas=metadatas,
ids=ids,
)
# the vector insert operation is async by design, we wait here a bit for the
# insertion to complete.
sleep(0.5)
output = dashvector.similarity_search("foo", filter="meta=2")
assert output == [Document(page_content="baz", metadata={"meta": 2})]
def test_dashvector_search_with_scores() -> None:
dashvector = DashVector.from_texts(
texts=texts,
embedding=FakeEmbeddings(),
ids=ids,
)
# the vector insert operation is async by design, we wait here a bit for the
# insertion to complete.
sleep(0.5)
output = dashvector.similarity_search_with_relevance_scores("foo")
docs, scores = zip(*output)
assert scores[0] < scores[1] < scores[2]
assert list(docs) == [
Document(page_content="foo"),
Document(page_content="bar"),
Document(page_content="baz"),
]