DB-GPT/dbgpt/rag/index/base.py
2024-07-09 22:42:39 +08:00

204 lines
6.5 KiB
Python

"""Index store base class."""
import logging
import time
from abc import ABC, abstractmethod
from concurrent.futures import Executor, ThreadPoolExecutor
from typing import Any, Dict, List, Optional
from dbgpt._private.pydantic import BaseModel, ConfigDict, Field, model_to_dict
from dbgpt.core import Chunk, Embeddings
from dbgpt.storage.vector_store.filters import MetadataFilters
from dbgpt.util.executor_utils import (
blocking_func_to_async,
blocking_func_to_async_no_executor,
)
logger = logging.getLogger(__name__)
class IndexStoreConfig(BaseModel):
"""Index store config."""
model_config = ConfigDict(arbitrary_types_allowed=True, extra="allow")
name: str = Field(
default="dbgpt_collection",
description="The name of index store, if not set, will use the default name.",
)
embedding_fn: Optional[Embeddings] = Field(
default=None,
description="The embedding function of vector store, if not set, will use the "
"default embedding function.",
)
max_chunks_once_load: int = Field(
default=10,
description="The max number of chunks to load at once. If your document is "
"large, you can set this value to a larger number to speed up the loading "
"process. Default is 10.",
)
max_threads: int = Field(
default=1,
description="The max number of threads to use. Default is 1. If you set this "
"bigger than 1, please make sure your vector store is thread-safe.",
)
def to_dict(self, **kwargs) -> Dict[str, Any]:
"""Convert to dict."""
return model_to_dict(self, **kwargs)
class IndexStoreBase(ABC):
"""Index store base class."""
def __init__(self, executor: Optional[Executor] = None):
"""Init index store."""
self._executor = executor or ThreadPoolExecutor()
@abstractmethod
def load_document(self, chunks: List[Chunk]) -> List[str]:
"""Load document in index database.
Args:
chunks(List[Chunk]): document chunks.
Return:
List[str]: chunk ids.
"""
@abstractmethod
async def aload_document(self, chunks: List[Chunk]) -> List[str]:
"""Load document in index database.
Args:
chunks(List[Chunk]): document chunks.
Return:
List[str]: chunk ids.
"""
@abstractmethod
def similar_search_with_scores(
self,
text,
topk,
score_threshold: float,
filters: Optional[MetadataFilters] = None,
) -> List[Chunk]:
"""Similar search with scores in index database.
Args:
text(str): The query text.
topk(int): The number of similar documents to return.
score_threshold(int): score_threshold: Optional, a floating point value
between 0 to 1
filters(Optional[MetadataFilters]): metadata filters.
Return:
List[Chunk]: The similar documents.
"""
@abstractmethod
def delete_by_ids(self, ids: str) -> List[str]:
"""Delete docs.
Args:
ids(str): The vector ids to delete, separated by comma.
"""
@abstractmethod
def delete_vector_name(self, index_name: str):
"""Delete index by name.
Args:
index_name(str): The name of index to delete.
"""
def vector_name_exists(self) -> bool:
"""Whether name exists."""
return True
def load_document_with_limit(
self, chunks: List[Chunk], max_chunks_once_load: int = 10, max_threads: int = 1
) -> List[str]:
"""Load document in index database with specified limit.
Args:
chunks(List[Chunk]): Document chunks.
max_chunks_once_load(int): Max number of chunks to load at once.
max_threads(int): Max number of threads to use.
Return:
List[str]: Chunk ids.
"""
# Group the chunks into chunks of size max_chunks
chunk_groups = [
chunks[i : i + max_chunks_once_load]
for i in range(0, len(chunks), max_chunks_once_load)
]
logger.info(
f"Loading {len(chunks)} chunks in {len(chunk_groups)} groups with "
f"{max_threads} threads."
)
ids = []
loaded_cnt = 0
start_time = time.time()
with ThreadPoolExecutor(max_workers=max_threads) as executor:
tasks = []
for chunk_group in chunk_groups:
tasks.append(executor.submit(self.load_document, chunk_group))
for future in tasks:
success_ids = future.result()
ids.extend(success_ids)
loaded_cnt += len(success_ids)
logger.info(f"Loaded {loaded_cnt} chunks, total {len(chunks)} chunks.")
logger.info(
f"Loaded {len(chunks)} chunks in {time.time() - start_time} seconds"
)
return ids
async def aload_document_with_limit(
self, chunks: List[Chunk], max_chunks_once_load: int = 10, max_threads: int = 1
) -> List[str]:
"""Load document in index database with specified limit.
Args:
chunks(List[Chunk]): Document chunks.
max_chunks_once_load(int): Max number of chunks to load at once.
max_threads(int): Max number of threads to use.
Return:
List[str]: Chunk ids.
"""
return await blocking_func_to_async(
self._executor,
self.load_document_with_limit,
chunks,
max_chunks_once_load,
max_threads,
)
def similar_search(
self, text: str, topk: int, filters: Optional[MetadataFilters] = None
) -> List[Chunk]:
"""Similar search in index database.
Args:
text(str): The query text.
topk(int): The number of similar documents to return.
filters(Optional[MetadataFilters]): metadata filters.
Return:
List[Chunk]: The similar documents.
"""
return self.similar_search_with_scores(text, topk, 1.0, filters)
async def asimilar_search_with_scores(
self,
doc: str,
topk: int,
score_threshold: float,
filters: Optional[MetadataFilters] = None,
) -> List[Chunk]:
"""Aynsc similar_search_with_score in vector database."""
return await blocking_func_to_async_no_executor(
self.similar_search_with_scores, doc, topk, score_threshold, filters
)