community[patch]: Milvus supports add & delete texts by ids (#16256)

# Description

To support [langchain
indexing](https://python.langchain.com/docs/modules/data_connection/indexing)
as requested by users, vectorstore Milvus needs to support:
- document addition by id (`add_documents` method with `ids` argument)
- delete by id (`delete` method with `ids` argument)

Example usage:

```python
from langchain.indexes import SQLRecordManager, index
from langchain.schema import Document
from langchain_community.vectorstores import Milvus
from langchain_openai import OpenAIEmbeddings

collection_name = "test_index"
embedding = OpenAIEmbeddings()
vectorstore = Milvus(embedding_function=embedding, collection_name=collection_name)

namespace = f"milvus/{collection_name}"
record_manager = SQLRecordManager(
    namespace, db_url="sqlite:///record_manager_cache.sql"
)
record_manager.create_schema()

doc1 = Document(page_content="kitty", metadata={"source": "kitty.txt"})
doc2 = Document(page_content="doggy", metadata={"source": "doggy.txt"})

index(
    [doc1, doc1, doc2],
    record_manager,
    vectorstore,
    cleanup="incremental",  # None, "incremental", or "full"
    source_id_key="source",
)
```

# Fix issues

Fix https://github.com/milvus-io/milvus/issues/30112

---------

Signed-off-by: Jael Gu <mengjia.gu@zilliz.com>
Co-authored-by: Bagatur <baskaryan@gmail.com>
This commit is contained in:
Jael Gu 2024-01-30 03:19:50 +08:00 committed by GitHub
parent e9d3527b79
commit a1aa3a657c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 124 additions and 13 deletions

View File

@ -60,7 +60,7 @@
" * document addition by id (`add_documents` method with `ids` argument)\n", " * document addition by id (`add_documents` method with `ids` argument)\n",
" * delete by id (`delete` method with `ids` argument)\n", " * delete by id (`delete` method with `ids` argument)\n",
"\n", "\n",
"Compatible Vectorstores: `AnalyticDB`, `AstraDB`, `AwaDB`, `Bagel`, `Cassandra`, `Chroma`, `DashVector`, `DatabricksVectorSearch`, `DeepLake`, `Dingo`, `ElasticVectorSearch`, `ElasticsearchStore`, `FAISS`, `HanaDB`, `MyScale`, `PGVector`, `Pinecone`, `Qdrant`, `Redis`, `ScaNN`, `SupabaseVectorStore`, `SurrealDBStore`, `TimescaleVector`, `Vald`, `Vearch`, `VespaStore`, `Weaviate`, `ZepVectorStore`.\n", "Compatible Vectorstores: `AnalyticDB`, `AstraDB`, `AwaDB`, `Bagel`, `Cassandra`, `Chroma`, `DashVector`, `DatabricksVectorSearch`, `DeepLake`, `Dingo`, `ElasticVectorSearch`, `ElasticsearchStore`, `FAISS`, `HanaDB`, `Milvus`, `MyScale`, `PGVector`, `Pinecone`, `Qdrant`, `Redis`, `ScaNN`, `SupabaseVectorStore`, `SurrealDBStore`, `TimescaleVector`, `Vald`, `Vearch`, `VespaStore`, `Weaviate`, `ZepVectorStore`.\n",
" \n", " \n",
"## Caution\n", "## Caution\n",
"\n", "\n",

View File

@ -56,6 +56,9 @@ class Milvus(VectorStore):
default of index. default of index.
drop_old (Optional[bool]): Whether to drop the current collection. Defaults drop_old (Optional[bool]): Whether to drop the current collection. Defaults
to False. to False.
auto_id (bool): Whether to enable auto id for primary key. Defaults to False.
If False, you needs to provide text ids (string less than 65535 bytes).
If True, Milvus will generate unique integers as primary keys.
primary_field (str): Name of the primary key field. Defaults to "pk". primary_field (str): Name of the primary key field. Defaults to "pk".
text_field (str): Name of the text field. Defaults to "text". text_field (str): Name of the text field. Defaults to "text".
vector_field (str): Name of the vector field. Defaults to "vector". vector_field (str): Name of the vector field. Defaults to "vector".
@ -102,6 +105,7 @@ class Milvus(VectorStore):
embedding_function = Embeddings, embedding_function = Embeddings,
collection_name = "LangChainCollection", collection_name = "LangChainCollection",
drop_old = True, drop_old = True,
auto_id = True
) )
Raises: Raises:
@ -119,6 +123,7 @@ class Milvus(VectorStore):
index_params: Optional[dict] = None, index_params: Optional[dict] = None,
search_params: Optional[dict] = None, search_params: Optional[dict] = None,
drop_old: Optional[bool] = False, drop_old: Optional[bool] = False,
auto_id: bool = False,
*, *,
primary_field: str = "pk", primary_field: str = "pk",
text_field: str = "text", text_field: str = "text",
@ -159,8 +164,9 @@ class Milvus(VectorStore):
self.index_params = index_params self.index_params = index_params
self.search_params = search_params self.search_params = search_params
self.consistency_level = consistency_level self.consistency_level = consistency_level
self.auto_id = auto_id
# In order for a collection to be compatible, pk needs to be auto'id and int # In order for a collection to be compatible, pk needs to be varchar
self._primary_field = primary_field self._primary_field = primary_field
# In order for compatibility, the text field will need to be called "text" # In order for compatibility, the text field will need to be called "text"
self._text_field = text_field self._text_field = text_field
@ -327,11 +333,22 @@ class Milvus(VectorStore):
FieldSchema(self._text_field, DataType.VARCHAR, max_length=65_535) FieldSchema(self._text_field, DataType.VARCHAR, max_length=65_535)
) )
# Create the primary key field # Create the primary key field
if self.auto_id:
fields.append( fields.append(
FieldSchema( FieldSchema(
self._primary_field, DataType.INT64, is_primary=True, auto_id=True self._primary_field, DataType.INT64, is_primary=True, auto_id=True
) )
) )
else:
fields.append(
FieldSchema(
self._primary_field,
DataType.VARCHAR,
is_primary=True,
auto_id=False,
max_length=65_535,
)
)
# Create the vector field, supports binary or float vectors # Create the vector field, supports binary or float vectors
fields.append( fields.append(
FieldSchema(self._vector_field, infer_dtype_bydata(embeddings[0]), dim=dim) FieldSchema(self._vector_field, infer_dtype_bydata(embeddings[0]), dim=dim)
@ -369,8 +386,6 @@ class Milvus(VectorStore):
schema = self.col.schema schema = self.col.schema
for x in schema.fields: for x in schema.fields:
self.fields.append(x.name) self.fields.append(x.name)
# Since primary field is auto-id, no need to track it
self.fields.remove(self._primary_field)
def _get_index(self) -> Optional[dict[str, Any]]: def _get_index(self) -> Optional[dict[str, Any]]:
"""Return the vector index information if it exists""" """Return the vector index information if it exists"""
@ -467,6 +482,8 @@ class Milvus(VectorStore):
metadatas: Optional[List[dict]] = None, metadatas: Optional[List[dict]] = None,
timeout: Optional[int] = None, timeout: Optional[int] = None,
batch_size: int = 1000, batch_size: int = 1000,
*,
ids: Optional[List[str]] = None,
**kwargs: Any, **kwargs: Any,
) -> List[str]: ) -> List[str]:
"""Insert text data into Milvus. """Insert text data into Milvus.
@ -483,10 +500,12 @@ class Milvus(VectorStore):
that they all fit in memory. that they all fit in memory.
metadatas (Optional[List[dict]]): Metadata dicts attached to each of metadatas (Optional[List[dict]]): Metadata dicts attached to each of
the texts. Defaults to None. the texts. Defaults to None.
should be less than 65535 bytes. Required and work when auto_id is False.
timeout (Optional[int]): Timeout for each batch insert. Defaults timeout (Optional[int]): Timeout for each batch insert. Defaults
to None. to None.
batch_size (int, optional): Batch size to use for insertion. batch_size (int, optional): Batch size to use for insertion.
Defaults to 1000. Defaults to 1000.
ids (Optional[List[str]]): List of text ids. The length of each item
Raises: Raises:
MilvusException: Failure to add texts MilvusException: Failure to add texts
@ -497,6 +516,16 @@ class Milvus(VectorStore):
from pymilvus import Collection, MilvusException from pymilvus import Collection, MilvusException
texts = list(texts) texts = list(texts)
if not self.auto_id:
assert isinstance(
ids, list
), "A list of valid ids are required when auto_id is False."
assert len(set(ids)) == len(
texts
), "Different lengths of texts and unique ids are provided."
assert all(
len(x.encode()) <= 65_535 for x in ids
), "Each id should be a string less than 65535 bytes."
try: try:
embeddings = self.embedding_func.embed_documents(texts) embeddings = self.embedding_func.embed_documents(texts)
@ -524,6 +553,9 @@ class Milvus(VectorStore):
self._vector_field: embeddings, self._vector_field: embeddings,
} }
if not self.auto_id:
insert_dict[self._primary_field] = ids
if self._metadata_field is not None: if self._metadata_field is not None:
for d in metadatas: for d in metadatas:
insert_dict.setdefault(self._metadata_field, []).append(d) insert_dict.setdefault(self._metadata_field, []).append(d)
@ -532,7 +564,12 @@ class Milvus(VectorStore):
if metadatas is not None: if metadatas is not None:
for d in metadatas: for d in metadatas:
for key, value in d.items(): for key, value in d.items():
if key in self.fields: keys = (
[x for x in self.fields if x != self._primary_field]
if self.auto_id
else [x for x in self.fields]
)
for key in keys:
insert_dict.setdefault(key, []).append(value) insert_dict.setdefault(key, []).append(value)
# Total insert count # Total insert count
@ -700,7 +737,7 @@ class Milvus(VectorStore):
param = self.search_params param = self.search_params
# Determine result metadata fields. # Determine result metadata fields.
output_fields = self.fields[:] output_fields = [x for x in self.fields if x != self._primary_field]
output_fields.remove(self._vector_field) output_fields.remove(self._vector_field)
# Perform the search. # Perform the search.
@ -864,6 +901,30 @@ class Milvus(VectorStore):
ret.append(documents[x]) ret.append(documents[x])
return ret return ret
def delete(
self, ids: Optional[List[str]] = None, expr: Optional[str] = None, **kwargs: str
):
"""Delete by vector ID or boolean expression.
Refer to [Milvus documentation](https://milvus.io/docs/delete_data.md)
for notes and examples of expressions.
Args:
ids: List of ids to delete.
expr: Boolean expression that specifies the entities to delete.
kwargs: Other parameters in Milvus delete api.
"""
if isinstance(ids, list) and len(ids) > 0:
expr = f"{self._primary_field} in {ids}"
if expr is not None:
logger.warning(
"Both ids and expr are provided. " "Ignore expr and delete by ids."
)
else:
assert isinstance(
expr, str
), "Either ids list or expr string must be provided."
return self.col.delete(expr=expr, **kwargs)
@classmethod @classmethod
def from_texts( def from_texts(
cls, cls,
@ -876,6 +937,8 @@ class Milvus(VectorStore):
index_params: Optional[dict] = None, index_params: Optional[dict] = None,
search_params: Optional[dict] = None, search_params: Optional[dict] = None,
drop_old: bool = False, drop_old: bool = False,
*,
ids: Optional[List[str]] = None,
**kwargs: Any, **kwargs: Any,
) -> Milvus: ) -> Milvus:
"""Create a Milvus collection, indexes it with HNSW, and insert data. """Create a Milvus collection, indexes it with HNSW, and insert data.
@ -897,10 +960,16 @@ class Milvus(VectorStore):
Defaults to None. Defaults to None.
drop_old (Optional[bool], optional): Whether to drop the collection with drop_old (Optional[bool], optional): Whether to drop the collection with
that name if it exists. Defaults to False. that name if it exists. Defaults to False.
ids (Optional[List[str]]): List of text ids. Defaults to None.
Returns: Returns:
Milvus: Milvus Vector Store Milvus: Milvus Vector Store
""" """
if isinstance(ids, list) and len(ids) > 0:
auto_id = False
else:
auto_id = True
vector_db = cls( vector_db = cls(
embedding_function=embedding, embedding_function=embedding,
collection_name=collection_name, collection_name=collection_name,
@ -909,9 +978,10 @@ class Milvus(VectorStore):
index_params=index_params, index_params=index_params,
search_params=search_params, search_params=search_params,
drop_old=drop_old, drop_old=drop_old,
auto_id=auto_id,
**kwargs, **kwargs,
) )
vector_db.add_texts(texts=texts, metadatas=metadatas) vector_db.add_texts(texts=texts, metadatas=metadatas, ids=ids)
return vector_db return vector_db
def _parse_document(self, data: dict) -> Document: def _parse_document(self, data: dict) -> Document:

View File

@ -36,6 +36,9 @@ class Zilliz(Milvus):
default of index. default of index.
drop_old (Optional[bool]): Whether to drop the current collection. Defaults drop_old (Optional[bool]): Whether to drop the current collection. Defaults
to False. to False.
auto_id (bool): Whether to enable auto id for primary key. Defaults to False.
If False, you needs to provide text ids (string less than 65535 bytes).
If True, Milvus will generate unique integers as primary keys.
The connection args used for this class comes in the form of a dict, The connection args used for this class comes in the form of a dict,
here are a few of the options: here are a few of the options:
@ -146,6 +149,9 @@ class Zilliz(Milvus):
index_params: Optional[dict] = None, index_params: Optional[dict] = None,
search_params: Optional[dict] = None, search_params: Optional[dict] = None,
drop_old: bool = False, drop_old: bool = False,
*,
ids: Optional[List[str]] = None,
auto_id: bool = False,
**kwargs: Any, **kwargs: Any,
) -> Zilliz: ) -> Zilliz:
"""Create a Zilliz collection, indexes it with HNSW, and insert data. """Create a Zilliz collection, indexes it with HNSW, and insert data.
@ -167,6 +173,10 @@ class Zilliz(Milvus):
Defaults to None. Defaults to None.
drop_old (Optional[bool], optional): Whether to drop the collection with drop_old (Optional[bool], optional): Whether to drop the collection with
that name if it exists. Defaults to False. that name if it exists. Defaults to False.
ids (Optional[List[str]]): List of text ids.
auto_id (bool): Whether to enable auto id for primary key. Defaults to
False. If False, you needs to provide text ids (string less than 65535
bytes). If True, Milvus will generate unique integers as primary keys.
Returns: Returns:
Zilliz: Zilliz Vector Store Zilliz: Zilliz Vector Store
@ -179,7 +189,8 @@ class Zilliz(Milvus):
index_params=index_params, index_params=index_params,
search_params=search_params, search_params=search_params,
drop_old=drop_old, drop_old=drop_old,
auto_id=auto_id,
**kwargs, **kwargs,
) )
vector_db.add_texts(texts=texts, metadatas=metadatas) vector_db.add_texts(texts=texts, metadatas=metadatas, ids=ids)
return vector_db return vector_db

