使用Python实现IP地址和端口状态检测与监控

2025-04-30 17:50

本文主要是介绍使用Python实现IP地址和端口状态检测与监控,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

《使用Python实现IP地址和端口状态检测与监控》在网络运维和服务器管理中,IP地址和端口的可用性监控是保障业务连续性的基础需求,本文将带你用Python从零打造一个高可用IP监控系统,感兴趣的小伙...

概述:为什么需要IP监控系统

网络运维和服务器管理中,IP地址和端口的可用性监控是保障业务连续性的基础需求。传统的人工巡检方式效率低下,而商业监控工具又往往价格昂贵。本文将带你用python从零打造一个高可用IP监控系统,具备以下核心功能:

  • 多目标监控:同时监测多个IP+端口组合状态
  • 智能告警:异常状态自动触发邮件通知(支持多收件人)
  • 可视化界面:基于Tkinter的现代化UI,操作直观
  • 配置持久化:所有设置自动保存,重启不丢失
  • 断线重试机制:避免网络抖动导致的误报

技术栈:Python 3 + Tkinter + smtplib + socket + 多线程

使用步骤说明

1. 环境准备

# 所需库(Python内置,无需额外安装)
import tkinter
import threading
import smtplib
import socket

2. 系统部署

下载完整代码(文末提供)

配置config.json(首次运行会自动生成)

运行主程序:

 python ip_monitor.py

3. 核心功能配置

目标配置

通过表格形式管理监控目标

复选框控制是否启用监测

支持双击编辑现有条目

邮件设置

服务商SMTP地址端口授权码获取方式
网易163smtp.163.com465/994邮箱设置→POP3/SMTP服务
QQ邮箱smtp.qq.com465设置→账户→POP3服务
Gmailsmtp.gmail.com587Google账号→应用密码

# 配置示例(支持SSL/TLS)
SMTP服务器: smtp.163.com:465
邮箱账户: yourname@163.com
授权码: xxxxxx  # 需在邮箱设置中获取
接收邮箱: admin@company.com,backup@company.com

注意事项:

必须开启SMTP服务

部分邮箱需要使用授权码而非密码

Gmail需开启"低安全性应用访问"

监控参数

检测模式说明

纯IP检测模式

  • 留空端口字段
  • 使用ICMP协议Ping检测
  • 适用场景:网络设备监控

组合检测模式

# 同时验证IP可达性和端口开放状态
if ping_success and port_open:
    return ONLINE
参数名默认值说明
监测间隔10秒两次检测的时间间隔
超时时间2秒判定离线的超时阈值
重试次数3次连续失败次数触发告警
重试间隔5秒失败后的快速重试间隔

系统效果展示

实时监控面板

使用Python实现IP地址和端口状态检测与监控

使用Python实现IP地址和端口状态检测与监控

邮件告警示例

主题:[告警] IP状态变更: 192.168.1.1:80 - 离线

IP地址: 192.168.1.1
端口: 80
备注: 主数据库服务器
状态变更为: 离线
检测时间: 2023-08-20 14:30:45

日志记录

[2023-08-20 14:30:45] 检测到 192.168.1.1:80 状态变更为离线
[2023-08-20 14:31:00] 已发送告警邮件给3个收件人
[2023-08-20 15:00:00] 已发送每日状态报告

核心源码解析

1. 状态检测引擎

def check_target(self, ip, port=None, timeout=2):
    """智能检测IP/端口状态"""
    try:
        if port:  # 端口检测模式
            with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
                s.settimeout(timeout)
                s.connect((ip, port))
                return IPStatus.ONLINE
        else:     # 纯Ping模式
            # 使用ICMP协议实现Ping
            sock = socket.socket(socket.AF_INET, 
                               socket.SOCK_RAW, 
                               socket.IPPROTO_ICMP)
            sock.settimeout(timeout)
            sock.connect((ip, 1))
            return IPStatus.ONLINE
    except socket.timeout:
        return IPStatus.OFFLINE
    except Exception:
        return IPStatus.UNKNOWN

2. 多线程监控架构

class MonitorThread(threading.Thread):
    def __init__(self, app):
        super().__init__(daemon=True)
        self.app = app
        self.running = True
        
    def run(self):
        while self.running:
            # 1. 执行所有目标的检测
            # 2. 触发状态变更通知
            # 3. 智能休眠控制CPU占用
            time.sleep(self.calculate_sleep_time())
            
    def stop(self):
        self.running = False

3. 配置持久化实现

