本文主要是介绍使用Python实现IP地址和端口状态检测与监控,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
《使用Python实现IP地址和端口状态检测与监控》在网络运维和服务器管理中,IP地址和端口的可用性监控是保障业务连续性的基础需求,本文将带你用Python从零打造一个高可用IP监控系统,感兴趣的小伙...
概述:为什么需要IP监控系统
在网络运维和服务器管理中,IP地址和端口的可用性监控是保障业务连续性的基础需求。传统的人工巡检方式效率低下,而商业监控工具又往往价格昂贵。本文将带你用python从零打造一个高可用IP监控系统,具备以下核心功能:
- 多目标监控:同时监测多个IP+端口组合状态
- 智能告警:异常状态自动触发邮件通知(支持多收件人)
- 可视化界面:基于Tkinter的现代化UI,操作直观
- 配置持久化:所有设置自动保存,重启不丢失
- 断线重试机制:避免网络抖动导致的误报
技术栈:Python 3 + Tkinter + smtplib + socket + 多线程
使用步骤说明
1. 环境准备
# 所需库(Python内置,无需额外安装) import tkinter import threading import smtplib import socket
2. 系统部署
下载完整代码(文末提供)
配置config.json(首次运行会自动生成)
运行主程序:
python ip_monitor.py
3. 核心功能配置
目标配置
通过表格形式管理监控目标
复选框控制是否启用监测
支持双击编辑现有条目
邮件设置
服务商 | SMTP地址 | 端口 | 授权码获取方式 |
---|---|---|---|
网易163 | smtp.163.com | 465/994 | 邮箱设置→POP3/SMTP服务 |
QQ邮箱 | smtp.qq.com | 465 | 设置→账户→POP3服务 |
Gmail | smtp.gmail.com | 587 | Google账号→应用密码 |
# 配置示例(支持SSL/TLS)
SMTP服务器: smtp.163.com:465
邮箱账户: yourname@163.com
授权码: xxxxxx # 需在邮箱设置中获取
接收邮箱: admin@company.com,backup@company.com
注意事项:
必须开启SMTP服务
部分邮箱需要使用授权码而非密码
Gmail需开启"低安全性应用访问"
监控参数
检测模式说明
纯IP检测模式
- 留空端口字段
- 使用ICMP协议Ping检测
- 适用场景:网络设备监控
组合检测模式
# 同时验证IP可达性和端口开放状态 if ping_success and port_open: return ONLINE
参数名 | 默认值 | 说明 |
---|---|---|
监测间隔 | 10秒 | 两次检测的时间间隔 |
超时时间 | 2秒 | 判定离线的超时阈值 |
重试次数 | 3次 | 连续失败次数触发告警 |
重试间隔 | 5秒 | 失败后的快速重试间隔 |
系统效果展示
实时监控面板
邮件告警示例
主题:[告警] IP状态变更: 192.168.1.1:80 - 离线
IP地址: 192.168.1.1
端口: 80
备注: 主数据库服务器
状态变更为: 离线
检测时间: 2023-08-20 14:30:45
日志记录
[2023-08-20 14:30:45] 检测到 192.168.1.1:80 状态变更为离线
[2023-08-20 14:31:00] 已发送告警邮件给3个收件人
[2023-08-20 15:00:00] 已发送每日状态报告
核心源码解析
1. 状态检测引擎
def check_target(self, ip, port=None, timeout=2): """智能检测IP/端口状态""" try: if port: # 端口检测模式 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.settimeout(timeout) s.connect((ip, port)) return IPStatus.ONLINE else: # 纯Ping模式 # 使用ICMP协议实现Ping sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP) sock.settimeout(timeout) sock.connect((ip, 1)) return IPStatus.ONLINE except socket.timeout: return IPStatus.OFFLINE except Exception: return IPStatus.UNKNOWN
2. 多线程监控架构
class MonitorThread(threading.Thread): def __init__(self, app): super().__init__(daemon=True) self.app = app self.running = True def run(self): while self.running: # 1. 执行所有目标的检测 # 2. 触发状态变更通知 # 3. 智能休眠控制CPU占用 time.sleep(self.calculate_sleep_time()) def stop(self): self.running = False
3. 配置持久化实现
def save_config(self): """JSON格式保存所有python配置""" config = { 'targets': [ self.tree.item(item, 'values') for item in self.tree.get_children() ], 'email_settings': { 'smtp': self.smtp_entry.get(), 'user': self.user_entry.get(), 'pass': self.pass_entry.get(), 'receivers': self.receiver_entry.get() } } with open('config.json', 'w') as f: json.dump(config, f, indent=2)
高级功能扩展建议
1. 微信/钉钉机器人告警
# 示例:企业微信机器人API import requests def send_wechat_alert(message): webhook = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=xxx" requests.post(webhook, json={"text": {"content": message}})
2. 数据库存储历史记录
# 使用SQLite记录状态变化 import sqlite3 conn = sqlite3.connect('monitor.db') c = conn.cursor() c.execute('''CREATE TABLE IF NOT EXISTS status_log (ip TEXT, port INT, status TEXT, check_time TIMESTAMP)''')
3. 可视化图表展示
# 使用Matplotlib绘制可用率曲线 plt.plot(dates, availability_rates) plt.title('月度服务可用率') plt.ylabel('百分比(%)') plt.savefig('report.png')
总结与资源下载
项目亮点
- 工业级可靠性:断线重试+智能休眠机制
- 开箱即用:无需复杂配置,5分钟快速部署
- 高度可扩展:代码结构清晰,便于二次开发
完整源码下载
import tkinter as tk from tkinter import ttk, messagebox, scrolledtext import threading import time import smtplib import ipaddress import re import json import socket from email.message import EmailMessage from enum import Enum class IPStatus(Enum): ONLINE = 1 OFFLINE = 2 UNKNOWN = 3 class IPMonitorApp(tk.Tk): def __init__(self): super().__init__() self.title("IP状态监测系统") self.geometry("1300x850") try: self.iconbitmap('monitor.ico') except: pass self.style = ttk.Style() self.configure_style() self.stop_event = threading.Event() self.monitor_thread = None self.ip_status = {} self.next_report_time = self.calculate_next_report_time() self.create_widgets() self.load_config() def configure_style(self): self.style.theme_create('ipmonitor', parent='alt', settings={ 'TFrame': {'configure': {'background': '#f5f5f5'}}, 'TLabelFrame': { 'configure': { 'background': '#f5f5f5', 'foreground': '#1e3d59', 'font': ('微软雅黑', 10, 'bold') } }, 'TLabel': { 'configure': { 'background': '#f5f5f5', 'foreground': '#1e3d59', 'font': ('微软雅黑', 10) } }, 'TButton': { 'configure': { 'background': '#1e3d59', 'foreground': 'white', 'font': ('微软雅黑', 10), 'padding': 5 }, 'map': { 'background': [('active', '#3a6ea5')], 'foreground': [('disabled', '#888888')] } }, 'TCheckbutton': { 'configure': { 'background': '#f5f5f5', 'font': ('微软雅黑', 10) } } }) self.style.theme_use('ipmonitor') def create_widgets(self): # 主容器 main_frame = ttk.Frame(self, padding=10) main_frame.grid(row=0, column=0, sticky="nsew") self.grid_columnconfigure(0, weight=1) self.grid_rowconfigure(0, weight=1) # 配置区域 config_frame = ttk.Frame(main_frame) config_frame.grid(row=0, column=0, sticky="nsew") # IP配置区域 ip_frame = ttk.LabelFrame(config_frame, text="监测目标配置", padding=10) ip_frame.grid(row=0, column=0, padx=5, pady=5, sticky="nsew") ip_frame.columnconfigure(0, weight=1) # IP列表Treeview self.ip_tree = ttk.Treeview( ip_frame, columns=("check", "ip", "port", "remark"), show="headings", height=8, selectmode="browse" ) self.ip_tree.grid(row=0, column=0, sticky="nsew") # 配置列 self.ip_tree.heading("check", text="检测") self.ip_tree.heading("ip", text="IP地址") self.ip_tree.heading("port", text="端口") self.ip_tree.heading("remark", text="备注") self.ip_tree.column("check", width=50, anchor="center") self.ip_tree.column("ip", width=150, anchor="w") self.ip_tree.column("port", width=80, anchor="center") self.ip_tree.column("remark", width=250, anchor="w") # 添加复选框 self.ip_tree.tag_configure("checked", background="#e6f7ff") self.ip_tree.tag_configure("unchecked", background="#f5f5f5") # 编辑区域 edit_frame = ttk.Frame(ip_frame) edit_frame.grid(row=1, column=0, sticky="ew", pady=(5, 0)) ttk.Label(edit_frame, text="IP:").grid(row=0, column=0, sticky="e") self.ip_entry = ttk.Entry(edit_frame, width=18) self.ip_entry.grid(row=0, column=1, padx=2, sticky="w") ttk.Label(edit_frame, text="端口:").grid(row=0, column=2, sticky="e") self.port_entry = ttk.Entry(edit_frame, width=8) self.port_entry.grid(row=0, column=3, padx=2, sticky="w") ttk.Label(edit_frame, text="备注:").grid(row=0, column=4, sticky="e") self.remark_entry = ttk.Entry(edit_frame, width=20) self.remark_entry.grid(row=0, column=5, padx=2, sticky="w") # 操作按钮 btn_frame = ttk.Frame(edit_frame) btn_frame.grid(row=0, column=6, padx=5) ttk.Button(btn_frame, text="添加", command=self.add_ip).grid(row=0, column=0, padx=2) ttk.Button(btn_frame, text="更新", command=self.update_ip).grid(row=0, column=1, padx=2) ttk.Button(btn_frame, text="删除", command=self.remove_ip).grid(row=0, column=2, padx=2) # 邮件配置区域 mail_frame = ttk.LabelFrame(config_frame, text="邮件通知配置", padding=10) mail_frame.grid(row=0, column=1, padx=5, pady=5, sticky="nsew") # 邮件配置项 mail_config = [ ("SMTP服务器:端口", "smtp_server", "smtp.163.com:465"), ("邮箱账户", "email_user", "yourname@163.com"), ("邮箱授权码", "email_pass", ""), ("接收邮箱", "email_receiver", "多个邮箱用逗号分隔") ] for i, (label, attr, ph) in enumerate(mail_config): ttk.Label(mail_frame, text=label).grid(row=i, column=0, sticky="e", pady=3) entry = ttk.Entry(mail_frame, width=25) entry.grid(row=i, column=1, padx=5, pady=3, sticky="ew") setattr(self, attr+"_entry", entry) ttk.Label(mail_frame, text=ph, foreground="#888888").grid(row=i, column=2, sticky="w", padx=5) # 监控参数 param_frame = ttk.LabelFrame(config_frame, text="监控参数", padding=10) param_frame.grid(row=1, column=0, columnspan=2, sticky="ew", pady=(5, 0)) params = [ ("监测间隔(秒):", "interval", "10"), ("超时(秒):", "timeout", "2"), ("重试次数:", "retry", "3"), ("重试间隔(秒):", "retry_interval", "5") ] for i, (label, attr, default) in enumerate(params): ttk.Label(param_frame, text=label).grid(row=0, column=i*2, sticky="e") entry = ttk.Entry(param_frame, width=8) entry.insert(0, default) entry.grid(row=0, column=i*2+1, padx=5, sticky="w") setattr(self, attr+"_entry", entry) # 控制按钮 ctrl_frame = ttk.Frame(config_frame) ctrl_frame.grid(row=2, column=0, columnspan=2, pady=(10, 0), sticky="e") ttk.Button(ctrl_frame, text="开始监测", command=self.start_monitoring).grid(row=0, column=0, padx=5) ttk.Button(ctrl_frame, text="停止监测", command=self.stop_monitoring, state=tk.DISABLED).grid(row=0, column=1, padx=5) self.start_btn = ctrl_frame.grid_slaves(row=0, column=0)[0] self.stop_btn = ctrl_frame.grid_slaves(row=0, column=1)[0] ttk.Button(ctrl_frame, text="保存配置", command=self.save_config).grid(row=0, column=2, padx=5) ttk.Button(ctrl_frame, text="清空日志", command=self.clear_logs).grid(row=0, column=3, padx=5) # 状态监控区域 status_frame = ttk.LabelFrame(main_frame, text="状态监控", padding=10) status_frame.grid(row=1, column=0, sticky="nsew", pady=(5, 0)) self.status_tree = ttk.Treeview( status_frame, columns=("ip", "port", "remark", "status", "last_check"), show="headings", height=8 ) self.status_tree.grid(row=0, column=0, sticky="nsew") # 状态列配置 for col, width in [("ip", 150), ("port", 80), ("remark", 200), ("status", 100), ("last_check", 180)]: self.status_tree.column(col, width=width, anchor="center") self.status_tree.heading(col, text=col) # 日志区域 log_frame = ttk.LabelFrame(main_frame, text="系统日志", padding=10) log_frame.grid(row=2, column=0, sticky="nsew", pady=(5, 0)) self.log_text = scrolledtext.ScrolledText( log_frame, wrap=tk.WORD, font=('微软雅黑', 9), bg='white', fg='#333333', height=10 ) self.log_text.pack(fill=tk.BOTH, expand=True) # 配置权重 main_frame.columnconfigure(0, weight=1) main_frame.rowconfigure(1, weight=1) main_frame.rowconfigure(2, weight=1) config_frame.columnconfigure(1, weight=1) ip_frame.rowconfigure(0, weight=1) ip_frame.columnconfigure(0, weight=1) status_frame.rowconfigure(0, weight=1) status_frame.columnconfigure(0, weight=1) # 绑定事件 self.ip_tree.bind("<Button-1>", self.on_tree_click) self.ip_tree.bind("<Double-1>", self.on_tree_double_click) def on_tree_click(self, event): """处理树状图点击事件""" region = self.ip_tree.identify("region", event.x, event.y) if region == "cell": column = self.ip_tree.identify_column(event.x) item = self.ip_tree.identify_row(event.y) if column == "#1": # 复选框列 current_val = self.ip_tree.item(item, "values")[0] new_val = "✓" if current_val == "" else "" values = list(self.ip_tree.item(item, "values")) values[0] = new_val self.ip_tree.item(item, values=values, tags=("checked" if new_val else "unchecked")) def on_tree_double_click(self, event): """处理树状图双击事件""" item = self.ip_tree.selection() if item: values = self.ip_tree.item(item, "values") self.ip_entry.delete(0, tk.END) self.ip_entry.insert(0, values[1]) self.port_entry.delete(0, tk.END) self.port_entry.insert(0, values[2]) self.remark_entry.delete(0, tk.END) self.remark_entry.insert(0, values[3]) def add_ip(self): """添加IP到列表""" ip = self.ip_entry.get().strip() port = self.port_entry.get().strip() remark = self.remark_entry.get().strip() if not ip: messagebox.showwarning("错误", "IP地址不能为空") return if not self.is_valid_ip(ip): messagebox.showwarning("错误", "无效的IP地址格式") return if port: try: port = int(port) if not (1 <= port <= 65535): messagebox.showwarning("错误", "端口必须在1-65535之间") return except ValueError: messagebox.showwarning("错误", "端口必须是数字") return self.ip_tree.insert("", tk.END, values=("✓", ip, port, remark), tags=("checked",)) # 清空输入框 self.ip_entry.delete(0, tk.END) self.port_entry.delete(0, tk.END) self.remark_entry.delete(0, tk.END) def update_ip(self): """更新选中的IP""" item = self.ip_tree.selection() if not item: messagebox.showwarning("错误", "请先选择要更新的项") return ip = self.ip_entry.get().strip() port = self.port_entry.get().strip() remark = self.remark_entry.get().strip() if not ip: messagebox.showwarning("错误", "IP地址不能为空") return if not sejslf.is_valid_ip(ip): messagebox.showwarning("错误", "无效的IP地址格式") return if port: try: port = int(port) if not (1 <= port <= 65535): messagebox.showwarning("错误", "端口必须在1-65535之间") return except ValueError: messagebox.showwarning("错误", "端口必须是数字") return # 保留原来的复选框状态 current_check = self.ip_tree.item(item, "values")[0] self.ip_tree.item(item, values=(current_check, ip, port, remark)) def remove_ip(self): """删除选中的IP""" items = self.ip_tree.selection() if not items: messagebox.showwarning("错误", "请先选择要删除的项") return for item in items: self.ip_tree.delete(item) def is_valid_ip(self, ip): """验证IP地址格式""" try: ipaddress.ip_address(ip) return True except ValueError: return False def start_monitoring(self): """开始监控""" if not self.validate_inputs(): return # 获取要监控的目标 targets = [] for item in self.ip_tree.get_children(): check, ip, port, remark = self.ip_tree.item(item, "values") if check == "✓": # 只监控选中的项 targets.append((ip, int(port) if port else None, remark)) if not targets: messagebox.showwarning("错误", "没有选中要监控的目标") return # 初始化状态字典 self.ip_status = {} for ip, port, remark in targets: self.ip_status[(ip, port)] = { "status": IPStatus.UNKNOWN, "last_notified": None, "remark": remark, "retries": 0, "next_retry_time": 0, "last_check": "从未检测" } # 更新状态显示 self.update_status_display() # 更新按钮状态 self.start_btn.config(state=tk.DISABLED) self.stop_btn.config(state=tk.NORMAL) self.stop_event.clear() # 获取监控参数 interval = int(self.interval_entry.get()) timeout = int(self.timeout_entry.get()) max_retries = int(self.retry_entry.get()) retry_interval = int(self.retry_interval_entry.get()) # 创建监控线程 self.monitor_thread = threading.Thread( target=self.monitor_targets, args=(interval, timeout, max_retries, retry_interval), daemon=True ) self.monitor_thread.start() self.log_message("监控已启动") def stop_monitoring(self): """停止监控""" self.stop_event.set() self.start_btn.config(state=tk.NORMAL) self.stop_btn.config(state=tk.DISABLED) self.log_message("监控已停止") def monitor_targets(self, interval, timeout, max_retries, retry_interval): """监控主循环""" while not self.stop_event.is_set(): current_time = time.time() # 每日报告检查 if current_time >= self.next_report_time: self.send_daily_report() self.next_report_time = self.calculate_next_report_time() # 检查所有目标 for (ip, port), data in list(self.ip_status.items()): if self.stop_event.is_set(): break if current_time < data['next_retry_time']: continue # 执行检测 if port: # 如果有端口则检测端口 current_status = self.check_port(ip, port, timeout) else: # 否则只ping IP current_status = self.ping_ip(ip, timeout) previous_status = data["status"] data["last_check"] = time.strftime("%Y-%m-%d %H:%M:%S") # 处理状态变化 if current_status == IPStatus.OFFLINE: if data['retries'] < max_retries: data['retries'] += 1 data['next_retry_time'] = current_time + retry_interval self.log_message(f"{ip}:{port or '无端口'} 检测失败,正在进行第 {data['retries']} 次重试...") else: if previous_status != current_status: self.send_alert(ip, port, data['remark'], current_status) data["status"] = current_status data['retries'] = 0 data['next_retry_time'] = current_time + interval else: if previous_status != current_status: self.send_alert(ip, port, data['remark'], current_status) data["status"] = current_status data['retries'] = 0 data['next_retry_time'] = current_time + interval # 更新UI self.after(0, self.update_status_display) # 计算睡眠时间 next_checks = [data['next_retry_time'] for data in self.ip_status.values()] next_check = min(next_checks) if next_checks else current_time + interval sleep_time = max(min(next_check - time.time(), interval), 0.1) time.sleep(sleep_time) def ping_ip(self, ip, timeout): """Ping IP地址""" try: # 使用socket创建ICMP ping sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP) sock.settimeout(timeout) sock.connect((ip, 1)) # 端口号不重要 return IPStatus.ONLINE except socket.timeout: return IPStatus.OFFLINE except Exception: return IPStatus.UNKNOWN finally: try: sock.close() except: pass def check_port(self, ip, port, timeout): """检查端口""" try: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.settimeout(timeout) s.connect((ip, port)) return IPStatus.ONLINE except socket.timeout: return IPStatus.OFFLINE except ConnectionRefusedError: return IPStatus.OFFLINE except Exception as e: self.log_message(f"检测异常: {str(e)}") return IPStatus.UNKNOWN def update_status_display(self): """更新状态显示""" # 清空现有显示 for item in self.status_tree.get_children(): self.status_tree.delete(item) # 添加新状态 for (ip, port), data in self.ip_status.items(): status = data["status"] status_text = "在线" if status == IPStatus.ONLINE else "离线" if status == IPStatus.OFFLINE else "未知" status_color = "green" if status == IPStatus.ONLINE else "red" if status == IPStatus.OFFLINE else "orange" YnAexkGYwp item = self.status_tree.insert("", tk.END, values=( ip, port if port else "无", data["remark"], status_text, data["last_check"] )) self.status_tree.tag_configure(status_color, foreground=status_color) self.status_tree.item(item, tags=(status_color,)) def send_alert(self, ip, port, remark, status): """发送状态变更通知""" status_text = "在线" if status == IPStatus.ONLINE else "离线" subject = f"IP状态变更: {remark or ip}:{port if port else '无端口'} - {status_text}" content = "\n".join([ f"IP地址: {ip}", f"端口: {port if port else '无'}", f"备注: {remark or '无备注信息'}", f"状态变更为: {status_text}", f"检测时间: {time.strftime('%Y-%m-%d %H:%M:%S')}" ]) try: self.send_email(subject, content) self.log_message(f"已发送状态变更通知: {ip}:{port if port else '无'} -> {status_text}") except Exception as e: self.log_message(f"邮件发送失败: {str(e)}") def send_email(self, subject, content): """发送邮件""" smtp_server = self.smtp_server_entry.get().strip() user = self.email_user_entry.get().strip() password = self.email_pass_entry.get().strip() receiver_str = self.email_receiver_entry.get().strip() if not all([smtp_server, user, password, receiver_str]): raise Exception("邮件配置不完整") receivers = [addr.strip() for addr in receiver_str.split(',') if addr.strip()] msg = EmailMessage() msg['Subject'] = subject msg['From'] = user msg['To'] = receivers msg.set_content(content) server, port = smtp_server.split(":") port = int(port) try: if port == 465: with smtplib.SMTP_SSL(server, port) as smtp: smtp.login(user, password) smtp.send_message(msg) else: with smtplib.SMTP(server, port) as smtp: smtp.starttls() smtp.login(user, password) smtp.send_message(msg) except Exception as e: raise Exception(f"SMTP错误: {str(e)}") def send_daily_report(self): """发送每日报告""" report_lines = [] for (ip, port), data in self.ip_status.items(): status = data["status"] status_text = "在线" if status == IPStatus.ONLINE else "离线" if status == IPStatus.OFFLINE else "未知" report_lines.append(f"IP: {ip}:{port if port else '无'} 备注: {data['remark']} 状态: {status_text}") current_time = time.strftime("%Y-%m-%d %H:%M:%S") content = "每日IP状态报告\n\n" + "\n".join(report_lines) + f"\n\n报告时间: {current_time}" subject = "每日IP状态报告" try: self.send_email(subject, content) self.log_message("已发送每日状态报告") except Exception as e: self.log_message(f"发送每日报告失败: {str(e)}") def calculate_next_report_time(self): """计算下次报告时间""" now = time.localtime() today_10am = time.mktime((now.tm_year, now.tm_mon, now.tm_mday, 10, 0, 0, 0, 0, -1)) current_time = time.time() return today_10am + 86400 if current_time >= today_10am else today_10am def log_message(self, message): """记录日志""" timesta编程mp = time.strftime("%Y-%m-%d %H:%M:%S") log_line = f"[{timestamp}] {message}\n" self.log_text.insert(tk.END, log_line) self.log_text.see(tk.END) with open("monitor.log", "a", encandroidoding="utf-8") as f: f.write(log_line) def clear_logs(self): """清空日志""" self.log_text.delete(1.0, tk.END) def validate_inputs(self): """验证输入""" # 检查邮件配置 smtp_server = self.smtp_server_entry.get().strip() user = self.email_user_entry.get().strip() password = self.email_pass_entry.get().strip() receiver_str = self.email_receiver_entry.get().strip() if not smtp_server: messagebox.showwarning("错误", "SMTP服务器不能为空") return False if ":" not in smtp_server: messagebox.showwarning("错误", "SMTP服务器格式应为 host:port") return False if not user: messagebox.showwarning("错误", "邮箱账户不能为空") return False if not password: messagebox.showwarning("错误", "邮箱授权码不能为空") return False if not receiver_str: messagebox.showwarning("错误", "接收邮箱不能为空") return False # 验证监控参数 try: interval = int(self.interval_entry.get()) timeout = int(self.timeout_entry.get()) retries = int(self.retry_entry.get()) retry_interval = int(self.retry_interval_entry.get()) if interval < 5: messagebox.showwarning("错误", "监测间隔不能小于5秒") return False if timeout < 1: messagebox.showwarning("错误", "超时时间不能小于1秒") return False if retries < 0: messagebox.showwarning("错误", "重试次数不能为负数") return False if retry_interval < 1: messagebox.showwarning("错误", "重试间隔不能小于1秒") return False except ValueError: messagebox.showwarning("错误", "请输入有效的数字") return False return True def save_config(self): """保存配置""" config = { "targets": [ self.ip_tree.item(item, "values") for item in self.ip_tree.get_children() ], "smtp_server": self.smtp_server_entry.get(), "email_user": self.email_user_entry.get(), "email_pass": self.email_pass_entry.get(), "email_receiver": self.email_receiver_entry.get(), "interval": self.interval_entry.get(), "timeout": self.timeout_entry.get(), "retry": self.retry_entry.get(), "retry_interval": self.retry_interval_entry.get() } try: with open("config.json", "w", encoding="utf-8") as f: json.dump(config, f, indent=2) self.log_message("配置已保存") messagebox.showinfo("成功", "配置已保存到config.json") except Exception as e: self.log_message(f"保存配置失败: {str(e)}") messagebox.showerror("错误", f"保存配置失败: {str(e)}") def load_config(self): """加载配置""" try: with open("config.json", encoding="utf-8") as f: config = json.load(f) # 加载目标 for item in self.ip_tree.get_children(): self.ip_tree.delete(item) for values in config.get("targets", []): self.ip_tree.insert("", tk.END, values=values, tags=("checked" if values[0] == "✓" else "unchecked")) # 加载邮件配置 self.smtp_server_entry.delete(0, tk.END) self.smtp_server_entry.insert(0, config.get("smtp_server", "")) self.email_user_entry.delete(0, tk.END) self.email_user_entry.insert(0, config.get("email_user", "")) self.email_pass_entry.delete(0, tk.END) self.email_pass_entry.insert(0, config.get("email_pass", "")) self.email_receiver_entry.delete(0, tk.END) self.email_receiver_entry.insert(0, config.get("email_receiver", "")) # 加载监控参数 self.interval_entry.delete(0, tk.END) self.interval_entry.insert(0, config.get("interval", "10")) self.timeout_entry.delete(0, tk.END) self.timeout_entry.insert(0, config.get("timeout", "2")) self.retry_entry.delete(0, tk.END) self.retry_entry.insert(0, config.get("retry", "3")) self.retry_interval_entry.delete(0, tk.END) self.retry_interval_entry.insert(0, config.get("retry_interval", "5")) self.log_message("配置已加载") except FileNotFoundError: self.log_message("未找到配置文件,使用默认配置") except Exception as e: self.log_message(f"加载配置失败: {str(e)}") if __name__ == "__main__": app = IPMonitorApp() app.mainloop()
适合人群:网络管理员、运维工程师、Python中级开发者
常见问题解答
Q:如何修改每日报告发送时间?
A:修改calculate_next_report_time()方法中的小时数(默认10:00)
Q:支持监控IPv6地址吗?
A:当前版本需要稍作修改,建议使用ipaddress库进行验证
Q:最大支持监控多少个IP?
A:理论上无限制,但建议不超过100个以保证性能
到此这篇关于使用Python实现IP地址和端口状态检测与监控的文章就介绍到这了,更多相关Python IP地址检测与监控内容请搜索China编程(www.chinasem.cn)以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程China编程(www.chinasem.cn)!
这篇关于使用Python实现IP地址和端口状态检测与监控的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!