mirror of
https://github.com/jumpserver/jumpserver.git
synced 2026-07-01 22:49:06 +00:00
perf: to tcpdump
This commit is contained in:
@@ -36,6 +36,7 @@ ARG TOOLS=" \
|
||||
postgresql-client \
|
||||
openssh-client \
|
||||
sshpass \
|
||||
tcpdump \
|
||||
bubblewrap"
|
||||
|
||||
ARG APT_MIRROR=http://deb.debian.org
|
||||
|
||||
@@ -1,78 +1,213 @@
|
||||
import asyncio
|
||||
import netifaces
|
||||
import contextlib
|
||||
import shutil
|
||||
import socket
|
||||
import struct
|
||||
import time
|
||||
|
||||
import netifaces
|
||||
|
||||
from common.utils.timezone import local_now_display
|
||||
from settings.utils import generate_ips, generate_ports
|
||||
|
||||
|
||||
async def once_tcpdump(
|
||||
interface, src_ips, src_ports, dest_ips, dest_ports, display, stop_event
|
||||
):
|
||||
loop = asyncio.get_event_loop()
|
||||
s = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.ntohs(0x0003))
|
||||
s.bind((interface, 0))
|
||||
s.setblocking(False)
|
||||
while not stop_event.is_set():
|
||||
try:
|
||||
packet = await loop.sock_recv(s, 65535)
|
||||
except BlockingIOError:
|
||||
await asyncio.sleep(0.1)
|
||||
# 解析IP数据包
|
||||
ip_header = packet[14:34]
|
||||
ip_hdr = struct.unpack('!BBHHHBBH4s4s', ip_header)
|
||||
# 判断是否为TCP数据包
|
||||
protocol = ip_hdr[6]
|
||||
if protocol != 6:
|
||||
continue
|
||||
# 解析TCP数据包
|
||||
tcp_header = packet[34:54]
|
||||
tcp_hdr = struct.unpack('!HHLLBBHHH', tcp_header)
|
||||
# 获取源地址、源端口号、目标地址、目标端口等信息
|
||||
src_ip, dest_ip = map(lambda x: socket.inet_ntoa(x), ip_hdr[8:10])
|
||||
src_port, dest_port = tcp_hdr[0], tcp_hdr[1]
|
||||
# 获取数据包类型和长度
|
||||
packet_type = socket.htons(ip_hdr[6])
|
||||
packet_len = len(packet)
|
||||
# 获取TCP标志位、序号、确认号、部分数据等信息
|
||||
seq, ack, flags = tcp_hdr[2], tcp_hdr[3], tcp_hdr[5]
|
||||
data = packet[54:]
|
||||
# 如果过滤的参数[源地址、源端口等]为空,则不过滤
|
||||
# 各个过滤参数之间为 `且` 的关系
|
||||
green_light = True
|
||||
if src_ips and src_ip not in src_ips:
|
||||
green_light = False
|
||||
if src_ports and src_port not in src_ports:
|
||||
green_light = False
|
||||
if dest_ips and dest_ip not in dest_ips:
|
||||
green_light = False
|
||||
if dest_ports and dest_port not in dest_ports:
|
||||
green_light = False
|
||||
if not green_light:
|
||||
continue
|
||||
|
||||
results = [
|
||||
f'[{interface}][{local_now_display()}] {src_ip}:{src_port} -> '
|
||||
f'{dest_ip}:{dest_port} ({packet_type}, {packet_len} bytes)',
|
||||
f'\tFlags: {flags} Seq: {seq}, Ack: {ack}', f'\tData: {data}'
|
||||
]
|
||||
for r in results:
|
||||
await display(r)
|
||||
_ETHERTYPE_IPV4 = 0x0800
|
||||
_PACKET_ALL = 0x0003
|
||||
_READ_TIMEOUT = 0.25
|
||||
_STOP_TIMEOUT = 1
|
||||
|
||||
|
||||
def list_show(items, default='all'):
|
||||
return ','.join(map(str, items)) or default
|
||||
|
||||
|
||||
async def verbose_tcpdump(interfaces, src_ips, src_ports, dest_ips, dest_ports, display=None):
|
||||
def _build_or_filter(template, values):
|
||||
if not values:
|
||||
return ''
|
||||
clauses = [template.format(value=value) for value in values]
|
||||
if len(clauses) == 1:
|
||||
return clauses[0]
|
||||
return f"({' or '.join(clauses)})"
|
||||
|
||||
|
||||
def build_tcpdump_filter(src_ips, src_ports, dest_ips, dest_ports):
|
||||
filters = [
|
||||
_build_or_filter('src host {value}', src_ips),
|
||||
_build_or_filter('src port {value}', src_ports),
|
||||
_build_or_filter('dst host {value}', dest_ips),
|
||||
_build_or_filter('dst port {value}', dest_ports),
|
||||
]
|
||||
return ' and '.join(filter(None, filters))
|
||||
|
||||
|
||||
def _format_endpoint(ip, port):
|
||||
try:
|
||||
service = socket.getservbyport(port, 'tcp')
|
||||
except OSError:
|
||||
service = str(port)
|
||||
return f'{ip}.{service}'
|
||||
|
||||
|
||||
def _format_tcp_flags(flags):
|
||||
mapping = (
|
||||
(0x01, 'F'),
|
||||
(0x02, 'S'),
|
||||
(0x04, 'R'),
|
||||
(0x08, 'P'),
|
||||
(0x10, '.'),
|
||||
(0x20, 'U'),
|
||||
(0x40, 'E'),
|
||||
(0x80, 'W'),
|
||||
)
|
||||
text = ''.join(symbol for bit, symbol in mapping if flags & bit)
|
||||
return text or 'none'
|
||||
|
||||
|
||||
def _format_emulated_line(src_ip, src_port, dest_ip, dest_port, seq, ack, flags, win, payload_len):
|
||||
now = time.time()
|
||||
timestamp = time.strftime('%H:%M:%S', time.localtime(now))
|
||||
micros = int((now % 1) * 1_000_000)
|
||||
details = [f'Flags [{_format_tcp_flags(flags)}]']
|
||||
if payload_len:
|
||||
details.append(f'seq {seq}:{seq + payload_len}')
|
||||
elif flags & 0x07:
|
||||
details.append(f'seq {seq}')
|
||||
if flags & 0x10:
|
||||
details.append(f'ack {ack}')
|
||||
details.append(f'win {win}')
|
||||
details.append(f'length {payload_len}')
|
||||
return (
|
||||
f'{timestamp}.{micros:06d} IP '
|
||||
f'{_format_endpoint(src_ip, src_port)} > {_format_endpoint(dest_ip, dest_port)}: '
|
||||
f"{', '.join(details)}"
|
||||
)
|
||||
|
||||
|
||||
async def once_tcpdump_emulated(
|
||||
interface, src_ips, src_ports, dest_ips, dest_ports, display, stop_event
|
||||
):
|
||||
loop = asyncio.get_running_loop()
|
||||
sock = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.ntohs(_PACKET_ALL))
|
||||
sock.bind((interface, 0))
|
||||
sock.setblocking(False)
|
||||
try:
|
||||
while not stop_event.is_set():
|
||||
try:
|
||||
packet = await asyncio.wait_for(loop.sock_recv(sock, 65535), timeout=_READ_TIMEOUT)
|
||||
except asyncio.TimeoutError:
|
||||
continue
|
||||
|
||||
if len(packet) < 54:
|
||||
continue
|
||||
|
||||
ether_type = struct.unpack('!H', packet[12:14])[0]
|
||||
if ether_type != _ETHERTYPE_IPV4:
|
||||
continue
|
||||
|
||||
version_ihl = packet[14]
|
||||
if version_ihl >> 4 != 4:
|
||||
continue
|
||||
ip_header_len = (version_ihl & 0x0F) * 4
|
||||
tcp_offset = 14 + ip_header_len
|
||||
if len(packet) < tcp_offset + 20:
|
||||
continue
|
||||
|
||||
ip_header = packet[14:14 + ip_header_len]
|
||||
ip_hdr = struct.unpack('!BBHHHBBH4s4s', ip_header[:20])
|
||||
if ip_hdr[6] != socket.IPPROTO_TCP:
|
||||
continue
|
||||
|
||||
tcp_header = packet[tcp_offset:tcp_offset + 20]
|
||||
tcp_hdr = struct.unpack('!HHLLBBHHH', tcp_header)
|
||||
src_ip, dest_ip = map(socket.inet_ntoa, ip_hdr[8:10])
|
||||
src_port, dest_port = tcp_hdr[0], tcp_hdr[1]
|
||||
|
||||
if src_ips and src_ip not in src_ips:
|
||||
continue
|
||||
if src_ports and src_port not in src_ports:
|
||||
continue
|
||||
if dest_ips and dest_ip not in dest_ips:
|
||||
continue
|
||||
if dest_ports and dest_port not in dest_ports:
|
||||
continue
|
||||
|
||||
tcp_header_len = (tcp_hdr[4] >> 4) * 4
|
||||
total_length = ip_hdr[2]
|
||||
payload_len = max(total_length - ip_header_len - tcp_header_len, 0)
|
||||
await display(
|
||||
_format_emulated_line(
|
||||
src_ip, src_port, dest_ip, dest_port,
|
||||
tcp_hdr[2], tcp_hdr[3], tcp_hdr[5], tcp_hdr[6], payload_len
|
||||
)
|
||||
)
|
||||
finally:
|
||||
sock.close()
|
||||
|
||||
|
||||
async def once_tcpdump(
|
||||
interface, src_ips, src_ports, dest_ips, dest_ports, display, stop_event, tcpdump_path
|
||||
):
|
||||
command = [tcpdump_path, '-l', '-i', interface]
|
||||
capture_filter = build_tcpdump_filter(src_ips, src_ports, dest_ips, dest_ports)
|
||||
if capture_filter:
|
||||
command.append(capture_filter)
|
||||
|
||||
process = await asyncio.create_subprocess_exec(
|
||||
*command,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.STDOUT,
|
||||
)
|
||||
wait_task = asyncio.create_task(process.wait())
|
||||
|
||||
try:
|
||||
while not stop_event.is_set():
|
||||
try:
|
||||
line = await asyncio.wait_for(process.stdout.readline(), timeout=_READ_TIMEOUT)
|
||||
except asyncio.TimeoutError:
|
||||
if wait_task.done():
|
||||
break
|
||||
continue
|
||||
|
||||
if not line:
|
||||
if wait_task.done():
|
||||
break
|
||||
continue
|
||||
|
||||
message = line.decode(errors='replace').rstrip()
|
||||
if message:
|
||||
await display(message)
|
||||
finally:
|
||||
if process.returncode is None:
|
||||
process.terminate()
|
||||
try:
|
||||
await asyncio.wait_for(wait_task, timeout=_STOP_TIMEOUT)
|
||||
except asyncio.TimeoutError:
|
||||
process.kill()
|
||||
await wait_task
|
||||
else:
|
||||
await wait_task
|
||||
|
||||
if process.returncode and not stop_event.is_set():
|
||||
await display(f'Error: tcpdump exited with status {process.returncode}')
|
||||
|
||||
|
||||
def _get_valid_interfaces(interfaces):
|
||||
all_interfaces = netifaces.interfaces()
|
||||
if not interfaces:
|
||||
return all_interfaces
|
||||
requested = set(interfaces)
|
||||
return [name for name in all_interfaces if name in requested]
|
||||
|
||||
|
||||
async def verbose_tcpdump(
|
||||
interfaces, src_ips, src_ports, dest_ips, dest_ports, display=None, stop_event=None
|
||||
):
|
||||
if not display:
|
||||
return
|
||||
|
||||
stop_event = asyncio.Event()
|
||||
valid_interface = netifaces.interfaces()
|
||||
if interfaces:
|
||||
valid_interface = set(netifaces.interfaces()) & set(interfaces)
|
||||
stop_event = stop_event or asyncio.Event()
|
||||
valid_interfaces = _get_valid_interfaces(interfaces)
|
||||
if interfaces and not valid_interfaces:
|
||||
await display('Error: no valid network interface was selected')
|
||||
return
|
||||
|
||||
src_ips = generate_ips(src_ips)
|
||||
src_ports = generate_ports(src_ports)
|
||||
@@ -80,19 +215,48 @@ async def verbose_tcpdump(interfaces, src_ips, src_ports, dest_ips, dest_ports,
|
||||
dest_ports = generate_ports(dest_ports)
|
||||
|
||||
summary = [
|
||||
f"[Summary] Tcpdump filter info: ",
|
||||
f"Interface: [{list_show(valid_interface)}]",
|
||||
f"Source address: [{list_show(src_ips)}]",
|
||||
f"source port: [{list_show(src_ports)}]",
|
||||
f"Destination address: [{list_show(dest_ips)}]",
|
||||
f"Destination port: [{list_show(dest_ports)}]",
|
||||
'[Summary] Tcpdump filter info: ',
|
||||
f'Interface: [{list_show(valid_interfaces)}]',
|
||||
f'Source address: [{list_show(src_ips)}]',
|
||||
f'source port: [{list_show(src_ports)}]',
|
||||
f'Destination address: [{list_show(dest_ips)}]',
|
||||
f'Destination port: [{list_show(dest_ports)}]',
|
||||
]
|
||||
for s in summary:
|
||||
await display(s)
|
||||
for line in summary:
|
||||
await display(line)
|
||||
|
||||
params = [src_ips, src_ports, dest_ips, dest_ports, display, stop_event]
|
||||
tasks = [
|
||||
asyncio.create_task(once_tcpdump(i, *params)) for i in valid_interface
|
||||
]
|
||||
await asyncio.gather(*tasks)
|
||||
stop_event.set()
|
||||
tcpdump_path = shutil.which('tcpdump')
|
||||
if not tcpdump_path:
|
||||
await display('[Warning] tcpdump command not found, using limited built-in capture mode')
|
||||
tasks = [
|
||||
asyncio.create_task(
|
||||
once_tcpdump_emulated(
|
||||
interface, src_ips, src_ports, dest_ips, dest_ports, display, stop_event
|
||||
)
|
||||
)
|
||||
for interface in valid_interfaces
|
||||
]
|
||||
else:
|
||||
tasks = [
|
||||
asyncio.create_task(
|
||||
once_tcpdump(
|
||||
interface, src_ips, src_ports, dest_ips, dest_ports,
|
||||
display, stop_event, tcpdump_path
|
||||
)
|
||||
)
|
||||
for interface in valid_interfaces
|
||||
]
|
||||
|
||||
try:
|
||||
await asyncio.gather(*tasks)
|
||||
except asyncio.CancelledError:
|
||||
stop_event.set()
|
||||
raise
|
||||
finally:
|
||||
stop_event.set()
|
||||
for task in tasks:
|
||||
if not task.done():
|
||||
task.cancel()
|
||||
for task in tasks:
|
||||
with contextlib.suppress(asyncio.CancelledError):
|
||||
await task
|
||||
|
||||
@@ -41,6 +41,7 @@ TASK_STATUS_IS_OVER = 'OVER'
|
||||
|
||||
class ToolsWebsocket(AsyncJsonWebsocketConsumer, OrgMixin):
|
||||
is_closed: bool = False
|
||||
tcpdump_stop_event = None
|
||||
|
||||
@staticmethod
|
||||
@sync_to_async
|
||||
@@ -98,7 +99,8 @@ class ToolsWebsocket(AsyncJsonWebsocketConsumer, OrgMixin):
|
||||
'dest_ips': dest_ips, 'dest_ports': dest_ports
|
||||
}
|
||||
logger.info(f'Receive request tcpdump: {params}')
|
||||
await verbose_tcpdump(display=self.send_msg, **params)
|
||||
self.tcpdump_stop_event = asyncio.Event()
|
||||
await verbose_tcpdump(display=self.send_msg, stop_event=self.tcpdump_stop_event, **params)
|
||||
|
||||
async def imitate_traceroute(self, dest_ips):
|
||||
params = {'dest_ips': dest_ips}
|
||||
@@ -118,6 +120,8 @@ class ToolsWebsocket(AsyncJsonWebsocketConsumer, OrgMixin):
|
||||
async def close(self, code=None):
|
||||
if self.is_closed:
|
||||
return
|
||||
if self.tcpdump_stop_event and not self.tcpdump_stop_event.is_set():
|
||||
self.tcpdump_stop_event.set()
|
||||
await super().close(code)
|
||||
self.is_closed = True
|
||||
|
||||
|
||||
Reference in New Issue
Block a user