一、需求
1.检测交换机网络状态,当离线、恢复、不稳定时可以发送短信
2.生成日志,保留每次检测结果
3.可以传入自己配置的交换机地址、发送短信的手机号
二、具体开发思路
2.1 主要思路
1.读取文件获取ip地址池与手机号列表
2.初始化所有交换机ip地址池网络状态
3.遍历交换机ip地址池,进行ping操作,每个地址分配一个线程,根据ping结果更新ip地址池状态
4.遍历ip地址池,满足条件的ip地址向用户发送短信
5.休眠40s进行下一次的3,4步骤
2.2 读取文件获取ip地址池与手机号列表
2.2.1引入模块
import argparse # 接受cmd参数,解析文件地址,获取交换机地址发送短信的手机号
import shutil #shutil.copy()获取默认的配置信息(交换机地址、手机号)
import copy # 初始化网络结果,进行深拷贝
import os # 获取运行时临时文件夹路径,打包后exe文件路径
import sys # 获取打包后exe文件路径
import threading # 多线程进行ping操作
import time
from datetime import date, datetime, timedelta
import requests # 发送短信,通过接口发送post请求
import json # 短信接口 传参需要json类型
from ping3 import ping # ping测试网络状态
2. 2.2全局路径
# 获取当前路径父路径(所在文件夹)
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
#获取打包后exe文件路径(所在文件夹)
BASE_EXE_DIR = os.path.dirname(sys.executable)
2.2.3 读取cmd运行传参并解析
def parseArgs():
# 创建一个ArgumentParser对象
parser = argparse.ArgumentParser(description='这是一个命令行运行bat_ping程序的说明')
# 添加一个位置参数
parser.add_argument('--ips', '-i',type=str, default=os.path.join(BASE_DIR,'上联地址.txt'),help='输入本地配置好等待ping的ip地址文件 地址')
# 添加一个可选参数
parser.add_argument('--tels', '-t', type=str, default=os.path.join(BASE_DIR,'电话号码.txt'), help='输入配置好的电话和用户名文件 地址')
parser.add_argument('--download', '-d',action='store_true', help='下载ip地址文件、发送手机号文件模板')
# 解析命令行输入
#args = parser.parse_args(['-i','aaaa','-t','bbbbb'])
args = parser.parse_args()
if args.download:
shutil.copy(os.path.join(BASE_DIR,'ips_template.txt'), BASE_EXE_DIR)
shutil.copy(os.path.join(BASE_DIR, 'tels_template.txt'), BASE_EXE_DIR)
shutil.copy(os.path.join(BASE_DIR,'上联地址.txt'), BASE_EXE_DIR)
shutil.copy(os.path.join(BASE_DIR, '电话号码.txt'), BASE_EXE_DIR)
return args
2.2.4 读取交换机地址池文件
交换机地址池文件格式
ip地址<空格>所属机构
127.0.0.1 本机
def loadIP_txt(self,ip_path=parseArgs().ips):
with open(ip_path, 'r',encoding='utf-8') as file:
lines = file.readlines()
del_key=''
all_ip={}
for line in lines:
all_ip[line.replace('\n','').split(' ')[0]]=line.replace('\n','').split(' ')[1]
for k,v in all_ip.items():
if k[0]>'9' or k[0]<'0':
del_key=k
del all_ip[del_key]
print(all_ip)
return all_ip
2.2.5 读取手机号
手机号<空格>姓名
138xxxxx0841 ww
def parse_phone_file(self,file_path):
print("---开始解析电话号码配置文件---")
#获取当日日期
today=date.today().strftime('%Y%m%d')
print(today)
iphone_users=[]
if os.path.exists(file_path):
print(f"文件 {file_path} 存在")
with open(file_path, 'r', encoding='utf-8') as file:
lines = file.readlines()
for line in lines:
iphone_users.append(line.replace('\n', '').split(' ')[0])
else:
print(f"文件 {file_path} 不存在")
# 创建一个名为example.txt的文件
return iphone_users
2.3 初始化所有交换机ip地址池网络状态
#------------1.获取待测试的ip地址池---------------
ip_pools=self.loadIP_txt()
print('开始批量检测网络是否通畅')
#----------------2. 初始化ip地址状态------------
init_result={
"当前时间":'',
"位置":'',
"网络状态":'',
"连续失败次数":0,
"本日累计失败次数": 0,
"最后在线时间":''
}
#初始化连续网络测试结果{'127.0.0.1': 0, '185.199.109.153': 0, '32.107.255.230': 0, '32.107.255.226': 0}
#sendMsg={ip:init_result for ip in ip_pools.keys()} #这种初始化导致都是最后一个ip地址的状态
sendMsg = {ip: copy.deepcopy(init_result) for ip in ip_pools.keys()} #必须深拷贝
2.4 批量进行ping操作
2.4.1 ip地址单个ping操作
def ping_ip(self,ip,address,sendMsg):
result=ping(ip,timeout=5)
sendMsg[ip]["位置"] =address
if not sendMsg[ip]["当前时间"][0:10].startswith(str(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))[0:10]):
sendMsg[ip]["本日累计失败次数"] = 0
sendMsg[ip]["连续失败次数"] = 0
sendMsg[ip]["当前时间"] = str(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))
if result is None or result is False:
sendMsg[ip]["连续失败次数"]=sendMsg[ip]["连续失败次数"]+1
sendMsg[ip]["本日累计失败次数"]=sendMsg[ip]["本日累计失败次数"]+1
sendMsg[ip]["网络状态"]='网络故障'
if(sendMsg[ip]["连续失败次数"]<=3 and sendMsg[ip]["本日累计失败次数"]==10 ):
sendMsg[ip]["网络状态"] = '网络不稳定'
sys.stdout.write("ip地址"+ip+"地址"+address+"网络故障\n")
else:
if sendMsg[ip]["连续失败次数"]>=4:
sendMsg[ip]["网络状态"] = '网络断网恢复成功'
sendMsg[ip]["连续失败次数"]=0
else:
sendMsg[ip]["网络状态"]='网络正常'
sendMsg[ip]["连续失败次数"] = 0
sendMsg[ip]["最后在线时间"]=time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
sys.stdout.write("ip地址" + ip + "地址" + address + "网络正常\n")
self.build_log_txt(ip,sendMsg[ip]) #将当前状态字典写入日志文件中
2.4.2 多线程进行ping
while 1:
# 3.多线程处理ping操作
threads=[]
for ip,address in ip_pools.items():
thread=threading.Thread(target=self.ping_ip,args=(ip,address,sendMsg))
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
2.5 发送短信
2.5.1 发送单个ip状态短信,并写入日志文件
def sendCode(self,ip,sendMsg):
url='http://xxxxxxx/smsCode/xxxxx'
tels=parseArgs().tels
tels_lists=self.parse_phone_file(tels) # 获取手机号列表
for tel in tels_lists:
data={
"phone":tel,
"content":sendMsg
}
requests.post(url,headers={'Content-Type':'application/json'},data=json.dumps(data))
print("发送成功")
today = date.today().strftime('%Y%m%d')
today_log_name = today + '_log.txt'
#写入日志文件
with open(os.path.join(BASE_EXE_DIR,today_log_name), 'a+', encoding='utf-8') as file:
single_log='----开始发送短信---'
single_log=single_log+sendMsg
file.write(single_log+"\n")
2.5.2 遍历ip地址池,满足条件的ip地址向用户发送短信
#4.发送通知短信(断网、断网恢复、网络不稳定)
for ip, address in ip_pools.items():
if (sendMsg[ip]["连续失败次数"]>=4 and sendMsg[ip]["连续失败次数"]<=5) or sendMsg[ip]["网络状态"]=='网络断网恢复成功'or sendMsg[ip]["网络状态"]=='网络不稳定' or (sendMsg[ip]["连续失败次数"]>=240 and sendMsg[ip]["连续失败次数"]%240==0):
print(sendMsg[ip]["连续失败次数"]%240)
print(ip,sendMsg[ip])
sendsingleMsg = sendMsg[ip]
single_log = sendMsg[ip]["网络状态"]+"通知\n当前时间:" + sendsingleMsg['当前时间'] + "\n当前ip地址:" + ip + "\n位置:" + sendsingleMsg['位置'] + "\n网络状态:" + sendsingleMsg['网络状态'] + '\n连续失败次数:' + str(
sendsingleMsg['连续失败次数']) + "\n本日累计失败次数:" + str(sendsingleMsg['本日累计失败次数']) + "\n最后在线时间:" + sendsingleMsg['最后在线时间']
self.sendCode(ip,single_log)
2.5.3 休眠40s后进入下次批量处理
#4.休眠40s后进入下次批量处理
time.sleep(40)
2.6 生成、写入并定时清理日志文件
def build_log_txt(self,ip,log_content):
print("---开始检测本日日志是否存在---")
#获取当日日期
today=date.today().strftime('%Y%m%d')
print(today)
ten_day=(datetime.now()- timedelta(days=10)).strftime('%Y%m%d')
print(ten_day)
ten_day__log_name=ten_day+ '_log.txt'
today_log_name=today + '_log.txt'
if os.path.exists(os.path.join(BASE_EXE_DIR,today_log_name)):
print(f"文件 {today_log_name} 存在")
else:
print(f"文件 {today_log_name} 不存在")
# 创建一个名为example.txt的文件
with open(os.path.join(BASE_EXE_DIR,today_log_name), 'a+',encoding='utf-8') as file:
file.write(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())+"-----创建本日日志文件成功----\n")
if os.path.exists(os.path.join(BASE_EXE_DIR,ten_day__log_name)):
print(f"10天前日志文件 {ten_day__log_name} 存在,开始删除。")
os.remove(os.path.join(BASE_EXE_DIR,ten_day__log_name))
else:
print(f"10天前日志文件 {ten_day__log_name} 不存在")
with (open(os.path.join(BASE_EXE_DIR,today_log_name), 'a+', encoding='utf-8') as file):
single_log="当前时间"+log_content['当前时间']+",当前ip地址:"+ip+",位置:"+log_content['位置'] \
+",网络状态:"+log_content['网络状态']+',连续失败次数:'+str(log_content['连续失败次数'])\
+",本日累计失败次数:"+str(log_content['本日累计失败次数'])\
+",最后在线时间:"+log_content['最后在线时间']
file.write(single_log+"\n")
三、 全部代码
import argparse
import copy
import json
import optparse
import os
import shutil
import sys
import threading
import time
from concurrent.futures import ThreadPoolExecutor
from datetime import date, datetime, timedelta
import requests
from ping3 import ping
####模块
### 日志模块
##格式
### 当前日期时间{},ip地址{},位置{},网络状态{},连续失败次数{},本日失败次数{},最后在线时间{}
### pi
###文件读取写入模块
#
### 短信发送模块
## 格式: 当前时间:{},ip地址:{},位置:{} 已经断网{时常}。
### 多线程模块
### ping操作耗费时间,单线程运行很慢,为每个ip地址的ping操作分配一个线程
# Q&A :多线程进行写入日志文件,线程不安全,出现多个线程同时打开同一资源,访问拒绝错误
# :多线程进行cmd窗口打印,输出顺序错乱
###
#cmd单独配置ip地址
# 获取当前路径父路径(所在文件夹)
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
#获取打包后exe文件路径(所在文件夹)
BASE_EXE_DIR = os.path.dirname(sys.executable)
#获取cmd
user_perfer=sys.argv
def parseArgs():
# 创建一个ArgumentParser对象
parser = argparse.ArgumentParser(description='这是一个命令行运行bat_ping程序的说明')
# 添加一个位置参数
parser.add_argument('--ips', '-i',type=str, default=os.path.join(BASE_DIR,'上联地址.txt'),help='输入本地配置好等待ping的ip地址文件 地址')
# 添加一个可选参数
parser.add_argument('--tels', '-t', type=str, default=os.path.join(BASE_DIR,'电话号码.txt'), help='输入配置好的电话和用户名文件 地址')
parser.add_argument('--download', '-d',action='store_true', help='下载ip地址文件、发送手机号文件模板')
# 解析命令行输入
#args = parser.parse_args(['-i','aaaa','-t','bbbbb'])
args = parser.parse_args()
if args.download:
shutil.copy(os.path.join(BASE_DIR,'ips_template.txt'), BASE_EXE_DIR)
shutil.copy(os.path.join(BASE_DIR, 'tels_template.txt'), BASE_EXE_DIR)
shutil.copy(os.path.join(BASE_DIR,'上联地址.txt'), BASE_EXE_DIR)
shutil.copy(os.path.join(BASE_DIR, '电话号码.txt'), BASE_EXE_DIR)
return args
parseArgs()
class batch_ping:
# 读取ip地址配置文件,生成字典,格式{ip地址:位置}
# 去除不正确的ip地址
def logger(self,func):
def inner(*args, **kwargs):
self.build_log_txt(kwargs['log'])
ret=func(*args, **kwargs)
return ret
return inner
def loadIP_txt(self,ip_path=parseArgs().ips):
with open(ip_path, 'r',encoding='utf-8') as file:
lines = file.readlines()
del_key=''
all_ip={}
for line in lines:
# print(line.split(' '))
# ip_pools=line.split(' ')[0]
# actress_pools=line.split(' ')[1]
# print(ip_pools)
# print(actress_pools)
all_ip[line.replace('\n','').split(' ')[0]]=line.replace('\n','').split(' ')[1]
for k,v in all_ip.items():
if k[0]>'9' or k[0]<'0':
del_key=k
del all_ip[del_key]
print(all_ip)
return all_ip
def build_log_txt(self,ip,log_content):
print("---开始检测本日日志是否存在---")
#获取当日日期
today=date.today().strftime('%Y%m%d')
print(today)
ten_day=(datetime.now()- timedelta(days=10)).strftime('%Y%m%d')
print(ten_day)
ten_day__log_name=ten_day+ '_log.txt'
today_log_name=today + '_log.txt'
if os.path.exists(os.path.join(BASE_EXE_DIR,today_log_name)):
print(f"文件 {today_log_name} 存在")
else:
print(f"文件 {today_log_name} 不存在")
# 创建一个名为example.txt的文件
with open(os.path.join(BASE_EXE_DIR,today_log_name), 'a+',encoding='utf-8') as file:
file.write(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())+"-----创建本日日志文件成功----\n")
if os.path.exists(os.path.join(BASE_EXE_DIR,ten_day__log_name)):
print(f"10天前日志文件 {ten_day__log_name} 存在,开始删除。")
os.remove(os.path.join(BASE_EXE_DIR,ten_day__log_name))
else:
print(f"10天前日志文件 {ten_day__log_name} 不存在")
with (open(os.path.join(BASE_EXE_DIR,today_log_name), 'a+', encoding='utf-8') as file):
single_log="当前时间"+log_content['当前时间']+",当前ip地址:"+ip+",位置:"+log_content['位置'] \
+",网络状态:"+log_content['网络状态']+',连续失败次数:'+str(log_content['连续失败次数'])\
+",本日累计失败次数:"+str(log_content['本日累计失败次数'])\
+",最后在线时间:"+log_content['最后在线时间']
file.write(single_log+"\n")
def parse_phone_file(self,file_path):
print("---开始解析电话号码配置文件---")
#获取当日日期
today=date.today().strftime('%Y%m%d')
print(today)
iphone_users=[]
if os.path.exists(file_path):
print(f"文件 {file_path} 存在")
with open(file_path, 'r', encoding='utf-8') as file:
lines = file.readlines()
for line in lines:
iphone_users.append(line.replace('\n', '').split(' ')[0])
else:
print(f"文件 {file_path} 不存在")
# 创建一个名为example.txt的文件
return iphone_users
def ping_ip(self,ip,address,sendMsg):
result=ping(ip,timeout=5)
print(ip)
sendMsg[ip]["位置"] =address
if not sendMsg[ip]["当前时间"][0:10].startswith(str(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))[0:10]):
print("第二天了")
sendMsg[ip]["本日累计失败次数"] = 0
sendMsg[ip]["连续失败次数"] = 0
sendMsg[ip]["当前时间"] = str(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))
if result is None or result is False:
sendMsg[ip]["连续失败次数"]=sendMsg[ip]["连续失败次数"]+1
sendMsg[ip]["本日累计失败次数"]=sendMsg[ip]["本日累计失败次数"]+1
sendMsg[ip]["网络状态"]='网络故障'
if(sendMsg[ip]["连续失败次数"]<=3 and sendMsg[ip]["本日累计失败次数"]==10 ):
sendMsg[ip]["网络状态"] = '网络不稳定'
sys.stdout.write("ip地址"+ip+"地址"+address+"网络故障\n")
else:
if sendMsg[ip]["连续失败次数"]>=4:
sendMsg[ip]["网络状态"] = '网络断网恢复成功'
sendMsg[ip]["连续失败次数"]=0
else:
sendMsg[ip]["网络状态"]='网络正常'
sendMsg[ip]["连续失败次数"] = 0
sendMsg[ip]["最后在线时间"]=time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
sys.stdout.write("ip地址" + ip + "地址" + address + "网络正常\n")
self.build_log_txt(ip,sendMsg[ip])
def sendCode(self,ip,sendMsg):
url='http://xxxxxxxxxx/xxxxxx/xxxxxxx'
tels=parseArgs().tels
tels_lists=self.parse_phone_file(tels)
for tel in tels_lists:
data={
"phone":tel,
"content":sendMsg
}
print(data)
print(data)
requests.post(url,headers={'Content-Type':'application/json'},data=json.dumps(data))
print("发送成功")
today = date.today().strftime('%Y%m%d')
print(today)
today_log_name = today + '_log.txt'
with open(os.path.join(BASE_EXE_DIR,today_log_name), 'a+', encoding='utf-8') as file:
single_log='----开始发送短信---'
single_log=single_log+sendMsg
file.write(single_log+"\n")
def batch_ping(self):
#------------1.获取待测试的ip地址池---------------
ip_pools=self.loadIP_txt()
print('开始批量检测网络是否通畅')
#----------------2. 初始化ip地址状态------------
init_result={
"当前时间":'',
"位置":'',
"网络状态":'',
"连续失败次数":0,
"本日累计失败次数": 0,
"最后在线时间":''
}
#初始化连续网络测试结果{'127.0.0.1': { "当前时间":'',"位置":'',"网络状态":'',"连续失败次数":0, "本日累计失败次数": 0,"最后在线时间":''}, '185.199.109.153':{ "当前时间":'',"位置":'',"网络状态":'',"连续失败次数":0, "本日累计失败次数": 0,"最后在线时间":''}}
#sendMsg={ip:init_result for ip in ip_pools.keys()} #这种初始化导致都是最后一个ip地址的状态
sendMsg = {ip: copy.deepcopy(init_result) for ip in ip_pools.keys()}
# for ip,address in ip_pools.items():
# result_fail[ip]=0
#------------
while 1:
# 3.多线程处理ping操作
threads=[]
for ip,address in ip_pools.items():
thread=threading.Thread(target=self.ping_ip,args=(ip,address,sendMsg))
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
#result=ping(ip)
#4.发送通知短信(断网、断网恢复、网络不稳定)
for ip, address in ip_pools.items():
if (sendMsg[ip]["连续失败次数"]>=4 and sendMsg[ip]["连续失败次数"]<=5) or sendMsg[ip]["网络状态"]=='网络断网恢复成功'or sendMsg[ip]["网络状态"]=='网络不稳定' or (sendMsg[ip]["连续失败次数"]>=240 and sendMsg[ip]["连续失败次数"]%240==0):
print(sendMsg[ip]["连续失败次数"]%240)
print(ip,sendMsg[ip])
sendsingleMsg = sendMsg[ip]
single_log = sendMsg[ip]["网络状态"]+"通知\n当前时间:" + sendsingleMsg['当前时间'] + "\n当前ip地址:" + ip + "\n位置:" + \
sendsingleMsg['位置'] \
+ "\n网络状态:" + sendsingleMsg['网络状态'] + '\n连续失败次数:' + str(
sendsingleMsg['连续失败次数']) + "\n本日累计失败次数:" + str(sendsingleMsg['本日累计失败次数']) \
+ "\n最后在线时间:" + sendsingleMsg['最后在线时间']
self.sendCode(ip,single_log)
print(sendMsg)
#4.休眠40s后进入下次批量处理
time.sleep(40)
'''
Q&A:待优化的点
多线程导致写入日志文件线程不安全,导致print乱序 ---------使用第三方日志包
日志分类:失败日志、网络恢复日志、发送短信日志
加入cmd加入参数运行(如配置ip地址文件、发送手机号文件配置等)----argparse
加入ip地址池文件里面ip地址的校验、手机号码校验。 ------正则表达式校验
异常处理模块加入 --------
延伸思考:
spec文件打包详解
'''
if __name__ == '__main__':
bat=batch_ping()
bat.batch_ping()
四、打包
4.1 前置说明
4.1.1文件列表
- batch_pingsv3.py
- 上联地址.txt
- 电话号码.txt
4.1.2 安装pyinstaller
pip install pyinstaller
4.2 打包生成.spec文件
pyinstaller -F batch_pingsv3.py
说明:1.生成batch_pingsv3.spec文件
2.pyinstaller常用打包参数
参数 | 解释 |
---|---|
-D, –onedir | 打包成一个文件夹 |
-F, –onefile | 打包成一个可执行文件(exe) |
4.3 使用.spec文件打包
a = Analysis(
['batch_pingsv3.py'],
pathex=[],
binaries=[],
datas=[('上联地址.txt','.'),('电话号码.txt','.'),('ips_template.txt','.'),('tels_template.txt','.')],
hiddenimports=[],
hookspath=[],
hooksconfig={},
runtime_hooks=[],
excludes=[],
noarchive=False,
optimize=0,
)
pyz = PYZ(a.pure)
exe = EXE(
pyz,
a.scripts,
a.binaries,
a.datas,
[],
name='batch_pingsv3',
debug=False,
bootloader_ignore_signals=False,
strip=False,
upx=True,
console=True,
upx_exclude=[],
runtime_tmpdir=None,
disable_windowed_traceback=False,
argv_emulation=False,
target_arch=None,
codesign_identity=None,
entitlements_file=None,
icon='C:\\Users\\ww\\Desktop\\ping\\pythonProject2\\PING.ico'
)
主要修改说明:
修改datas=[('上联地址.txt','.'),('电话号码.txt','.'),('ips_template.txt','.'),('tels_template.txt','.')],
注意:存放位置为 ’ . ’ 时,表示为程序运行目录(windows:C:\Users\Administrator\AppData\Local\Temp_MEI190322,单文件模式运行时会自动解压到该目录)
console=True,
cmd窗口是否显示
runtime_tmpdir=None,
单文件模式
icon='C:\\Users\\ww\\Desktop\\ping\\pythonProject2\\PING.ico'
打包exe文件的图标地址
PyInstaller 完美打包 Python 脚本,输出结构清晰、便于二次编辑的打包程序 - 淳帅二代 - 博客园 (cnblogs.com)