mirror of
https://github.com/jumpserver/jumpserver.git
synced 2025-09-08 18:59:47 +00:00
perf: 命令存储ES可根据日期动态建立索引 (#8180)
* perf: 命令存储ES可根据日期动态建立索引 * perf: 优化合并字段 * feat: 修改逻辑
This commit is contained in:
@@ -1,11 +1,12 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
#
|
||||
import pytz
|
||||
import inspect
|
||||
|
||||
from datetime import datetime
|
||||
from functools import reduce, partial
|
||||
from itertools import groupby
|
||||
import pytz
|
||||
from uuid import UUID
|
||||
import inspect
|
||||
|
||||
from django.utils.translation import gettext_lazy as _
|
||||
from django.db.models import QuerySet as DJQuerySet
|
||||
@@ -15,6 +16,7 @@ from elasticsearch.exceptions import RequestError, NotFoundError
|
||||
|
||||
from common.utils.common import lazyproperty
|
||||
from common.utils import get_logger
|
||||
from common.utils.timezone import local_now_date_display, utc_now
|
||||
from common.exceptions import JMSException
|
||||
from .models import AbstractSessionCommand
|
||||
|
||||
@@ -28,12 +30,13 @@ class InvalidElasticsearch(JMSException):
|
||||
|
||||
class CommandStore(object):
|
||||
def __init__(self, config):
|
||||
hosts = config.get("HOSTS")
|
||||
kwargs = config.get("OTHER", {})
|
||||
self.index = config.get("INDEX") or 'jumpserver'
|
||||
self.doc_type = config.get("DOC_TYPE") or '_doc'
|
||||
self.index_prefix = config.get('INDEX') or 'jumpserver'
|
||||
self.is_index_by_date = bool(config.get('INDEX_BY_DATE'))
|
||||
self.exact_fields = {}
|
||||
self.match_fields = {}
|
||||
hosts = config.get("HOSTS")
|
||||
kwargs = config.get("OTHER", {})
|
||||
|
||||
ignore_verify_certs = kwargs.pop('IGNORE_VERIFY_CERTS', False)
|
||||
if ignore_verify_certs:
|
||||
@@ -50,6 +53,17 @@ class CommandStore(object):
|
||||
else:
|
||||
self.match_fields.update(may_exact_fields)
|
||||
|
||||
self.init_index(config)
|
||||
|
||||
def init_index(self, config):
|
||||
if self.is_index_by_date:
|
||||
date = local_now_date_display()
|
||||
self.index = '%s-%s' % (self.index_prefix, date)
|
||||
self.query_index = '%s-alias' % self.index_prefix
|
||||
else:
|
||||
self.index = config.get("INDEX") or 'jumpserver'
|
||||
self.query_index = config.get("INDEX") or 'jumpserver'
|
||||
|
||||
def is_new_index_type(self):
|
||||
if not self.ping(timeout=3):
|
||||
return False
|
||||
@@ -101,11 +115,18 @@ class CommandStore(object):
|
||||
else:
|
||||
mappings = {'mappings': {'properties': properties}}
|
||||
|
||||
if self.is_index_by_date:
|
||||
mappings['aliases'] = {
|
||||
self.query_index: {}
|
||||
}
|
||||
try:
|
||||
self.es.indices.create(self.index, body=mappings)
|
||||
return
|
||||
except RequestError as e:
|
||||
logger.exception(e)
|
||||
if e.error == 'resource_already_exists_exception':
|
||||
logger.warning(e)
|
||||
else:
|
||||
logger.exception(e)
|
||||
|
||||
@staticmethod
|
||||
def make_data(command):
|
||||
@@ -141,7 +162,7 @@ class CommandStore(object):
|
||||
body = self.get_query_body(**query)
|
||||
|
||||
data = self.es.search(
|
||||
index=self.index, doc_type=self.doc_type, body=body, from_=from_, size=size,
|
||||
index=self.query_index, doc_type=self.doc_type, body=body, from_=from_, size=size,
|
||||
sort=sort
|
||||
)
|
||||
source_data = []
|
||||
@@ -154,7 +175,7 @@ class CommandStore(object):
|
||||
|
||||
def count(self, **query):
|
||||
body = self.get_query_body(**query)
|
||||
data = self.es.count(index=self.index, doc_type=self.doc_type, body=body)
|
||||
data = self.es.count(index=self.query_index, doc_type=self.doc_type, body=body)
|
||||
return data["count"]
|
||||
|
||||
def __getattr__(self, item):
|
||||
|
Reference in New Issue
Block a user