def save_config(self):
    """JSON格式保存所有python配置"""
    config = {
        'targets': [
            self.tree.item(item, 'values')
            for item in self.tree.get_children()
        ],
        'email_settings': {
            'smtp': self.smtp_entry.get(),
            'user': self.user_entry.get(),
            'pass': self.pass_entry.get(),
            'receivers': self.receiver_entry.get()
        }
    }
    with open('config.json', 'w') as f:
        json.dump(config, f, indent=2)

高级功能扩展建议

1. 微信/钉钉机器人告警

# 示例:企业微信机器人API
import requests
def send_wechat_alert(message):
    webhook = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=xxx"
    requests.post(webhook, json={"text": {"content": message}})

2. 数据库存储历史记录

# 使用SQLite记录状态变化
import sqlite3
conn = sqlite3.connect('monitor.db')
c = conn.cursor()
c.execute('''CREATE TABLE IF NOT EXISTS status_log
             (ip TEXT, port INT, status TEXT, check_time TIMESTAMP)''')

3. 可视化图表展示

# 使用Matplotlib绘制可用率曲线
plt.plot(dates, availability_rates)
plt.title('月度服务可用率')
plt.ylabel('百分比(%)')
plt.savefig('report.png')

总结与资源下载

项目亮点

  • 工业级可靠性:断线重试+智能休眠机制
  • 开箱即用:无需复杂配置,5分钟快速部署
  • 高度可扩展:代码结构清晰,便于二次开发

完整源码下载

import tkinter as tk
from tkinter import ttk, messagebox, scrolledtext
import threading
import time
import smtplib
import ipaddress
import re
import json
import socket
from email.message import EmailMessage
from enum import Enum


class IPStatus(Enum):
    ONLINE = 1
    OFFLINE = 2
    UNKNOWN = 3


