perf: 命令存储支持ES8的版本

This commit is contained in:
wangruidong
2024-06-04 19:34:18 +08:00
committed by Bryan
parent 7ad4d9116a
commit 948e9ecb4b
3 changed files with 221 additions and 51 deletions

View File

@@ -14,9 +14,13 @@ from uuid import UUID
from django.utils.translation import gettext_lazy as _
from django.db.models import QuerySet as DJQuerySet
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
from elasticsearch.exceptions import RequestError, NotFoundError
from elasticsearch7 import Elasticsearch
from elasticsearch7.helpers import bulk
from elasticsearch7.exceptions import RequestError
from elasticsearch7.exceptions import NotFoundError as NotFoundError7
from elasticsearch8.exceptions import NotFoundError as NotFoundError8
from elasticsearch8.exceptions import BadRequestError
from common.utils.common import lazyproperty
from common.utils import get_logger
@@ -36,9 +40,71 @@ class NotSupportElasticsearch8(JMSException):
default_detail = _('Not Support Elasticsearch8')
class ES(object):
def __init__(self, config, properties, keyword_fields, exact_fields=None, match_fields=None):
class ESClient(object):
def __new__(cls, *args, **kwargs):
version = kwargs.pop('version')
if version == 6:
return ESClientV6(*args, **kwargs)
if version == 7:
return ESClientV7(*args, **kwargs)
elif version == 8:
return ESClientV8(*args, **kwargs)
raise ValueError('Unsupported ES_VERSION %r' % version)
class ESClientBase(object):
@classmethod
def get_properties(cls, data, index):
return data[index]['mappings']['properties']
@classmethod
def get_mapping(cls, properties):
return {'mappings': {'properties': properties}}
class ESClientV7(ESClientBase):
def __init__(self, *args, **kwargs):
from elasticsearch7 import Elasticsearch
self.es = Elasticsearch(*args, **kwargs)
@classmethod
def get_sort(cls, field, direction):
return f'{field}:{direction}'
class ESClientV6(ESClientV7):
@classmethod
def get_properties(cls, data, index):
return data[index]['mappings']['data']['properties']
@classmethod
def get_mapping(cls, properties):
return {'mappings': {'data': {'properties': properties}}}
class ESClientV8(ESClientBase):
def __init__(self, *args, **kwargs):
from elasticsearch8 import Elasticsearch
self.es = Elasticsearch(*args, **kwargs)
@classmethod
def get_sort(cls, field, direction):
return {field: {'order': direction}}
def get_es_client_version(**kwargs):
es = kwargs.get('es')
info = es.info()
version = int(info['version']['number'].split('.')[0])
return version
class ES(object):
def __init__(self, config, properties, keyword_fields, exact_fields=None, match_fields=None):
self.version = 7
self.config = config
hosts = self.config.get('HOSTS')
kwargs = self.config.get('OTHER', {})
@@ -47,6 +113,9 @@ class ES(object):
if ignore_verify_certs:
kwargs['verify_certs'] = None
self.es = Elasticsearch(hosts=hosts, max_retries=0, **kwargs)
self.version = get_es_client_version(es=self.es)
self.client = ESClient(version=self.version, hosts=hosts, max_retries=0, **kwargs)
self.es = self.client.es
self.index_prefix = self.config.get('INDEX') or 'jumpserver'
self.is_index_by_date = bool(self.config.get('INDEX_BY_DATE', False))
@@ -83,26 +152,14 @@ class ES(object):
if not self.ping(timeout=2):
return False
info = self.es.info()
version = info['version']['number'].split('.')[0]
if version == '8':
raise NotSupportElasticsearch8
try:
# 获取索引信息,如果没有定义,直接返回
data = self.es.indices.get_mapping(index=self.index)
except NotFoundError:
except (NotFoundError8, NotFoundError7):
return False
try:
if version == '6':
# 检测索引是不是新的类型 es6
properties = data[self.index]['mappings']['data']['properties']
else:
# 检测索引是不是新的类型 es7 default index type: _doc
properties = data[self.index]['mappings']['properties']
properties = self.client.get_properties(data=data, index=self.index)
for keyword in self.keyword_fields:
if not properties[keyword]['type'] == 'keyword':
break
@@ -118,12 +175,7 @@ class ES(object):
def _ensure_index_exists(self):
try:
info = self.es.info()
version = info['version']['number'].split('.')[0]
if version == '6':
mappings = {'mappings': {'data': {'properties': self.properties}}}
else:
mappings = {'mappings': {'properties': self.properties}}
mappings = self.client.get_mapping(self.properties)
if self.is_index_by_date:
mappings['aliases'] = {
@@ -132,7 +184,7 @@ class ES(object):
try:
self.es.indices.create(index=self.index, body=mappings)
except RequestError as e:
except (RequestError, BadRequestError) as e:
if e.error == 'resource_already_exists_exception':
logger.warning(e)
else:
@@ -367,7 +419,7 @@ class QuerySet(DJQuerySet):
else:
direction = 'asc'
field = field.lstrip('-+')
sort = f'{field}:{direction}'
sort = self._storage.client.get_sort(field, direction)
return sort
def __execute(self):