Zabbix监控之从zookeeper中获取Kafka消费进度和lag

2024-06-22 19:32

本文主要是介绍Zabbix监控之从zookeeper中获取Kafka消费进度和lag,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Kafka在0.9之前,消费进度是存放在zookeeper中,在0.9及之后的版本,kafka自身提供了存放消费进度的功能。从kafka中获取消费进度请查看我另一片文章 传送门

这篇文章是转载自http://club.oneapm.com/t/zabbix-kafka/854
原文代码在调试时跑不通,上pykafka官网上看了下,貌似是有些方法过时了,没法使用,下面是加了备注和稍作修改后的代码。

在执行脚本前,需要修改host文件,将kafka服务器名和ip做下映射,不然会出现连接不上的情况。我的kafka服务器名叫做kafka
vi /etc/hosts 添加映射 10.12.11.131 kafka

#!/usr/bin/env python
#coding=utf-8import os, sys, time, json, yaml ,pdb
from kazoo.client import KazooClient
from kazoo.exceptions import NoNodeError
from kafka import KafkaClient
from kafka import KafkaConsumer
from kafka import TopicPartition
from kafka import SimpleClient
from kafka.protocol.offset import OffsetRequest, OffsetResetStrategy
from kafka.common import OffsetRequestPayloadclass spoorerClient(object):def __init__(self, zookeeper_hosts, kafka_hosts, zookeeper_url='/', timeout=3, log_dir='/tmp/spoorer'):self.zookeeper_hosts = zookeeper_hostsself.kafka_hosts = kafka_hostsself.timeout = timeoutself.log_dir = log_dirself.log_file = log_dir + '/' + 'spoorer.log'self.kafka_logsize = {}self.result = []self.log_day_file = log_dir + '/' + 'spoorer_day.log.' + str(time.strftime("%Y-%m-%d", time.localtime()))self.log_keep_day = 1#将spoorer.yaml中的配置读取出来try:f = file(os.path.dirname(os.path.abspath(__file__)) + '/' + 'spoorer.yaml')self.white_topic_group = yaml.load(f)except IOError as e:print 'Error, spoorer.yaml is not found'sys.exit(1)else:f.close()if self.white_topic_group is None:self.white_topic_group = {}if not os.path.exists(self.log_dir):     os.mkdir(self.log_dir)#获取到的消费进度会写入到self.log_file,self.log_day_file这两个文件中,self.log_day_file用于存放历史消费进度,self.log_file存放当前最新获取到的消费进度#self.log_day_file该文件的创建时间和当前系统时间相隔一天,则删除for logfile in [x for x in os.listdir(self.log_dir) if x.split('.')[-1] != 'log' and x.split('.')[-1] != 'swp']:if int(time.mktime(time.strptime(logfile.split('.')[-1], '%Y-%m-%d'))) < int(time.time()) - self.log_keep_day * 86400:os.remove(self.log_dir + '/' + logfile)if zookeeper_url == '/':self.zookeeper_url = zookeeper_urlelse:self.zookeeper_url = zookeeper_url + '/'def spoorer(self):#连接kafka,获取topicstry:kafka_client = KafkaClient(self.kafka_hosts, timeout=self.timeout)except Exception as e:print "Error, cannot connect kafka broker."sys.exit(1)else:kafka_topics = kafka_client.topicsfinally:kafka_client.close()#连接zk,获取当前消费进度current offsettry:zookeeper_client = KazooClient(hosts=self.zookeeper_hosts, read_only=True, timeout=self.timeout)zookeeper_client.start()except Exception as e:print "Error, cannot connect zookeeper server."sys.exit(1)try:groups = map(str,zookeeper_client.get_children(self.zookeeper_url + 'consumers'))except NoNodeError as e:print "Error, invalid zookeeper url."zookeeper_client.stop()sys.exit(2)else:for group in groups:if 'offsets' not in zookeeper_client.get_children(self.zookeeper_url + 'consumers/%s' % group): continuetopic_path = 'consumers/%s/offsets' % (group)topics = map(str,zookeeper_client.get_children(self.zookeeper_url + topic_path))if len(topics) == 0: continuefor topic in topics:if topic not in self.white_topic_group.keys():continue elif group not in self.white_topic_group[topic].replace(' ','').split(','):continuepartition_path = 'consumers/%s/offsets/%s' % (group,topic)partitions = map(int,zookeeper_client.get_children(self.zookeeper_url + partition_path))for partition in partitions:base_path = 'consumers/%s/%s/%s/%s' % (group, '%s', topic, partition)owner_path, offset_path = base_path % 'owners', base_path % 'offsets'offset = zookeeper_client.get(self.zookeeper_url + offset_path)[0]try:owner = zookeeper_client.get(self.zookeeper_url + owner_path)[0]except NoNodeError as e:owner = 'null'#消费进度放在字典metric中metric = {'datetime':time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), 'topic':topic, 'group':group, 'partition':int(partition), 'logsize':None, 'offset':int(offset), 'lag':None, 'owner':owner}self.result.append(metric)finally:zookeeper_client.stop()#获取每个分片的logsize(此处和原文不一样,做了修改)try:client = SimpleClient(self.kafka_hosts)except Exception as e:print "Error, cannot connect kafka broker."sys.exit(1)else:for kafka_topic in kafka_topics:self.kafka_logsize[kafka_topic] = {}partitions = client.topic_partitions[kafka_topic]offset_requests = [OffsetRequestPayload(kafka_topic, p, -1, 1) for p in partitions.keys()]offsets_responses = client.send_offset_request(offset_requests)for r in offsets_responses:self.kafka_logsize[kafka_topic][r.partition] = r.offsets[0]#logsize减去current offset等于lagwith open(self.log_file,'w') as f1, open(self.log_day_file,'a') as f2:for metric in self.result:logsize = self.kafka_logsize[metric['topic']][metric['partition']]metric['logsize'] = int(logsize)metric['lag'] = int(logsize) - int(metric['offset'])f1.write(json.dumps(metric,sort_keys=True) + '\n')f1.flush()f2.write(json.dumps(metric,sort_keys=True) + '\n')f2.flush()finally:client.close()return ''if __name__ == '__main__':check = spoorerClient(zookeeper_hosts='10.12.11.131:2181', zookeeper_url='/', kafka_hosts='10.12.11.131:9092', log_dir='/data/python-scripts/inspector/AccountInspector/otherInspector/spoorer', timeout=3)print check.spoorer()

这篇关于Zabbix监控之从zookeeper中获取Kafka消费进度和lag的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1085207

相关文章

使用Python实现IP地址和端口状态检测与监控

《使用Python实现IP地址和端口状态检测与监控》在网络运维和服务器管理中,IP地址和端口的可用性监控是保障业务连续性的基础需求,本文将带你用Python从零打造一个高可用IP监控系统,感兴趣的小伙... 目录概述:为什么需要IP监控系统使用步骤说明1. 环境准备2. 系统部署3. 核心功能配置系统效果展

SpringBoot整合mybatisPlus实现批量插入并获取ID详解

《SpringBoot整合mybatisPlus实现批量插入并获取ID详解》这篇文章主要为大家详细介绍了SpringBoot如何整合mybatisPlus实现批量插入并获取ID,文中的示例代码讲解详细... 目录【1】saveBATch(一万条数据总耗时:2478ms)【2】集合方式foreach(一万条数

python获取网页表格的多种方法汇总

《python获取网页表格的多种方法汇总》我们在网页上看到很多的表格,如果要获取里面的数据或者转化成其他格式,就需要将表格获取下来并进行整理,在Python中,获取网页表格的方法有多种,下面就跟随小编... 目录1. 使用Pandas的read_html2. 使用BeautifulSoup和pandas3.

SpringBoot UserAgentUtils获取用户浏览器的用法

《SpringBootUserAgentUtils获取用户浏览器的用法》UserAgentUtils是于处理用户代理(User-Agent)字符串的工具类,一般用于解析和处理浏览器、操作系统以及设备... 目录介绍效果图依赖封装客户端工具封装IP工具实体类获取设备信息入库介绍UserAgentUtils

C# foreach 循环中获取索引的实现方式

《C#foreach循环中获取索引的实现方式》:本文主要介绍C#foreach循环中获取索引的实现方式,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录一、手动维护索引变量二、LINQ Select + 元组解构三、扩展方法封装索引四、使用 for 循环替代

Linux下如何使用C++获取硬件信息

《Linux下如何使用C++获取硬件信息》这篇文章主要为大家详细介绍了如何使用C++实现获取CPU,主板,磁盘,BIOS信息等硬件信息,文中的示例代码讲解详细,感兴趣的小伙伴可以了解下... 目录方法获取CPU信息:读取"/proc/cpuinfo"文件获取磁盘信息:读取"/proc/diskstats"文

Vue3组件中getCurrentInstance()获取App实例,但是返回null的解决方案

《Vue3组件中getCurrentInstance()获取App实例,但是返回null的解决方案》:本文主要介绍Vue3组件中getCurrentInstance()获取App实例,但是返回nu... 目录vue3组件中getCurrentInstajavascriptnce()获取App实例,但是返回n

SpringMVC获取请求参数的方法

《SpringMVC获取请求参数的方法》:本文主要介绍SpringMVC获取请求参数的方法,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下... 目录1、通过ServletAPI获取2、通过控制器方法的形参获取请求参数3、@RequestParam4、@

深入理解Apache Kafka(分布式流处理平台)

《深入理解ApacheKafka(分布式流处理平台)》ApacheKafka作为现代分布式系统中的核心中间件,为构建高吞吐量、低延迟的数据管道提供了强大支持,本文将深入探讨Kafka的核心概念、架构... 目录引言一、Apache Kafka概述1.1 什么是Kafka?1.2 Kafka的核心概念二、Ka

Python获取C++中返回的char*字段的两种思路

《Python获取C++中返回的char*字段的两种思路》有时候需要获取C++函数中返回来的不定长的char*字符串,本文小编为大家找到了两种解决问题的思路,感兴趣的小伙伴可以跟随小编一起学习一下... 有时候需要获取C++函数中返回来的不定长的char*字符串,目前我找到两种解决问题的思路,具体实现如下: