mirror of
https://github.com/jumpserver/jumpserver.git
synced 2026-07-02 07:01:30 +00:00
* perf: Update Dockerfile with new base image tag * perf: update uv lock * perf: update nmap * perf: remove nmap * perf: remove nmap * perf: remove unused tools * perf: update uv lock --------- Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> Co-authored-by: ibuler <ibuler@qq.com> Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: ibuler <17382885+ibuler@users.noreply.github.com>
130 lines
4.3 KiB
Python
130 lines
4.3 KiB
Python
import argparse
|
|
import asyncio
|
|
import socket
|
|
import time
|
|
|
|
from common.utils.timezone import local_now_display
|
|
from settings.utils import generate_ips
|
|
|
|
_SCANNER_VERSION = '1.0'
|
|
|
|
# Fallback service name table for platforms where getservbyport is unavailable
|
|
_KNOWN_SERVICES = {
|
|
21: 'ftp', 22: 'ssh', 23: 'telnet', 25: 'smtp', 53: 'domain',
|
|
80: 'http', 110: 'pop3', 135: 'msrpc', 139: 'netbios-ssn',
|
|
143: 'imap', 443: 'https', 445: 'microsoft-ds', 587: 'submission',
|
|
993: 'imaps', 995: 'pop3s', 1433: 'ms-sql-s', 1521: 'oracle',
|
|
3306: 'mysql', 3389: 'ms-wbt-server', 5432: 'postgresql',
|
|
5900: 'vnc', 6379: 'redis', 8080: 'http-proxy', 8443: 'https-alt',
|
|
27017: 'mongodb',
|
|
}
|
|
|
|
|
|
def _parse_ports(ports_str):
|
|
"""Parse '22,80,443' or '22-100' or a mix into a sorted list of ints."""
|
|
if not ports_str:
|
|
# mirror nmap's default: the 1000 most common ports; use 1-1024 as a
|
|
# reasonable approximation without requiring root privileges.
|
|
return list(range(1, 1025))
|
|
ports = []
|
|
for part in ports_str.split(','):
|
|
part = part.strip()
|
|
if '-' in part:
|
|
start, end = part.split('-', 1)
|
|
ports.extend(range(int(start), int(end) + 1))
|
|
else:
|
|
ports.append(int(part))
|
|
return sorted(set(ports))
|
|
|
|
|
|
def _service_name(port: int, proto: str = 'tcp') -> str:
|
|
try:
|
|
return socket.getservbyport(port, proto)
|
|
except OSError:
|
|
return _KNOWN_SERVICES.get(port, 'unknown')
|
|
|
|
|
|
async def _scan_tcp_port(ip: str, port: int, timeout: float) -> str:
|
|
"""Return 'open' or 'closed' for a single TCP port."""
|
|
try:
|
|
_, writer = await asyncio.wait_for(
|
|
asyncio.open_connection(ip, port), timeout=timeout
|
|
)
|
|
writer.close()
|
|
try:
|
|
await writer.wait_closed()
|
|
except Exception:
|
|
pass
|
|
return 'open'
|
|
except (asyncio.TimeoutError, ConnectionRefusedError, OSError):
|
|
return 'closed'
|
|
|
|
|
|
async def get_nmap_result(ip: str, ports_str, timeout) -> list[str]:
|
|
"""Scan *ip* and return formatted result lines (PORT / STATE / SERVICE)."""
|
|
timeout = float(timeout) if timeout else 1.0
|
|
ports = _parse_ports(ports_str)
|
|
|
|
states = await asyncio.gather(
|
|
*[_scan_tcp_port(ip, p, timeout) for p in ports]
|
|
)
|
|
|
|
lines = ['PORT\tSTATE\tSERVICE']
|
|
for port, state in zip(ports, states):
|
|
if state == 'open':
|
|
lines.append(f'{port}/tcp\t{state}\t{_service_name(port)}')
|
|
return lines
|
|
|
|
|
|
async def once_nmap(ip: str, ports_str, timeout, display) -> bool:
|
|
await display(f'Starting Nmap at {local_now_display()} for {ip}')
|
|
try:
|
|
results = await get_nmap_result(ip, ports_str, timeout)
|
|
for line in results:
|
|
await display(line)
|
|
is_ok = len(results) > 1 # at least one open port found
|
|
except Exception as err:
|
|
is_ok = False
|
|
await display(f'Error: {err}')
|
|
return is_ok
|
|
|
|
|
|
async def verbose_nmap(dest_ips, dest_ports=None, timeout=None, display=None):
|
|
if not display:
|
|
return
|
|
|
|
ips = generate_ips(dest_ips)
|
|
dest_port = ','.join(list(dest_ports)) if dest_ports else None
|
|
|
|
success_num, start_time = 0, time.time()
|
|
await display(f'[Summary] Nmap (v{_SCANNER_VERSION}): {len(ips)} addresses were scanned')
|
|
for ip in ips:
|
|
ok = await once_nmap(str(ip), dest_port, timeout, display)
|
|
if ok:
|
|
success_num += 1
|
|
await display()
|
|
await display(
|
|
f'[Done] Nmap: {len(ips)} IP addresses ({success_num} hosts up) '
|
|
f'scanned in {round(time.time() - start_time, 2)} seconds'
|
|
)
|
|
|
|
|
|
async def _main():
|
|
parser = argparse.ArgumentParser(description='Pure-Python TCP port scanner')
|
|
parser.add_argument('targets', nargs='+', help='IP / CIDR, e.g. 192.168.1.1 or 10.0.0.0/24')
|
|
parser.add_argument('-p', '--ports', default=None,
|
|
help='Ports to scan, e.g. 22,80,443 or 22-1024 (default: 1-1024)')
|
|
parser.add_argument('--timeout', type=float, default=1.0,
|
|
help='Per-port connect timeout in seconds (default: 1.0)')
|
|
args = parser.parse_args()
|
|
|
|
async def display(msg=''):
|
|
print(msg)
|
|
|
|
dest_ports = args.ports.split(',') if args.ports else None
|
|
await verbose_nmap(args.targets, dest_ports=dest_ports, timeout=args.timeout, display=display)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
asyncio.run(_main())
|