From a5021a30585bb912481f31557a34bc827853a43d Mon Sep 17 00:00:00 2001 From: feng <1304903146@qq.com> Date: Wed, 17 Jun 2026 16:27:54 +0800 Subject: [PATCH] Revert "perf: to tcpdump" This reverts commit 1413d57bfd96c3f42cb09e6fda6cfb4dc02802b5. --- Dockerfile | 1 - apps/settings/tools/tcpdump.py | 312 ++++++++------------------------- apps/settings/ws.py | 6 +- 3 files changed, 75 insertions(+), 244 deletions(-) diff --git a/Dockerfile b/Dockerfile index 78f532f28..08f43dd09 100644 --- a/Dockerfile +++ b/Dockerfile @@ -36,7 +36,6 @@ ARG TOOLS=" \ postgresql-client \ openssh-client \ sshpass \ - tcpdump \ bubblewrap" ARG APT_MIRROR=http://deb.debian.org diff --git a/apps/settings/tools/tcpdump.py b/apps/settings/tools/tcpdump.py index c9f6cb09a..2b5412c15 100644 --- a/apps/settings/tools/tcpdump.py +++ b/apps/settings/tools/tcpdump.py @@ -1,213 +1,78 @@ import asyncio -import contextlib -import shutil +import netifaces import socket import struct -import time - -import netifaces +from common.utils.timezone import local_now_display from settings.utils import generate_ips, generate_ports -_ETHERTYPE_IPV4 = 0x0800 -_PACKET_ALL = 0x0003 -_READ_TIMEOUT = 0.25 -_STOP_TIMEOUT = 1 + +async def once_tcpdump( + interface, src_ips, src_ports, dest_ips, dest_ports, display, stop_event +): + loop = asyncio.get_event_loop() + s = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.ntohs(0x0003)) + s.bind((interface, 0)) + s.setblocking(False) + while not stop_event.is_set(): + try: + packet = await loop.sock_recv(s, 65535) + except BlockingIOError: + await asyncio.sleep(0.1) + # 解析IP数据包 + ip_header = packet[14:34] + ip_hdr = struct.unpack('!BBHHHBBH4s4s', ip_header) + # 判断是否为TCP数据包 + protocol = ip_hdr[6] + if protocol != 6: + continue + # 解析TCP数据包 + tcp_header = packet[34:54] + tcp_hdr = struct.unpack('!HHLLBBHHH', tcp_header) + # 获取源地址、源端口号、目标地址、目标端口等信息 + src_ip, dest_ip = map(lambda x: socket.inet_ntoa(x), ip_hdr[8:10]) + src_port, dest_port = tcp_hdr[0], tcp_hdr[1] + # 获取数据包类型和长度 + packet_type = socket.htons(ip_hdr[6]) + packet_len = len(packet) + # 获取TCP标志位、序号、确认号、部分数据等信息 + seq, ack, flags = tcp_hdr[2], tcp_hdr[3], tcp_hdr[5] + data = packet[54:] + # 如果过滤的参数[源地址、源端口等]为空,则不过滤 + # 各个过滤参数之间为 `且` 的关系 + green_light = True + if src_ips and src_ip not in src_ips: + green_light = False + if src_ports and src_port not in src_ports: + green_light = False + if dest_ips and dest_ip not in dest_ips: + green_light = False + if dest_ports and dest_port not in dest_ports: + green_light = False + if not green_light: + continue + + results = [ + f'[{interface}][{local_now_display()}] {src_ip}:{src_port} -> ' + f'{dest_ip}:{dest_port} ({packet_type}, {packet_len} bytes)', + f'\tFlags: {flags} Seq: {seq}, Ack: {ack}', f'\tData: {data}' + ] + for r in results: + await display(r) def list_show(items, default='all'): return ','.join(map(str, items)) or default -def _build_or_filter(template, values): - if not values: - return '' - clauses = [template.format(value=value) for value in values] - if len(clauses) == 1: - return clauses[0] - return f"({' or '.join(clauses)})" - - -def build_tcpdump_filter(src_ips, src_ports, dest_ips, dest_ports): - filters = [ - _build_or_filter('src host {value}', src_ips), - _build_or_filter('src port {value}', src_ports), - _build_or_filter('dst host {value}', dest_ips), - _build_or_filter('dst port {value}', dest_ports), - ] - return ' and '.join(filter(None, filters)) - - -def _format_endpoint(ip, port): - try: - service = socket.getservbyport(port, 'tcp') - except OSError: - service = str(port) - return f'{ip}.{service}' - - -def _format_tcp_flags(flags): - mapping = ( - (0x01, 'F'), - (0x02, 'S'), - (0x04, 'R'), - (0x08, 'P'), - (0x10, '.'), - (0x20, 'U'), - (0x40, 'E'), - (0x80, 'W'), - ) - text = ''.join(symbol for bit, symbol in mapping if flags & bit) - return text or 'none' - - -def _format_emulated_line(src_ip, src_port, dest_ip, dest_port, seq, ack, flags, win, payload_len): - now = time.time() - timestamp = time.strftime('%H:%M:%S', time.localtime(now)) - micros = int((now % 1) * 1_000_000) - details = [f'Flags [{_format_tcp_flags(flags)}]'] - if payload_len: - details.append(f'seq {seq}:{seq + payload_len}') - elif flags & 0x07: - details.append(f'seq {seq}') - if flags & 0x10: - details.append(f'ack {ack}') - details.append(f'win {win}') - details.append(f'length {payload_len}') - return ( - f'{timestamp}.{micros:06d} IP ' - f'{_format_endpoint(src_ip, src_port)} > {_format_endpoint(dest_ip, dest_port)}: ' - f"{', '.join(details)}" - ) - - -async def once_tcpdump_emulated( - interface, src_ips, src_ports, dest_ips, dest_ports, display, stop_event -): - loop = asyncio.get_running_loop() - sock = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.ntohs(_PACKET_ALL)) - sock.bind((interface, 0)) - sock.setblocking(False) - try: - while not stop_event.is_set(): - try: - packet = await asyncio.wait_for(loop.sock_recv(sock, 65535), timeout=_READ_TIMEOUT) - except asyncio.TimeoutError: - continue - - if len(packet) < 54: - continue - - ether_type = struct.unpack('!H', packet[12:14])[0] - if ether_type != _ETHERTYPE_IPV4: - continue - - version_ihl = packet[14] - if version_ihl >> 4 != 4: - continue - ip_header_len = (version_ihl & 0x0F) * 4 - tcp_offset = 14 + ip_header_len - if len(packet) < tcp_offset + 20: - continue - - ip_header = packet[14:14 + ip_header_len] - ip_hdr = struct.unpack('!BBHHHBBH4s4s', ip_header[:20]) - if ip_hdr[6] != socket.IPPROTO_TCP: - continue - - tcp_header = packet[tcp_offset:tcp_offset + 20] - tcp_hdr = struct.unpack('!HHLLBBHHH', tcp_header) - src_ip, dest_ip = map(socket.inet_ntoa, ip_hdr[8:10]) - src_port, dest_port = tcp_hdr[0], tcp_hdr[1] - - if src_ips and src_ip not in src_ips: - continue - if src_ports and src_port not in src_ports: - continue - if dest_ips and dest_ip not in dest_ips: - continue - if dest_ports and dest_port not in dest_ports: - continue - - tcp_header_len = (tcp_hdr[4] >> 4) * 4 - total_length = ip_hdr[2] - payload_len = max(total_length - ip_header_len - tcp_header_len, 0) - await display( - _format_emulated_line( - src_ip, src_port, dest_ip, dest_port, - tcp_hdr[2], tcp_hdr[3], tcp_hdr[5], tcp_hdr[6], payload_len - ) - ) - finally: - sock.close() - - -async def once_tcpdump( - interface, src_ips, src_ports, dest_ips, dest_ports, display, stop_event, tcpdump_path -): - command = [tcpdump_path, '-l', '-i', interface] - capture_filter = build_tcpdump_filter(src_ips, src_ports, dest_ips, dest_ports) - if capture_filter: - command.append(capture_filter) - - process = await asyncio.create_subprocess_exec( - *command, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.STDOUT, - ) - wait_task = asyncio.create_task(process.wait()) - - try: - while not stop_event.is_set(): - try: - line = await asyncio.wait_for(process.stdout.readline(), timeout=_READ_TIMEOUT) - except asyncio.TimeoutError: - if wait_task.done(): - break - continue - - if not line: - if wait_task.done(): - break - continue - - message = line.decode(errors='replace').rstrip() - if message: - await display(message) - finally: - if process.returncode is None: - process.terminate() - try: - await asyncio.wait_for(wait_task, timeout=_STOP_TIMEOUT) - except asyncio.TimeoutError: - process.kill() - await wait_task - else: - await wait_task - - if process.returncode and not stop_event.is_set(): - await display(f'Error: tcpdump exited with status {process.returncode}') - - -def _get_valid_interfaces(interfaces): - all_interfaces = netifaces.interfaces() - if not interfaces: - return all_interfaces - requested = set(interfaces) - return [name for name in all_interfaces if name in requested] - - -async def verbose_tcpdump( - interfaces, src_ips, src_ports, dest_ips, dest_ports, display=None, stop_event=None -): +async def verbose_tcpdump(interfaces, src_ips, src_ports, dest_ips, dest_ports, display=None): if not display: return - stop_event = stop_event or asyncio.Event() - valid_interfaces = _get_valid_interfaces(interfaces) - if interfaces and not valid_interfaces: - await display('Error: no valid network interface was selected') - return + stop_event = asyncio.Event() + valid_interface = netifaces.interfaces() + if interfaces: + valid_interface = set(netifaces.interfaces()) & set(interfaces) src_ips = generate_ips(src_ips) src_ports = generate_ports(src_ports) @@ -215,48 +80,19 @@ async def verbose_tcpdump( dest_ports = generate_ports(dest_ports) summary = [ - '[Summary] Tcpdump filter info: ', - f'Interface: [{list_show(valid_interfaces)}]', - f'Source address: [{list_show(src_ips)}]', - f'source port: [{list_show(src_ports)}]', - f'Destination address: [{list_show(dest_ips)}]', - f'Destination port: [{list_show(dest_ports)}]', + f"[Summary] Tcpdump filter info: ", + f"Interface: [{list_show(valid_interface)}]", + f"Source address: [{list_show(src_ips)}]", + f"source port: [{list_show(src_ports)}]", + f"Destination address: [{list_show(dest_ips)}]", + f"Destination port: [{list_show(dest_ports)}]", ] - for line in summary: - await display(line) + for s in summary: + await display(s) - tcpdump_path = shutil.which('tcpdump') - if not tcpdump_path: - await display('[Warning] tcpdump command not found, using limited built-in capture mode') - tasks = [ - asyncio.create_task( - once_tcpdump_emulated( - interface, src_ips, src_ports, dest_ips, dest_ports, display, stop_event - ) - ) - for interface in valid_interfaces - ] - else: - tasks = [ - asyncio.create_task( - once_tcpdump( - interface, src_ips, src_ports, dest_ips, dest_ports, - display, stop_event, tcpdump_path - ) - ) - for interface in valid_interfaces - ] - - try: - await asyncio.gather(*tasks) - except asyncio.CancelledError: - stop_event.set() - raise - finally: - stop_event.set() - for task in tasks: - if not task.done(): - task.cancel() - for task in tasks: - with contextlib.suppress(asyncio.CancelledError): - await task + params = [src_ips, src_ports, dest_ips, dest_ports, display, stop_event] + tasks = [ + asyncio.create_task(once_tcpdump(i, *params)) for i in valid_interface + ] + await asyncio.gather(*tasks) + stop_event.set() diff --git a/apps/settings/ws.py b/apps/settings/ws.py index 1cf45b2de..d59635b0f 100644 --- a/apps/settings/ws.py +++ b/apps/settings/ws.py @@ -41,7 +41,6 @@ TASK_STATUS_IS_OVER = 'OVER' class ToolsWebsocket(AsyncJsonWebsocketConsumer, OrgMixin): is_closed: bool = False - tcpdump_stop_event = None @staticmethod @sync_to_async @@ -99,8 +98,7 @@ class ToolsWebsocket(AsyncJsonWebsocketConsumer, OrgMixin): 'dest_ips': dest_ips, 'dest_ports': dest_ports } logger.info(f'Receive request tcpdump: {params}') - self.tcpdump_stop_event = asyncio.Event() - await verbose_tcpdump(display=self.send_msg, stop_event=self.tcpdump_stop_event, **params) + await verbose_tcpdump(display=self.send_msg, **params) async def imitate_traceroute(self, dest_ips): params = {'dest_ips': dest_ips} @@ -120,8 +118,6 @@ class ToolsWebsocket(AsyncJsonWebsocketConsumer, OrgMixin): async def close(self, code=None): if self.is_closed: return - if self.tcpdump_stop_event and not self.tcpdump_stop_event.is_set(): - self.tcpdump_stop_event.set() await super().close(code) self.is_closed = True