Zabbix监控之从Kafka中获取消费进度和lag

2024-06-22 19:32

本文主要是介绍Zabbix监控之从Kafka中获取消费进度和lag,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

在0.9及之后的版本,kafka自身提供了存放消费进度的功能。本文讲解的是如何从kafka自身获取消费进度。从zookeeper中获取消费进度请阅读我的另一片文章传送门

https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka
这是官网上的教程,提供了scala版本的获取消费状态和提交消费状态的代码。仅供参考。

http://pykafka.readthedocs.io/en/latest/api/broker.html
这是pykafka官网提供获取消费状态的API,试过不知道怎么用,网上也找不到相关代码。

http://kafka-python.readthedocs.io/en/latest/usage.html
这是python-kafka官网,找不到想要的API,没试过。

获取消费进度之前,一定要先弄明白kafka的存储结构以及消费进度是存放在zookeeper中还是kafka中,否则可能会发现到头来,自己都不知道自己在干什么。以上几种方式我都试过,但是都没成功,最后选择命令行的方式获取到消费状态,将消费状态写入文件中,再解析文件。
Kafka管理工具
https://www.iteblog.com/archives/1605.html
http://orchome.com/454

使用指令可以获取该组下每个consumer的消费进度

/data/kafka_2.11-0.10.1.0/bin/kafka-consumer-groups.sh --bootstrap-server 10.12.11.131:9092 --group kafkaTestGroup --describe

然后再将其中的数据取出来,echo到文件中,可以使用crontab来执行指令,定时更新文件。

/data/kafka_2.11-0.10.1.0/bin/kafka-consumer-groups.sh --bootstrap-server 10.1.8.74:9092 --group datasync.server.10.1.2.118 --describe |grep datasync.server.10.1.2.118 | awk '{print $1,$2,$3,$4,$5,$6,$7}'

将消费状态存放在kafka.log文件中,再解析文件,我这里监控阀值设置为1000,将lag值大于1000的数据取出来并输出。下面是解析文件的python脚本。

#!/usr/bin/env python
#coding=utf-8import os.path
import time
import pdb
from fileStatus import Fileif __name__=="__main__":filePath='/data/python-scripts/inspector/AccountInspector/otherInspector/kafka.log'f=open(filePath,"r")columnNameList=['GROUP','TOPIC','PARTITION','CURRENT-OFFSET','LOG-END-OFFSET','LAG','OWNER']result='no topicPartition lag is over allowedRange'resultDic={}overAllowedLagDic={}for line in f:#使用命令行处理时有时会得到Consumer group is not exists 或者Consumer group is rebalancing等不正常的结果,这种数据忽略不处理if 'Consumer group' not in line: line=line.strip('\n')lineSplit=line.split(' ')dicKey=lineSplit[0]+'_'+lineSplit[1]+'_'+lineSplit[2]dicValue={}for i in range(0,len(lineSplit),1):dicValue[columnNameList[i]]=lineSplit[i]#由于我设置的阀值时lag值为1000时就告警,此处LOG-END-OFFSET就是logsize,当logsize小于1000时可以忽略(因为lag总是小于logsize的)if dicValue['LOG-END-OFFSET']<='1000':dicValue['CURRENT-OFFSET']='0'dicValue['LAG']='0'resultDic[dicKey]=dicValue#使用命令行有缺陷,经常会出现取出来的值为unknown的情况,出现这种情况也当作告警处理if dicValue['LAG'] == 'unknown':overAllowedLagDic[dicKey]=dicValueelse:if int(dicValue['LAG'])>1000:overAllowedLagDic[dicKey]=dicValueif len(overAllowedLagDic)>0:result=''for key in overAllowedLagDic:dicValue=overAllowedLagDic[key]lag=dicValue['LAG']result=key+':'+lag+'; '+resultprint result

方式很low,而且还有漏洞,后面有时间研究下使用API的方式获取消费进度。

这篇关于Zabbix监控之从Kafka中获取消费进度和lag的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1085208

相关文章

使用Python实现IP地址和端口状态检测与监控

《使用Python实现IP地址和端口状态检测与监控》在网络运维和服务器管理中,IP地址和端口的可用性监控是保障业务连续性的基础需求,本文将带你用Python从零打造一个高可用IP监控系统,感兴趣的小伙... 目录概述:为什么需要IP监控系统使用步骤说明1. 环境准备2. 系统部署3. 核心功能配置系统效果展

SpringBoot整合mybatisPlus实现批量插入并获取ID详解

《SpringBoot整合mybatisPlus实现批量插入并获取ID详解》这篇文章主要为大家详细介绍了SpringBoot如何整合mybatisPlus实现批量插入并获取ID,文中的示例代码讲解详细... 目录【1】saveBATch(一万条数据总耗时:2478ms)【2】集合方式foreach(一万条数

python获取网页表格的多种方法汇总

《python获取网页表格的多种方法汇总》我们在网页上看到很多的表格,如果要获取里面的数据或者转化成其他格式,就需要将表格获取下来并进行整理,在Python中,获取网页表格的方法有多种,下面就跟随小编... 目录1. 使用Pandas的read_html2. 使用BeautifulSoup和pandas3.

SpringBoot UserAgentUtils获取用户浏览器的用法

《SpringBootUserAgentUtils获取用户浏览器的用法》UserAgentUtils是于处理用户代理(User-Agent)字符串的工具类,一般用于解析和处理浏览器、操作系统以及设备... 目录介绍效果图依赖封装客户端工具封装IP工具实体类获取设备信息入库介绍UserAgentUtils

C# foreach 循环中获取索引的实现方式

《C#foreach循环中获取索引的实现方式》:本文主要介绍C#foreach循环中获取索引的实现方式,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录一、手动维护索引变量二、LINQ Select + 元组解构三、扩展方法封装索引四、使用 for 循环替代

Linux下如何使用C++获取硬件信息

《Linux下如何使用C++获取硬件信息》这篇文章主要为大家详细介绍了如何使用C++实现获取CPU,主板,磁盘,BIOS信息等硬件信息,文中的示例代码讲解详细,感兴趣的小伙伴可以了解下... 目录方法获取CPU信息:读取"/proc/cpuinfo"文件获取磁盘信息:读取"/proc/diskstats"文

Vue3组件中getCurrentInstance()获取App实例,但是返回null的解决方案

《Vue3组件中getCurrentInstance()获取App实例,但是返回null的解决方案》:本文主要介绍Vue3组件中getCurrentInstance()获取App实例,但是返回nu... 目录vue3组件中getCurrentInstajavascriptnce()获取App实例,但是返回n

SpringMVC获取请求参数的方法

《SpringMVC获取请求参数的方法》:本文主要介绍SpringMVC获取请求参数的方法,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下... 目录1、通过ServletAPI获取2、通过控制器方法的形参获取请求参数3、@RequestParam4、@

深入理解Apache Kafka(分布式流处理平台)

《深入理解ApacheKafka(分布式流处理平台)》ApacheKafka作为现代分布式系统中的核心中间件,为构建高吞吐量、低延迟的数据管道提供了强大支持,本文将深入探讨Kafka的核心概念、架构... 目录引言一、Apache Kafka概述1.1 什么是Kafka?1.2 Kafka的核心概念二、Ka

Python获取C++中返回的char*字段的两种思路

《Python获取C++中返回的char*字段的两种思路》有时候需要获取C++函数中返回来的不定长的char*字符串,本文小编为大家找到了两种解决问题的思路,感兴趣的小伙伴可以跟随小编一起学习一下... 有时候需要获取C++函数中返回来的不定长的char*字符串,目前我找到两种解决问题的思路,具体实现如下: