Zabbix监控之从zookeeper中获取Kafka消费进度和lag

2024-06-22 19:32

本文主要是介绍Zabbix监控之从zookeeper中获取Kafka消费进度和lag,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Kafka在0.9之前,消费进度是存放在zookeeper中,在0.9及之后的版本,kafka自身提供了存放消费进度的功能。从kafka中获取消费进度请查看我另一片文章 传送门

这篇文章是转载自http://club.oneapm.com/t/zabbix-kafka/854
原文代码在调试时跑不通,上pykafka官网上看了下,貌似是有些方法过时了,没法使用,下面是加了备注和稍作修改后的代码。

在执行脚本前,需要修改host文件,将kafka服务器名和ip做下映射,不然会出现连接不上的情况。我的kafka服务器名叫做kafka
vi /etc/hosts 添加映射 10.12.11.131 kafka

#!/usr/bin/env python
#coding=utf-8import os, sys, time, json, yaml ,pdb
from kazoo.client import KazooClient
from kazoo.exceptions import NoNodeError
from kafka import KafkaClient
from kafka import KafkaConsumer
from kafka import TopicPartition
from kafka import SimpleClient
from kafka.protocol.offset import OffsetRequest, OffsetResetStrategy
from kafka.common import OffsetRequestPayloadclass spoorerClient(object):def __init__(self, zookeeper_hosts, kafka_hosts, zookeeper_url='/', timeout=3, log_dir='/tmp/spoorer'):self.zookeeper_hosts = zookeeper_hostsself.kafka_hosts = kafka_hostsself.timeout = timeoutself.log_dir = log_dirself.log_file = log_dir + '/' + 'spoorer.log'self.kafka_logsize = {}self.result = []self.log_day_file = log_dir + '/' + 'spoorer_day.log.' + str(time.strftime("%Y-%m-%d", time.localtime()))self.log_keep_day = 1#将spoorer.yaml中的配置读取出来try:f = file(os.path.dirname(os.path.abspath(__file__)) + '/' + 'spoorer.yaml')self.white_topic_group = yaml.load(f)except IOError as e:print 'Error, spoorer.yaml is not found'sys.exit(1)else:f.close()if self.white_topic_group is None:self.white_topic_group = {}if not os.path.exists(self.log_dir):     os.mkdir(self.log_dir)#获取到的消费进度会写入到self.log_file,self.log_day_file这两个文件中,self.log_day_file用于存放历史消费进度,self.log_file存放当前最新获取到的消费进度#self.log_day_file该文件的创建时间和当前系统时间相隔一天,则删除for logfile in [x for x in os.listdir(self.log_dir) if x.split('.')[-1] != 'log' and x.split('.')[-1] != 'swp']:if int(time.mktime(time.strptime(logfile.split('.')[-1], '%Y-%m-%d'))) < int(time.time()) - self.log_keep_day * 86400:os.remove(self.log_dir + '/' + logfile)if zookeeper_url == '/':self.zookeeper_url = zookeeper_urlelse:self.zookeeper_url = zookeeper_url + '/'def spoorer(self):#连接kafka,获取topicstry:kafka_client = KafkaClient(self.kafka_hosts, timeout=self.timeout)except Exception as e:print "Error, cannot connect kafka broker."sys.exit(1)else:kafka_topics = kafka_client.topicsfinally:kafka_client.close()#连接zk,获取当前消费进度current offsettry:zookeeper_client = KazooClient(hosts=self.zookeeper_hosts, read_only=True, timeout=self.timeout)zookeeper_client.start()except Exception as e:print "Error, cannot connect zookeeper server."sys.exit(1)try:groups = map(str,zookeeper_client.get_children(self.zookeeper_url + 'consumers'))except NoNodeError as e:print "Error, invalid zookeeper url."zookeeper_client.stop()sys.exit(2)else:for group in groups:if 'offsets' not in zookeeper_client.get_children(self.zookeeper_url + 'consumers/%s' % group): continuetopic_path = 'consumers/%s/offsets' % (group)topics = map(str,zookeeper_client.get_children(self.zookeeper_url + topic_path))if len(topics) == 0: continuefor topic in topics:if topic not in self.white_topic_group.keys():continue elif group not in self.white_topic_group[topic].replace(' ','').split(','):continuepartition_path = 'consumers/%s/offsets/%s' % (group,topic)partitions = map(int,zookeeper_client.get_children(self.zookeeper_url + partition_path))for partition in partitions:base_path = 'consumers/%s/%s/%s/%s' % (group, '%s', topic, partition)owner_path, offset_path = base_path % 'owners', base_path % 'offsets'offset = zookeeper_client.get(self.zookeeper_url + offset_path)[0]try:owner = zookeeper_client.get(self.zookeeper_url + owner_path)[0]except NoNodeError as e:owner = 'null'#消费进度放在字典metric中metric = {'datetime':time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), 'topic':topic, 'group':group, 'partition':int(partition), 'logsize':None, 'offset':int(offset), 'lag':None, 'owner':owner}self.result.append(metric)finally:zookeeper_client.stop()#获取每个分片的logsize(此处和原文不一样,做了修改)try:client = SimpleClient(self.kafka_hosts)except Exception as e:print "Error, cannot connect kafka broker."sys.exit(1)else:for kafka_topic in kafka_topics:self.kafka_logsize[kafka_topic] = {}partitions = client.topic_partitions[kafka_topic]offset_requests = [OffsetRequestPayload(kafka_topic, p, -1, 1) for p in partitions.keys()]offsets_responses = client.send_offset_request(offset_requests)for r in offsets_responses:self.kafka_logsize[kafka_topic][r.partition] = r.offsets[0]#logsize减去current offset等于lagwith open(self.log_file,'w') as f1, open(self.log_day_file,'a') as f2:for metric in self.result:logsize = self.kafka_logsize[metric['topic']][metric['partition']]metric['logsize'] = int(logsize)metric['lag'] = int(logsize) - int(metric['offset'])f1.write(json.dumps(metric,sort_keys=True) + '\n')f1.flush()f2.write(json.dumps(metric,sort_keys=True) + '\n')f2.flush()finally:client.close()return ''if __name__ == '__main__':check = spoorerClient(zookeeper_hosts='10.12.11.131:2181', zookeeper_url='/', kafka_hosts='10.12.11.131:9092', log_dir='/data/python-scripts/inspector/AccountInspector/otherInspector/spoorer', timeout=3)print check.spoorer()

这篇关于Zabbix监控之从zookeeper中获取Kafka消费进度和lag的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1085207

相关文章

Python版本信息获取方法详解与实战

《Python版本信息获取方法详解与实战》在Python开发中,获取Python版本号是调试、兼容性检查和版本控制的重要基础操作,本文详细介绍了如何使用sys和platform模块获取Python的主... 目录1. python版本号获取基础2. 使用sys模块获取版本信息2.1 sys模块概述2.1.1

Java发送SNMP至交换机获取交换机状态实现方式

《Java发送SNMP至交换机获取交换机状态实现方式》文章介绍使用SNMP4J库(2.7.0)通过RCF1213-MIB协议获取交换机单/多路状态,需开启SNMP支持,重点对比SNMPv1、v2c、v... 目录交换机协议SNMP库获取交换机单路状态获取交换机多路状态总结交换机协议这里使用的交换机协议为常

MyBatis/MyBatis-Plus同事务循环调用存储过程获取主键重复问题分析及解决

《MyBatis/MyBatis-Plus同事务循环调用存储过程获取主键重复问题分析及解决》MyBatis默认开启一级缓存,同一事务中循环调用查询方法时会重复使用缓存数据,导致获取的序列主键值均为1,... 目录问题原因解决办法如果是存储过程总结问题myBATis有如下代码获取序列作为主键IdMappe

Java中的分布式系统开发基于 Zookeeper 与 Dubbo 的应用案例解析

《Java中的分布式系统开发基于Zookeeper与Dubbo的应用案例解析》本文将通过实际案例,带你走进基于Zookeeper与Dubbo的分布式系统开发,本文通过实例代码给大家介绍的非常详... 目录Java 中的分布式系统开发基于 Zookeeper 与 Dubbo 的应用案例一、分布式系统中的挑战二

C#使用iText获取PDF的trailer数据的代码示例

《C#使用iText获取PDF的trailer数据的代码示例》开发程序debug的时候,看到了PDF有个trailer数据,挺有意思,于是考虑用代码把它读出来,那么就用到我们常用的iText框架了,所... 目录引言iText 核心概念C# 代码示例步骤 1: 确保已安装 iText步骤 2: C# 代码程

Spring Boot中获取IOC容器的多种方式

《SpringBoot中获取IOC容器的多种方式》本文主要介绍了SpringBoot中获取IOC容器的多种方式,包括直接注入、实现ApplicationContextAware接口、通过Spring... 目录1. 直接注入ApplicationContext2. 实现ApplicationContextA

python获取指定名字的程序的文件路径的两种方法

《python获取指定名字的程序的文件路径的两种方法》本文主要介绍了python获取指定名字的程序的文件路径的两种方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要... 最近在做项目,需要用到给定一个程序名字就可以自动获取到这个程序在Windows系统下的绝对路径,以下

SpringBoot 获取请求参数的常用注解及用法

《SpringBoot获取请求参数的常用注解及用法》SpringBoot通过@RequestParam、@PathVariable等注解支持从HTTP请求中获取参数,涵盖查询、路径、请求体、头、C... 目录SpringBoot 提供了多种注解来方便地从 HTTP 请求中获取参数以下是主要的注解及其用法:1

Java Kafka消费者实现过程

《JavaKafka消费者实现过程》Kafka消费者通过KafkaConsumer类实现,核心机制包括偏移量管理、消费者组协调、批量拉取消息及多线程处理,手动提交offset确保数据可靠性,自动提交... 目录基础KafkaConsumer类分析关键代码与核心算法2.1 订阅与分区分配2.2 拉取消息2.3

springboot2.1.3 hystrix集成及hystrix-dashboard监控详解

《springboot2.1.3hystrix集成及hystrix-dashboard监控详解》Hystrix是Netflix开源的微服务容错工具,通过线程池隔离和熔断机制防止服务崩溃,支持降级、监... 目录Hystrix是Netflix开源技术www.chinasem.cn栈中的又一员猛将Hystrix熔