解决KafkaConsumer多线程接入不安全问题(spark streaming 消费kafka)

本文主要是介绍解决KafkaConsumer多线程接入不安全问题(spark streaming 消费kafka),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

使用场景:

设置并行度参数spark.streaming.concurrentJobs >1 时候,使用spark streaming消费kafka

异常信息:

There may be two or more tasks in one executor will use the same kafka consumer at the same time, then it will throw an exception: "KafkaConsumer is not safe for multi-threaded access"

JIRA-SPARK中已经提出的问题

https://issues.apache.org/jira/browse/SPARK-22606?jql=text ~ "spark.streaming.concurrentJobs"

解决办法:

第一种方案

PR地址:https://github.com/apache/spark/pull/19819

spark streaming消费kafka时候,默认开启了对kafkaconsumer进行缓存,通过存放到HashMap中实现,因此就需要有相应的key,才能找到具体到kafkaconsumer。

//原生的代码中是没有threadId变量的,通过加入线程id ,使得不同的线程不能同时使用同一个kafkaconsumerprivate case class CacheKey(groupId: String, topic: String, partition: Int, threadId: Long)private var cache: ju.LinkedHashMap[CacheKey, CachedKafkaConsumer[_, _]] = nullCachedKafkaConsumer.get[K, V](groupId, part.topic, part.partition, threadId, kafkaParams)

这个办法其实就是为缓存在map中的CachedKafkaConsumer对应的key增加了一个参数是线程id,使得不让多个线程使用同一个consumer,但是这种情况每一个task都需要去创建一个consumer,是消耗资源的。
PR中这样一句评论:
It will create a new consumer for each thread. This could be quite resource consuming when several topics shared with thread pools.

第二种方案

对spark-streaming-kafka中的CacheKafkaConsumer进行了重构,首先介绍几个类

//接口
KafkaDataConsumer//KafkaDataConsumer的实现类private case class CachedKafkaDataConsumer[K, V](internalConsumer: InternalKafkaConsumer[K, V])extends KafkaDataConsumer[K, V] {assert(internalConsumer.inUse)override def release(): Unit = KafkaDataConsumer.release(internalConsumer)}private case class NonCachedKafkaDataConsumer[K, V](internalConsumer: InternalKafkaConsumer[K, V])extends KafkaDataConsumer[K, V] {override def release(): Unit = internalConsumer.close()}//那么InternalKafkaConsumer是什么?其实对KafkaConsumer进行了封装而已,持有KafkaConsumer对象
private[kafka010] class InternalKafkaConsumer[K, V](val topicPartition: TopicPartition,val kafkaParams: ju.Map[String, Object])private[kafka010] case class CacheKey(groupId: String, topicPartition: TopicPartition)private[kafka010] var cache: ju.Map[CacheKey, InternalKafkaConsumer[_, _]] = null

那么为了防止一个executor中多个task同时使用同一个KafkaConsumer,如何解决呢?通过看如何获取的consumer即可看到解决方案!

def acquire[K, V](topicPartition: TopicPartition,kafkaParams: ju.Map[String, Object],context: TaskContext,useCache: Boolean): KafkaDataConsumer[K, V] = synchronized {val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]//根据groupId以及topicPartition创建相应的keyval key = new CacheKey(groupId, topicPartition)//根据key获得缓存的InternalKafkaConsumer对象,其实可以理解为KafkaConsumer对象,就是多了一层封装val existingInternalConsumer = cache.get(key)lazy val newInternalConsumer = new InternalKafkaConsumer[K, V](topicPartition, kafkaParams)//如果TaskContext不为null,同时task尝试次数大于等于1 if (context != null && context.attemptNumber >= 1) {logDebug(s"Reattempt detected, invalidating cached consumer $existingInternalConsumer")//如果缓存中存在该key的InternalKafkaConsumer对象if (existingInternalConsumer != null) {// 如果缓存中存在并且是使用状态,设置markedForClose=true,意思是说下一次release时候会将其关闭//如果缓存了并且非使用状态,那么直接关闭,并从缓存移除 if (existingInternalConsumer.inUse) {existingInternalConsumer.markedForClose = true} else {existingInternalConsumer.close()cache.remove(key)}}logDebug("Reattempt detected, new non-cached consumer will be allocated " +s"$newInternalConsumer")//这个最外层if分支创建新的consumer , 最后返回  NonCachedKafkaDataConsumerNonCachedKafkaDataConsumer(newInternalConsumer)} else if (!useCache) {//如果task重试次数小于1    或者 taskcontext不存在,并且没有使用缓存,直接创建NonCachedKafkaDataConsumer对象logDebug("Cache usage turned off, new non-cached consumer will be allocated " +s"$newInternalConsumer")NonCachedKafkaDataConsumer(newInternalConsumer)} else if (existingInternalConsumer == null) {//使用缓存了,但是缓存中不存在,直接创建CachedKafkaDataConsumerlogDebug("No cached consumer, new cached consumer will be allocated " +s"$newInternalConsumer")cache.put(key, newInternalConsumer)CachedKafkaDataConsumer(newInternalConsumer)} else if (existingInternalConsumer.inUse) {// 缓存中存在并且当前是在使用,那么创建一个新的InternalConsmer然后封装到NonCachedKafkaDataConsumer中返回logDebug("Used cached consumer found, new non-cached consumer will be allocated " +s"$newInternalConsumer")NonCachedKafkaDataConsumer(newInternalConsumer)} else {//缓存中存在并且没有被使用,直接设置为使用状态,然后封装到CachedKafkaDataConsumer中返回logDebug(s"Not used cached consumer found, re-using it $existingInternalConsumer")existingInternalConsumer.inUse = true// Any given TopicPartition should have a consistent key and value typeCachedKafkaDataConsumer(existingInternalConsumer.asInstanceOf[InternalKafkaConsumer[K, V]])}}

将InternalConsumer的markedForClose字段设置为true,意味着这个对象的kafkaconsumer对象要关闭

//KafkaRDD中增加了一个task完成监听器,如果任务完成调用closeIfNeeded方法
context.addTaskCompletionListener[Unit](_ => closeIfNeeded())def closeIfNeeded(): Unit = {if (consumer != null) {consumer.release()}}//上面的consumer是KafkaDataConsumer的子类的对象,其两个子类如下://   1:    internalConsumer会缓存private case class CachedKafkaDataConsumer[K, V](internalConsumer: InternalKafkaConsumer[K, V])extends KafkaDataConsumer[K, V] {assert(internalConsumer.inUse)//直接调用父类的release方法override def release(): Unit = KafkaDataConsumer.release(internalConsumer)}//看其父类的release方法private def release(internalConsumer: InternalKafkaConsumer[_, _]): Unit = synchronized {//获取internalConsumer的groupid topicpartition然后组成key,根据key去缓存查找相应的internalConsumer对象val key = new CacheKey(internalConsumer.groupId, internalConsumer.topicPartition)val cachedInternalConsumer = cache.get(key)//如果要释放的internalConsumer是缓存中存放的if (internalConsumer.eq(cachedInternalConsumer)) {// 标记为ture那么调用其close方法,然后从缓存移除if (internalConsumer.markedForClose) {internalConsumer.close()cache.remove(key)} else {//如果没有标记为true,意味着继续在缓存,不会移除,只是将其使用状态改为falseinternalConsumer.inUse = false}} else {// 这个对象没有被缓存过,或者 不等于缓存中的,直接关闭internalConsumer.close()logInfo(s"Released a supposedly cached consumer that was not found in the cache " +s"$internalConsumer")}}
}// 2 : internalConsumer不会缓存private case class NonCachedKafkaDataConsumer[K, V](internalConsumer: InternalKafkaConsumer[K, V])extends KafkaDataConsumer[K, V] {//直接调用其持有对象internalConsumer的close方法override def release(): Unit = internalConsumer.close()}//internalConsumer的close方法其实就是调用KafkaConsumer的close方法
def close(): Unit = consumer.close()//此处consumer是什么?
private val consumer = createConsumerprivate def createConsumer: KafkaConsumer[K, V] = {val updatedKafkaParams = KafkaConfigUpdater("executor", kafkaParams.asScala.toMap).setAuthenticationConfigIfNeeded().build()val c = new KafkaConsumer[K, V](updatedKafkaParams)val topics = ju.Arrays.asList(topicPartition)c.assign(topics)c}

通过这个方案,当我们没有使用缓存时候直接创建NonCachedKafkaDataConsumer对象,NonCachedKafkaDataConsumer对象封装了InternalConsumer, InternalConsumer对象中持有KafkaConsumer对象,InternalConsumer不会被缓存放到Map中。

当使用缓存时候,首先从根据groupid topicpartiiton组成的key,得到缓存的InternalConsumer对象,不存在就是null。

如果缓存不存在那么直接创建CachedKafkaDataConsumer对象,然后将这个对象引用的InternalConsumer对象缓存到Map中;

如果缓存已经存在了并且InternalConsumer当前是使用状态,那么直接创建NonCachedKafkaDataConsumer对象,这个对象持有的InternalConsumer对象是新建的,并不是缓存中的,虽然参数(topicpartition对象和kafkaParams)与缓存中的InternalConsumer是一样的;

如果缓存存在InternalConsumer并且不是使用状态,直接把缓存中的InternalConsumer设置为使用状态,然后封装到CachedKafkaDataConsumer中。

如果任务有重试,之前缓存的InternalConsumer如果是非使用状态,直接关闭并且缓存中移除;如果缓存的InternalConsumer是使用状态,将其标记为下一次release时候移除的状态,最后任务重试也需要相应的consumer,因此会返回一个NonCachedKafkaDataConsumer对象,并且里面的InternalConsumer对象是新建的,并没有使用缓存中的

这篇关于解决KafkaConsumer多线程接入不安全问题(spark streaming 消费kafka)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1064761

相关文章

Vue3绑定props默认值问题

《Vue3绑定props默认值问题》使用Vue3的defineProps配合TypeScript的interface定义props类型,并通过withDefaults设置默认值,使组件能安全访问传入的... 目录前言步骤步骤1:使用 defineProps 定义 Props步骤2:设置默认值总结前言使用T

504 Gateway Timeout网关超时的根源及完美解决方法

《504GatewayTimeout网关超时的根源及完美解决方法》在日常开发和运维过程中,504GatewayTimeout错误是常见的网络问题之一,尤其是在使用反向代理(如Nginx)或... 目录引言为什么会出现 504 错误?1. 探索 504 Gateway Timeout 错误的根源 1.1 后端

Web服务器-Nginx-高并发问题

《Web服务器-Nginx-高并发问题》Nginx通过事件驱动、I/O多路复用和异步非阻塞技术高效处理高并发,结合动静分离和限流策略,提升性能与稳定性... 目录前言一、架构1. 原生多进程架构2. 事件驱动模型3. IO多路复用4. 异步非阻塞 I/O5. Nginx高并发配置实战二、动静分离1. 职责2

解决升级JDK报错:module java.base does not“opens java.lang.reflect“to unnamed module问题

《解决升级JDK报错:modulejava.basedoesnot“opensjava.lang.reflect“tounnamedmodule问题》SpringBoot启动错误源于Jav... 目录问题描述原因分析解决方案总结问题描述启动sprintboot时报以下错误原因分析编程异js常是由Ja

Java Kafka消费者实现过程

《JavaKafka消费者实现过程》Kafka消费者通过KafkaConsumer类实现,核心机制包括偏移量管理、消费者组协调、批量拉取消息及多线程处理,手动提交offset确保数据可靠性,自动提交... 目录基础KafkaConsumer类分析关键代码与核心算法2.1 订阅与分区分配2.2 拉取消息2.3

深度剖析SpringBoot日志性能提升的原因与解决

《深度剖析SpringBoot日志性能提升的原因与解决》日志记录本该是辅助工具,却为何成了性能瓶颈,SpringBoot如何用代码彻底破解日志导致的高延迟问题,感兴趣的小伙伴可以跟随小编一起学习一下... 目录前言第一章:日志性能陷阱的底层原理1.1 日志级别的“双刃剑”效应1.2 同步日志的“吞吐量杀手”

MySQL 表空却 ibd 文件过大的问题及解决方法

《MySQL表空却ibd文件过大的问题及解决方法》本文给大家介绍MySQL表空却ibd文件过大的问题及解决方法,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考... 目录一、问题背景:表空却 “吃满” 磁盘的怪事二、问题复现:一步步编程还原异常场景1. 准备测试源表与数据

解决Nginx启动报错Job for nginx.service failed because the control process exited with error code问题

《解决Nginx启动报错Jobfornginx.servicefailedbecausethecontrolprocessexitedwitherrorcode问题》Nginx启... 目录一、报错如下二、解决原因三、解决方式总结一、报错如下Job for nginx.service failed bec

SysMain服务可以关吗? 解决SysMain服务导致的高CPU使用率问题

《SysMain服务可以关吗?解决SysMain服务导致的高CPU使用率问题》SysMain服务是超级预读取,该服务会记录您打开应用程序的模式,并预先将它们加载到内存中以节省时间,但它可能占用大量... 在使用电脑的过程中,CPU使用率居高不下是许多用户都遇到过的问题,其中名为SysMain的服务往往是罪魁

Python多线程实现大文件快速下载的代码实现

《Python多线程实现大文件快速下载的代码实现》在互联网时代,文件下载是日常操作之一,尤其是大文件,然而,网络条件不稳定或带宽有限时,下载速度会变得很慢,本文将介绍如何使用Python实现多线程下载... 目录引言一、多线程下载原理二、python实现多线程下载代码说明:三、实战案例四、注意事项五、总结引