diff --git a/packages/dbgpt-ext/pyproject.toml b/packages/dbgpt-ext/pyproject.toml index 37a775b40..891f4d157 100644 --- a/packages/dbgpt-ext/pyproject.toml +++ b/packages/dbgpt-ext/pyproject.toml @@ -77,7 +77,11 @@ storage_chromadb = [ "onnxruntime>=1.14.1,<=1.18.1", "chromadb>=0.4.22" ] -storage_elasticsearch = ["elasticsearch"] +storage_elasticsearch = [ + "elasticsearch==8.17.1", + "langchain==0.3.19", + "langchain-community==0.3.18", + ] storage_obvector = ["pyobvector"] file_oss = [ diff --git a/packages/dbgpt-ext/src/dbgpt_ext/storage/full_text/elasticsearch.py b/packages/dbgpt-ext/src/dbgpt_ext/storage/full_text/elasticsearch.py index b2c26a625..2e0c3cdc0 100644 --- a/packages/dbgpt-ext/src/dbgpt_ext/storage/full_text/elasticsearch.py +++ b/packages/dbgpt-ext/src/dbgpt_ext/storage/full_text/elasticsearch.py @@ -65,6 +65,10 @@ class ElasticDocumentStore(FullTextStoreBase): hosts=[f"http://{self._es_url}:{self._es_port}"], ) self._es_index_settings = { + "number_of_shards": 1, + # replica number # # Avoid yellow status in standalone es, + # TODO: setting in config toml + "number_of_replicas": 0, "analysis": {"analyzer": {"default": {"type": "standard"}}}, "similarity": { "custom_bm25": { diff --git a/packages/dbgpt-ext/src/dbgpt_ext/storage/vector_store/elastic_store.py b/packages/dbgpt-ext/src/dbgpt_ext/storage/vector_store/elastic_store.py index 5be306e85..994d80012 100644 --- a/packages/dbgpt-ext/src/dbgpt_ext/storage/vector_store/elastic_store.py +++ b/packages/dbgpt-ext/src/dbgpt_ext/storage/vector_store/elastic_store.py @@ -4,6 +4,7 @@ from __future__ import annotations import logging import os +import traceback from dataclasses import dataclass, field from typing import Any, List, Optional @@ -236,12 +237,9 @@ class ElasticStore(VectorStoreBase): f"http://{self.uri}:{self.port}", basic_auth=(self.username, self.password), ) - # create es index - self.create_collection(collection_name=self.index_name) else: logger.warning("ElasticSearch not set username and password") self.es_client_python = Elasticsearch(f"http://{self.uri}:{self.port}") - self.create_collection(collection_name=self.index_name) except ConnectionError: logger.error("ElasticSearch connection failed") except Exception as e: @@ -298,6 +296,12 @@ class ElasticStore(VectorStoreBase): "Please install it with `pip install langchain` and " "`pip install elasticsearch`." ) + try: + # create es index + self.create_collection(collection_name=self.index_name) + except Exception as e: + logger.error(f"Try create es index failed : {e}", exc_info=True) + logger.error(traceback.format_exc()) try: texts = [chunk.content for chunk in chunks] metadatas = [chunk.metadata for chunk in chunks] @@ -346,6 +350,7 @@ class ElasticStore(VectorStoreBase): logger.error(f"ElasticSearch connect failed {ce}") except Exception as e: logger.error(f"ElasticSearch load_document failed : {e}") + logger.error(traceback.format_exc()) return [] def delete_by_ids(self, ids): @@ -365,7 +370,7 @@ class ElasticStore(VectorStoreBase): filters: Optional[MetadataFilters] = None, ) -> List[Chunk]: """Perform a search on a query string and return results.""" - info_docs = self._search(query=text, topk=topk, filters=filters) + info_docs = self._vector_search(query=text, topk=topk, filters=filters) return info_docs def similar_search_with_scores( @@ -385,7 +390,7 @@ class ElasticStore(VectorStoreBase): List[Chunk]: Result doc and score. """ query = text - info_docs = self._search(query=query, topk=topk, filters=filters) + info_docs = self._vector_search(query=query, topk=topk, filters=filters) docs_and_scores = [ chunk for chunk in info_docs if chunk.score >= score_threshold ] @@ -439,6 +444,67 @@ class ElasticStore(VectorStoreBase): info_docs.append(doc_with_score) return info_docs + def _vector_search( + self, query: str, topk: int, filters: Optional[MetadataFilters] = None, **kwargs + ) -> List[Chunk]: + """Search similar documents. + + Args: + query: query text + topk: return docs nums. Defaults to 4. + filters: metadata filters. + Return: + List[Chunk]: list of chunks + """ + # Convert the query text to a vector using the embedding function + query_vector = self.embedding.embed_query(query) + + # Prepare the script score query to compute vector similarity + script_score_query = { + "script_score": { + "query": {"match_all": {}}, + "script": { + "source": "(cosineSimilarity(params.query_vector, 'dense_vector')" + " + 1.0)/2.0", + "params": {"query_vector": query_vector}, + }, + } + } + + # Prepare the body for the search query + body = {"query": script_score_query} + + # Apply filter first if filters are provided + if filters: + where_filters = self.convert_metadata_filters(filters) + body["query"] = { + "bool": { + "filter": [{"terms": where_filters}], + "must": [script_score_query], + } + } + + search_results = self.es_client_python.search( + index=self.index_name, body=body, size=topk + ) + search_results = search_results["hits"]["hits"] + + if not search_results: + logger.warning("""No ElasticSearch results found.""") + return [] + info_docs = [] + for result in search_results: + doc_id = result["_id"] + source = result["_source"] + context = source["context"] + metadata = source["metadata"] + score = result["_score"] + doc_with_score = Chunk( + content=context, metadata=metadata, score=score, chunk_id=doc_id + ) + info_docs.append(doc_with_score) + return info_docs + def vector_name_exists(self): """Whether vector name exists.""" return self.es_client_python.indices.exists(index=self.index_name)