一、需求
1.通过SSH或者Telnet登录交换机
2.生成日志,保留交换机支持的登录接口
3.可以传入自己配置的交换机地址
二、总体思路整理
2.1 主要思路
1.解析待登录的交换机配置文件,生成初始状态字典
2.批量测试端口可用性,更新状态字典可用性字段,并写入日志文件
3.使用SSH或者Telnet登录交换机,获取配置(尝试登录【登录协议判断、尝试账号密码】、执行命令),更新状态字典
4.根据执行结果与状态字典,写入日志
2.2 解析配置文件
2.2.1引入模块
import os #获取路径
import sys #获取打包后exe文件所在文件夹路径
import re #正则匹配连接后回显内容,用于判断输入用户名还是密码
import shutil #下载待处理的交换机列表文件
import socket #判断端口是否开放
import time
from datetime import date
import argparse #获取并解析cmd窗口传入的参数
import logging #打印出netmiko框架运行日志
import threading # 多线程
import telnetlib #telnet连接交换机
import paramiko #ssh连接交换机paramiko的再次封装
from netmiko import ConnectHandler #netmiko 通过ssh连接交换机,是paramiko的再次封装
2.2.2 全局参数
lock = threading.Lock() #线程锁
# 获取当前路径父路径(所在文件夹)
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
#获取打包后exe文件路径(所在文件夹)
BASE_EXE_DIR = os.path.dirname(sys.executable)
2.2.3 读取cmd运行传参并解析
def parseArgs():
# 创建一个ArgumentParser对象
parser = argparse.ArgumentParser(description='这是一个命令行运行autoBackupSwitch程序的说明')
# 添加一个位置参数
parser.add_argument('--ips', '-i',type=str, default=os.path.join(BASE_DIR,'网络设备ip端口3.txt'),help='输入本地配置好网络设备ip端口文件')
# 添加一个可选参数
parser.add_argument('--download', '-d',action='store_true', help='下载默认的交换机列表文件模板')
# 解析命令行输入
#args = parser.parse_args(['-i','aaaa','-t','bbbbb'])
args = parser.parse_args()
if args.download:
shutil.copy(os.path.join(BASE_DIR,'网络设备ip端口3.txt'), BASE_EXE_DIR)
return args
parseArgs()
2.2.4 读取交换机地址端口文件
def parse_port_file( file_path=parseArgs().ips):
print("---开始解析端口文件---")
# 获取当日日期
today = date.today().strftime('%Y%m%d')
print(today)
# 数字格式[{'ip':ip,"协议":协议,"端口号":端口号,"位置":位置},....]
ip_port = []
if os.path.exists(file_path):
print(f"文件 {file_path} 存在")
with open(file_path, 'r', encoding='utf-8') as file:
lines = file.readlines()[1:]
for line in lines:
line = line.strip()
ip=line.split('\t')[0]
protocol=line.split('\t')[3]
port=line.split('\t')[2]
address=line.split('\t')[1]
isable=False
password_authentication_status=False
local_username=''
local_password=''
ip_port.append({'ip':ip,'protocol':protocol,'port':port,'address':address,'isable':isable,'password_authentication_status':password_authentication_status,'local_password':local_password,"local_username":local_username})
else:
print(f"文件 {file_path} 不存在")
# 创建一个名为example.txt的文件
print(ip_port)
return ip_port
2.3批量测试端口可用性
2.3.1 单个端口测试
def check_port(ip, port,ip_info):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(5)
try:
result = sock.connect_ex((ip, port))
if result == 0:
ip_info['isable']=True #网络正常,更新 交换机状态字典的端口状态字段
return True
else:
print(f'The port {port} on {ip} is closed.')
ip_info['isable'] = False #网络不通,更新 交换机状态字典的端口状态字段
return False
except Exception as e:
print(f"错误: {str(e)}")
ip_info['isable'] = False #出错,更新交换机状态字典的端口状态字段
return False
finally:
sock.close()
2.3.2 多线程多个端口批量测试
def bat_test_port(ip_port):
threads = []
for ip_info in ip_port:
ip=ip_info['ip']
port=ip_info['port']
thread = threading.Thread(target=check_port, args=(ip,int(port), ip_info))
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
return ip_port
2.4 使用SSH或者Telnet登录交换机
2.4.1 netmiko 使用SSH协议登录交换机
def try_ssh_connection_with_passwords(hostname, port, username, passwords, ip_info,timeout=60,compress=True):
"""
尝试使用密码列表连接到 SSH 服务器。
:param hostname: SSH 服务器的主机名或IP地址
:param port: SSH 服务器的端口号
:param username: 登录用户名
:param passwords: 包含密码的列表
:param timeout: 连接超时时间(秒)
:return: 成功连接后返回SSHClient对象,否则返回None
"""
# client = paramiko.SSHClient()
# client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
passwords_num=len(passwords)
for password in passwords:
try:
print(f"尝试使用密码连接(密码 {password})...")
# 设备连接信息 开启fast_cli快速模式,全局global_delay_factor延迟因子调小(默认值为1)
dev = {'device_type': 'terminal_server',
'host': hostname,
'username': username,
'password': password,
'port': port,
'timeout': 180,
'conn_timeout':180,##必须长,否则没有连接到就直接失败
'auth_timeout':180,
'fast_cli': True,
'global_delay_factor': 0.01,
'session_log': 'session.log',
}
client = ConnectHandler(**dev)
print("连接成功!")
ip_info['local_username'] = username
ip_info['local_password'] = password
ip_info['password_authentication_status'] = True
return client
except paramiko.ssh_exception.AuthenticationException:
print("密码错误,尝试下一个密码...")
except paramiko.ssh_exception.SSHException as e:
print(f"SSH 错误: {e}")
return None # 也可以选择继续尝试其他密码,但这里我们假设 SSH 错误是致命的
except Exception as e:
print(f"发生其他错误: {e}")
return None # 同样,这里可以处理异常或继续尝试
print("所有密码尝试均失败。")
return None
2.4.2 使用SSH协议登录交换机进行查询配置
def connet_port(ip_info,command_lists):
ip=ip_info['ip']
print(ip)
port=ip_info['port']
print(port)
passwords = ['xxxxx','*****']
isable=ip_info['isable']
password = '******'
username = '*****'
# username = 'root'
logging.basicConfig(level=logging.DEBUG)
if isable and port=='22':
print('开始使用SSH协议连接服务器')
try:
with try_ssh_connection_with_passwords(ip,port,username,passwords,ip_info) as ssh:
print(ssh)
print('SSH协议连接服务器成功')
time.sleep(40) ##必须休眠,有些交换机登录之后,提醒更改密码,休眠40s跳过选择
'''
Q:如何交互,显示回显内容,来确定选择Y/N
'''
'''
command_lists= ['screen-length 0 temporary','screen-length disable' ,'terminal length 0','more off','dis cu', 'show running-config']
'''
print('去除分屏状态')
for command in command_lists:
print('开始执行命令')
try:
output = ssh.send_command(command_string=command,cmd_verify=True,strip_command=True)
print(output)
bulid_result_file(ip_info, output) ##将结果写入单独文件
except Exception as e:
print(f'出错,{command}执行结束,原因{e}')
print('命令执行完毕')
except Exception:
print(f"{ip},连接失败")
return 0
# 关闭连接
2.4.3 写入结果文件
def bulid_result_file(ip,resp):
print("---开始生成交换机执行命令结果存在---")
result_name=ip['ip']+'_'+ip['address']+'_'+ip['protocol']+'_result.txt'
with lock:
with open(os.path.join(BASE_EXE_DIR, result_name), 'a+', encoding='utf-8') as file:
file.write(resp + "\n")
file.flush() # 确保数据被立即写入磁盘
2.5 telnet 登录并执行命令
if isable and port=='23':
with telnetlib.Telnet(ip, port) as tn:
tn.set_debuglevel(3)
login_putout = ''
while 1:
time.sleep(3)
first_login=tn.read_very_eager().decode('ascii')
login_putout+=first_login
# time.sleep(1)
if re.search(r'password:',login_putout.lower()):
tn.write(password.encode() + b"\n")
break
if re.search(r'username:|login:',login_putout.lower()):
tn.write( username.encode() + b"\n")
time.sleep(1)
continue
index,obj,output=tn.expect([b'>',b'#'],timeout=20)
if index==1:
ip_info['local_username'] = username
ip_info['local_password'] = password
ip_info['password_authentication_status'] = True
time.sleep(1)
tn.write( b"\n") #单独回车,获取提示符例如
prompt_pattern =tn.read_until(b'#', timeout=10).decode('ascii') #获取提示符
tn.write(b'screen-length 0 temporary' + b"\n")
tn.read_until(b'#', timeout=10)
tn.write(b'screen-length disable' + b"\n")
tn.read_until(b'#', timeout=10)
tn.write(b'terminal length 0' + b"\n")
tn.read_until(b'#', timeout=10)
tn.write(b'more off' + b"\n")
tn.read_until(b'#', timeout=10)
for command in command_lists:
tn.write(command.encode() + b"\n")
output = tn.read_until(prompt_pattern.encode('ascii')).decode('ascii')
print(output)
bulid_result_file(ip_info, output)
if index == 0:
ip_info['local_username'] = username
ip_info['local_password'] = password
ip_info['password_authentication_status'] = True
print('111111111111111')
# tn.write(b'ls -l' + b"\n")
# time.sleep(1)
# output=tn.read_very_eager().decode()
# # print('ooooooooooooooooooooooooooooooooo')
# print(output)
time.sleep(1)
tn.write( b"\n")
prompt_pattern =tn.read_until(b'>', timeout=10).decode('ascii')
tn.write(b'screen-length 0 temporary' + b"\n")
tn.read_until(b'>', timeout=10)
tn.write(b'screen-length disable' + b"\n")
tn.read_until(b'>', timeout=10)
tn.write(b'terminal length 0' + b"\n")
tn.read_until(b'>', timeout=10)
tn.write(b'more off' + b"\n")
tn.read_until(b'>', timeout=10)
for command in command_lists:
tn.write(command.encode() + b"\n")
output = tn.read_until(prompt_pattern.encode('ascii')).decode('ascii')
print(output)
bulid_result_file(ip_info, output)
if index == -1:
return None
2.6多线程执行获取配置结果
def bat_check_port():
ip_port=parse_port_file()
bat_test_port(ip_port)
print(ip_port)
time.sleep(10) ####必须休眠,端口测试进程没有结束,直接连接会出意外结果
# command_lists=['ls -a','show']
command_lists= ['screen-length 0 temporary','screen-length disable' ,'terminal length 0','more off','dis cu', 'show running-config']
threads = []
for ip_info in ip_port:
thread = threading.Thread(target=connet_port, args=(ip_info,command_lists))
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
build_log_file(ip_port)
2.7将结果写入文件
def build_log_file(ip_info):
print("---开始检测总日志是否存在---")
# 获取当日日期
today = date.today().strftime('%Y%m%d')
today_log_name = today + '_log.txt'
success_name = today + '_connect_success.txt'
fail_name = today + '_connect_fail.txt'
for ip in ip_info:
with lock:
with open(os.path.join(BASE_EXE_DIR, today_log_name), 'a+', encoding='utf-8') as file:
single_log = 'ip:' + ip['ip'] + "\t" + 'protocol:' + ip['protocol'] + "\t" + 'port:' + ip['port'] + "\t" + 'address:' + ip['address'] + "\t" + 'isable:' + str(ip['isable']) + "\t" + 'password_authentication_status:' + str(ip['password_authentication_status']) + "\t" + 'local_password:' + ip['local_password'] + "\t" + 'local_username:' + ip['local_username']
file.write(single_log + "\n")
file.flush() # 确保数据被立即写入磁盘
if ip['isable']:
with lock:
with open(os.path.join(BASE_EXE_DIR, success_name), 'a+', encoding='utf-8') as file:
single_log = 'ip:' + ip['ip'] + "\t" + 'protocol:' + ip['protocol'] + "\t" + 'port:' + ip['port'] +"\t" + 'address:' + ip['address'] + "\t" + 'isable:' + str(ip['isable']) + "\t" + 'password_authentication_status:' + str(ip['password_authentication_status']) + "\t" + 'local_password:' + ip['local_password'] + "\t" + 'local_username:' + ip['local_username']
file.write(single_log + "\n")
file.flush() # 确保数据被立即写入磁盘
if not ip['isable']:
with lock:
with open(os.path.join(BASE_EXE_DIR, fail_name), 'a+', encoding='utf-8') as file:
single_log = 'ip:' + ip['ip'] + "\t" + 'protocol:' + ip['protocol'] + "\t" + 'port:' + ip['port'] + "\t" + 'address:' + ip['address'] + "\t" + 'isable:' + str(ip['isable']) + "\t" + 'password_authentication_status:' + str(ip['password_authentication_status']) +"\t" + 'local_password:' + ip['local_password'] + "\t" + 'local_username:' + ip['local_username']
file.write(single_log + "\n")
file.flush() # 确保数据被立即写入磁盘
三、 全部代码
import argparse
import logging
import os
import re
import shutil
import socket
import sys
import telnetlib
import threading
import time
from concurrent.futures import ThreadPoolExecutor
from datetime import date, datetime, timedelta
import paramiko
from netmiko import ConnectHandler
from pip._internal.utils import datetime
lock = threading.Lock()
# 获取当前路径父路径(所在文件夹)
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
#获取打包后exe文件路径(所在文件夹)
BASE_EXE_DIR = os.path.dirname(sys.executable)
# BASE_EXE_DIR = os.path.dirname(os.path.abspath(__file__))
def parseArgs():
# 创建一个ArgumentParser对象
parser = argparse.ArgumentParser(description='这是一个命令行运行bat_check_port程序的说明')
# 添加一个位置参数
parser.add_argument('--ips', '-i',type=str, default=os.path.join(BASE_DIR,'网络设备ip端口3.txt'),help='输入本地配置好网络设备ip端口文件')
# 添加一个可选参数
parser.add_argument('--download', '-d',action='store_true', help='下载ip地址文件、发送手机号文件模板')
# 解析命令行输入
#args = parser.parse_args(['-i','aaaa','-t','bbbbb'])
args = parser.parse_args()
if args.download:
shutil.copy(os.path.join(BASE_DIR,'网络设备ip端口3.txt'), BASE_EXE_DIR)
return args
parseArgs()
def try_ssh_connection_with_passwords(hostname, port, username, passwords, ip_info,timeout=60,compress=True):
"""
尝试使用密码列表连接到 SSH 服务器。
:param hostname: SSH 服务器的主机名或IP地址
:param port: SSH 服务器的端口号
:param username: 登录用户名
:param passwords: 包含密码的列表
:param timeout: 连接超时时间(秒)
:return: 成功连接后返回SSHClient对象,否则返回None
"""
# client = paramiko.SSHClient()
# client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
passwords_num=len(passwords)
for password in passwords:
try:
print(f"尝试使用密码连接(密码 {password})...")
# 设备连接信息 开启fast_cli快速模式,全局global_delay_factor延迟因子调小(默认值为1)
dev = {'device_type': 'terminal_server',
'host': hostname,
'username': username,
'password': password,
'port': port,
'timeout': 180,
'conn_timeout':180,
'auth_timeout':180,
'fast_cli': True,
'global_delay_factor': 0.01,
'session_log': 'session.log',
}
client = ConnectHandler(**dev)
print("连接成功!")
ip_info['local_username'] = username
ip_info['local_password'] = password
ip_info['password_authentication_status'] = True
return client
except paramiko.ssh_exception.AuthenticationException:
print("密码错误,尝试下一个密码...")
except paramiko.ssh_exception.SSHException as e:
print(f"SSH 错误: {e}")
return None # 也可以选择继续尝试其他密码,但这里我们假设 SSH 错误是致命的
except Exception as e:
print(f"发生其他错误: {e}")
return None # 同样,这里可以处理异常或继续尝试
print("所有密码尝试均失败。")
return None
def parse_port_file( file_path=parseArgs().ips):
print("---开始解析端口文件---")
# 获取当日日期
today = date.today().strftime('%Y%m%d')
print(today)
# 数字格式[{'ip':ip,"协议":协议,"端口号":端口号,"位置":位置},....]
ip_port = []
if os.path.exists(file_path):
print(f"文件 {file_path} 存在")
with open(file_path, 'r', encoding='utf-8') as file:
lines = file.readlines()[1:]
for line in lines:
line = line.strip()
ip=line.split('\t')[0]
protocol=line.split('\t')[3]
port=line.split('\t')[2]
address=line.split('\t')[1]
isable=False
password_authentication_status=False
local_username=''
local_password=''
ip_port.append({'ip':ip,'protocol':protocol,'port':port,'address':address,'isable':isable,'password_authentication_status':password_authentication_status,'local_password':local_password,"local_username":local_username})
else:
print(f"文件 {file_path} 不存在")
# 创建一个名为example.txt的文件
print(ip_port)
return ip_port
def build_log_file(ip_info):
print("---开始检测总日志是否存在---")
# 获取当日日期
today = date.today().strftime('%Y%m%d')
today_log_name = today + '_log.txt'
success_name = today + '_connect_success.txt'
fail_name = today + '_connect_fail.txt'
for ip in ip_info:
with lock:
with open(os.path.join(BASE_EXE_DIR, today_log_name), 'a+', encoding='utf-8') as file:
single_log = 'ip:' + ip['ip'] + "\t" + 'protocol:' + ip['protocol'] + "\t" + 'port:' + ip['port'] + \
"\t" + 'address:' + ip['address'] + "\t" + 'isable:' + str(ip['isable']) + "\t" + \
'password_authentication_status:' + str(ip['password_authentication_status']) + \
"\t" + 'local_password:' + ip['local_password'] + "\t" + 'local_username:' + ip['local_username']
file.write(single_log + "\n")
file.flush() # 确保数据被立即写入磁盘
if ip['isable']:
with lock:
with open(os.path.join(BASE_EXE_DIR, success_name), 'a+', encoding='utf-8') as file:
single_log = 'ip:' + ip['ip'] + "\t" + 'protocol:' + ip['protocol'] + "\t" + 'port:' + ip['port'] + \
"\t" + 'address:' + ip['address'] + "\t" + 'isable:' + str(ip['isable']) + "\t" + \
'password_authentication_status:' + str(ip['password_authentication_status']) + \
"\t" + 'local_password:' + ip['local_password'] + "\t" + 'local_username:' + ip['local_username']
file.write(single_log + "\n")
file.flush() # 确保数据被立即写入磁盘
if not ip['isable']:
with lock:
with open(os.path.join(BASE_EXE_DIR, fail_name), 'a+', encoding='utf-8') as file:
single_log = 'ip:' + ip['ip'] + "\t" + 'protocol:' + ip['protocol'] + "\t" + 'port:' + ip['port'] + \
"\t" + 'address:' + ip['address'] + "\t" + 'isable:' + str(ip['isable']) + "\t" + \
'password_authentication_status:' + str(ip['password_authentication_status']) + \
"\t" + 'local_password:' + ip['local_password'] + "\t" + 'local_username:' + ip['local_username']
file.write(single_log + "\n")
file.flush() # 确保数据被立即写入磁盘
def bulid_result_file(ip,resp):
print("---开始生成交换机执行命令结果存在---")
result_name=ip['ip']+'_'+ip['address']+'_'+ip['protocol']+'_result.txt'
with lock:
with open(os.path.join(BASE_EXE_DIR, result_name), 'a+', encoding='utf-8') as file:
file.write(resp + "\n")
file.flush() # 确保数据被立即写入磁盘
def check_port(ip, port,ip_info):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(5)
try:
result = sock.connect_ex((ip, port))
if result == 0:
ip_info['isable']=True
return True
else:
print(f'The port {port} on {ip} is closed.')
ip_info['isable'] = False
return False
except Exception as e:
print(f"错误: {str(e)}")
ip_info['isable'] = False
return False
finally:
sock.close()
def connet_port(ip_info,command_lists):
ip=ip_info['ip']
print(ip)
port=ip_info['port']
print(port)
passwords = ['*****','*****']
isable=ip_info['isable']
password = 'xxxx'
username = '*****'
# username = 'root'
logging.basicConfig(level=logging.DEBUG)
if isable and port=='22':
print('开始使用SSH协议连接服务器')
try:
with try_ssh_connection_with_passwords(ip,port,username,passwords,ip_info) as ssh:
print(ssh)
print('SSH协议连接服务器成功')
time.sleep(40)
# 执行命令
# ssh.exec_command('screen-length 0 temporary',get_pty=True)
# # 执行命令
# ssh.exec_command('screen-length disable',get_pty=True)
# # 执行命令
# ssh.exec_command('terminal length 0',get_pty=True)
# # 执行命令
# ssh.exec_command('more off',get_pty=True)
print('去除分屏状态')
for command in command_lists:
print('开始执行命令')
try:
output = ssh.send_command(command_string=command,cmd_verify=True,strip_command=True)
print(output)
bulid_result_file(ip_info, output)
except Exception as e:
print(f'出错,{command}执行结束,原因{e}')
print('命令执行完毕')
except Exception:
print(f"{ip},连接失败")
return 0
# 关闭连接
if isable and port=='23':
print('iiiiiiiiiiiiiiiiii')
with telnetlib.Telnet(ip, port) as tn:
print(tn)
tn.set_debuglevel(0)
# print(tn.read_all().decode())
# tn.read_until(b'continue...\n')
# tn.write( b"\n")
login_putout = ''
while 1:
time.sleep(3)
first_login=tn.read_very_eager().decode('ascii')
login_putout+=first_login
# time.sleep(1)
if re.search(r'password:',login_putout.lower()):
tn.write(password.encode() + b"\n")
# tn.read_until(b"\n")
# tn.write(b"\n")
break
if re.search(r'username:|login:',login_putout.lower()):
tn.write( username.encode() + b"\n")
time.sleep(1)
# print(tn.read_very_eager())
continue
#
# # tn.read_until(b'login:',timeout=10)
# tn.write(b'root'+ b"\n")
# tn.read_until(b'Password:',timeout=10)
# tn.write(b'123456' + b"\n")
index,obj,output=tn.expect([b'>',b'#'],timeout=20)
# print(f'1111111----{index}')
# print(index)
if index==1:
ip_info['local_username'] = username
ip_info['local_password'] = password
ip_info['password_authentication_status'] = True
print('111111111111111')
# tn.write(b'ls -l' + b"\n")
# time.sleep(1)
# output=tn.read_very_eager().decode()
# # print('ooooooooooooooooooooooooooooooooo')
# print(output)
time.sleep(1)
tn.write( b"\n")
prompt_pattern =tn.read_until(b'#', timeout=10).decode('ascii')
tn.write(b'screen-length 0 temporary' + b"\n")
tn.read_until(b'#', timeout=10)
tn.write(b'screen-length disable' + b"\n")
tn.read_until(b'#', timeout=10)
tn.write(b'terminal length 0' + b"\n")
tn.read_until(b'#', timeout=10)
tn.write(b'more off' + b"\n")
tn.read_until(b'#', timeout=10)
for command in command_lists:
tn.write(command.encode() + b"\n")
output = tn.read_until(prompt_pattern.encode('ascii')).decode('ascii')
print(output)
bulid_result_file(ip_info, output)
if index == 0:
ip_info['local_username'] = username
ip_info['local_password'] = password
ip_info['password_authentication_status'] = True
print('111111111111111')
# tn.write(b'ls -l' + b"\n")
# time.sleep(1)
# output=tn.read_very_eager().decode()
# # print('ooooooooooooooooooooooooooooooooo')
# print(output)
time.sleep(1)
tn.write( b"\n")
prompt_pattern =tn.read_until(b'>', timeout=10).decode('ascii')
tn.write(b'screen-length 0 temporary' + b"\n")
tn.read_until(b'>', timeout=10)
tn.write(b'screen-length disable' + b"\n")
tn.read_until(b'>', timeout=10)
tn.write(b'terminal length 0' + b"\n")
tn.read_until(b'>', timeout=10)
tn.write(b'more off' + b"\n")
tn.read_until(b'>', timeout=10)
for command in command_lists:
tn.write(command.encode() + b"\n")
output = tn.read_until(prompt_pattern.encode('ascii')).decode('ascii')
print(output)
bulid_result_file(ip_info, output)
if index == -1:
return None
#
#
# # 从这里开始,tn可以用来发送命令和接收响应
#
# tn.write(b"pwd \n")
# print('8888888888888')
#
# print(tn.read_all().decode())
# print('qqqqqqqqq')
# tn.close()
def bat_test_port(ip_port):
threads = []
for ip_info in ip_port:
ip=ip_info['ip']
port=ip_info['port']
thread = threading.Thread(target=check_port, args=(ip,int(port), ip_info))
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
return ip_port
def bat_check_port():
ip_port=parse_port_file()
bat_test_port(ip_port)
print(ip_port)
time.sleep(10) ####必须休眠,端口测试进程没有结束,直接连接会出意外结果
# command_lists=['ls -a','show']
command_lists= ['screen-length 0 temporary','screen-length disable' ,'terminal length 0','more off','dis cu', 'show running-config']
threads = []
for ip_info in ip_port:
thread = threading.Thread(target=connet_port, args=(ip_info,command_lists))
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
build_log_file(ip_port)
if __name__ == '__main__':
bat_check_port()
四、打包
4.1 前置说明
4.1.1文件列表
- autoBackupSwicthesV2.py
- 网络设备ip端口3.txt
4.1.2 安装pyinstaller
pip install pyinstaller
4.2 打包生成.spec文件
pyinstaller -F autoBackupSwicthesV2.py
说明:1.生成autoBackupSwicthesV2.spec文件
4.3 使用.spec文件打包
# -*- mode: python ; coding: utf-8 -*-
a = Analysis(
['autoBackupSwicthesV2.py'],
pathex=[],
binaries=[],
datas=[("网络设备ip端口3.txt",".")],
hiddenimports=[],
hookspath=[],
hooksconfig={},
runtime_hooks=[],
excludes=[],
noarchive=False,
optimize=0,
)
pyz = PYZ(a.pure)
exe = EXE(
pyz,
a.scripts,
a.binaries,
a.datas,
[],
name='autoBackupSwicthesV2.5',
debug=False,
bootloader_ignore_signals=False,
strip=False,
upx=True,
upx_exclude=[],
runtime_tmpdir=None,
console=True,
disable_windowed_traceback=False,
argv_emulation=False,
target_arch=None,
codesign_identity=None,
entitlements_file=None,
)
执行pyinstaller .\autoBackupSwitchesV2.spec
五、其他说明
5.1 总结
socket编程
5.2 易错点/踩坑点
1.socket编程,一定要注意关闭连接; 本例中先测试端口联通性,再登录端口,有两次连接过程,如果没有断开,会报错,可以使用sleep休眠一段时间,再进行登录。
2.关闭本地防火墙,关闭本地防火墙,关闭本地防火墙。
3.多线程进行操作,会出各种问题,注意线程抢占资源问题,不可同时写入文件,会出现写入不全问题。
5.3 疑问
- 登录交换机之后出现 提醒更改密码,如何有效获取提示并输入命令?
拓展,如何交互式进行读取回显与写入命令,根据回显内容决定下一步命令? - 单行过长,会出现乱码,什么原因?
- 如何有效获取登录 输入命令前的提示符?