From 4b52cfa670fcbb170ea0348e187d4c26e28b0c2b Mon Sep 17 00:00:00 2001 From: zhangyj21-lh <154875313+zhangyj21-lh@users.noreply.github.com> Date: Thu, 18 Dec 2025 17:24:20 +0800 Subject: [PATCH] fix: enable Elasticsearch full text search support (#2947) --- .../src/dbgpt/storage/full_text/base.py | 35 ++++- .../storage/full_text/elasticsearch.py | 122 ++++++++++++++++-- .../dbgpt_ext/storage/full_text/opensearch.py | 4 +- 3 files changed, 148 insertions(+), 13 deletions(-) diff --git a/packages/dbgpt-core/src/dbgpt/storage/full_text/base.py b/packages/dbgpt-core/src/dbgpt/storage/full_text/base.py index 43d1641ce..d4886fc32 100644 --- a/packages/dbgpt-core/src/dbgpt/storage/full_text/base.py +++ b/packages/dbgpt-core/src/dbgpt/storage/full_text/base.py @@ -20,6 +20,37 @@ class FullTextStoreBase(IndexStoreBase): """Initialize vector store.""" super().__init__(executor) + def is_support_full_text_search(self) -> bool: + # 重写,新增抽象类 + """Support full text search. + + Full text store should support full text search by default. + + Return: + bool: True, full text stores always support full text search. + """ + return True # 全文检索存储类应该始终支持全文检索 + + def full_text_search( + self, text: str, topk: int, filters: Optional[MetadataFilters] = None + ) -> List[Chunk]: + # 重写,新增抽象类 + """Full text search. + + Args: + text (str): The query text. + topk (int): Number of results to return. Default is 10. + + Returns: + List[Chunk]: Search results as chunks. + """ + # 调用抽象方法 similar_search_with_scores,但可以忽略分数阈值 + # 或者子类需要实现具体的全文检索逻辑 + + return self.similar_search_with_scores( + text, topk, score_threshold=0.0, filters=filters + ) + @abstractmethod def load_document(self, chunks: List[Chunk]) -> List[str]: """Load document in index database. @@ -30,7 +61,9 @@ class FullTextStoreBase(IndexStoreBase): List[str]: chunk ids. """ - async def aload_document(self, chunks: List[Chunk]) -> List[str]: + async def aload_document( + self, chunks: List[Chunk], file_id: Optional[str] = None + ) -> List[str]: """Async load document in index database. Args: diff --git a/packages/dbgpt-ext/src/dbgpt_ext/storage/full_text/elasticsearch.py b/packages/dbgpt-ext/src/dbgpt_ext/storage/full_text/elasticsearch.py index b2c26a625..67c47fc23 100644 --- a/packages/dbgpt-ext/src/dbgpt_ext/storage/full_text/elasticsearch.py +++ b/packages/dbgpt-ext/src/dbgpt_ext/storage/full_text/elasticsearch.py @@ -8,7 +8,11 @@ from typing import List, Optional from dbgpt.core import Chunk from dbgpt.storage.base import IndexStoreConfig, logger from dbgpt.storage.full_text.base import FullTextStoreBase -from dbgpt.storage.vector_store.filters import MetadataFilters +from dbgpt.storage.vector_store.filters import ( + FilterCondition, + FilterOperator, + MetadataFilters, +) from dbgpt.util import string_utils from dbgpt.util.executor_utils import blocking_func_to_async from dbgpt_ext.storage.vector_store.elastic_store import ElasticsearchStoreConfig @@ -81,7 +85,9 @@ class ElasticDocumentStore(FullTextStoreBase): "similarity": "custom_bm25", }, "metadata": { - "type": "keyword", + # Use object so metadata fields stay queryable for filters + "type": "object", + "dynamic": True, }, } } @@ -94,6 +100,35 @@ class ElasticDocumentStore(FullTextStoreBase): ) self._executor = executor or ThreadPoolExecutor() + def is_support_full_text_search(self) -> bool: + # 重写,避免继承父类的默认实现 + """Support full text search. + + Elasticsearch supports full text search. + + Return: + bool: True if full text search is supported. + """ + return True # Elasticsearch 支持全文检索 + + def full_text_search( + self, text: str, topk: int, filters: Optional[MetadataFilters] = None + ) -> List[Chunk]: + # 重写,使用现有的 similar_search_with_scores 方法实现全文检索 + """Full text search in Elasticsearch. + + Args: + text (str): The query text. + topk (int): Number of results to return. Default is 10. + + Returns: + List[Chunk]: Search results as chunks. + """ + score_threshold = 0.0 + return self.similar_search_with_scores( + text=text, top_k=topk, score_threshold=score_threshold, filters=filters + ) + def get_config(self) -> IndexStoreConfig: """Get the es store config.""" return self._es_config @@ -114,7 +149,7 @@ class ElasticDocumentStore(FullTextStoreBase): es_requests = [] ids = [] contents = [chunk.content for chunk in chunks] - metadatas = [json.dumps(chunk.metadata) for chunk in chunks] + metadatas = [self._normalize_metadata(chunk.metadata) for chunk in chunks] chunk_ids = [chunk.chunk_id for chunk in chunks] for i, content in enumerate(contents): es_request = { @@ -143,19 +178,22 @@ class ElasticDocumentStore(FullTextStoreBase): Return: List[Chunk]: similar text. """ - es_query = {"query": {"match": {"content": text}}} - res = self._es_client.search(index=self._index_name, body=es_query) + es_query = self._build_query(text, filters) + res = self._es_client.search( + index=self._index_name, body=es_query, size=topk, track_total_hits=False + ) chunks = [] for r in res["hits"]["hits"]: + metadata = self._normalize_metadata(r["_source"].get("metadata")) chunks.append( Chunk( chunk_id=r["_id"], content=r["_source"]["content"], - metadata=json.loads(r["_source"]["metadata"]), + metadata=metadata, ) ) - return chunks[:topk] + return chunks def similar_search_with_scores( self, @@ -175,17 +213,20 @@ class ElasticDocumentStore(FullTextStoreBase): Return: List[Tuple[str, float]]: similar text with scores. """ - es_query = {"query": {"match": {"content": text}}} - res = self._es_client.search(index=self._index_name, body=es_query) + es_query = self._build_query(text, filters) + res = self._es_client.search( + index=self._index_name, body=es_query, size=top_k, track_total_hits=False + ) chunks_with_scores = [] for r in res["hits"]["hits"]: if r["_score"] >= score_threshold: + metadata = self._normalize_metadata(r["_source"].get("metadata")) chunks_with_scores.append( Chunk( chunk_id=r["_id"], content=r["_source"]["content"], - metadata=json.loads(r["_source"]["metadata"]), + metadata=metadata, score=r["_score"], ) ) @@ -196,7 +237,9 @@ class ElasticDocumentStore(FullTextStoreBase): ) return chunks_with_scores[:top_k] - async def aload_document(self, chunks: List[Chunk]) -> List[str]: + async def aload_document( + self, chunks: List[Chunk], file_id: Optional[str] = None + ) -> List[str]: """Async load document in elasticsearch. Args: @@ -204,6 +247,13 @@ class ElasticDocumentStore(FullTextStoreBase): Return: List[str]: chunk ids. """ + # 新增修改:将 file_id 注入到每个 chunk 的元数据中 + if file_id: + # 确保 metadata 字段存在,然后添加或更新 file_id + for chunk in chunks: + if not hasattr(chunk, "metadata"): + chunk.metadata = {} + chunk.metadata["file_id"] = file_id return await blocking_func_to_async(self._executor, self.load_document, chunks) def delete_by_ids(self, ids: str) -> List[str]: @@ -229,3 +279,53 @@ class ElasticDocumentStore(FullTextStoreBase): index_name(str): The name of index to delete. """ self._es_client.indices.delete(index=self._index_name) + + def _build_query(self, text: str, filters: Optional[MetadataFilters]): + must_clauses = [{"match": {"content": text}}] + filter_clause = self._build_metadata_filter(filters) + if filter_clause: + must_clauses.append(filter_clause) + return {"query": {"bool": {"must": must_clauses}}} + + def _build_metadata_filter(self, filters: Optional[MetadataFilters]): + """Translate MetadataFilters to elasticsearch bool clause.""" + if not filters or not filters.filters: + return None + + clauses = [] + for f in filters.filters: + field_name = f"metadata.{f.key}" + if f.operator == FilterOperator.EQ: + clauses.append({"term": {field_name: f.value}}) + elif f.operator == FilterOperator.IN: + values = f.value if isinstance(f.value, list) else [f.value] + clauses.append({"terms": {field_name: values}}) + elif f.operator == FilterOperator.NE: + clauses.append({"bool": {"must_not": {"term": {field_name: f.value}}}}) + else: + logger.warning( + "Unsupported filter operator %s for elastic full text search", + f.operator, + ) + if not clauses: + return None + if filters.condition == FilterCondition.OR: + return {"bool": {"should": clauses, "minimum_should_match": 1}} + return {"bool": {"must": clauses}} + + def _normalize_metadata(self, metadata): + """Ensure metadata is stored as a dict for downstream consumers.""" + if metadata is None: + return {} + if isinstance(metadata, dict): + return metadata + if isinstance(metadata, str): + try: + return json.loads(metadata) + except Exception: + # Fallback to wrapping the raw string to avoid breaking callers + return {"value": metadata} + try: + return dict(metadata) + except Exception: + return {"value": metadata} diff --git a/packages/dbgpt-ext/src/dbgpt_ext/storage/full_text/opensearch.py b/packages/dbgpt-ext/src/dbgpt_ext/storage/full_text/opensearch.py index 0645d351b..409ee5917 100644 --- a/packages/dbgpt-ext/src/dbgpt_ext/storage/full_text/opensearch.py +++ b/packages/dbgpt-ext/src/dbgpt_ext/storage/full_text/opensearch.py @@ -20,7 +20,9 @@ class OpenSearch(FullTextStoreBase): """ pass - def aload_document(self, chunks: List[Chunk]) -> List[str]: + def aload_document( + self, chunks: List[Chunk], file_id: Optional[str] = None + ) -> List[str]: """Async load document in index database. Args: