Revert "perf: to tcpdump"

This reverts commit 1413d57bfd.
This commit is contained in:
feng
2026-06-17 16:27:54 +08:00
committed by feng626
parent b6bbf237e2
commit a5021a3058
3 changed files with 75 additions and 244 deletions

View File

@@ -36,7 +36,6 @@ ARG TOOLS=" \
postgresql-client \
openssh-client \
sshpass \
tcpdump \
bubblewrap"
ARG APT_MIRROR=http://deb.debian.org

View File

@@ -1,213 +1,78 @@
import asyncio
import contextlib
import shutil
import netifaces
import socket
import struct
import time
import netifaces
from common.utils.timezone import local_now_display
from settings.utils import generate_ips, generate_ports
_ETHERTYPE_IPV4 = 0x0800
_PACKET_ALL = 0x0003
_READ_TIMEOUT = 0.25
_STOP_TIMEOUT = 1
async def once_tcpdump(
interface, src_ips, src_ports, dest_ips, dest_ports, display, stop_event
):
loop = asyncio.get_event_loop()
s = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.ntohs(0x0003))
s.bind((interface, 0))
s.setblocking(False)
while not stop_event.is_set():
try:
packet = await loop.sock_recv(s, 65535)
except BlockingIOError:
await asyncio.sleep(0.1)
# 解析IP数据包
ip_header = packet[14:34]
ip_hdr = struct.unpack('!BBHHHBBH4s4s', ip_header)
# 判断是否为TCP数据包
protocol = ip_hdr[6]
if protocol != 6:
continue
# 解析TCP数据包
tcp_header = packet[34:54]
tcp_hdr = struct.unpack('!HHLLBBHHH', tcp_header)
# 获取源地址、源端口号、目标地址、目标端口等信息
src_ip, dest_ip = map(lambda x: socket.inet_ntoa(x), ip_hdr[8:10])
src_port, dest_port = tcp_hdr[0], tcp_hdr[1]
# 获取数据包类型和长度
packet_type = socket.htons(ip_hdr[6])
packet_len = len(packet)
# 获取TCP标志位、序号、确认号、部分数据等信息
seq, ack, flags = tcp_hdr[2], tcp_hdr[3], tcp_hdr[5]
data = packet[54:]
# 如果过滤的参数[源地址、源端口等]为空,则不过滤
# 各个过滤参数之间为 `且` 的关系
green_light = True
if src_ips and src_ip not in src_ips:
green_light = False
if src_ports and src_port not in src_ports:
green_light = False
if dest_ips and dest_ip not in dest_ips:
green_light = False
if dest_ports and dest_port not in dest_ports:
green_light = False
if not green_light:
continue
results = [
f'[{interface}][{local_now_display()}] {src_ip}:{src_port} -> '
f'{dest_ip}:{dest_port} ({packet_type}, {packet_len} bytes)',
f'\tFlags: {flags} Seq: {seq}, Ack: {ack}', f'\tData: {data}'
]
for r in results:
await display(r)
def list_show(items, default='all'):
return ','.join(map(str, items)) or default
def _build_or_filter(template, values):
if not values:
return ''
clauses = [template.format(value=value) for value in values]
if len(clauses) == 1:
return clauses[0]
return f"({' or '.join(clauses)})"
def build_tcpdump_filter(src_ips, src_ports, dest_ips, dest_ports):
filters = [
_build_or_filter('src host {value}', src_ips),
_build_or_filter('src port {value}', src_ports),
_build_or_filter('dst host {value}', dest_ips),
_build_or_filter('dst port {value}', dest_ports),
]
return ' and '.join(filter(None, filters))
def _format_endpoint(ip, port):
try:
service = socket.getservbyport(port, 'tcp')
except OSError:
service = str(port)
return f'{ip}.{service}'
def _format_tcp_flags(flags):
mapping = (
(0x01, 'F'),
(0x02, 'S'),
(0x04, 'R'),
(0x08, 'P'),
(0x10, '.'),
(0x20, 'U'),
(0x40, 'E'),
(0x80, 'W'),
)
text = ''.join(symbol for bit, symbol in mapping if flags & bit)
return text or 'none'
def _format_emulated_line(src_ip, src_port, dest_ip, dest_port, seq, ack, flags, win, payload_len):
now = time.time()
timestamp = time.strftime('%H:%M:%S', time.localtime(now))
micros = int((now % 1) * 1_000_000)
details = [f'Flags [{_format_tcp_flags(flags)}]']
if payload_len:
details.append(f'seq {seq}:{seq + payload_len}')
elif flags & 0x07:
details.append(f'seq {seq}')
if flags & 0x10:
details.append(f'ack {ack}')
details.append(f'win {win}')
details.append(f'length {payload_len}')
return (
f'{timestamp}.{micros:06d} IP '
f'{_format_endpoint(src_ip, src_port)} > {_format_endpoint(dest_ip, dest_port)}: '
f"{', '.join(details)}"
)
async def once_tcpdump_emulated(
interface, src_ips, src_ports, dest_ips, dest_ports, display, stop_event
):
loop = asyncio.get_running_loop()
sock = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.ntohs(_PACKET_ALL))
sock.bind((interface, 0))
sock.setblocking(False)
try:
while not stop_event.is_set():
try:
packet = await asyncio.wait_for(loop.sock_recv(sock, 65535), timeout=_READ_TIMEOUT)
except asyncio.TimeoutError:
continue
if len(packet) < 54:
continue
ether_type = struct.unpack('!H', packet[12:14])[0]
if ether_type != _ETHERTYPE_IPV4:
continue
version_ihl = packet[14]
if version_ihl >> 4 != 4:
continue
ip_header_len = (version_ihl & 0x0F) * 4
tcp_offset = 14 + ip_header_len
if len(packet) < tcp_offset + 20:
continue
ip_header = packet[14:14 + ip_header_len]
ip_hdr = struct.unpack('!BBHHHBBH4s4s', ip_header[:20])
if ip_hdr[6] != socket.IPPROTO_TCP:
continue
tcp_header = packet[tcp_offset:tcp_offset + 20]
tcp_hdr = struct.unpack('!HHLLBBHHH', tcp_header)
src_ip, dest_ip = map(socket.inet_ntoa, ip_hdr[8:10])
src_port, dest_port = tcp_hdr[0], tcp_hdr[1]
if src_ips and src_ip not in src_ips:
continue
if src_ports and src_port not in src_ports:
continue
if dest_ips and dest_ip not in dest_ips:
continue
if dest_ports and dest_port not in dest_ports:
continue
tcp_header_len = (tcp_hdr[4] >> 4) * 4
total_length = ip_hdr[2]
payload_len = max(total_length - ip_header_len - tcp_header_len, 0)
await display(
_format_emulated_line(
src_ip, src_port, dest_ip, dest_port,
tcp_hdr[2], tcp_hdr[3], tcp_hdr[5], tcp_hdr[6], payload_len
)
)
finally:
sock.close()
async def once_tcpdump(
interface, src_ips, src_ports, dest_ips, dest_ports, display, stop_event, tcpdump_path
):
command = [tcpdump_path, '-l', '-i', interface]
capture_filter = build_tcpdump_filter(src_ips, src_ports, dest_ips, dest_ports)
if capture_filter:
command.append(capture_filter)
process = await asyncio.create_subprocess_exec(
*command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
)
wait_task = asyncio.create_task(process.wait())
try:
while not stop_event.is_set():
try:
line = await asyncio.wait_for(process.stdout.readline(), timeout=_READ_TIMEOUT)
except asyncio.TimeoutError:
if wait_task.done():
break
continue
if not line:
if wait_task.done():
break
continue
message = line.decode(errors='replace').rstrip()
if message:
await display(message)
finally:
if process.returncode is None:
process.terminate()
try:
await asyncio.wait_for(wait_task, timeout=_STOP_TIMEOUT)
except asyncio.TimeoutError:
process.kill()
await wait_task
else:
await wait_task
if process.returncode and not stop_event.is_set():
await display(f'Error: tcpdump exited with status {process.returncode}')
def _get_valid_interfaces(interfaces):
all_interfaces = netifaces.interfaces()
if not interfaces:
return all_interfaces
requested = set(interfaces)
return [name for name in all_interfaces if name in requested]
async def verbose_tcpdump(
interfaces, src_ips, src_ports, dest_ips, dest_ports, display=None, stop_event=None
):
async def verbose_tcpdump(interfaces, src_ips, src_ports, dest_ips, dest_ports, display=None):
if not display:
return
stop_event = stop_event or asyncio.Event()
valid_interfaces = _get_valid_interfaces(interfaces)
if interfaces and not valid_interfaces:
await display('Error: no valid network interface was selected')
return
stop_event = asyncio.Event()
valid_interface = netifaces.interfaces()
if interfaces:
valid_interface = set(netifaces.interfaces()) & set(interfaces)
src_ips = generate_ips(src_ips)
src_ports = generate_ports(src_ports)
@@ -215,48 +80,19 @@ async def verbose_tcpdump(
dest_ports = generate_ports(dest_ports)
summary = [
'[Summary] Tcpdump filter info: ',
f'Interface: [{list_show(valid_interfaces)}]',
f'Source address: [{list_show(src_ips)}]',
f'source port: [{list_show(src_ports)}]',
f'Destination address: [{list_show(dest_ips)}]',
f'Destination port: [{list_show(dest_ports)}]',
f"[Summary] Tcpdump filter info: ",
f"Interface: [{list_show(valid_interface)}]",
f"Source address: [{list_show(src_ips)}]",
f"source port: [{list_show(src_ports)}]",
f"Destination address: [{list_show(dest_ips)}]",
f"Destination port: [{list_show(dest_ports)}]",
]
for line in summary:
await display(line)
for s in summary:
await display(s)
tcpdump_path = shutil.which('tcpdump')
if not tcpdump_path:
await display('[Warning] tcpdump command not found, using limited built-in capture mode')
tasks = [
asyncio.create_task(
once_tcpdump_emulated(
interface, src_ips, src_ports, dest_ips, dest_ports, display, stop_event
)
)
for interface in valid_interfaces
]
else:
tasks = [
asyncio.create_task(
once_tcpdump(
interface, src_ips, src_ports, dest_ips, dest_ports,
display, stop_event, tcpdump_path
)
)
for interface in valid_interfaces
]
try:
await asyncio.gather(*tasks)
except asyncio.CancelledError:
stop_event.set()
raise
finally:
stop_event.set()
for task in tasks:
if not task.done():
task.cancel()
for task in tasks:
with contextlib.suppress(asyncio.CancelledError):
await task
params = [src_ips, src_ports, dest_ips, dest_ports, display, stop_event]
tasks = [
asyncio.create_task(once_tcpdump(i, *params)) for i in valid_interface
]
await asyncio.gather(*tasks)
stop_event.set()

View File

@@ -41,7 +41,6 @@ TASK_STATUS_IS_OVER = 'OVER'
class ToolsWebsocket(AsyncJsonWebsocketConsumer, OrgMixin):
is_closed: bool = False
tcpdump_stop_event = None
@staticmethod
@sync_to_async
@@ -99,8 +98,7 @@ class ToolsWebsocket(AsyncJsonWebsocketConsumer, OrgMixin):
'dest_ips': dest_ips, 'dest_ports': dest_ports
}
logger.info(f'Receive request tcpdump: {params}')
self.tcpdump_stop_event = asyncio.Event()
await verbose_tcpdump(display=self.send_msg, stop_event=self.tcpdump_stop_event, **params)
await verbose_tcpdump(display=self.send_msg, **params)
async def imitate_traceroute(self, dest_ips):
params = {'dest_ips': dest_ips}
@@ -120,8 +118,6 @@ class ToolsWebsocket(AsyncJsonWebsocketConsumer, OrgMixin):
async def close(self, code=None):
if self.is_closed:
return
if self.tcpdump_stop_event and not self.tcpdump_stop_event.is_set():
self.tcpdump_stop_event.set()
await super().close(code)
self.is_closed = True