class IPMonitorApp(tk.Tk):
    def __init__(self):
        super().__init__()
        self.title("IP状态监测系统")
        self.geometry("1300x850")
        
        try:
            self.iconbitmap('monitor.ico')
        except:
            pass

        self.style = ttk.Style()
        self.configure_style()
        
        self.stop_event = threading.Event()
        self.monitor_thread = None
        self.ip_status = {}
        self.next_report_time = self.calculate_next_report_time()
        
        self.create_widgets()
        self.load_config()

    def configure_style(self):
        self.style.theme_create('ipmonitor', parent='alt', settings={
            'TFrame': {'configure': {'background': '#f5f5f5'}},
            'TLabelFrame': {
                'configure': {
                    'background': '#f5f5f5',
                    'foreground': '#1e3d59',
                    'font': ('微软雅黑', 10, 'bold')
                }
            },
            'TLabel': {
                'configure': {
                    'background': '#f5f5f5',
                    'foreground': '#1e3d59',
                    'font': ('微软雅黑', 10)
                }
            },
            'TButton': {
                'configure': {
                    'background': '#1e3d59',
                    'foreground': 'white',
                    'font': ('微软雅黑', 10),
                    'padding': 5
                },
                'map': {
                    'background': [('active', '#3a6ea5')],
                    'foreground': [('disabled', '#888888')]
                }
            },
            'TCheckbutton': {
                'configure': {
                    'background': '#f5f5f5',
                    'font': ('微软雅黑', 10)
                }
            }
        })
        self.style.theme_use('ipmonitor')

    def create_widgets(self):
        # 主容器
        main_frame = ttk.Frame(self, padding=10)
        main_frame.grid(row=0, column=0, sticky="nsew")
        self.grid_columnconfigure(0, weight=1)
        self.grid_rowconfigure(0, weight=1)
        
        # 配置区域
        config_frame = ttk.Frame(main_frame)
        config_frame.grid(row=0, column=0, sticky="nsew")
        
        # IP配置区域
        ip_frame = ttk.LabelFrame(config_frame, text="监测目标配置", padding=10)
        ip_frame.grid(row=0, column=0, padx=5, pady=5, sticky="nsew")
        ip_frame.columnconfigure(0, weight=1)
        
        # IP列表Treeview
        self.ip_tree = ttk.Treeview(
            ip_frame, 
            columns=("check", "ip", "port", "remark"), 
            show="headings",
            height=8,
            selectmode="browse"
        )
        self.ip_tree.grid(row=0, column=0, sticky="nsew")
        
        # 配置列
        self.ip_tree.heading("check", text="检测")
        self.ip_tree.heading("ip", text="IP地址")
        self.ip_tree.heading("port", text="端口")
        self.ip_tree.heading("remark", text="备注")
        
        self.ip_tree.column("check", width=50, anchor="center")
        self.ip_tree.column("ip", width=150, anchor="w")
        self.ip_tree.column("port", width=80, anchor="center")
        self.ip_tree.column("remark", width=250, anchor="w")
        
        # 添加复选框
        self.ip_tree.tag_configure("checked", background="#e6f7ff")
        self.ip_tree.tag_configure("unchecked", background="#f5f5f5")
        
        # 编辑区域
        edit_frame = ttk.Frame(ip_frame)
        edit_frame.grid(row=1, column=0, sticky="ew", pady=(5, 0))
        
        ttk.Label(edit_frame, text="IP:").grid(row=0, column=0, sticky="e")
        self.ip_entry = ttk.Entry(edit_frame, width=18)
        self.ip_entry.grid(row=0, column=1, padx=2, sticky="w")
        
        ttk.Label(edit_frame, text="端口:").grid(row=0, column=2, sticky="e")
        self.port_entry = ttk.Entry(edit_frame, width=8)
        self.port_entry.grid(row=0, column=3, padx=2, sticky="w")
        
        ttk.Label(edit_frame, text="备注:").grid(row=0, column=4, sticky="e")
        self.remark_entry = ttk.Entry(edit_frame, width=20)
        self.remark_entry.grid(row=0, column=5, padx=2, sticky="w")
        
        # 操作按钮
        btn_frame = ttk.Frame(edit_frame)
        btn_frame.grid(row=0, column=6, padx=5)
        
        ttk.Button(btn_frame, text="添加", command=self.add_ip).grid(row=0, column=0, padx=2)
        ttk.Button(btn_frame, text="更新", command=self.update_ip).grid(row=0, column=1, padx=2)
        ttk.Button(btn_frame, text="删除", command=self.remove_ip).grid(row=0, column=2, padx=2)
        
        # 邮件配置区域
        mail_frame = ttk.LabelFrame(config_frame, text="邮件通知配置", padding=10)
        mail_frame.grid(row=0, column=1, padx=5, pady=5, sticky="nsew")
        
        # 邮件配置项
        mail_config = [
            ("SMTP服务器:端口", "smtp_server", "smtp.163.com:465"),
            ("邮箱账户", "email_user", "yourname@163.com"),
            ("邮箱授权码", "email_pass", ""),
            ("接收邮箱", "email_receiver", "多个邮箱用逗号分隔")
        ]
        
        for i, (label, attr, ph) in enumerate(mail_config):
            ttk.Label(mail_frame, text=label).grid(row=i, column=0, sticky="e", pady=3)
            entry = ttk.Entry(mail_frame, width=25)
            entry.grid(row=i, column=1, padx=5, pady=3, sticky="ew")
            setattr(self, attr+"_entry", entry)
            ttk.Label(mail_frame, text=ph, foreground="#888888").grid(row=i, column=2, sticky="w", padx=5)
        
        # 监控参数
        param_frame = ttk.LabelFrame(config_frame, text="监控参数", padding=10)
        param_frame.grid(row=1, column=0, columnspan=2, sticky="ew", pady=(5, 0))
        
        params = [
            ("监测间隔(秒):", "interval", "10"),
            ("超时(秒):", "timeout", "2"),
            ("重试次数:", "retry", "3"),
            ("重试间隔(秒):", "retry_interval", "5")
        ]
        
        for i, (label, attr, default) in enumerate(params):
            ttk.Label(param_frame, text=label).grid(row=0, column=i*2, sticky="e")
            entry = ttk.Entry(param_frame, width=8)
            entry.insert(0, default)
            entry.grid(row=0, column=i*2+1, padx=5, sticky="w")
            setattr(self, attr+"_entry", entry)
        
        # 控制按钮
        ctrl_frame = ttk.Frame(config_frame)
        ctrl_frame.grid(row=2, column=0, columnspan=2, pady=(10, 0), sticky="e")
        
        ttk.Button(ctrl_frame, text="开始监测", command=self.start_monitoring).grid(row=0, column=0, padx=5)
        ttk.Button(ctrl_frame, text="停止监测", command=self.stop_monitoring, state=tk.DISABLED).grid(row=0, column=1, padx=5)
        self.start_btn = ctrl_frame.grid_slaves(row=0, column=0)[0]
        self.stop_btn = ctrl_frame.grid_slaves(row=0, column=1)[0]
        
        ttk.Button(ctrl_frame, text="保存配置", command=self.save_config).grid(row=0, column=2, padx=5)
        ttk.Button(ctrl_frame, text="清空日志", command=self.clear_logs).grid(row=0, column=3, padx=5)
        
        # 状态监控区域
        status_frame = ttk.LabelFrame(main_frame, text="状态监控", padding=10)
        status_frame.grid(row=1, column=0, sticky="nsew", pady=(5, 0))
        
        self.status_tree = ttk.Treeview(
            status_frame, 
            columns=("ip", "port", "remark", "status", "last_check"), 
            show="headings",
            height=8
        )
        self.status_tree.grid(row=0, column=0, sticky="nsew")
        
        # 状态列配置
        for col, width in [("ip", 150), ("port", 80), ("remark", 200), ("status", 100), ("last_check", 180)]:
            self.status_tree.column(col, width=width, anchor="center")
            self.status_tree.heading(col, text=col)
        
        # 日志区域
        log_frame = ttk.LabelFrame(main_frame, text="系统日志", padding=10)
        log_frame.grid(row=2, column=0, sticky="nsew", pady=(5, 0))
        
        self.log_text = scrolledtext.ScrolledText(
            log_frame, 
            wrap=tk.WORD, 
            font=('微软雅黑', 9), 
            bg='white', 
            fg='#333333',
            height=10
        )
        self.log_text.pack(fill=tk.BOTH, expand=True)
        
        # 配置权重
        main_frame.columnconfigure(0, weight=1)
        main_frame.rowconfigure(1, weight=1)
        main_frame.rowconfigure(2, weight=1)
        
        config_frame.columnconfigure(1, weight=1)
        
        ip_frame.rowconfigure(0, weight=1)
        ip_frame.columnconfigure(0, weight=1)
        
        status_frame.rowconfigure(0, weight=1)
        status_frame.columnconfigure(0, weight=1)
        
        # 绑定事件
        self.ip_tree.bind("<Button-1>", self.on_tree_click)
        self.ip_tree.bind("<Double-1>", self.on_tree_double_click)

    def on_tree_click(self, event):
        """处理树状图点击事件"""
        region = self.ip_tree.identify("region", event.x, event.y)
        if region == "cell":
            column = self.ip_tree.identify_column(event.x)
            item = self.ip_tree.identify_row(event.y)
            if column == "#1":  # 复选框列
                current_val = self.ip_tree.item(item, "values")[0]
                new_val = "✓" if current_val == "" else ""
                values = list(self.ip_tree.item(item, "values"))
                values[0] = new_val
                self.ip_tree.item(item, values=values, tags=("checked" if new_val else "unchecked"))

    def on_tree_double_click(self, event):
        """处理树状图双击事件"""
        item = self.ip_tree.selection()
        if item:
            values = self.ip_tree.item(item, "values")
            self.ip_entry.delete(0, tk.END)
            self.ip_entry.insert(0, values[1])
            self.port_entry.delete(0, tk.END)
            self.port_entry.insert(0, values[2])
            self.remark_entry.delete(0, tk.END)
            self.remark_entry.insert(0, values[3])

    def add_ip(self):
        """添加IP到列表"""
        ip = self.ip_entry.get().strip()
        port = self.port_entry.get().strip()
        remark = self.remark_entry.get().strip()
        
        if not ip:
            messagebox.showwarning("错误", "IP地址不能为空")
            return
            
        if not self.is_valid_ip(ip):
            messagebox.showwarning("错误", "无效的IP地址格式")
            return
            
        if port:
            try:
                port = int(port)
                if not (1 <= port <= 65535):
                    messagebox.showwarning("错误", "端口必须在1-65535之间")
                    return
            except ValueError:
                messagebox.showwarning("错误", "端口必须是数字")
                return
        
        self.ip_tree.insert("", tk.END, values=("✓", ip, port, remark), tags=("checked",))
        
        # 清空输入框
        self.ip_entry.delete(0, tk.END)
        self.port_entry.delete(0, tk.END)
        self.remark_entry.delete(0, tk.END)

    def update_ip(self):
        """更新选中的IP"""
        item = self.ip_tree.selection()
        if not item:
            messagebox.showwarning("错误", "请先选择要更新的项")
            return
            
        ip = self.ip_entry.get().strip()
        port = self.port_entry.get().strip()
        remark = self.remark_entry.get().strip()
        
        if not ip:
            messagebox.showwarning("错误", "IP地址不能为空")
            return
            
        if not sejslf.is_valid_ip(ip):
            messagebox.showwarning("错误", "无效的IP地址格式")
            return
            
        if port:
            try:
                port = int(port)
                if not (1 <= port <= 65535):
                    messagebox.showwarning("错误", "端口必须在1-65535之间")
                    return
            except ValueError:
                messagebox.showwarning("错误", "端口必须是数字")
                return
        
        # 保留原来的复选框状态
        current_check = self.ip_tree.item(item, "values")[0]
        self.ip_tree.item(item, values=(current_check, ip, port, remark))

    def remove_ip(self):
        """删除选中的IP"""
        items = self.ip_tree.selection()
        if not items:
            messagebox.showwarning("错误", "请先选择要删除的项")
            return
            
        for item in items:
            self.ip_tree.delete(item)

    def is_valid_ip(self, ip):
        """验证IP地址格式"""
        try:
            ipaddress.ip_address(ip)
            return True
        except ValueError:
            return False

    def start_monitoring(self):
        """开始监控"""
        if not self.validate_inputs():
            return
            
        # 获取要监控的目标
        targets = []
        for item in self.ip_tree.get_children():
            check, ip, port, remark = self.ip_tree.item(item, "values")
            if check == "✓":  # 只监控选中的项
                targets.append((ip, int(port) if port else None, remark))
        
        if not targets:
            messagebox.showwarning("错误", "没有选中要监控的目标")
            return
            
        # 初始化状态字典
        self.ip_status = {}
        for ip, port, remark in targets:
            self.ip_status[(ip, port)] = {
                "status": IPStatus.UNKNOWN,
                "last_notified": None,
                "remark": remark,
                "retries": 0,
                "next_retry_time": 0,
                "last_check": "从未检测"
            }
        
        # 更新状态显示
        self.update_status_display()
        
        # 更新按钮状态
        self.start_btn.config(state=tk.DISABLED)
        self.stop_btn.config(state=tk.NORMAL)
        self.stop_event.clear()
        
        # 获取监控参数
        interval = int(self.interval_entry.get())
        timeout = int(self.timeout_entry.get())
        max_retries = int(self.retry_entry.get())
        retry_interval = int(self.retry_interval_entry.get())
        
        # 创建监控线程
        self.monitor_thread = threading.Thread(
            target=self.monitor_targets,
            args=(interval, timeout, max_retries, retry_interval),
            daemon=True
        )
        self.monitor_thread.start()
        
        self.log_message("监控已启动")

    def stop_monitoring(self):
        """停止监控"""
        self.stop_event.set()
        self.start_btn.config(state=tk.NORMAL)
        self.stop_btn.config(state=tk.DISABLED)
        self.log_message("监控已停止")

    def monitor_targets(self, interval, timeout, max_retries, retry_interval):
        """监控主循环"""
        while not self.stop_event.is_set():
            current_time = time.time()

            # 每日报告检查
            if current_time >= self.next_report_time:
                self.send_daily_report()
                self.next_report_time = self.calculate_next_report_time()

            # 检查所有目标
            for (ip, port), data in list(self.ip_status.items()):
                if self.stop_event.is_set():
                    break

                if current_time < data['next_retry_time']:
                    continue

                # 执行检测
                if port:  # 如果有端口则检测端口
                    current_status = self.check_port(ip, port, timeout)
                else:  # 否则只ping IP
                    current_status = self.ping_ip(ip, timeout)
                
                previous_status = data["status"]
                data["last_check"] = time.strftime("%Y-%m-%d %H:%M:%S")
                
                # 处理状态变化
                if current_status == IPStatus.OFFLINE:
                    if data['retries'] < max_retries:
                        data['retries'] += 1
                        data['next_retry_time'] = current_time + retry_interval
                        self.log_message(f"{ip}:{port or '无端口'} 检测失败,正在进行第 {data['retries']} 次重试...")
                    else:
                        if previous_status != current_status:
                            self.send_alert(ip, port, data['remark'], current_status)
                        data["status"] = current_status
                        data['retries'] = 0
                        data['next_retry_time'] = current_time + interval
                else:
                    if previous_status != current_status:
                        self.send_alert(ip, port, data['remark'], current_status)
                    data["status"] = current_status
                    data['retries'] = 0
                    data['next_retry_time'] = current_time + interval
                
                # 更新UI
                self.after(0, self.update_status_display)

            # 计算睡眠时间
            next_checks = [data['next_retry_time'] for data in self.ip_status.values()]
            next_check = min(next_checks) if next_checks else current_time + interval
            sleep_time = max(min(next_check - time.time(), interval), 0.1)
            time.sleep(sleep_time)

    def ping_ip(self, ip, timeout):
        """Ping IP地址"""
        try:
            # 使用socket创建ICMP ping
            sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP)
            sock.settimeout(timeout)
            sock.connect((ip, 1))  # 端口号不重要
            return IPStatus.ONLINE
        except socket.timeout:
            return IPStatus.OFFLINE
        except Exception:
            return IPStatus.UNKNOWN
        finally:
            try:
                sock.close()
            except:
                pass

    def check_port(self, ip, port, timeout):
        """检查端口"""
        try:
            with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
                s.settimeout(timeout)
                s.connect((ip, port))
                return IPStatus.ONLINE
        except socket.timeout:
            return IPStatus.OFFLINE
        except ConnectionRefusedError:
            return IPStatus.OFFLINE
        except Exception as e:
            self.log_message(f"检测异常: {str(e)}")
            return IPStatus.UNKNOWN

    def update_status_display(self):
        """更新状态显示"""
        # 清空现有显示
        for item in self.status_tree.get_children():
            self.status_tree.delete(item)
        
        # 添加新状态
        for (ip, port), data in self.ip_status.items():
            status = data["status"]
            status_text = "在线" if status == IPStatus.ONLINE else "离线" if status == IPStatus.OFFLINE else "未知"
            status_color = "green" if status == IPStatus.ONLINE else "red" if status == IPStatus.OFFLINE else "orange"
            
          YnAexkGYwp  item = self.status_tree.insert("", tk.END, values=(
                ip, 
                port if port else "无", 
                data["remark"], 
                status_text, 
                data["last_check"]
            ))
            self.status_tree.tag_configure(status_color, foreground=status_color)
            self.status_tree.item(item, tags=(status_color,))

    def send_alert(self, ip, port, remark, status):
        """发送状态变更通知"""
        status_text = "在线" if status == IPStatus.ONLINE else "离线"
        subject = f"IP状态变更: {remark or ip}:{port if port else '无端口'} - {status_text}"
        
        content = "\n".join([
            f"IP地址: {ip}",
            f"端口: {port if port else '无'}",
            f"备注: {remark or '无备注信息'}",
            f"状态变更为: {status_text}",
            f"检测时间: {time.strftime('%Y-%m-%d %H:%M:%S')}"
        ])
        
        try:
            self.send_email(subject, content)
            self.log_message(f"已发送状态变更通知: {ip}:{port if port else '无'} -> {status_text}")
        except Exception as e:
            self.log_message(f"邮件发送失败: {str(e)}")

    def send_email(self, subject, content):
        """发送邮件"""
        smtp_server = self.smtp_server_entry.get().strip()
        user = self.email_user_entry.get().strip()
        password = self.email_pass_entry.get().strip()
        receiver_str = self.email_receiver_entry.get().strip()
        
        if not all([smtp_server, user, password, receiver_str]):
            raise Exception("邮件配置不完整")
        
        receivers = [addr.strip() for addr in receiver_str.split(',') if addr.strip()]
        
        msg = EmailMessage()
        msg['Subject'] = subject
        msg['From'] = user
        msg['To'] = receivers
        msg.set_content(content)
        
        server, port = smtp_server.split(":")
        port = int(port)
        
        try:
            if port == 465:
                with smtplib.SMTP_SSL(server, port) as smtp:
                    smtp.login(user, password)
                    smtp.send_message(msg)
            else:
                with smtplib.SMTP(server, port) as smtp:
                    smtp.starttls()
                    smtp.login(user, password)
                    smtp.send_message(msg)
        except Exception as e:
            raise Exception(f"SMTP错误: {str(e)}")

    def send_daily_report(self):
        """发送每日报告"""
        report_lines = []
        for (ip, port), data in self.ip_status.items():
            status = data["status"]
            status_text = "在线" if status == IPStatus.ONLINE else "离线" if status == IPStatus.OFFLINE else "未知"
            report_lines.append(f"IP: {ip}:{port if port else '无'} 备注: {data['remark']} 状态: {status_text}")
        
        current_time = time.strftime("%Y-%m-%d %H:%M:%S")
        content = "每日IP状态报告\n\n" + "\n".join(report_lines) + f"\n\n报告时间: {current_time}"
        subject = "每日IP状态报告"
        
        try:
            self.send_email(subject, content)
            self.log_message("已发送每日状态报告")
        except Exception as e:
            self.log_message(f"发送每日报告失败: {str(e)}")

    def calculate_next_report_time(self):
        """计算下次报告时间"""
        now = time.localtime()
        today_10am = time.mktime((now.tm_year, now.tm_mon, now.tm_mday, 10, 0, 0, 0, 0, -1))
        current_time = time.time()
        
        return today_10am + 86400 if current_time >= today_10am else today_10am

    def log_message(self, message):
        """记录日志"""
        timesta编程mp = time.strftime("%Y-%m-%d %H:%M:%S")
        log_line = f"[{timestamp}] {message}\n"
        
        self.log_text.insert(tk.END, log_line)
        self.log_text.see(tk.END)
        
        with open("monitor.log", "a", encandroidoding="utf-8") as f:
            f.write(log_line)

    def clear_logs(self):
        """清空日志"""
        self.log_text.delete(1.0, tk.END)

    def validate_inputs(self):
        """验证输入"""
        # 检查邮件配置
        smtp_server = self.smtp_server_entry.get().strip()
        user = self.email_user_entry.get().strip()
        password = self.email_pass_entry.get().strip()
        receiver_str = self.email_receiver_entry.get().strip()
        
        if not smtp_server:
            messagebox.showwarning("错误", "SMTP服务器不能为空")
            return False
            
        if ":" not in smtp_server:
            messagebox.showwarning("错误", "SMTP服务器格式应为 host:port")
            return False
            
        if not user:
            messagebox.showwarning("错误", "邮箱账户不能为空")
            return False
            
        if not password:
            messagebox.showwarning("错误", "邮箱授权码不能为空")
            return False
            
        if not receiver_str:
            messagebox.showwarning("错误", "接收邮箱不能为空")
            return False
            
        # 验证监控参数
        try:
            interval = int(self.interval_entry.get())
            timeout = int(self.timeout_entry.get())
            retries = int(self.retry_entry.get())
            retry_interval = int(self.retry_interval_entry.get())
            
            if interval < 5:
                messagebox.showwarning("错误", "监测间隔不能小于5秒")
                return False
                
            if timeout < 1:
                messagebox.showwarning("错误", "超时时间不能小于1秒")
                return False
                
            if retries < 0:
                messagebox.showwarning("错误", "重试次数不能为负数")
                return False
                
            if retry_interval < 1:
                messagebox.showwarning("错误", "重试间隔不能小于1秒")
                return False
                
        except ValueError:
            messagebox.showwarning("错误", "请输入有效的数字")
            return False
            
        return True

    def save_config(self):
        """保存配置"""
        config = {
            "targets": [
                self.ip_tree.item(item, "values")
                for item in self.ip_tree.get_children()
            ],
            "smtp_server": self.smtp_server_entry.get(),
            "email_user": self.email_user_entry.get(),
            "email_pass": self.email_pass_entry.get(),
            "email_receiver": self.email_receiver_entry.get(),
            "interval": self.interval_entry.get(),
            "timeout": self.timeout_entry.get(),
            "retry": self.retry_entry.get(),
            "retry_interval": self.retry_interval_entry.get()
        }
        
        try:
            with open("config.json", "w", encoding="utf-8") as f:
                json.dump(config, f, indent=2)
            self.log_message("配置已保存")
            messagebox.showinfo("成功", "配置已保存到config.json")
        except Exception as e:
            self.log_message(f"保存配置失败: {str(e)}")
            messagebox.showerror("错误", f"保存配置失败: {str(e)}")

    def load_config(self):
        """加载配置"""
        try:
            with open("config.json", encoding="utf-8") as f:
                config = json.load(f)
            
            # 加载目标
            for item in self.ip_tree.get_children():
                self.ip_tree.delete(item)
                
            for values in config.get("targets", []):
                self.ip_tree.insert("", tk.END, values=values, tags=("checked" if values[0] == "✓" else "unchecked"))
            
            # 加载邮件配置
            self.smtp_server_entry.delete(0, tk.END)
            self.smtp_server_entry.insert(0, config.get("smtp_server", ""))
            
            self.email_user_entry.delete(0, tk.END)
            self.email_user_entry.insert(0, config.get("email_user", ""))
            
            self.email_pass_entry.delete(0, tk.END)
            self.email_pass_entry.insert(0, config.get("email_pass", ""))
            
            self.email_receiver_entry.delete(0, tk.END)
            self.email_receiver_entry.insert(0, config.get("email_receiver", ""))
            
            # 加载监控参数
            self.interval_entry.delete(0, tk.END)
            self.interval_entry.insert(0, config.get("interval", "10"))
            
            self.timeout_entry.delete(0, tk.END)
            self.timeout_entry.insert(0, config.get("timeout", "2"))
            
            self.retry_entry.delete(0, tk.END)
            self.retry_entry.insert(0, config.get("retry", "3"))
            
            self.retry_interval_entry.delete(0, tk.END)
            self.retry_interval_entry.insert(0, config.get("retry_interval", "5"))
            
            self.log_message("配置已加载")
        except FileNotFoundError:
            self.log_message("未找到配置文件,使用默认配置")
        except Exception as e:
            self.log_message(f"加载配置失败: {str(e)}")


