Compare commits

...

3 Commits

Author SHA1 Message Date
xinwen
bc7dfc3dc7 fix: 完善消息队列 redis 连接未关闭 2022-02-10 14:44:29 +08:00
xinwen
5cfd5afc7c fix: 消息订阅redis连接未关闭 2022-02-08 14:22:18 +08:00
fit2bot
ed077910eb fix: 将 es 的 doc_type 默认值改为 _doc (#6628)
* fix: 修复 es 命令存储过滤不准确

* fix: 无效的 es 报 500

* fix: 修复索引不存在时报错

* fix: 将 es 的 doc_type 默认值改为 _doc

Co-authored-by: xinwen <coderWen@126.com>
2021-08-12 15:36:49 +08:00
3 changed files with 40 additions and 7 deletions

View File

@@ -4,7 +4,6 @@ import json
from channels.generic.websocket import JsonWebsocketConsumer
from common.utils import get_logger
from .models import SiteMessage
from .site_msg import SiteMessageUtil
from .signals_handler import new_site_msg_chan
@@ -14,6 +13,7 @@ logger = get_logger(__name__)
class SiteMsgWebsocket(JsonWebsocketConsumer):
disconnected = False
refresh_every_seconds = 10
subscribe = None
def connect(self):
user = self.scope["user"]
@@ -50,6 +50,7 @@ class SiteMsgWebsocket(JsonWebsocketConsumer):
while not self.disconnected:
subscribe = new_site_msg_chan.subscribe()
self.subscribe = subscribe
for message in subscribe.listen():
if message['type'] != 'message':
continue
@@ -68,3 +69,5 @@ class SiteMsgWebsocket(JsonWebsocketConsumer):
def disconnect(self, close_code):
self.disconnected = True
self.close()
if self.subscribe:
self.subscribe.close()

View File

@@ -11,7 +11,7 @@ from django.utils.translation import gettext_lazy as _
from django.db.models import QuerySet as DJQuerySet
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
from elasticsearch.exceptions import RequestError
from elasticsearch.exceptions import RequestError, NotFoundError
from common.utils.common import lazyproperty
from common.utils import get_logger
@@ -33,12 +33,43 @@ class CommandStore():
kwargs = config.get("OTHER", {})
self.index = config.get("INDEX") or 'jumpserver'
self.doc_type = config.get("DOC_TYPE") or 'command_store'
self.exact_fields = {}
self.match_fields = {}
ignore_verify_certs = kwargs.pop('IGNORE_VERIFY_CERTS', False)
if ignore_verify_certs:
kwargs['verify_certs'] = None
self.es = Elasticsearch(hosts=hosts, max_retries=0, **kwargs)
self.exact_fields = set()
self.match_fields = {'input', 'risk_level', 'user', 'asset', 'system_user'}
may_exact_fields = {'session', 'org_id'}
if self.is_new_index_type():
self.exact_fields.update(may_exact_fields)
self.doc_type = '_doc'
else:
self.match_fields.update(may_exact_fields)
def is_new_index_type(self):
if not self.ping(timeout=3):
return False
try:
# 获取索引信息,如果没有定义,直接返回
data = self.es.indices.get_mapping(self.index)
except NotFoundError:
return False
try:
# 检测索引是不是新的类型
properties = data[self.index]['mappings']['properties']
if properties['session']['type'] == 'keyword' \
and properties['org_id']['type'] == 'keyword':
return True
except KeyError:
return False
def pre_use_check(self):
if not self.ping(timeout=3):
raise InvalidElasticsearch
@@ -110,15 +141,14 @@ class CommandStore():
except Exception:
return False
@staticmethod
def get_query_body(**kwargs):
def get_query_body(self, **kwargs):
new_kwargs = {}
for k, v in kwargs.items():
new_kwargs[k] = str(v) if isinstance(v, UUID) else v
kwargs = new_kwargs
exact_fields = {}
match_fields = {'session', 'input', 'org_id', 'risk_level', 'user', 'asset', 'system_user'}
exact_fields = self.exact_fields
match_fields = self.match_fields
match = {}
exact = {}

View File

@@ -191,7 +191,7 @@ class CommandStorageTypeESSerializer(serializers.Serializer):
INDEX = serializers.CharField(
max_length=1024, default='jumpserver', label=_('Index'), allow_null=True
)
DOC_TYPE = ReadableHiddenField(default='command', label=_('Doc type'), allow_null=True)
DOC_TYPE = ReadableHiddenField(default='_doc', label=_('Doc type'), allow_null=True)
IGNORE_VERIFY_CERTS = serializers.BooleanField(
default=False, label=_('Ignore Certificate Verification'),
source='OTHER.IGNORE_VERIFY_CERTS', allow_null=True,