实现功能 巡检网络设备,给出网络设备的IP,名称,CPU使用率,内存使用率,风扇总数,再用风扇,正常状态风扇,电源总数,再用电源,正常状态电源,温度状态。测试结果如下:
# -*- coding: utf-8 -*-
# 导入所需的模块和库
from netmiko.huawei.huawei import HuaweiSSH
from netmiko import NetMikoTimeoutException, NetMikoAuthenticationException
from getpass import getpass
import re
import io
import xlwt
# HuaweiSSH是用于与华为设备建立SSH连接的类,来自netmiko.huawei.huawei模块
# NetMikoTimeoutException和NetMikoAuthenticationException是用于处理SSH连接超时和身份验证错误的异常类
# getpass模块用于安全地获取用户输入的密码
# re模块用于进行正则表达式匹配
# io模块用于处理文件IO操作
# xlwt模块用于生成Excel文件
def extract_fans(cmd_result):
"""
提取风扇信息
"""
# 初始化计数器
fan_total = 0 # 风扇总数
fan_in_use = 0 # 在使用中的风扇数量
fan_normal = 0 # 正常状态的风扇数量
# 将命令结果按行分割,并去除首尾空格
fan_lines = cmd_result.strip().split('\n')
# 遍历每一行风扇信息
for line in fan_lines:
# 如果行以N/A开头,则停止计算
if line.startswith('N/A'):
break
# 使用正则表达式匹配风扇编号(例如FAN1)
fan_info = re.search(r'FAN\d+', line)
# 如果匹配到风扇编号
if fan_info:
# 增加风扇总数计数
fan_total += 1
# 使用正则表达式匹配风扇速度(例如40%)
fan_speed = re.search(r'\d+%', line)
# 使用正则表达式匹配风扇状态(例如Normal)
fan_status = re.search(r'Normal', line)
# 如果同时匹配到风扇速度和风扇状态
if fan_speed and fan_status:
# 增加在使用中的风扇数量计数
fan_in_use += 1
# 增加正常状态的风扇数量计数
fan_normal += 1
# 返回提取的风扇信息计数结果
return fan_total, fan_in_use, fan_normal
def extract_power_supplies(cmd_result):
lines = cmd_result.strip().split('\n')
power_total = 0
power_in_use = 0
power_normal = 0
for line in lines:
if line.startswith("N/A"):
break
matches_pwr = re.findall(r"PWR\d+", line)
power_total += len(matches_pwr)
match_yes = re.search(r"\bYES\b", line)
if match_yes:
power_in_use += len(matches_pwr)
match_supply = re.search(r"\bSupply\b", line)
if match_supply:
power_normal += len(matches_pwr)
return power_total, power_in_use, power_normal
def main():
"""
主函数
"""
# 让用户输入SSH用户名和密码
username = input('请输入SSH用户名:')
password = getpass('请输入SSH密码:')
# 让用户输入SSH端口号
port = input('请输入SSH端口号:')
# 打开ip_list.txt文件获取IP列表
ip_list = open('ip_list.txt', 'r')
ip_addr = ip_list.readlines()
ip_list.close()
# 定义需要执行的命令列表
cmd_line = ['display version', 'display cpu', 'display memory', 'display device temperature all',
'display device fan', 'display device power']
# 创建一个workbook 设置编码为utf-8
workbook = xlwt.Workbook(encoding='utf-8')
# 创建一个worksheet,命名为"My Worksheet"
worksheet = workbook.add_sheet('My Worksheet')
# 初始化表格的标题行
headers = ['交换机IP', '交换机名称', '软件版本', 'CPU利用率', '内存利用率', '风扇总数', '再用风扇', '正常状态风扇',
'电源总数', '再用电源', '正常状态电源', '温度状态']
# 在worksheet的第一行写入标题
for col, header in enumerate(headers):
worksheet.write(0, col, label=header)
# 遍历ip列表用来生成迭代器
for row, ip in enumerate(ip_addr, start=1):
print(' ')
print('本次巡检的设备IP:' + ip)
try:
# 构造设备连接参数
CE = {
'device_type': 'huawei',
'ip': ip.strip(),
'username': username,
'password': password,
'conn_timeout': 15, # 连接超时,一些设备连接时间会超过默认的5秒
'port': port # SSH端口号
}
# 实例化HuaweiSSH
net_connect = HuaweiSSH(**CE)
print("恭喜,成功登录")
print("设备名:" + str(net_connect.find_prompt().strip('<>')))
# 写入交换机IP和名称
worksheet.write(row, 0, label=ip.strip())
worksheet.write(row, 1, label=net_connect.find_prompt().strip('<>'))
for cmd in cmd_line:
cmd_result = net_connect.send_command(cmd) # 执行命令并获取命令结果
if 'display version' in cmd: # 如果命令是"display version"
version = re.search(r'\(\w*\d\d.*\)', cmd_result) # 使用正则表达式匹配版本信息
worksheet.write(row, 2, label=version.group().strip('()') if version else "Unknown") # 将版本信息写入表格
elif 'display cpu' in cmd: # 如果命令是"display cpu"
cpu_usage = re.search(r'\d*.\d*.\%', cmd_result) # 使用正则表达式匹配CPU利用率
worksheet.write(row, 3, label=cpu_usage.group().strip(' ') if cpu_usage else "N/A") # 将CPU利用率写入表格
elif 'display memory' in cmd: # 如果命令是"display memory"
memory = re.search(r'\d*.\d*.\%', cmd_result) # 使用正则表达式匹配内存利用率
worksheet.write(row, 4, label=memory.group().strip(' ') if memory else "N/A") # 将内存利用率写入表格
elif 'display device temperature all' in cmd: # 如果命令是"display device temperature all"
table_lines = re.findall(r"\s*\d+\s+--.*", cmd_result) # 使用正则表达式匹配温度信息的表格行
temperature_data = []
for line in table_lines:
line_data = line.strip().split() # 将行数据按空格分割成列表
if len(line_data) == 8:
temperature_data.append(line_data) # 将符合条件的行数据加入温度数据列表
temperature_status = "Normal"
for data in temperature_data:
if data[3] != "Normal":
temperature_status = "Abnormal" # 如果有任何温度状态不是"Normal",将温度状态设为"Abnormal"
break
worksheet.write(row, 11, label=temperature_status) # 将温度状态写入表格
elif 'display device fan' in cmd: # 如果命令是"display device fan"
fan_total, fan_in_use, fan_normal = extract_fans(cmd_result) # 调用extract_fans函数提取风扇信息
worksheet.write(row, 5, label=fan_total) # 将风扇总数写入表格
worksheet.write(row, 6, label=fan_in_use) # 将再用风扇数写入表格
worksheet.write(row, 7, label=fan_normal) # 将正常状态风扇数写入表格
elif 'display device power' in cmd: # 如果命令是"display device power"
power_total, power_in_use, power_normal = extract_power_supplies(cmd_result) # 调用extract_power_supplies函数提取电源信息
worksheet.write(row, 8, label=power_total) # 将电源总数写入表格
worksheet.write(row, 9, label=power_in_use) # 将再用电源数写入表格
worksheet.write(row, 10, label=power_normal) # 将正常状态电源数写入表格
net_connect.disconnect() # 断开与设备的连接
except (EOFError, NetMikoTimeoutException):
print('无法连接设备') # 捕获连接超时异常,并打印无法连接设备的提示信息
worksheet.write(row, 0, label=f"无法连接设备: {ip.strip()}") # 将无法连接设备的信息写入表格
except (EOFError, NetMikoAuthenticationException):
print('用户名密码错误!') # 捕获用户名密码错误异常,并打印用户名密码错误的提示信息
worksheet.write(row, 0, label=f"用户名密码错误: {ip.strip()}") # 将用户名密码错误的信息写入表格
except Exception as e:
print(f"发生错误: {str(e)}") # 捕获其他异常,并打印错误信息
worksheet.write(row, 0, label=f"发生错误: {str(e)}") # 将发生错误的信息写入表格
workbook.save('网络设备巡检结果.xls') # 保存结果到Excel文件
if __name__ == '__main__':
main() # 如果当前模块是主程序,则调用main()函数