if __name__ == "__main__":
    app = IPMonitorApp()
    app.mainloop()

适合人群:网络管理员、运维工程师、Python中级开发者

常见问题解答

Q:如何修改每日报告发送时间?

A:修改calculate_next_report_time()方法中的小时数(默认10:00)

Q:支持监控IPv6地址吗?

A:当前版本需要稍作修改,建议使用ipaddress库进行验证

Q:最大支持监控多少个IP?

A:理论上无限制,但建议不超过100个以保证性能

到此这篇关于使用Python实现IP地址和端口状态检测与监控的文章就介绍到这了,更多相关Python IP地址检测与监控内容请搜索China编程(www.chinasem.cn)以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程China编程(www.chinasem.cn)!

这篇关于使用Python实现IP地址和端口状态检测与监控的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1154444

相关文章

基于Python打造一个智能单词管理神器

《基于Python打造一个智能单词管理神器》这篇文章主要为大家详细介绍了如何使用Python打造一个智能单词管理神器,从查询到导出的一站式解决,感兴趣的小伙伴可以跟随小编一起学习一下... 目录1. 项目概述:为什么需要这个工具2. 环境搭建与快速入门2.1 环境要求2.2 首次运行配置3. 核心功能使用指

Python实现微信自动锁定工具

《Python实现微信自动锁定工具》在数字化办公时代,微信已成为职场沟通的重要工具,但临时离开时忘记锁屏可能导致敏感信息泄露,下面我们就来看看如何使用Python打造一个微信自动锁定工具吧... 目录引言:当微信隐私遇到自动化守护效果展示核心功能全景图技术亮点深度解析1. 无操作检测引擎2. 微信路径智能获

