mirror of
https://github.com/csunny/DB-GPT.git
synced 2025-07-26 05:23:37 +00:00
204 lines
6.5 KiB
Python
204 lines
6.5 KiB
Python
"""Index store base class."""
|
|
import logging
|
|
import time
|
|
from abc import ABC, abstractmethod
|
|
from concurrent.futures import Executor, ThreadPoolExecutor
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
from dbgpt._private.pydantic import BaseModel, ConfigDict, Field, model_to_dict
|
|
from dbgpt.core import Chunk, Embeddings
|
|
from dbgpt.storage.vector_store.filters import MetadataFilters
|
|
from dbgpt.util.executor_utils import (
|
|
blocking_func_to_async,
|
|
blocking_func_to_async_no_executor,
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class IndexStoreConfig(BaseModel):
|
|
"""Index store config."""
|
|
|
|
model_config = ConfigDict(arbitrary_types_allowed=True, extra="allow")
|
|
|
|
name: str = Field(
|
|
default="dbgpt_collection",
|
|
description="The name of index store, if not set, will use the default name.",
|
|
)
|
|
embedding_fn: Optional[Embeddings] = Field(
|
|
default=None,
|
|
description="The embedding function of vector store, if not set, will use the "
|
|
"default embedding function.",
|
|
)
|
|
max_chunks_once_load: int = Field(
|
|
default=10,
|
|
description="The max number of chunks to load at once. If your document is "
|
|
"large, you can set this value to a larger number to speed up the loading "
|
|
"process. Default is 10.",
|
|
)
|
|
max_threads: int = Field(
|
|
default=1,
|
|
description="The max number of threads to use. Default is 1. If you set this "
|
|
"bigger than 1, please make sure your vector store is thread-safe.",
|
|
)
|
|
|
|
def to_dict(self, **kwargs) -> Dict[str, Any]:
|
|
"""Convert to dict."""
|
|
return model_to_dict(self, **kwargs)
|
|
|
|
|
|
class IndexStoreBase(ABC):
|
|
"""Index store base class."""
|
|
|
|
def __init__(self, executor: Optional[Executor] = None):
|
|
"""Init index store."""
|
|
self._executor = executor or ThreadPoolExecutor()
|
|
|
|
@abstractmethod
|
|
def load_document(self, chunks: List[Chunk]) -> List[str]:
|
|
"""Load document in index database.
|
|
|
|
Args:
|
|
chunks(List[Chunk]): document chunks.
|
|
|
|
Return:
|
|
List[str]: chunk ids.
|
|
"""
|
|
|
|
@abstractmethod
|
|
async def aload_document(self, chunks: List[Chunk]) -> List[str]:
|
|
"""Load document in index database.
|
|
|
|
Args:
|
|
chunks(List[Chunk]): document chunks.
|
|
|
|
Return:
|
|
List[str]: chunk ids.
|
|
"""
|
|
|
|
@abstractmethod
|
|
def similar_search_with_scores(
|
|
self,
|
|
text,
|
|
topk,
|
|
score_threshold: float,
|
|
filters: Optional[MetadataFilters] = None,
|
|
) -> List[Chunk]:
|
|
"""Similar search with scores in index database.
|
|
|
|
Args:
|
|
text(str): The query text.
|
|
topk(int): The number of similar documents to return.
|
|
score_threshold(int): score_threshold: Optional, a floating point value
|
|
between 0 to 1
|
|
filters(Optional[MetadataFilters]): metadata filters.
|
|
Return:
|
|
List[Chunk]: The similar documents.
|
|
"""
|
|
|
|
@abstractmethod
|
|
def delete_by_ids(self, ids: str) -> List[str]:
|
|
"""Delete docs.
|
|
|
|
Args:
|
|
ids(str): The vector ids to delete, separated by comma.
|
|
"""
|
|
|
|
@abstractmethod
|
|
def delete_vector_name(self, index_name: str):
|
|
"""Delete index by name.
|
|
|
|
Args:
|
|
index_name(str): The name of index to delete.
|
|
"""
|
|
|
|
def vector_name_exists(self) -> bool:
|
|
"""Whether name exists."""
|
|
return True
|
|
|
|
def load_document_with_limit(
|
|
self, chunks: List[Chunk], max_chunks_once_load: int = 10, max_threads: int = 1
|
|
) -> List[str]:
|
|
"""Load document in index database with specified limit.
|
|
|
|
Args:
|
|
chunks(List[Chunk]): Document chunks.
|
|
max_chunks_once_load(int): Max number of chunks to load at once.
|
|
max_threads(int): Max number of threads to use.
|
|
|
|
Return:
|
|
List[str]: Chunk ids.
|
|
"""
|
|
# Group the chunks into chunks of size max_chunks
|
|
chunk_groups = [
|
|
chunks[i : i + max_chunks_once_load]
|
|
for i in range(0, len(chunks), max_chunks_once_load)
|
|
]
|
|
logger.info(
|
|
f"Loading {len(chunks)} chunks in {len(chunk_groups)} groups with "
|
|
f"{max_threads} threads."
|
|
)
|
|
ids = []
|
|
loaded_cnt = 0
|
|
start_time = time.time()
|
|
with ThreadPoolExecutor(max_workers=max_threads) as executor:
|
|
tasks = []
|
|
for chunk_group in chunk_groups:
|
|
tasks.append(executor.submit(self.load_document, chunk_group))
|
|
for future in tasks:
|
|
success_ids = future.result()
|
|
ids.extend(success_ids)
|
|
loaded_cnt += len(success_ids)
|
|
logger.info(f"Loaded {loaded_cnt} chunks, total {len(chunks)} chunks.")
|
|
logger.info(
|
|
f"Loaded {len(chunks)} chunks in {time.time() - start_time} seconds"
|
|
)
|
|
return ids
|
|
|
|
async def aload_document_with_limit(
|
|
self, chunks: List[Chunk], max_chunks_once_load: int = 10, max_threads: int = 1
|
|
) -> List[str]:
|
|
"""Load document in index database with specified limit.
|
|
|
|
Args:
|
|
chunks(List[Chunk]): Document chunks.
|
|
max_chunks_once_load(int): Max number of chunks to load at once.
|
|
max_threads(int): Max number of threads to use.
|
|
|
|
Return:
|
|
List[str]: Chunk ids.
|
|
"""
|
|
return await blocking_func_to_async(
|
|
self._executor,
|
|
self.load_document_with_limit,
|
|
chunks,
|
|
max_chunks_once_load,
|
|
max_threads,
|
|
)
|
|
|
|
def similar_search(
|
|
self, text: str, topk: int, filters: Optional[MetadataFilters] = None
|
|
) -> List[Chunk]:
|
|
"""Similar search in index database.
|
|
|
|
Args:
|
|
text(str): The query text.
|
|
topk(int): The number of similar documents to return.
|
|
filters(Optional[MetadataFilters]): metadata filters.
|
|
Return:
|
|
List[Chunk]: The similar documents.
|
|
"""
|
|
return self.similar_search_with_scores(text, topk, 1.0, filters)
|
|
|
|
async def asimilar_search_with_scores(
|
|
self,
|
|
doc: str,
|
|
topk: int,
|
|
score_threshold: float,
|
|
filters: Optional[MetadataFilters] = None,
|
|
) -> List[Chunk]:
|
|
"""Aynsc similar_search_with_score in vector database."""
|
|
return await blocking_func_to_async_no_executor(
|
|
self.similar_search_with_scores, doc, topk, score_threshold, filters
|
|
)
|