fix: enable Elasticsearch full text search support (#2947)

This commit is contained in:
zhangyj21-lh
2025-12-18 17:24:20 +08:00
committed by GitHub
parent bfa1b11cec
commit 4b52cfa670
3 changed files with 148 additions and 13 deletions

View File

@@ -20,6 +20,37 @@ class FullTextStoreBase(IndexStoreBase):
"""Initialize vector store."""
super().__init__(executor)
def is_support_full_text_search(self) -> bool:
# 重写,新增抽象类
"""Support full text search.
Full text store should support full text search by default.
Return:
bool: True, full text stores always support full text search.
"""
return True # 全文检索存储类应该始终支持全文检索
def full_text_search(
self, text: str, topk: int, filters: Optional[MetadataFilters] = None
) -> List[Chunk]:
# 重写,新增抽象类
"""Full text search.
Args:
text (str): The query text.
topk (int): Number of results to return. Default is 10.
Returns:
List[Chunk]: Search results as chunks.
"""
# 调用抽象方法 similar_search_with_scores但可以忽略分数阈值
# 或者子类需要实现具体的全文检索逻辑
return self.similar_search_with_scores(
text, topk, score_threshold=0.0, filters=filters
)
@abstractmethod
def load_document(self, chunks: List[Chunk]) -> List[str]:
"""Load document in index database.
@@ -30,7 +61,9 @@ class FullTextStoreBase(IndexStoreBase):
List[str]: chunk ids.
"""
async def aload_document(self, chunks: List[Chunk]) -> List[str]:
async def aload_document(
self, chunks: List[Chunk], file_id: Optional[str] = None
) -> List[str]:
"""Async load document in index database.
Args:

View File

@@ -8,7 +8,11 @@ from typing import List, Optional
from dbgpt.core import Chunk
from dbgpt.storage.base import IndexStoreConfig, logger
from dbgpt.storage.full_text.base import FullTextStoreBase
from dbgpt.storage.vector_store.filters import MetadataFilters
from dbgpt.storage.vector_store.filters import (
FilterCondition,
FilterOperator,
MetadataFilters,
)
from dbgpt.util import string_utils
from dbgpt.util.executor_utils import blocking_func_to_async
from dbgpt_ext.storage.vector_store.elastic_store import ElasticsearchStoreConfig
@@ -81,7 +85,9 @@ class ElasticDocumentStore(FullTextStoreBase):
"similarity": "custom_bm25",
},
"metadata": {
"type": "keyword",
# Use object so metadata fields stay queryable for filters
"type": "object",
"dynamic": True,
},
}
}
@@ -94,6 +100,35 @@ class ElasticDocumentStore(FullTextStoreBase):
)
self._executor = executor or ThreadPoolExecutor()
def is_support_full_text_search(self) -> bool:
# 重写,避免继承父类的默认实现
"""Support full text search.
Elasticsearch supports full text search.
Return:
bool: True if full text search is supported.
"""
return True # Elasticsearch 支持全文检索
def full_text_search(
self, text: str, topk: int, filters: Optional[MetadataFilters] = None
) -> List[Chunk]:
# 重写,使用现有的 similar_search_with_scores 方法实现全文检索
"""Full text search in Elasticsearch.
Args:
text (str): The query text.
topk (int): Number of results to return. Default is 10.
Returns:
List[Chunk]: Search results as chunks.
"""
score_threshold = 0.0
return self.similar_search_with_scores(
text=text, top_k=topk, score_threshold=score_threshold, filters=filters
)
def get_config(self) -> IndexStoreConfig:
"""Get the es store config."""
return self._es_config
@@ -114,7 +149,7 @@ class ElasticDocumentStore(FullTextStoreBase):
es_requests = []
ids = []
contents = [chunk.content for chunk in chunks]
metadatas = [json.dumps(chunk.metadata) for chunk in chunks]
metadatas = [self._normalize_metadata(chunk.metadata) for chunk in chunks]
chunk_ids = [chunk.chunk_id for chunk in chunks]
for i, content in enumerate(contents):
es_request = {
@@ -143,19 +178,22 @@ class ElasticDocumentStore(FullTextStoreBase):
Return:
List[Chunk]: similar text.
"""
es_query = {"query": {"match": {"content": text}}}
res = self._es_client.search(index=self._index_name, body=es_query)
es_query = self._build_query(text, filters)
res = self._es_client.search(
index=self._index_name, body=es_query, size=topk, track_total_hits=False
)
chunks = []
for r in res["hits"]["hits"]:
metadata = self._normalize_metadata(r["_source"].get("metadata"))
chunks.append(
Chunk(
chunk_id=r["_id"],
content=r["_source"]["content"],
metadata=json.loads(r["_source"]["metadata"]),
metadata=metadata,
)
)
return chunks[:topk]
return chunks
def similar_search_with_scores(
self,
@@ -175,17 +213,20 @@ class ElasticDocumentStore(FullTextStoreBase):
Return:
List[Tuple[str, float]]: similar text with scores.
"""
es_query = {"query": {"match": {"content": text}}}
res = self._es_client.search(index=self._index_name, body=es_query)
es_query = self._build_query(text, filters)
res = self._es_client.search(
index=self._index_name, body=es_query, size=top_k, track_total_hits=False
)
chunks_with_scores = []
for r in res["hits"]["hits"]:
if r["_score"] >= score_threshold:
metadata = self._normalize_metadata(r["_source"].get("metadata"))
chunks_with_scores.append(
Chunk(
chunk_id=r["_id"],
content=r["_source"]["content"],
metadata=json.loads(r["_source"]["metadata"]),
metadata=metadata,
score=r["_score"],
)
)
@@ -196,7 +237,9 @@ class ElasticDocumentStore(FullTextStoreBase):
)
return chunks_with_scores[:top_k]
async def aload_document(self, chunks: List[Chunk]) -> List[str]:
async def aload_document(
self, chunks: List[Chunk], file_id: Optional[str] = None
) -> List[str]:
"""Async load document in elasticsearch.
Args:
@@ -204,6 +247,13 @@ class ElasticDocumentStore(FullTextStoreBase):
Return:
List[str]: chunk ids.
"""
# 新增修改:将 file_id 注入到每个 chunk 的元数据中
if file_id:
# 确保 metadata 字段存在,然后添加或更新 file_id
for chunk in chunks:
if not hasattr(chunk, "metadata"):
chunk.metadata = {}
chunk.metadata["file_id"] = file_id
return await blocking_func_to_async(self._executor, self.load_document, chunks)
def delete_by_ids(self, ids: str) -> List[str]:
@@ -229,3 +279,53 @@ class ElasticDocumentStore(FullTextStoreBase):
index_name(str): The name of index to delete.
"""
self._es_client.indices.delete(index=self._index_name)
def _build_query(self, text: str, filters: Optional[MetadataFilters]):
must_clauses = [{"match": {"content": text}}]
filter_clause = self._build_metadata_filter(filters)
if filter_clause:
must_clauses.append(filter_clause)
return {"query": {"bool": {"must": must_clauses}}}
def _build_metadata_filter(self, filters: Optional[MetadataFilters]):
"""Translate MetadataFilters to elasticsearch bool clause."""
if not filters or not filters.filters:
return None
clauses = []
for f in filters.filters:
field_name = f"metadata.{f.key}"
if f.operator == FilterOperator.EQ:
clauses.append({"term": {field_name: f.value}})
elif f.operator == FilterOperator.IN:
values = f.value if isinstance(f.value, list) else [f.value]
clauses.append({"terms": {field_name: values}})
elif f.operator == FilterOperator.NE:
clauses.append({"bool": {"must_not": {"term": {field_name: f.value}}}})
else:
logger.warning(
"Unsupported filter operator %s for elastic full text search",
f.operator,
)
if not clauses:
return None
if filters.condition == FilterCondition.OR:
return {"bool": {"should": clauses, "minimum_should_match": 1}}
return {"bool": {"must": clauses}}
def _normalize_metadata(self, metadata):
"""Ensure metadata is stored as a dict for downstream consumers."""
if metadata is None:
return {}
if isinstance(metadata, dict):
return metadata
if isinstance(metadata, str):
try:
return json.loads(metadata)
except Exception:
# Fallback to wrapping the raw string to avoid breaking callers
return {"value": metadata}
try:
return dict(metadata)
except Exception:
return {"value": metadata}

View File

@@ -20,7 +20,9 @@ class OpenSearch(FullTextStoreBase):
"""
pass
def aload_document(self, chunks: List[Chunk]) -> List[str]:
def aload_document(
self, chunks: List[Chunk], file_id: Optional[str] = None
) -> List[str]:
"""Async load document in index database.
Args: