python运维工具之SFTP上传服务器

一、需求说明

1.设计桌面程序,封装好链接,通过SFTP协议上传本地文件至服务器

2.检测本地文件的格式

3.获取服务器文件列表,下载至本地

二、总体思路整理

1.使用pyqt库设计,使用QT designer (.ui文件)设计程序元素

2.py引入ui文件,获取各个控件,绑定事件(获取文件地址、检测文件格式、上传服务器、获取服务器文件列表、下载文件),所有绑定事件内需要写入日志

2.1程序ui

ui设计

先小后大:先设计相关控件位置,再添加父容器,确定之间的布局关系,使用layoutStretch确定大小。

见名知意:控件设置名字

2.2 py程序

2.2.1 引入模块

import os.path # 获取临时文件夹的路径
import sys    #pyqt5的QApplication 需要参数
import chardet #检测txt编码(utf-8,GB2312,...)
import paramiko #SFTP协议的封装上传
from PyQt5 import uic, QtWidgets #解析QT Desinger 的ui文件
from PyQt5.QtCore import QStringListModel # QStringListModel 用于处理字符串列表的数据模型,它可以作为 QListView 的数据模型,在界面上显示和编辑字符串列表。
from PyQt5.QtWidgets import QMainWindow, QApplication, QFileDialog, QPushButton, QLineEdit, QStatusBar, QLabel, QScrollArea, QWidget, QPlainTextEdit, QRadioButton, QListView, QMessageBox #相关控件
import socket # 获取本机ip

2.2.2 全局变量

BASE_DIR = os.path.dirname(os.path.abspath(__file__))  #获取临时文件夹的路径
#远程服务器链接信息
host = 'xxxxxx'
port = 22
username = 'xxxx'
password = 'xxxx'
remote = '/tmp/'
ip = socket.gethostbyname(socket.gethostname())#获取运行电脑ip地址

2.2.3 引入ui文件

app = QApplication(sys.argv)
ui = uic.loadUi(os.path.join(BASE_DIR, 'untitled5.ui'))

2.2.4 获取控件

select_button: QPushButton = ui.find_address_button  #文件选择按钮
file_address_input: QLineEdit = ui.file_address     # 显示文件地址输入框
code_label = QLabel()  #状态栏显示编码,新建一个label
dfgzRadio: QRadioButton = ui.dfgz      #单选按钮,代发工资
szrmbRadio: QRadioButton = ui.szrmb  #单选按钮,数字人民币
check_button: QPushButton = ui.check_Button #检测按钮
my_download_remote_button: QPushButton = ui.download_remote_button  #下载远程文件按钮
my_fetch_remote_lists_button: QPushButton = ui.fetch_remote_lists_button #获取远程文件列表按钮
my_remote_list: QListView = ui.remote_lists  #远程文件列表显示控制
log_content_area: QPlainTextEdit = ui.log_content #日志显示区域控件
myupload_Button: QPushButton = ui.upload_Button # 文件上传按钮
myStatusBar: QStatusBar = ui.statusbar                #状态栏
#----状态栏显示ip开始----
ip_label = QLabel()
ip_info = '当前登录ip:' + ip
ip_label.setText(ip_info)
myStatusBar.addPermanentWidget(ip_label)
#----状态栏显示ip结束----
log_content_area.appendPlainText(f'{ip_label.text()}')  #日志区域添加当前ip信息

2.2.5 获取文件地址,绑定 文件选择按钮(点击),回显至 文件地址输入框,并检测文件编码

2.2.5.1 检测文件编码
def getTxtCode(txt_path, ):
    try:
        detector = chardet.UniversalDetector()
        with open(txt_path, 'rb') as f:
            for line in f:
                detector.feed(line)
                if detector.done:
                    break
            detector.close()
        f_charInfo=detector.result
        txt_code='utf-8'
        if f_charInfo['encoding'] =='GB2312':
            txt_code='GB18030'
        if f_charInfo['encoding'] is not None and f_charInfo['encoding'] != 'GB2312':
            txt_code=f_charInfo['encoding']
        if f_charInfo['encoding'] is  None :
            txt_code = '未知'
        return txt_code
        # return f_charInfo['encoding'] if f_charInfo['encoding'] is not None else '未知'
    except:
        if f:
            f.close()
            print('err')
            return '未知'
2.2.5.2 选择文件
def selectFile(file_address_input, log_content_area, code_label, myStatusBar):
    fd = QFileDialog()
    fd.setFileMode(QFileDialog.FileMode.ExistingFile)
    fd.setDirectory('C:\\')
    fd.setNameFilter('*.txt')
    if fd.exec():
        print(fd.selectedFiles())
        file_address_input.setText(fd.selectedFiles()[0])
        log_content_area.appendPlainText('---读取待上传的文件地址成功---')
        log_content_area.appendPlainText(fd.selectedFiles()[0])
        suffix = fd.selectedFiles()[0].split('/')[-1].split('.')[1]
        log_content_area.appendPlainText('---读取待上传的文件后缀---')
        suffix_content = '后缀为:' + suffix
        log_content_area.appendPlainText(suffix_content)
        code = getTxtCode(fd.selectedFiles()[0])    #调用文件编码检测函数
        log_content_area.appendPlainText('---读取待上传的文件编码---')
        code_content = '编码为:' + code
        log_content_area.appendPlainText(code_content)

        if code == 'ascii' or code == 'gb2312' or code == 'gbk' or code == 'GB2312' or code == 'utf-8':
            log_content_area.appendPlainText('文件编码符合规范')
        elif code == 'UTF-16':
            log_content_area.appendPlainText('文件编码不符合规范,请检查文件编码')
        else:
            log_content_area.appendPlainText('文件编码未知')
        code_label.setText(code)
        myStatusBar.addPermanentWidget(code_label)
2.2.5.3 文件选择按钮的绑定文件选择函数
select_button.clicked.connect(lambda: selectFile(file_address_input, log_content_area, code_label, myStatusBar))

2.2.6 检测文件格式

2.2.6.1 检测txt文件内容格式
def check_txt_content(dfgzRadio,pldbRadio,szrmbRadio,plxxwhRadio,code,file_address, log_content_area):
    if not dfgzRadio.isChecked() and not pldbRadio.isChecked() and not szrmbRadio.isChecked() and not plxxwhRadio.isChecked() :
        msgBox = QMessageBox()
        msgBox.setText("请先选择检测类型!!")
        msgBox.exec_()
        log_content_area.appendPlainText(f'请先选择检测类型!!')
        return
    if not os.path.isfile(file_address):
        msgBox = QMessageBox()
        msgBox.setText("选择的文件不存在,请检测文件地址")
        msgBox.exec_()
        log_content_area.appendPlainText(f'选择的文件不存在,请检测文件地址')
        return
    #代发工资检测---格式  1|6230662035xxxxxxxx|姓名|100.00
    ##常见错误2   包含其他字符 如字符串符号、空格
    ###常见错误2  行长度不足,使用|分割之后存在空白或者分割后的部分不足
    ###常见错误3 金额未填写,或者未保留2位数字
    ##常见错误4
    if dfgzRadio.isChecked():
        with open(file_address, mode='r',encoding=code) as f:
            log_content_area.appendPlainText('代发工资开始检测')
            fail_num=0
            for i, line in enumerate(f):
                if "'" in line or '"' in line or " " in line or "\t" in line or " " in line:
                    log_content_area.appendPlainText(f'第{i+1}行包含特殊字符:{line}')
                    fail_num=fail_num+1
                if len(line.split('|'))<4 or len(line.split('|'))>4:
                    log_content_area.appendPlainText(f'第{i + 1}行格式错误:{line}')
                    fail_num = fail_num + 1

                if len(line.replace('\n','').split('|')[-1].split('.')[-1]) <2 or len(line.replace('\n','').split('|')[-1].split('.')[-1]) >2:
                    log_content_area.appendPlainText(f'第{i + 1}行金额错误:{line}')
                    fail_num = fail_num + 1
            log_content_area.appendPlainText(f'代发工资文件检测结束,共发现{fail_num}处错误')

    # 数字人民币代发检测 格式:1|009102xxxxxx|100.00|TR05|C213|TF99|姓名|钱包名|1
    if szrmbRadio.isChecked():
        with open(file_address, mode='r',encoding=code) as f:
            fail_num_szrmb = 0
            for i, line in enumerate(f):
                if "'" in line or '"' in line or " " in line or "\t" in line or " " in line:
                    log_content_area.appendPlainText(f'第{i + 1}行包含特殊字符:{line}')
                    fail_num_szrmb=fail_num_szrmb+1
                    continue
                if len(line.split('|')) < 12 or len(line.split('|')) > 12:
                    print(len(line.split('|')))
                    log_content_area.appendPlainText(f'第{i + 1}行格式错误:{line}')
                    fail_num_szrmb = fail_num_szrmb + 1
                    continue
                if len(line.replace('\n', '').split('|')[2].split('.')[-1]) < 2 or len(
                        line.replace('\n', '').split('|')[2].split('.')[-1]) > 2:
                    log_content_area.appendPlainText(f'第{i + 1}行金额错误:{line}')
                    fail_num_szrmb = fail_num_szrmb + 1
        log_content_area.appendPlainText(f'数字人民币代发文件检测结束,共发现{fail_num_szrmb}处错误')