使用Java将各种数据写入Excel表格的操作示例

《使用Java将各种数据写入Excel表格的操作示例》在数据处理与管理领域,Excel凭借其强大的功能和广泛的应用,成为了数据存储与展示的重要工具,在Java开发过程中,常常需要将不同类型的数据,本文... 目录前言安装免费Java库1. 写入文本、或数值到 Excel单元格2. 写入数组到 Excel表格

redis中使用lua脚本的原理与基本使用详解

《redis中使用lua脚本的原理与基本使用详解》在Redis中使用Lua脚本可以实现原子性操作、减少网络开销以及提高执行效率,下面小编就来和大家详细介绍一下在redis中使用lua脚本的原理... 目录Redis 执行 Lua 脚本的原理基本使用方法使用EVAL命令执行 Lua 脚本使用EVALSHA命令

Python中pywin32 常用窗口操作的实现

《Python中pywin32常用窗口操作的实现》本文主要介绍了Python中pywin32常用窗口操作的实现,pywin32主要的作用是供Python开发者快速调用WindowsAPI的一个... 目录获取窗口句柄获取最前端窗口句柄获取指定坐标处的窗口根据窗口的完整标题匹配获取句柄根据窗口的类别匹配获取句

利用Python打造一个Excel记账模板

《利用Python打造一个Excel记账模板》这篇文章主要为大家详细介绍了如何使用Python打造一个超实用的Excel记账模板,可以帮助大家高效管理财务,迈向财富自由之路,感兴趣的小伙伴快跟随小编一... 目录设置预算百分比超支标红预警记账模板功能介绍基础记账预算管理可视化分析摸鱼时间理财法碎片时间利用财

Java 中的 @SneakyThrows 注解使用方法(简化异常处理的利与弊)

《Java中的@SneakyThrows注解使用方法(简化异常处理的利与弊)》为了简化异常处理,Lombok提供了一个强大的注解@SneakyThrows,本文将详细介绍@SneakyThro... 目录1. @SneakyThrows 简介 1.1 什么是 Lombok?2. @SneakyThrows

在 Spring Boot 中实现异常处理最佳实践

《在SpringBoot中实现异常处理最佳实践》本文介绍如何在SpringBoot中实现异常处理,涵盖核心概念、实现方法、与先前查询的集成、性能分析、常见问题和最佳实践,感兴趣的朋友一起看看吧... 目录一、Spring Boot 异常处理的背景与核心概念1.1 为什么需要异常处理?1.2 Spring B

Python中的Walrus运算符分析示例详解

《Python中的Walrus运算符分析示例详解》Python中的Walrus运算符(:=)是Python3.8引入的一个新特性,允许在表达式中同时赋值和返回值,它的核心作用是减少重复计算,提升代码简... 目录1. 在循环中避免重复计算2. 在条件判断中同时赋值变量3. 在列表推导式或字典推导式中简化逻辑

python处理带有时区的日期和时间数据

《python处理带有时区的日期和时间数据》这篇文章主要为大家详细介绍了如何在Python中使用pytz库处理时区信息,包括获取当前UTC时间,转换为特定时区等,有需要的小伙伴可以参考一下... 目录时区基本信息python datetime使用timezonepandas处理时区数据知识延展时区基本信息