mirror of
https://github.com/jumpserver/jumpserver.git
synced 2025-09-06 09:51:00 +00:00
feat: 新增ping、telnet系统工具 (#8666)
* feat: 新增ping、telnet系统工具 * perf: 消息返回 Co-authored-by: halo <wuyihuangw@gmail.com>
This commit is contained in:
77
apps/settings/ws.py
Normal file
77
apps/settings/ws.py
Normal file
@@ -0,0 +1,77 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
#
|
||||
|
||||
import json
|
||||
|
||||
from channels.generic.websocket import JsonWebsocketConsumer
|
||||
|
||||
from common.db.utils import close_old_connections
|
||||
from common.utils import get_logger
|
||||
from .utils import ping, telnet
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
|
||||
class ToolsWebsocket(JsonWebsocketConsumer):
|
||||
|
||||
def connect(self):
|
||||
user = self.scope["user"]
|
||||
if user.is_authenticated:
|
||||
self.accept()
|
||||
else:
|
||||
self.close()
|
||||
|
||||
def imitate_ping(self, dest_addr, timeout=3, count=5, psize=64):
|
||||
"""
|
||||
Send `count' ping with `psize' size to `dest_addr' with
|
||||
the given `timeout' and display the result.
|
||||
"""
|
||||
logger.info('receive request ping {}'.format(dest_addr))
|
||||
self.send_json({'msg': 'Trying {0}...\r\n'.format(dest_addr)})
|
||||
for i in range(count):
|
||||
msg = 'ping {0} with ...{1}\r\n'
|
||||
try:
|
||||
delay = ping(dest_addr, timeout, psize)
|
||||
except Exception as e:
|
||||
msg = msg.format(dest_addr, 'failed. (socket error: {})'.format(str(e)))
|
||||
logger.error(msg)
|
||||
self.send_json({'msg': msg})
|
||||
break
|
||||
if delay is None:
|
||||
msg = msg.format(dest_addr, 'failed. (timeout within {}sec.)'.format(timeout))
|
||||
else:
|
||||
delay = delay * 1000
|
||||
msg = msg.format(dest_addr, 'get ping in %0.4fms' % delay)
|
||||
self.send_json({'msg': msg})
|
||||
|
||||
def imitate_telnet(self, dest_addr, port_num=23, timeout=10):
|
||||
logger.info('receive request telnet {}'.format(dest_addr))
|
||||
self.send_json({'msg': 'Trying {0} {1}...\r\n'.format(dest_addr, port_num)})
|
||||
msg = 'Telnet: {}'
|
||||
try:
|
||||
is_connective, resp = telnet(dest_addr, port_num, timeout)
|
||||
if is_connective:
|
||||
msg = msg.format('Connected to {0} {1}\r\n{2}'.format(dest_addr, port_num, resp))
|
||||
else:
|
||||
msg = msg.format('Connect to {0} {1} {2}\r\nTelnet: Unable to connect to remote host'
|
||||
.format(dest_addr, port_num, resp))
|
||||
except Exception as e:
|
||||
logger.error(msg)
|
||||
msg = msg.format(str(e))
|
||||
finally:
|
||||
self.send_json({'msg': msg})
|
||||
|
||||
def receive(self, text_data=None, bytes_data=None, **kwargs):
|
||||
data = json.loads(text_data)
|
||||
tool_type = data.get('tool_type', 'Ping')
|
||||
dest_addr = data.get('dest_addr')
|
||||
if tool_type == 'Ping':
|
||||
self.imitate_ping(dest_addr)
|
||||
else:
|
||||
port_num = data.get('port_num')
|
||||
self.imitate_telnet(dest_addr, port_num)
|
||||
self.close()
|
||||
|
||||
def disconnect(self, code):
|
||||
self.close()
|
||||
close_old_connections()
|
Reference in New Issue
Block a user