feat: telnet、ping支持批量测试

This commit is contained in:
jiangweidong
2023-08-29 17:02:51 +08:00
parent 904406c5c1
commit 1f2a4b0fb5
4 changed files with 76 additions and 71 deletions

View File

@@ -1,57 +1,53 @@
# -*- coding: utf-8 -*-
#
import asyncio
import socket
import telnetlib
from common.utils import lookup_domain
from settings.utils import generate_ips
PROMPT_REGEX = r'[\<|\[](.*)[\>|\]]'
async def telnet(dest_addr, port_number=23, timeout=10):
loop = asyncio.get_running_loop()
try:
reader, writer = await asyncio.wait_for(
asyncio.open_connection(dest_addr, port_number), timeout
)
connection = await loop.run_in_executor(None, telnetlib.Telnet, dest_addr, port_number, timeout)
except asyncio.TimeoutError:
return False, 'Timeout'
except (ConnectionRefusedError, OSError) as e:
except (ConnectionRefusedError, socket.timeout, socket.gaierror) as e:
return False, str(e)
try:
# 发送命令
writer.write(b"command\r\n")
await writer.drain()
# 读取响应
response = await reader.readuntil()
except asyncio.TimeoutError:
writer.close()
await writer.wait_closed()
return False, 'Timeout'
writer.close()
await writer.wait_closed()
return True, response.decode('utf-8', 'ignore')
expected_regexes = [bytes(PROMPT_REGEX, encoding='ascii')]
__, __, output = connection.expect(expected_regexes, timeout=3)
return True, output.decode('utf-8', 'ignore')
async def verbose_telnet(dest_ip, dest_port=23, timeout=10, display=None):
async def verbose_telnet(dest_ips, dest_port=23, timeout=10, display=None):
if not display:
return
ip, err = lookup_domain(dest_ip)
if not ip:
await display(err)
return
result = {}
ips = generate_ips(dest_ips)
await display(f'Total valid address: {len(ips)}\r\n')
for dest_ip in ips:
await display(f'Trying ({dest_ip}:{dest_port})')
try:
is_connective, resp = await telnet(dest_ip, dest_port, timeout)
if is_connective:
result[dest_ip] = 'ok'
msg = f'Connected to {dest_ip} {dest_port} {resp}.\r\n' \
f'Connection closed by foreign host.'
else:
result[dest_ip] = 'failed'
msg = f'Unable to connect to remote host\r\n' \
f'Reason: {resp}'
except Exception as e:
msg = 'Error: %s' % e
await display(f'{msg}\r\n')
await display(f'Trying {dest_ip} ({ip}:{dest_port})')
try:
is_connective, resp = await telnet(dest_ip, dest_port, timeout)
if is_connective:
msg = f'Connected to {dest_ip} {dest_port} {resp}.\r\n' \
f'Connection closed by foreign host.'
else:
msg = f'Unable to connect to remote host\r\n' \
f'Reason: {resp}'
except Exception as e:
msg = 'Error: %s' % e
await display(msg)
await display(f'----- Telnet statistics -----')
for k, v in result.items():
await display(f'{k}: {v}')
if __name__ == "__main__":