如何管理Spark Streaming消费Kafka的偏移量(三)

2024-05-15 03:08

本文主要是介绍如何管理Spark Streaming消费Kafka的偏移量(三),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

前面的文章已经介绍了在spark streaming集成kafka时,如何处理其偏移量的问题,由于spark streaming自带的checkpoint弊端非常明显,所以一些对数据一致性要求比较高的项目里面,不建议采用其自带的checkpoint来做故障恢复。

在spark streaming1.3之后的版本支持direct kafka stream,这种策略更加完善,放弃了原来使用Kafka的高级API自动保存数据的偏移量,之后的版本采用Simple API也就是更加偏底层的api,我们既可以用checkpoint来容灾,也可以通过低级api来获取偏移量自己管理偏移量,这样以来无论是程序升级,还是故障重启,在框架端都可以做到Exact One准确一次的语义。

本篇文章,会再介绍下,如何手动管理kafka的offset,并给出具体的代码加以分析:

版本:

apache spark streaming2.1

apache kafka 0.9.0.0

手动管理offset的注意点:

(1)第一次项目启动的时候,因为zk里面没有偏移量,所以使用KafkaUtils直接创建InputStream,默认是从最新的偏移量开始消费,这一点可以控制。

(2)如果非第一次启动,zk里面已经存在偏移量,所以我们读取zk的偏移量,并把它传入到KafkaUtils中,从上次结束时的偏移量开始消费处理。

(3)在foreachRDD里面,对每一个批次的数据处理之后,再次更新存在zk里面的偏移量

注意上面的3个步骤,1和2只会加载一次,第3个步骤是每个批次里面都会执行一次。

下面看第一和第二个步骤的核心代码:

