This commit is contained in:
JohnSaxon 2025-07-07 10:48:43 +08:00 committed by GitHub
commit c934a5512f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 80 additions and 6 deletions

View File

@ -77,7 +77,11 @@ storage_chromadb = [
"onnxruntime>=1.14.1,<=1.18.1", "onnxruntime>=1.14.1,<=1.18.1",
"chromadb>=0.4.22" "chromadb>=0.4.22"
] ]
storage_elasticsearch = ["elasticsearch"] storage_elasticsearch = [
"elasticsearch==8.17.1",
"langchain==0.3.19",
"langchain-community==0.3.18",
]
storage_obvector = ["pyobvector"] storage_obvector = ["pyobvector"]
file_oss = [ file_oss = [

View File

@ -65,6 +65,10 @@ class ElasticDocumentStore(FullTextStoreBase):
hosts=[f"http://{self._es_url}:{self._es_port}"], hosts=[f"http://{self._es_url}:{self._es_port}"],
) )
self._es_index_settings = { self._es_index_settings = {
"number_of_shards": 1,
# replica number # # Avoid yellow status in standalone es,
# TODO: setting in config toml
"number_of_replicas": 0,
"analysis": {"analyzer": {"default": {"type": "standard"}}}, "analysis": {"analyzer": {"default": {"type": "standard"}}},
"similarity": { "similarity": {
"custom_bm25": { "custom_bm25": {

View File

@ -4,6 +4,7 @@ from __future__ import annotations
import logging import logging
import os import os
import traceback
from dataclasses import dataclass, field from dataclasses import dataclass, field
from typing import Any, List, Optional from typing import Any, List, Optional
@ -236,12 +237,9 @@ class ElasticStore(VectorStoreBase):
f"http://{self.uri}:{self.port}", f"http://{self.uri}:{self.port}",
basic_auth=(self.username, self.password), basic_auth=(self.username, self.password),
) )
# create es index
self.create_collection(collection_name=self.index_name)
else: else:
logger.warning("ElasticSearch not set username and password") logger.warning("ElasticSearch not set username and password")
self.es_client_python = Elasticsearch(f"http://{self.uri}:{self.port}") self.es_client_python = Elasticsearch(f"http://{self.uri}:{self.port}")
self.create_collection(collection_name=self.index_name)
except ConnectionError: except ConnectionError:
logger.error("ElasticSearch connection failed") logger.error("ElasticSearch connection failed")
except Exception as e: except Exception as e:
@ -298,6 +296,12 @@ class ElasticStore(VectorStoreBase):
"Please install it with `pip install langchain` and " "Please install it with `pip install langchain` and "
"`pip install elasticsearch`." "`pip install elasticsearch`."
) )
try:
# create es index
self.create_collection(collection_name=self.index_name)
except Exception as e:
logger.error(f"Try create es index failed : {e}", exc_info=True)
logger.error(traceback.format_exc())
try: try:
texts = [chunk.content for chunk in chunks] texts = [chunk.content for chunk in chunks]
metadatas = [chunk.metadata for chunk in chunks] metadatas = [chunk.metadata for chunk in chunks]
@ -346,6 +350,7 @@ class ElasticStore(VectorStoreBase):
logger.error(f"ElasticSearch connect failed {ce}") logger.error(f"ElasticSearch connect failed {ce}")
except Exception as e: except Exception as e:
logger.error(f"ElasticSearch load_document failed : {e}") logger.error(f"ElasticSearch load_document failed : {e}")
logger.error(traceback.format_exc())
return [] return []
def delete_by_ids(self, ids): def delete_by_ids(self, ids):
@ -365,7 +370,7 @@ class ElasticStore(VectorStoreBase):
filters: Optional[MetadataFilters] = None, filters: Optional[MetadataFilters] = None,
) -> List[Chunk]: ) -> List[Chunk]:
"""Perform a search on a query string and return results.""" """Perform a search on a query string and return results."""
info_docs = self._search(query=text, topk=topk, filters=filters) info_docs = self._vector_search(query=text, topk=topk, filters=filters)
return info_docs return info_docs
def similar_search_with_scores( def similar_search_with_scores(
@ -385,7 +390,7 @@ class ElasticStore(VectorStoreBase):
List[Chunk]: Result doc and score. List[Chunk]: Result doc and score.
""" """
query = text query = text
info_docs = self._search(query=query, topk=topk, filters=filters) info_docs = self._vector_search(query=query, topk=topk, filters=filters)
docs_and_scores = [ docs_and_scores = [
chunk for chunk in info_docs if chunk.score >= score_threshold chunk for chunk in info_docs if chunk.score >= score_threshold
] ]
@ -439,6 +444,67 @@ class ElasticStore(VectorStoreBase):
info_docs.append(doc_with_score) info_docs.append(doc_with_score)
return info_docs return info_docs
def _vector_search(
self, query: str, topk: int, filters: Optional[MetadataFilters] = None, **kwargs
) -> List[Chunk]:
"""Search similar documents.
Args:
query: query text
topk: return docs nums. Defaults to 4.
filters: metadata filters.
Return:
List[Chunk]: list of chunks
"""
# Convert the query text to a vector using the embedding function
query_vector = self.embedding.embed_query(query)
# Prepare the script score query to compute vector similarity
script_score_query = {
"script_score": {
"query": {"match_all": {}},
"script": {
"source": "(cosineSimilarity(params.query_vector, 'dense_vector')"
" + 1.0)/2.0",
"params": {"query_vector": query_vector},
},
}
}
# Prepare the body for the search query
body = {"query": script_score_query}
# Apply filter first if filters are provided
if filters:
where_filters = self.convert_metadata_filters(filters)
body["query"] = {
"bool": {
"filter": [{"terms": where_filters}],
"must": [script_score_query],
}
}
search_results = self.es_client_python.search(
index=self.index_name, body=body, size=topk
)
search_results = search_results["hits"]["hits"]
if not search_results:
logger.warning("""No ElasticSearch results found.""")
return []
info_docs = []
for result in search_results:
doc_id = result["_id"]
source = result["_source"]
context = source["context"]
metadata = source["metadata"]
score = result["_score"]
doc_with_score = Chunk(
content=context, metadata=metadata, score=score, chunk_id=doc_id
)
info_docs.append(doc_with_score)
return info_docs
def vector_name_exists(self): def vector_name_exists(self):
"""Whether vector name exists.""" """Whether vector name exists."""
return self.es_client_python.indices.exists(index=self.index_name) return self.es_client_python.indices.exists(index=self.index_name)