Zabbix监控之从zookeeper中获取Kafka消费进度和lag

2024-06-22 19:32

本文主要是介绍Zabbix监控之从zookeeper中获取Kafka消费进度和lag,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Kafka在0.9之前,消费进度是存放在zookeeper中,在0.9及之后的版本,kafka自身提供了存放消费进度的功能。从kafka中获取消费进度请查看我另一片文章 传送门

这篇文章是转载自http://club.oneapm.com/t/zabbix-kafka/854
原文代码在调试时跑不通,上pykafka官网上看了下,貌似是有些方法过时了,没法使用,下面是加了备注和稍作修改后的代码。

在执行脚本前,需要修改host文件,将kafka服务器名和ip做下映射,不然会出现连接不上的情况。我的kafka服务器名叫做kafka
vi /etc/hosts 添加映射 10.12.11.131 kafka

#!/usr/bin/env python
#coding=utf-8import os, sys, time, json, yaml ,pdb
from kazoo.client import KazooClient
from kazoo.exceptions import NoNodeError
from kafka import KafkaClient
from kafka import KafkaConsumer
from kafka import TopicPartition
from kafka import SimpleClient
from kafka.protocol.offset import OffsetRequest, OffsetResetStrategy
from kafka.common import OffsetRequestPayloadclass spoorerClient(object):def __init__(self, zookeeper_hosts, kafka_hosts, zookeeper_url='/', timeout=3, log_dir='/tmp/spoorer'):self.zookeeper_hosts = zookeeper_hostsself.kafka_hosts = kafka_hostsself.timeout = timeoutself.log_dir = log_dirself.log_file = log_dir + '/' + 'spoorer.log'self.kafka_logsize = {}self.result = []self.log_day_file = log_dir + '/' + 'spoorer_day.log.' + str(time.strftime("%Y-%m-%d", time.localtime()))self.log_keep_day = 1#将spoorer.yaml中的配置读取出来try:f = file(os.path.dirname(os.path.abspath(__file__)) + '/' + 'spoorer.yaml')self.white_topic_group = yaml.load(f)except IOError as e:print 'Error, spoorer.yaml is not found'sys.exit(1)else:f.close()if self.white_topic_group is None:self.white_topic_group = {}if not os.path.exists(self.log_dir):     os.mkdir(self.log_dir)#获取到的消费进度会写入到self.log_file,self.log_day_file这两个文件中,self.log_day_file用于存放历史消费进度,self.log_file存放当前最新获取到的消费进度#self.log_day_file该文件的创建时间和当前系统时间相隔一天,则删除for logfile in [x for x in os.listdir(self.log_dir) if x.split('.')[-1] != 'log' and x.split('.')[-1] != 'swp']:if int(time.mktime(time.strptime(logfile.split('.')[-1], '%Y-%m-%d'))) < int(time.time()) - self.log_keep_day * 86400:os.remove(self.log_dir + '/' + logfile)if zookeeper_url == '/':self.zookeeper_url = zookeeper_urlelse:self.zookeeper_url = zookeeper_url + '/'def spoorer(self):#连接kafka,获取topicstry:kafka_client = KafkaClient(self.kafka_hosts, timeout=self.timeout)except Exception as e:print "Error, cannot connect kafka broker."sys.exit(1)else:kafka_topics = kafka_client.topicsfinally:kafka_client.close()#连接zk,获取当前消费进度current offsettry:zookeeper_client = KazooClient(hosts=self.zookeeper_hosts, read_only=True, timeout=self.timeout)zookeeper_client.start()except Exception as e:print "Error, cannot connect zookeeper server."sys.exit(1)try:groups = map(str,zookeeper_client.get_children(self.zookeeper_url + 'consumers'))except NoNodeError as e:print "Error, invalid zookeeper url."zookeeper_client.stop()sys.exit(2)else:for group in groups:if 'offsets' not in zookeeper_client.get_children(self.zookeeper_url + 'consumers/%s' % group): continuetopic_path = 'consumers/%s/offsets' % (group)topics = map(str,zookeeper_client.get_children(self.zookeeper_url + topic_path))if len(topics) == 0: continuefor topic in topics:if topic not in self.white_topic_group.keys():continue elif group not in self.white_topic_group[topic].replace(' ','').split(','):continuepartition_path = 'consumers/%s/offsets/%s' % (group,topic)partitions = map(int,zookeeper_client.get_children(self.zookeeper_url + partition_path))for partition in partitions:base_path = 'consumers/%s/%s/%s/%s' % (group, '%s', topic, partition)owner_path, offset_path = base_path % 'owners', base_path % 'offsets'offset = zookeeper_client.get(self.zookeeper_url + offset_path)[0]try:owner = zookeeper_client.get(self.zookeeper_url + owner_path)[0]except NoNodeError as e:owner = 'null'#消费进度放在字典metric中metric = {'datetime':time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), 'topic':topic, 'group':group, 'partition':int(partition), 'logsize':None, 'offset':int(offset), 'lag':None, 'owner':owner}self.result.append(metric)finally:zookeeper_client.stop()#获取每个分片的logsize(此处和原文不一样,做了修改)try:client = SimpleClient(self.kafka_hosts)except Exception as e:print "Error, cannot connect kafka broker."sys.exit(1)else:for kafka_topic in kafka_topics:self.kafka_logsize[kafka_topic] = {}partitions = client.topic_partitions[kafka_topic]offset_requests = [OffsetRequestPayload(kafka_topic, p, -1, 1) for p in partitions.keys()]offsets_responses = client.send_offset_request(offset_requests)for r in offsets_responses:self.kafka_logsize[kafka_topic][r.partition] = r.offsets[0]#logsize减去current offset等于lagwith open(self.log_file,'w') as f1, open(self.log_day_file,'a') as f2:for metric in self.result:logsize = self.kafka_logsize[metric['topic']][metric['partition']]metric['logsize'] = int(logsize)metric['lag'] = int(logsize) - int(metric['offset'])f1.write(json.dumps(metric,sort_keys=True) + '\n')f1.flush()f2.write(json.dumps(metric,sort_keys=True) + '\n')f2.flush()finally:client.close()return ''if __name__ == '__main__':check = spoorerClient(zookeeper_hosts='10.12.11.131:2181', zookeeper_url='/', kafka_hosts='10.12.11.131:9092', log_dir='/data/python-scripts/inspector/AccountInspector/otherInspector/spoorer', timeout=3)print check.spoorer()

这篇关于Zabbix监控之从zookeeper中获取Kafka消费进度和lag的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1085207

相关文章

C++中RAII资源获取即初始化

《C++中RAII资源获取即初始化》RAII通过构造/析构自动管理资源生命周期,确保安全释放,本文就来介绍一下C++中的RAII技术及其应用,具有一定的参考价值,感兴趣的可以了解一下... 目录一、核心原理与机制二、标准库中的RAII实现三、自定义RAII类设计原则四、常见应用场景1. 内存管理2. 文件操

SpringBoot服务获取Pod当前IP的两种方案

《SpringBoot服务获取Pod当前IP的两种方案》在Kubernetes集群中,SpringBoot服务获取Pod当前IP的方案主要有两种,通过环境变量注入或通过Java代码动态获取网络接口IP... 目录方案一:通过 Kubernetes Downward API 注入环境变量原理步骤方案二:通过

SpringBoot读取ZooKeeper(ZK)属性的方法实现

《SpringBoot读取ZooKeeper(ZK)属性的方法实现》本文主要介绍了SpringBoot读取ZooKeeper(ZK)属性的方法实现,强调使用@ConfigurationProperti... 目录1. 在配置文件中定义 ZK 属性application.propertiesapplicati

使用Python实现获取屏幕像素颜色值

《使用Python实现获取屏幕像素颜色值》这篇文章主要为大家详细介绍了如何使用Python实现获取屏幕像素颜色值,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 一、一个小工具,按住F10键,颜色值会跟着显示。完整代码import tkinter as tkimport pyau

python获取cmd环境变量值的实现代码

《python获取cmd环境变量值的实现代码》:本文主要介绍在Python中获取命令行(cmd)环境变量的值,可以使用标准库中的os模块,需要的朋友可以参考下... 前言全局说明在执行py过程中,总要使用到系统环境变量一、说明1.1 环境:Windows 11 家庭版 24H2 26100.4061

JVisualVM之Java性能监控与调优利器详解

《JVisualVM之Java性能监控与调优利器详解》本文将详细介绍JVisualVM的使用方法,并结合实际案例展示如何利用它进行性能调优,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全... 目录1. JVisualVM简介2. JVisualVM的安装与启动2.1 启动JVisualVM2

使用Python获取JS加载的数据的多种实现方法

《使用Python获取JS加载的数据的多种实现方法》在当今的互联网时代,网页数据的动态加载已经成为一种常见的技术手段,许多现代网站通过JavaScript(JS)动态加载内容,这使得传统的静态网页爬取... 目录引言一、动态 网页与js加载数据的原理二、python爬取JS加载数据的方法(一)分析网络请求1

通过cmd获取网卡速率的代码

《通过cmd获取网卡速率的代码》今天从群里看到通过bat获取网卡速率两段代码,感觉还不错,学习bat的朋友可以参考一下... 1、本机有线网卡支持的最高速度:%v%@echo off & setlocal enabledelayedexpansionecho 代码开始echo 65001编码获取: >

SpringBoot实现Kafka动态反序列化的完整代码

《SpringBoot实现Kafka动态反序列化的完整代码》在分布式系统中,Kafka作为高吞吐量的消息队列,常常需要处理来自不同主题(Topic)的异构数据,不同的业务场景可能要求对同一消费者组内的... 目录引言一、问题背景1.1 动态反序列化的需求1.2 常见问题二、动态反序列化的核心方案2.1 ht

使用Python实现调用API获取图片存储到本地的方法

《使用Python实现调用API获取图片存储到本地的方法》开发一个自动化工具,用于从JSON数据源中提取图像ID,通过调用指定API获取未经压缩的原始图像文件,并确保下载结果与Postman等工具直接... 目录使用python实现调用API获取图片存储到本地1、项目概述2、核心功能3、环境准备4、代码实现