/****
    *
    * @param ssc  StreamingContext
    * @param kafkaParams  配置kafka的参数
    * @param zkClient  zk连接的client
    * @param zkOffsetPath zk里面偏移量的路径
    * @param topics     需要处理的topic
    * @return   InputDStream[(String, String)] 返回输入流
    */def createKafkaStream(ssc: StreamingContext,kafkaParams: Map[String, String],zkClient: ZkClient,zkOffsetPath: String,topics: Set[String]): InputDStream[(String, String)]={//目前仅支持一个topic的偏移量处理,读取zk里面偏移量字符串val zkOffsetData=KafkaOffsetManager.readOffsets(zkClient,zkOffsetPath,topics.last)val kafkaStream = zkOffsetData match {case None =>  //如果从zk里面没有读到偏移量,就说明是系统第一次启动log.info("系统第一次启动,没有读取到偏移量,默认就最新的offset开始消费")//使用最新的偏移量创建DirectStreamKafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)case Some(lastStopOffset) =>log.info("从zk中读取到偏移量,从上次的偏移量开始消费数据......")val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key, mmd.message)//使用上次停止时候的偏移量创建DirectStreamKafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, lastStopOffset, messageHandler)}kafkaStream//返回创建的kafkaStream}

主要是针对第一次启动,和非首次启动做了不同的处理。

然后看下第三个步骤的代码:

/***** 保存每个批次的rdd的offset到zk中* @param zkClient zk连接的client* @param zkOffsetPath   偏移量路径* @param rdd     每个批次的rdd*/def saveOffsets(zkClient: ZkClient, zkOffsetPath: String, rdd: RDD[_]): Unit = {//转换rdd为Array[OffsetRange]val offsetsRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges//转换每个OffsetRange为存储到zk时的字符串格式 :  分区序号1:偏移量1,分区序号2:偏移量2,......val offsetsRangesStr = offsetsRanges.map(offsetRange => s"${offsetRange.partition}:${offsetRange.untilOffset}").mkString(",")log.debug(" 保存的偏移量:  "+offsetsRangesStr)//将最终的字符串结果保存到zk里面ZkUtils.updatePersistentPath(zkClient, zkOffsetPath, offsetsRangesStr)}

主要是更新每个批次的偏移量到zk中。

例子已经上传到github中,有兴趣的同学可以参考这个链接:

https://github.com/qindongliang/streaming-offset-to-zk

后续文章会聊一下为了升级应用如何优雅的关闭的流程序,以及在kafka扩展分区时,上面的程序如何自动兼容。

这篇关于如何管理Spark Streaming消费Kafka的偏移量(三)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/990637

相关文章

Linux创建服务使用systemctl管理详解

《Linux创建服务使用systemctl管理详解》文章指导在Linux中创建systemd服务,设置文件权限为所有者读写、其他只读,重新加载配置,启动服务并检查状态,确保服务正常运行,关键步骤包括权... 目录创建服务 /usr/lib/systemd/system/设置服务文件权限:所有者读写js,其他

在Node.js中使用.env文件管理环境变量的全过程

《在Node.js中使用.env文件管理环境变量的全过程》Node.js应用程序通常依赖于环境变量来管理敏感信息或配置设置,.env文件已经成为一种流行的本地管理这些变量的方法,本文将探讨.env文件... 目录引言为什么使php用 .env 文件 ?如何在 Node.js 中使用 .env 文件最佳实践引

python库pydantic数据验证和设置管理库的用途

《python库pydantic数据验证和设置管理库的用途》pydantic是一个用于数据验证和设置管理的Python库,它主要利用Python类型注解来定义数据模型的结构和验证规则,本文给大家介绍p... 目录主要特点和用途:Field数值验证参数总结pydantic 是一个让你能够 confidentl

SpringBoot 多环境开发实战(从配置、管理与控制)

《SpringBoot多环境开发实战(从配置、管理与控制)》本文详解SpringBoot多环境配置,涵盖单文件YAML、多文件模式、MavenProfile分组及激活策略,通过优先级控制灵活切换环境... 目录一、多环境开发基础(单文件 YAML 版)(一)配置原理与优势(二)实操示例二、多环境开发多文件版

Redis实现高效内存管理的示例代码

《Redis实现高效内存管理的示例代码》Redis内存管理是其核心功能之一,为了高效地利用内存,Redis采用了多种技术和策略,如优化的数据结构、内存分配策略、内存回收、数据压缩等,下面就来详细的介绍... 目录1. 内存分配策略jemalloc 的使用2. 数据压缩和编码ziplist示例代码3. 优化的

Java Kafka消费者实现过程

《JavaKafka消费者实现过程》Kafka消费者通过KafkaConsumer类实现,核心机制包括偏移量管理、消费者组协调、批量拉取消息及多线程处理,手动提交offset确保数据可靠性,自动提交... 目录基础KafkaConsumer类分析关键代码与核心算法2.1 订阅与分区分配2.2 拉取消息2.3

SpringBoot集成XXL-JOB实现任务管理全流程

《SpringBoot集成XXL-JOB实现任务管理全流程》XXL-JOB是一款轻量级分布式任务调度平台,功能丰富、界面简洁、易于扩展,本文介绍如何通过SpringBoot项目,使用RestTempl... 目录一、前言二、项目结构简述三、Maven 依赖四、Controller 代码详解五、Service

深入解析C++ 中std::map内存管理

《深入解析C++中std::map内存管理》文章详解C++std::map内存管理,指出clear()仅删除元素可能不释放底层内存,建议用swap()与空map交换以彻底释放,针对指针类型需手动de... 目录1️、基本清空std::map2️、使用 swap 彻底释放内存3️、map 中存储指针类型的对象

Linux系统管理与进程任务管理方式

《Linux系统管理与进程任务管理方式》本文系统讲解Linux管理核心技能,涵盖引导流程、服务控制(Systemd与GRUB2)、进程管理(前台/后台运行、工具使用)、计划任务(at/cron)及常用... 目录引言一、linux系统引导过程与服务控制1.1 系统引导的五个关键阶段1.2 GRUB2的进化优

Python利用PySpark和Kafka实现流处理引擎构建指南

《Python利用PySpark和Kafka实现流处理引擎构建指南》本文将深入解剖基于Python的实时处理黄金组合:Kafka(分布式消息队列)与PySpark(分布式计算引擎)的化学反应,并构建一... 目录引言:数据洪流时代的生存法则第一章 Kafka:数据世界的中央神经系统消息引擎核心设计哲学高吞吐