Compare commits

...

3 Commits

Author SHA1 Message Date
xinwen
bc7dfc3dc7 fix: 完善消息队列 redis 连接未关闭 2022-02-10 14:44:29 +08:00
xinwen
5cfd5afc7c fix: 消息订阅redis连接未关闭 2022-02-08 14:22:18 +08:00
fit2bot
ed077910eb fix: 将 es 的 doc_type 默认值改为 _doc (#6628)
* fix: 修复 es 命令存储过滤不准确

* fix: 无效的 es 报 500

* fix: 修复索引不存在时报错

* fix: 将 es 的 doc_type 默认值改为 _doc

Co-authored-by: xinwen <coderWen@126.com>
2021-08-12 15:36:49 +08:00
3 changed files with 40 additions and 7 deletions

View File

@@ -4,7 +4,6 @@ import json
from channels.generic.websocket import JsonWebsocketConsumer from channels.generic.websocket import JsonWebsocketConsumer
from common.utils import get_logger from common.utils import get_logger
from .models import SiteMessage
from .site_msg import SiteMessageUtil from .site_msg import SiteMessageUtil
from .signals_handler import new_site_msg_chan from .signals_handler import new_site_msg_chan
@@ -14,6 +13,7 @@ logger = get_logger(__name__)
class SiteMsgWebsocket(JsonWebsocketConsumer): class SiteMsgWebsocket(JsonWebsocketConsumer):
disconnected = False disconnected = False
refresh_every_seconds = 10 refresh_every_seconds = 10
subscribe = None
def connect(self): def connect(self):
user = self.scope["user"] user = self.scope["user"]
@@ -50,6 +50,7 @@ class SiteMsgWebsocket(JsonWebsocketConsumer):
while not self.disconnected: while not self.disconnected:
subscribe = new_site_msg_chan.subscribe() subscribe = new_site_msg_chan.subscribe()
self.subscribe = subscribe
for message in subscribe.listen(): for message in subscribe.listen():
if message['type'] != 'message': if message['type'] != 'message':
continue continue
@@ -68,3 +69,5 @@ class SiteMsgWebsocket(JsonWebsocketConsumer):
def disconnect(self, close_code): def disconnect(self, close_code):
self.disconnected = True self.disconnected = True
self.close() self.close()
if self.subscribe:
self.subscribe.close()

View File

@@ -11,7 +11,7 @@ from django.utils.translation import gettext_lazy as _
from django.db.models import QuerySet as DJQuerySet from django.db.models import QuerySet as DJQuerySet
from elasticsearch import Elasticsearch from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk from elasticsearch.helpers import bulk
from elasticsearch.exceptions import RequestError from elasticsearch.exceptions import RequestError, NotFoundError
from common.utils.common import lazyproperty from common.utils.common import lazyproperty
from common.utils import get_logger from common.utils import get_logger
@@ -33,12 +33,43 @@ class CommandStore():
kwargs = config.get("OTHER", {}) kwargs = config.get("OTHER", {})
self.index = config.get("INDEX") or 'jumpserver' self.index = config.get("INDEX") or 'jumpserver'
self.doc_type = config.get("DOC_TYPE") or 'command_store' self.doc_type = config.get("DOC_TYPE") or 'command_store'
self.exact_fields = {}
self.match_fields = {}
ignore_verify_certs = kwargs.pop('IGNORE_VERIFY_CERTS', False) ignore_verify_certs = kwargs.pop('IGNORE_VERIFY_CERTS', False)
if ignore_verify_certs: if ignore_verify_certs:
kwargs['verify_certs'] = None kwargs['verify_certs'] = None
self.es = Elasticsearch(hosts=hosts, max_retries=0, **kwargs) self.es = Elasticsearch(hosts=hosts, max_retries=0, **kwargs)
self.exact_fields = set()
self.match_fields = {'input', 'risk_level', 'user', 'asset', 'system_user'}
may_exact_fields = {'session', 'org_id'}
if self.is_new_index_type():
self.exact_fields.update(may_exact_fields)
self.doc_type = '_doc'
else:
self.match_fields.update(may_exact_fields)
def is_new_index_type(self):
if not self.ping(timeout=3):
return False
try:
# 获取索引信息,如果没有定义,直接返回
data = self.es.indices.get_mapping(self.index)
except NotFoundError:
return False
try:
# 检测索引是不是新的类型
properties = data[self.index]['mappings']['properties']
if properties['session']['type'] == 'keyword' \
and properties['org_id']['type'] == 'keyword':
return True
except KeyError:
return False
def pre_use_check(self): def pre_use_check(self):
if not self.ping(timeout=3): if not self.ping(timeout=3):
raise InvalidElasticsearch raise InvalidElasticsearch
@@ -110,15 +141,14 @@ class CommandStore():
except Exception: except Exception:
return False return False
@staticmethod def get_query_body(self, **kwargs):
def get_query_body(**kwargs):
new_kwargs = {} new_kwargs = {}
for k, v in kwargs.items(): for k, v in kwargs.items():
new_kwargs[k] = str(v) if isinstance(v, UUID) else v new_kwargs[k] = str(v) if isinstance(v, UUID) else v
kwargs = new_kwargs kwargs = new_kwargs
exact_fields = {} exact_fields = self.exact_fields
match_fields = {'session', 'input', 'org_id', 'risk_level', 'user', 'asset', 'system_user'} match_fields = self.match_fields
match = {} match = {}
exact = {} exact = {}

View File

@@ -191,7 +191,7 @@ class CommandStorageTypeESSerializer(serializers.Serializer):
INDEX = serializers.CharField( INDEX = serializers.CharField(
max_length=1024, default='jumpserver', label=_('Index'), allow_null=True max_length=1024, default='jumpserver', label=_('Index'), allow_null=True
) )
DOC_TYPE = ReadableHiddenField(default='command', label=_('Doc type'), allow_null=True) DOC_TYPE = ReadableHiddenField(default='_doc', label=_('Doc type'), allow_null=True)
IGNORE_VERIFY_CERTS = serializers.BooleanField( IGNORE_VERIFY_CERTS = serializers.BooleanField(
default=False, label=_('Ignore Certificate Verification'), default=False, label=_('Ignore Certificate Verification'),
source='OTHER.IGNORE_VERIFY_CERTS', allow_null=True, source='OTHER.IGNORE_VERIFY_CERTS', allow_null=True,