2.2.6.1 检测按钮绑定内容格式函数
check_button.clicked.connect(lambda: check_txt_content(dfgzRadio,pldbRadio,szrmbRadio,plxxwhRadio,code_label.text(),file_address_input.text(),log_content_area))

2.2.7 上传文件

2.2.7.1 SFTP连接函数
def connect_to_sftp(hostname, port, username, password,log_content_area):
    # 创建SSHClient对象
    try:
        ssh = paramiko.SSHClient()
        # 设置自动添加主机密钥
        ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        # 连接到服务器
        ssh.connect(hostname=hostname, port=port, username=username, password=password)
        # 创建SFTP对象
        sftp = ssh.open_sftp()
        # 返回SFTP对象
        return sftp
    except Exception as e:
        print('upload exception:', e)
        log_content_area.appendPlainText('连接服务器失败,连接尝试失败')
        msgBox = QMessageBox()
        msgBox.setText("连接服务器失败,连接尝试失败。!!")
        msgBox.exec_()
2.2.7.2 SFTP上传函数
def sftp_upload(host, port, username, password, local, remote, log_content_area):
    local_name = local.split('/')[-1]  # 获取文件名
    try:
        remote_file_path = remote +  local_name
        # remote_file_path = os.path.join(remote , local_name)
        print(remote_file_path)
        sftp=connect_to_sftp(host, port, username, password,log_content_area) #SFTP连接远程服务器
        # 获取本地文件大小
        local_file_size = os.stat(local).st_size
        print(local_file_size)
        print("local_file_size:{}".format(local_file_size))
        # 打印上传进度
        with open(local, 'rb') as f:
            def callback(transferred, local_file_size):
                percent = float(transferred) * 100 / local_file_size
                print("上传了 %.2f%% " % percent)
                log_content_area.appendPlainText("上传了 %.2f%% " % percent)

            # transfer 32768 bytes as SSH slice, put local file to remote
            sftp.putfo(f, remote_file_path, local_file_size, callback=callback)
        sftp.close()
        print('上传成功')
        log_content_area.appendPlainText('---文件上传成功---')
        log_content_area.appendPlainText('---文件上传至' + remote_file_path + '----')
    except Exception as e:
        print('upload exception:', e)
        fail_content = 'upload exception:' + str(e)
        log_content_area.appendPlainText(fail_content)
2.2.7.3 上传按钮绑定SFTP上传函数
myupload_Button.clicked.connect(
    lambda: sftp_upload(host, port, username, password, file_address_input.text(), remote, log_content_area))

2.2.8 获取服务器文件列表

