mirror of
https://github.com/csunny/DB-GPT.git
synced 2025-07-22 11:51:42 +00:00
303 lines
9.8 KiB
Python
303 lines
9.8 KiB
Python
"""Connector for vector store."""
|
|
|
|
import copy
|
|
import logging
|
|
import os
|
|
from collections import defaultdict
|
|
from typing import Any, DefaultDict, Dict, List, Optional, Tuple, Type, cast
|
|
|
|
from dbgpt.core import Chunk, Embeddings
|
|
from dbgpt.core.awel.flow import (
|
|
FunctionDynamicOptions,
|
|
OptionValue,
|
|
Parameter,
|
|
ResourceCategory,
|
|
register_resource,
|
|
)
|
|
from dbgpt.rag.index.base import IndexStoreBase, IndexStoreConfig
|
|
from dbgpt.storage.vector_store.base import VectorStoreConfig
|
|
from dbgpt.storage.vector_store.filters import MetadataFilters
|
|
from dbgpt.util.i18n_utils import _
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
connector: Dict[str, Tuple[Type, Type]] = {}
|
|
pools: DefaultDict[str, Dict] = defaultdict(dict)
|
|
|
|
|
|
def _load_vector_options() -> List[OptionValue]:
|
|
from dbgpt.storage import vector_store
|
|
|
|
return [
|
|
OptionValue(label=cls, name=cls, value=cls)
|
|
for cls in vector_store.__all__
|
|
if issubclass(getattr(vector_store, cls)[0], IndexStoreBase)
|
|
]
|
|
|
|
|
|
@register_resource(
|
|
_("Vector Store Connector"),
|
|
"vector_store_connector",
|
|
category=ResourceCategory.VECTOR_STORE,
|
|
parameters=[
|
|
Parameter.build_from(
|
|
_("Vector Store Type"),
|
|
"vector_store_type",
|
|
str,
|
|
description=_("The type of vector store."),
|
|
options=FunctionDynamicOptions(func=_load_vector_options),
|
|
),
|
|
Parameter.build_from(
|
|
_("Vector Store Implementation"),
|
|
"vector_store_config",
|
|
VectorStoreConfig,
|
|
description=_("The vector store implementation."),
|
|
optional=True,
|
|
default=None,
|
|
),
|
|
],
|
|
# Compatible with the old version
|
|
alias=["dbgpt.storage.vector_store.connector.VectorStoreConnector"],
|
|
)
|
|
class VectorStoreConnector:
|
|
"""The connector for vector store.
|
|
|
|
VectorStoreConnector, can connect different vector db provided load document api_v1
|
|
and similar search api_v1.
|
|
|
|
1.load_document:knowledge document source into vector store.(Chroma, Milvus,
|
|
Weaviate).
|
|
2.similar_search: similarity search from vector_store.
|
|
3.similar_search_with_scores: similarity search with similarity score from
|
|
vector_store
|
|
|
|
code example:
|
|
>>> from dbgpt.serve.rag.connector import VectorStoreConnector
|
|
l
|
|
>>> vector_store_config = VectorStoreConfig
|
|
>>> vector_store_connector = VectorStoreConnector(vector_store_type="Chroma")
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
vector_store_type: str,
|
|
vector_store_config: Optional[IndexStoreConfig] = None,
|
|
) -> None:
|
|
"""Create a VectorStoreConnector instance.
|
|
|
|
Args:
|
|
- vector_store_type: vector store type Milvus, Chroma, Weaviate
|
|
- ctx: vector store config params.
|
|
"""
|
|
if vector_store_config is None:
|
|
raise Exception("vector_store_config is required")
|
|
|
|
self._index_store_config = vector_store_config
|
|
self._register()
|
|
|
|
if self._match(vector_store_type):
|
|
self.connector_class, self.config_class = connector[vector_store_type]
|
|
else:
|
|
raise Exception(f"Vector store {vector_store_type} not supported")
|
|
|
|
logger.info(f"VectorStore:{self.connector_class}")
|
|
|
|
self._vector_store_type = vector_store_type
|
|
self._embeddings = vector_store_config.embedding_fn
|
|
|
|
config_dict = {}
|
|
for key in vector_store_config.to_dict().keys():
|
|
value = getattr(vector_store_config, key)
|
|
if value is not None:
|
|
config_dict[key] = value
|
|
for key, value in vector_store_config.model_extra.items():
|
|
if value is not None:
|
|
config_dict[key] = value
|
|
config = self.config_class(**config_dict)
|
|
try:
|
|
if vector_store_type in pools and config.name in pools[vector_store_type]:
|
|
self.client = pools[vector_store_type][config.name]
|
|
else:
|
|
client = self.connector_class(config)
|
|
pools[vector_store_type][config.name] = self.client = client
|
|
except Exception as e:
|
|
logger.error("connect vector store failed: %s", e)
|
|
raise e
|
|
|
|
@classmethod
|
|
def from_default(
|
|
cls,
|
|
vector_store_type: Optional[str] = None,
|
|
embedding_fn: Optional[Any] = None,
|
|
vector_store_config: Optional[VectorStoreConfig] = None,
|
|
) -> "VectorStoreConnector":
|
|
"""Initialize default vector store connector."""
|
|
vector_store_type = vector_store_type or os.getenv(
|
|
"VECTOR_STORE_TYPE", "Chroma"
|
|
)
|
|
from dbgpt.storage.vector_store.chroma_store import ChromaVectorConfig
|
|
|
|
vector_store_config = vector_store_config or ChromaVectorConfig()
|
|
vector_store_config.embedding_fn = embedding_fn
|
|
real_vector_store_type = cast(str, vector_store_type)
|
|
return cls(real_vector_store_type, vector_store_config)
|
|
|
|
@property
|
|
def index_client(self):
|
|
return self.client
|
|
|
|
def load_document(self, chunks: List[Chunk]) -> List[str]:
|
|
"""Load document in vector database.
|
|
|
|
Args:
|
|
- chunks: document chunks.
|
|
Return chunk ids.
|
|
"""
|
|
max_chunks_once_load = (
|
|
self._index_store_config.max_chunks_once_load
|
|
if self._index_store_config
|
|
else 10
|
|
)
|
|
max_threads = (
|
|
self._index_store_config.max_threads if self._index_store_config else 1
|
|
)
|
|
return self.client.load_document_with_limit(
|
|
chunks,
|
|
max_chunks_once_load,
|
|
max_threads,
|
|
)
|
|
|
|
async def aload_document(self, chunks: List[Chunk]) -> List[str]:
|
|
"""Async load document in vector database.
|
|
|
|
Args:
|
|
- chunks: document chunks.
|
|
Return chunk ids.
|
|
"""
|
|
max_chunks_once_load = (
|
|
self._index_store_config.max_chunks_once_load
|
|
if self._index_store_config
|
|
else 10
|
|
)
|
|
max_threads = (
|
|
self._index_store_config.max_threads if self._index_store_config else 1
|
|
)
|
|
return await self.client.aload_document_with_limit(
|
|
chunks, max_chunks_once_load, max_threads
|
|
)
|
|
|
|
def similar_search(
|
|
self, doc: str, topk: int, filters: Optional[MetadataFilters] = None
|
|
) -> List[Chunk]:
|
|
"""Similar search in vector database.
|
|
|
|
Args:
|
|
- doc: query text
|
|
- topk: topk
|
|
- filters: metadata filters.
|
|
Return:
|
|
- chunks: chunks.
|
|
"""
|
|
return self.client.similar_search(doc, topk, filters)
|
|
|
|
def similar_search_with_scores(
|
|
self,
|
|
doc: str,
|
|
topk: int,
|
|
score_threshold: float,
|
|
filters: Optional[MetadataFilters] = None,
|
|
) -> List[Chunk]:
|
|
"""Similar_search_with_score in vector database.
|
|
|
|
Return docs and relevance scores in the range [0, 1].
|
|
|
|
Args:
|
|
doc(str): query text
|
|
topk(int): return docs nums. Defaults to 4.
|
|
score_threshold(float): score_threshold: Optional, a floating point value
|
|
between 0 to 1 to filter the resulting set of retrieved docs,0 is
|
|
dissimilar, 1 is most similar.
|
|
filters: metadata filters.
|
|
Return:
|
|
- chunks: Return docs and relevance scores in the range [0, 1].
|
|
"""
|
|
return self.client.similar_search_with_scores(
|
|
doc, topk, score_threshold, filters
|
|
)
|
|
|
|
async def asimilar_search_with_scores(
|
|
self,
|
|
doc: str,
|
|
topk: int,
|
|
score_threshold: float,
|
|
filters: Optional[MetadataFilters] = None,
|
|
) -> List[Chunk]:
|
|
"""Async similar_search_with_score in vector database."""
|
|
return await self.client.asimilar_search_with_scores(
|
|
doc, topk, score_threshold, filters
|
|
)
|
|
|
|
@property
|
|
def vector_store_config(self) -> IndexStoreConfig:
|
|
"""Return the vector store config."""
|
|
if not self._index_store_config:
|
|
raise ValueError("vector store config not set.")
|
|
return self._index_store_config
|
|
|
|
def vector_name_exists(self):
|
|
"""Whether vector name exists."""
|
|
return self.client.vector_name_exists()
|
|
|
|
def delete_vector_name(self, vector_name: str):
|
|
"""Delete vector name.
|
|
|
|
Args:
|
|
- vector_name: vector store name
|
|
"""
|
|
try:
|
|
if self.vector_name_exists():
|
|
self.client.delete_vector_name(vector_name)
|
|
except Exception as e:
|
|
logger.error(f"delete vector name {vector_name} failed: {e}")
|
|
raise Exception(f"delete name {vector_name} failed")
|
|
return True
|
|
|
|
def delete_by_ids(self, ids):
|
|
"""Delete vector by ids.
|
|
|
|
Args:
|
|
- ids: vector ids
|
|
"""
|
|
return self.client.delete_by_ids(ids=ids)
|
|
|
|
@property
|
|
def current_embeddings(self) -> Optional[Embeddings]:
|
|
"""Return the current embeddings."""
|
|
return self._embeddings
|
|
|
|
def new_connector(self, name: str, **kwargs) -> "VectorStoreConnector":
|
|
"""Create a new connector.
|
|
|
|
New connector based on the current connector.
|
|
"""
|
|
config = copy.copy(self.vector_store_config)
|
|
for k, v in kwargs.items():
|
|
if v is not None:
|
|
setattr(config, k, v)
|
|
config.name = name
|
|
|
|
return self.__class__(self._vector_store_type, config)
|
|
|
|
def _match(self, vector_store_type) -> bool:
|
|
return bool(connector.get(vector_store_type))
|
|
|
|
def _register(self):
|
|
from dbgpt.storage import vector_store
|
|
|
|
for cls in vector_store.__all__:
|
|
store_cls, config_cls = getattr(vector_store, cls)
|
|
if issubclass(store_cls, IndexStoreBase) and issubclass(
|
|
config_cls, IndexStoreConfig
|
|
):
|
|
connector[cls] = (store_cls, config_cls)
|