View File

@ -11,12 +11,15 @@ from tests.integration_tests.vectorstores.fake_embeddings import (
def _milvus_from_texts( def _milvus_from_texts(
metadatas: Optional[List[dict]] = None, drop: bool = True metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
drop: bool = True,
) -> Milvus: ) -> Milvus:
return Milvus.from_texts( return Milvus.from_texts(
fake_texts, fake_texts,
FakeEmbeddings(), FakeEmbeddings(),
metadatas=metadatas, metadatas=metadatas,
ids=ids,
connection_args={"host": "127.0.0.1", "port": "19530"}, connection_args={"host": "127.0.0.1", "port": "19530"},
drop_old=drop, drop_old=drop,
) )
@ -29,6 +32,30 @@ def test_milvus() -> None:
assert output == [Document(page_content="foo")] assert output == [Document(page_content="foo")]
def test_milvus_with_metadata() -> None:
"""Test with metadata"""
docsearch = _milvus_from_texts(metadatas=[{"label": "test"}] * len(fake_texts))
output = docsearch.similarity_search("foo", k=1)
assert output == [Document(page_content="foo", metadata={"label": "test"})]
def test_milvus_with_id() -> None:
"""Test with ids"""
ids = ["id_" + str(i) for i in range(len(fake_texts))]
docsearch = _milvus_from_texts(ids=ids)
output = docsearch.similarity_search("foo", k=1)
assert output == [Document(page_content="foo")]
output = docsearch.delete(ids=ids)
assert output.delete_count == len(fake_texts)
try:
ids = ["dup_id" for _ in fake_texts]
_milvus_from_texts(ids=ids)
except Exception as e:
assert isinstance(e, AssertionError)
def test_milvus_with_score() -> None: def test_milvus_with_score() -> None:
"""Test end to end construction and search with scores and IDs.""" """Test end to end construction and search with scores and IDs."""
texts = ["foo", "bar", "baz"] texts = ["foo", "bar", "baz"]
@ -84,6 +111,7 @@ def test_milvus_no_drop() -> None:
# if __name__ == "__main__": # if __name__ == "__main__":
# test_milvus() # test_milvus()
# test_milvus_with_metadata()
# test_milvus_with_score() # test_milvus_with_score()
# test_milvus_max_marginal_relevance_search() # test_milvus_max_marginal_relevance_search()
# test_milvus_add_extra() # test_milvus_add_extra()

View File

@ -61,6 +61,7 @@ def test_compatible_vectorstore_documentation() -> None:
"ElasticsearchStore", "ElasticsearchStore",
"FAISS", "FAISS",
"HanaDB", "HanaDB",
"Milvus",
"MomentoVectorIndex", "MomentoVectorIndex",
"MyScale", "MyScale",
"PGVector", "PGVector",
@ -78,6 +79,7 @@ def test_compatible_vectorstore_documentation() -> None:
"VespaStore", "VespaStore",
"Weaviate", "Weaviate",
"ZepVectorStore", "ZepVectorStore",
"Zilliz",
"Lantern", "Lantern",
} }
assert compatible == documented assert compatible == documented