2.2.8.1 获取远程文件列表按钮
def fecth_remote_list(host, port, username, password,remote,my_remote_list,log_content_area):
    log_content_area.appendPlainText('开始获取远程报表服务器下txt文件')
    try:
        sftp = connect_to_sftp(host, port, username, password, log_content_area)
        files=[x.filename for x in sorted(sftp.listdir_attr(remote), key=lambda f: f.st_mtime)]# 按修改时间排序
    except Exception as e:
        print('upload exception:', e)
        log_content_area.appendPlainText('获取远程文件列表失败')
    file_txts = []
    for file in files:
        if file.endswith('.txt'):
         file_txts.append(file)
    slm = QStringListModel()  # 创建model
    slm.setStringList(file_txts)  # 将数据设置到model
    my_remote_list.setModel(slm)
2.2.8.1 获取远程文件列表按钮 绑定 获取远程文件列表按钮
my_fetch_remote_lists_button.clicked.connect(lambda: fecth_remote_list(host, port, username, password,remote,my_remote_list,log_content_area))

2.2.9 下载远程文件

2.2.9.1 下载文件函数
def download_remote_list(host, port, username, password,remote,my_remote_list,log_content_area):
    selected = my_remote_list.currentIndex().data()  #获取选中的远程服务器文件名称
    if selected is  None:
        msgBox = QMessageBox()
        msgBox.setText("请先获取远程文件列表,并选择其中一项!!")
        msgBox.exec_()
        log_content_area.appendPlainText(f'未选择远程报表服务器上任何文件,请先选择')
    else:
        log_content_area.appendPlainText(f'选择远程报表服务器文件:{selected}')
        selected_file_path=os.path.join(remote, selected)
        sftp = connect_to_sftp(host, port, username, password,log_content_area)
        directory = QFileDialog.getExistingDirectory(None, "选取文件夹", "C:/")  # 起始路径
        local_path=os.path.join(directory,selected)
        log_content_area.appendPlainText(f'选择本地保持路径:{directory},开始下载{selected}')
        remote_file_size = sftp.stat(selected_file_path).st_size      # 获得远程文件大小
        print("remote_file_size:{}".format(remote_file_size))
        print("local_path:{}".format(local_path))
        # 打印下载进程
        with open(local_path, 'wb') as f:
            def callback(transferred, remote_file_size):
                print('local_path:--------')
                percent = float(transferred) * 100 / remote_file_size
                print("下载了 %.2f%% " % percent)
                log_content_area.appendPlainText("下载了 %.2f%% " % percent)
            # transfer 32768 bytes as SSH slice, get remote file to local
            sftp.getfo(selected_file_path, f, callback=callback)
2.2.9.1 下载文件按钮 绑定 下载文件函数
my_download_remote_button.clicked.connect(lambda: download_remote_list(host, port, username, password,remote,my_remote_list,log_content_area))

三、全部代码

import os.path
import sys

import chardet
import paramiko
from PyQt5 import uic
from PyQt5.QtCore import QStringListModel
from PyQt5.QtWidgets import  QApplication, QFileDialog, QPushButton, QLineEdit, QStatusBar, QLabel, QPlainTextEdit, QRadioButton, QListView, QMessageBox
import socket
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
host = 'xxxxxx'
port = 22
username = 'xxxx'
password = 'xxxx'
remote = '/tmp/'
# host='127.0.0.1'
# port = 26
# username = '0841'
# password = 'WW'
# local = ''
# remote ='/ping/'
def getIp():
    # 函数 gethostname() 返回当前正在执行 Python 的系统主机名
    res = socket.gethostbyname(socket.gethostname())
    print(res)
    return res
def getTxtCode(txt_path, ):
    try:
        detector = chardet.UniversalDetector()
        with open(txt_path, 'rb') as f:
            for line in f:
                detector.feed(line)
                if detector.done:
                    break
            detector.close()
        f_charInfo=detector.result
        txt_code='utf-8'
        if f_charInfo['encoding'] =='GB2312':
            txt_code='GB18030'
        if f_charInfo['encoding'] is not None and f_charInfo['encoding'] != 'GB2312':
            txt_code=f_charInfo['encoding']
        if f_charInfo['encoding'] is  None :
            txt_code = '未知'
        return txt_code
        # return f_charInfo['encoding'] if f_charInfo['encoding'] is not None else '未知'
    except:
        if f:
            f.close()
            print('err')
            return '未知'
def check_txt_content(dfgzRadio,pldbRadio,szrmbRadio,plxxwhRadio,code,file_address,log_content_area):
    if not dfgzRadio.isChecked() and not pldbRadio.isChecked() and not szrmbRadio.isChecked() and not plxxwhRadio.isChecked() :
        msgBox = QMessageBox()
        msgBox.setText("请先选择检测类型!!")
        msgBox.exec_()
        log_content_area.appendPlainText(f'请先选择检测类型!!')
        return
    if not os.path.isfile(file_address):
        msgBox = QMessageBox()
        msgBox.setText("选择的文件不存在,请检测文件地址")
        msgBox.exec_()
        log_content_area.appendPlainText(f'选择的文件不存在,请检测文件地址')
        return
    #代发工资检测---格式  1|6230662035xxxxxxxx|姓名|100.00
    ##常见错误2   包含其他字符 如字符串符号、空格
    ###常见错误2  行长度不足,使用|分割之后存在空白或者分割后的部分不足
    ###常见错误3 金额未填写,或者未保留2位数字
    ##常见错误4
    print('开始检测文件内容')
    if dfgzRadio.isChecked():
        print('代发工资开始检测')
        with open(file_address, mode='r',encoding=code) as f:

            log_content_area.appendPlainText('代发工资开始检测')
            fail_num=0
            for i, line in enumerate(f):
                if "'" in line or '"' in line or " " in line or "\t" in line or " " in line:
                    print(f'第{i+1}行包含特殊字符:{line}')
                    log_content_area.appendPlainText(f'第{i+1}行包含特殊字符:{line}')
                    fail_num=fail_num+1
                if len(line.split('|'))<4 or len(line.split('|'))>4:
                    print(f'第{i + 1}行格式错误:{line}')
                    log_content_area.appendPlainText(f'第{i + 1}行格式错误:{line}')
                    fail_num = fail_num + 1

                if len(line.replace('\n','').split('|')[-1].split('.')[-1]) <2 or len(line.replace('\n','').split('|')[-1].split('.')[-1]) >2:
                    print(f'第{i + 1}行金额错误:{line}')
                    log_content_area.appendPlainText(f'第{i + 1}行金额错误:{line}')
                    fail_num = fail_num + 1
            print('代发工资文件检测结束')
            log_content_area.appendPlainText(f'代发工资文件检测结束,共发现{fail_num}处错误')
    #----------批量调拨检测-----------
    if pldbRadio.isChecked():
        with open(file_address, mode='r',encoding='GB18030') as f:
                pass
    # 数字人民币代发检测 格式:1|009102xxxxxx|100.00|TR05|C213|TF99|姓名|钱包名|1
    if szrmbRadio.isChecked():
        print(f'数字人民币代发检测')
        with open(file_address, mode='r',encoding=code) as f:
            fail_num_szrmb = 0
            for i, line in enumerate(f):
                if "'" in line or '"' in line or " " in line or "\t" in line or " " in line:
                    print(f'第{i + 1}行包含特殊字符:{line}')
                    log_content_area.appendPlainText(f'第{i + 1}行包含特殊字符:{line}')
                    fail_num_szrmb=fail_num_szrmb+1
                    continue
                if len(line.split('|')) < 12 or len(line.split('|')) > 12:
                    print(len(line.split('|')))
                    print(f'第{i + 1}行格式错误:{line}')
                    log_content_area.appendPlainText(f'第{i + 1}行格式错误:{line}')
                    fail_num_szrmb = fail_num_szrmb + 1
                    continue

                if len(line.replace('\n', '').split('|')[2].split('.')[-1]) < 2 or len(
                        line.replace('\n', '').split('|')[2].split('.')[-1]) > 2:
                    print(f'第{i + 1}行金额错误:{line}')
                    log_content_area.appendPlainText(f'第{i + 1}行金额错误:{line}')
                    fail_num_szrmb = fail_num_szrmb + 1
        print('数字人民币代发文件检测结束')
        log_content_area.appendPlainText(f'数字人民币代发文件检测结束,共发现{fail_num_szrmb}处错误')

    #批量信息维护检测
    if plxxwhRadio.isChecked():
        with open(file_address, mode='r',encoding=code) as f:
            pass
# 连接到SFTP服务器
def connect_to_sftp(hostname, port, username, password,log_content_area):
    # 创建SSHClient对象
    try:
        ssh = paramiko.SSHClient()
        # 设置自动添加主机密钥
        ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        # 连接到服务器
        ssh.connect(hostname=hostname, port=port, username=username, password=password)
        # 创建SFTP对象
        sftp = ssh.open_sftp()
        # 返回SFTP对象
        return sftp
    except Exception as e:
        print('upload exception:', e)
        log_content_area.appendPlainText('连接服务器失败,连接尝试失败')
        msgBox = QMessageBox()
        msgBox.setText("连接服务器失败,连接尝试失败。!!")
        msgBox.exec_()
def fecth_remote_list(host, port, username, password,remote,my_remote_list,log_content_area):
    print('------------')
    log_content_area.appendPlainText('开始获取远程报表服务器下txt文件')
    print('-=-=-=-=-=-=')
    try:
        sftp = connect_to_sftp(host, port, username, password, log_content_area)
        files=[x.filename for x in sorted(sftp.listdir_attr(remote), key=lambda f: f.st_mtime)]
    except Exception as e:
        print('upload exception:', e)
        log_content_area.appendPlainText('获取远程文件列表失败')
    print(files)
    file_txts = []
    for file in files:
        if file.endswith('.txt'):
         # filepath =os.path.join(remote, file)
         # file_txts.append(filepath)
         file_txts.append(file)
    slm = QStringListModel()  # 创建model
    slm.setStringList(file_txts)  # 将数据设置到model
    my_remote_list.setModel(slm)
def download_remote_list(host, port, username, password,remote,my_remote_list,log_content_area):
    selected = my_remote_list.currentIndex().data()
    print(selected)
    if selected is  None:
        msgBox = QMessageBox()
        msgBox.setText("请先获取远程文件列表,并选择其中一项!!")
        msgBox.exec_()
        log_content_area.appendPlainText(f'未选择远程报表服务器上任何文件,请先选择')
    else:
        log_content_area.appendPlainText(f'选择远程报表服务器文件:{selected}')
        selected_file_path=os.path.join(remote, selected)
        sftp = connect_to_sftp(host, port, username, password,log_content_area)
        # item = my_remote_list[selected]
        # print(selected)
        # file_name=selected.split('/')[-1]
        directory = QFileDialog.getExistingDirectory(None, "选取文件夹", "C:/")  # 起始路径
        local_path=os.path.join(directory,selected)
        log_content_area.appendPlainText(f'选择本地保持路径:{directory},开始下载{selected}')
        # get remote file size
        remote_file_size = sftp.stat(selected_file_path).st_size
        print("remote_file_size:{}".format(remote_file_size))
        print("local_path:{}".format(local_path))
        # print progress
        with open(local_path, 'wb') as f:
            def callback(transferred, remote_file_size):
                print('local_path:--------')
                percent = float(transferred) * 100 / remote_file_size
                print("Download %.2f%% of the file." % percent)
                log_content_area.appendPlainText("下载了 %.2f%% " % percent)

            # transfer 32768 bytes as SSH slice, get remote file to local
            sftp.getfo(selected_file_path, f, callback=callback)
def sftp_upload(host, port, username, password, local, remote, log_content_area):
    local_name = local.split('/')[-1]
    try:
        remote_file_path = remote +  local_name
        # remote_file_path = os.path.join(remote , local_name)
        print(remote_file_path)
        sftp=connect_to_sftp(host, port, username, password,log_content_area)
        # get local file size
        local_file_size = os.stat(local).st_size
        print(local_file_size)
        print("local_file_size:{}".format(local_file_size))
        # print progress
        with open(local, 'rb') as f:
            def callback(transferred, local_file_size):
                percent = float(transferred) * 100 / local_file_size
                print("Upload %.2f%% of the file." % percent)
                log_content_area.appendPlainText("Upload %.2f%% of the file." % percent)
            # transfer 32768 bytes as SSH slice, put local file to remote
            sftp.putfo(f, remote_file_path, local_file_size, callback=callback)
        sftp.close()
        print('上传成功')
        log_content_area.appendPlainText('---文件上传成功---')
        log_content_area.appendPlainText('---文件上传至' + remote_file_path + '----')

    except Exception as e:
        print('upload exception:', e)
        fail_content = 'upload exception:' + str(e)
        log_content_area.appendPlainText(fail_content)

def romote_file_list(host, port, username, password, local, remote, log_content_area):
    local_name = local.split('/')[-1]
    try:
        sf = paramiko.Transport((host, port))
        sf.connect(username=username, password=password)
        sftp = paramiko.SFTPClient.from_transport(sf)

        sftp.put(local, os.path.join(remote, local_name))  # 上传文件
        print('上传成功')
        log_content_area.appendPlainText('---文件上传成功---')
    except Exception as e:
        print('upload exception:', e)
def selectFile(file_address_input, log_content_area, code_label, myStatusBar):
    fd = QFileDialog()
    fd.setFileMode(QFileDialog.FileMode.ExistingFile)
    fd.setDirectory('C:\\')
    fd.setNameFilter('*.txt')
    if fd.exec():
        print(fd.selectedFiles())
        file_address_input.setText(fd.selectedFiles()[0])
        log_content_area.appendPlainText('---读取待上传的文件地址成功---')
        log_content_area.appendPlainText(fd.selectedFiles()[0])
        suffix = fd.selectedFiles()[0].split('/')[-1].split('.')[1]
        log_content_area.appendPlainText('---读取待上传的文件后缀---')
        suffix_content = '后缀为:' + suffix
        log_content_area.appendPlainText(suffix_content)
        code = getTxtCode(fd.selectedFiles()[0])
        log_content_area.appendPlainText('---读取待上传的文件编码---')
        code_content = '编码为:' + code
        log_content_area.appendPlainText(code_content)

        if code == 'ascii' or code == 'gb2312' or code == 'gbk' or code == 'GB2312' or code == 'utf-8':
            log_content_area.appendPlainText('文件编码符合规范')
        elif code == 'UTF-16':
            log_content_area.appendPlainText('文件编码不符合规范,请检查文件编码')
        else:
            log_content_area.appendPlainText('文件编码未知')
        code_label.setText(code)
        myStatusBar.addPermanentWidget(code_label)
if __name__ == '__main__':
    app = QApplication(sys.argv)
    ui = uic.loadUi(os.path.join(BASE_DIR, 'untitled5.ui'))
    select_button: QPushButton = ui.find_address_button
    file_address_input: QLineEdit = ui.file_address
    myStatusBar: QStatusBar = ui.statusbar
    ip_label = QLabel()
    ip_info = '当前登录ip:' + getIp()
    ip_label.setText(ip_info)
    code_label = QLabel()
    dfgzRadio: QRadioButton = ui.dfgz
    pldbRadio: QRadioButton = ui.pldb
    plxxwhRadio: QRadioButton = ui.plxxwh
    szrmbRadio: QRadioButton = ui.szrmb
    pldbRadio.hide()
    plxxwhRadio.hide()
    check_button: QPushButton = ui.check_Button
    # my_download_checkfail_Button: QPushButton=ui.download_checkfail_Button
    # my_download_report_button: QPushButton = ui.download_report_button
    # my_download_checkfail_Button.hide()
    # my_download_report_button.hide()


    my_download_remote_button: QPushButton = ui.download_remote_button

    my_fetch_remote_lists_button: QPushButton = ui.fetch_remote_lists_button
    my_remote_list: QListView = ui.remote_lists
    myStatusBar.addPermanentWidget(ip_label)
    log_content_area: QPlainTextEdit = ui.log_content
    log_content_area.appendPlainText(f'{ip_label.text()}')
    myupload_Button: QPushButton = ui.upload_Button
    select_button.clicked.connect(lambda: selectFile(file_address_input, log_content_area, code_label, myStatusBar))
    my_fetch_remote_lists_button.clicked.connect(lambda: fecth_remote_list(host, port, username, password,remote,my_remote_list,log_content_area))
    myupload_Button.clicked.connect(
        lambda: sftp_upload(host, port, username, password, file_address_input.text(), remote, log_content_area))
    check_button.clicked.connect(lambda: check_txt_content(dfgzRadio,pldbRadio,szrmbRadio,plxxwhRadio,code_label.text(),file_address_input.text(),log_content_area))
    my_download_remote_button.clicked.connect(lambda: download_remote_list(host, port, username, password,remote,my_remote_list,log_content_area))

    ui.show()
    sys.exit(app.exec())

四、打包

4.1 前置说明

4.1.1文件列表

  • hx_fenlei2.py
  • untitled5.ui

4.1.2 安装pyinstaller

pip install pyinstaller

4.2 打包生成.spec文件

pyinstaller -F hx_fenlei2.py

说明:1.生成hx_fenlei2.spec文件

2.pyinstaller常用打包参数

参数 解释
-D, –onedir 打包成一个文件夹
-F, –onefile 打包成一个可执行文件(exe)

4.3 使用.spec文件打包

a = Analysis(
    ['hx_fenlei2.py'],
    pathex=[],
    binaries=[],
    datas=[('untitled5.ui','.')],
    hiddenimports=[],
    hookspath=[],
    hooksconfig={},
    runtime_hooks=[],
    excludes=[],
    noarchive=False,
    optimize=0,
)
pyz = PYZ(a.pure)

exe = EXE(
    pyz,
    a.scripts,
    a.binaries,
    a.datas,
    [],

    name='hx_fenlei2',
    debug=False,
    bootloader_ignore_signals=False,
    strip=False,
    upx=True,
    console=False,
    upx_exclude=[],
    runtime_tmpdir=None,
    disable_windowed_traceback=False,
    argv_emulation=False,
    target_arch=None,
    codesign_identity=None,
    entitlements_file=None,
    icon='C:\\Users\\ww\\Desktop\\upload_df\\pythonProject4\\hx.ico'
)

主要修改说明:

修改datas=[('上联地址.txt','.'),('电话号码.txt','.'),('ips_template.txt','.'),('tels_template.txt','.')],

注意:存放位置为 ’ . ’ 时,表示为程序运行目录(windows:C:\Users\Administrator\AppData\Local\Temp_MEI190322,单文件模式运行时会自动解压到该目录)

console=False, cmd窗口不显示

runtime_tmpdir=None, 单文件模式

icon='C:\\Users\\ww\\Desktop\\ping\\pythonProject2\\PING.ico' 打包exe文件的图标地址

五、总结

1.pyqt设计代码框架

import os.path
from PyQt5 import uic, QtWidgets
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
if __name__ == '__main__':
    app = QApplication(sys.argv)
    ui = uic.loadUi(os.path.join(BASE_DIR, 'untitled5.ui'))
    '''
    获取ui控件,设置事件

    '''
    ui.show()
    sys.exit(app.exec())

2.兼容性问题

python 3.8.10之后不支持win7

PyQt5 5.15.9

pyqt5-tools 5.15.9.3.3

Python 3.8.8 win32

pyinstaller 6.9.0

3.参考文献


   转载规则


《python运维工具之SFTP上传服务器》 WangWei 采用 知识共享署名 4.0 国际许可协议 进行许可。
 上一篇
python运维工具之批量备份交换机配置 python运维工具之批量备份交换机配置
一、需求1.通过SSH或者Telnet登录交换机 2.生成日志,保留交换机支持的登录接口 3.可以传入自己配置的交换机地址 二、总体思路整理2.1 主要思路1.解析待登录的交换机配置文件,生成初始状态字典 2.批量测试端口可用性,更新状态字
下一篇 
python运维工具之交换机网络状态 python运维工具之交换机网络状态
一、需求1.检测交换机网络状态,当离线、恢复、不稳定时可以发送短信 2.生成日志,保留每次检测结果 3.可以传入自己配置的交换机地址、发送短信的手机号 二、具体开发思路2.1 主要思路1.读取文件获取ip地址池与手机号列表 2.初始化所有交
  目录