Kafka-Spark Streaming 异常: dead for group td_topic_advert_impress_blacklist

2024-05-03 06:18

本文主要是介绍Kafka-Spark Streaming 异常: dead for group td_topic_advert_impress_blacklist,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

 

最近在编写Spark Streaming 作业的时候,遇到了一个比较奇怪的问题,表现如下:

 

在本地连接Kafka 集群执行作业:

18/10/31 17:42:58 INFO AbstractCoordinator: Discovered coordinator kafka1:9092 (id: 2147483574 rack: null) for group td_topic_advert_impress_blacklist.
18/10/31 17:43:00 INFO AbstractCoordinator: Marking the coordinator kafka1:9092 (id: 2147483574 rack: null) dead for group td_topic_advert_impress_blacklist

18/10/31 17:42:58 INFO AbstractCoordinator: Discovered coordinator kafka1:9092 (id: 2147483574 rack: null) for group td_topic_advert_impress_blacklist.
18/10/31 17:43:00 INFO AbstractCoordinator: Marking the coordinator kafka1:9092 (id: 2147483574 rack: null) dead for group td_topic_advert_impress_blacklist

 

仅从日志我们极难定位问题的原因。经过了一大堆查找,baidu, github..

我将spark-streaming 的日志级别 从 INFO 换成了 DEBUG 终于找到了问题的原因 ,

 

设置日志级别的代码如下

        JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);javaSparkContext.setLogLevel("DEBUG");

 

出现的问题:

18/10/31 17:40:08 DEBUG KafkaConsumer: Starting the Kafka consumer
18/10/31 17:40:08 DEBUG Metadata: Updated cluster metadata version 1 to Cluster(id = null, nodes = [10.170.0.8:9092 (id: -3 rack: null), 10.170.0.7:9092 (id: -2 rack: null), 10.170.0.6:9092 (id: -1 rack: null)], partitions = [])
18/10/31 17:40:08 DEBUG Metrics: Added sensor with name fetch-throttle-time
18/10/31 17:40:08 DEBUG Metrics: Added sensor with name connections-closed:
18/10/31 17:40:08 DEBUG Metrics: Added sensor with name connections-created:
18/10/31 17:40:08 DEBUG Metrics: Added sensor with name bytes-sent-received:
18/10/31 17:40:08 DEBUG Metrics: Added sensor with name bytes-sent:
18/10/31 17:40:08 DEBUG Metrics: Added sensor with name bytes-received:
18/10/31 17:40:08 DEBUG Metrics: Added sensor with name select-time:
18/10/31 17:40:08 DEBUG Metrics: Added sensor with name io-time:
18/10/31 17:40:08 DEBUG Metrics: Added sensor with name heartbeat-latency
18/10/31 17:40:08 DEBUG Metrics: Added sensor with name join-latency
18/10/31 17:40:08 DEBUG Metrics: Added sensor with name sync-latency
18/10/31 17:40:08 DEBUG Metrics: Added sensor with name commit-latency
18/10/31 17:40:08 DEBUG Metrics: Added sensor with name bytes-fetched
18/10/31 17:40:08 DEBUG Metrics: Added sensor with name records-fetched
18/10/31 17:40:08 DEBUG Metrics: Added sensor with name fetch-latency
18/10/31 17:40:08 DEBUG Metrics: Added sensor with name records-lag
18/10/31 17:40:08 INFO AppInfoParser: Kafka version : 0.11.0.0
18/10/31 17:40:08 INFO AppInfoParser: Kafka commitId : cb8625948210849f
18/10/31 17:40:08 DEBUG KafkaConsumer: Kafka consumer created
18/10/31 17:40:08 DEBUG KafkaConsumer: Subscribed to topic(s): topic_advert_impress
18/10/31 17:40:08 DEBUG AbstractCoordinator: Sending GroupCoordinator request for group td_topic_advert_impress_blacklist to broker 10.170.0.6:9092 (id: -1 rack: null)
18/10/31 17:40:08 DEBUG NetworkClient: Initiating connection to node -1 at 10.170.0.6:9092.
18/10/31 17:40:08 DEBUG Metrics: Added sensor with name node--1.bytes-sent
18/10/31 17:40:08 DEBUG Metrics: Added sensor with name node--1.bytes-received
18/10/31 17:40:08 DEBUG Metrics: Added sensor with name node--1.latency
18/10/31 17:40:08 DEBUG Selector: Created socket with SO_RCVBUF = 65536, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node -1
18/10/31 17:40:08 DEBUG NetworkClient: Completed connection to node -1.  Fetching API versions.
18/10/31 17:40:08 DEBUG NetworkClient: Initiating API versions fetch from node -1.
18/10/31 17:40:08 DEBUG NetworkClient: Initialize connection to node -3 for sending metadata request
18/10/31 17:40:08 DEBUG NetworkClient: Initiating connection to node -3 at 10.170.0.8:9092.
18/10/31 17:40:08 DEBUG NetworkClient: Recorded API versions for node -1: (Produce(0): 0 to 3 [usable: 3], Fetch(1): 0 to 5 [usable: 5], Offsets(2): 0 to 2 [usable: 2], Metadata(3): 0 to 4 [usable: 4], LeaderAndIsr(4): 0 [usable: 0], StopReplica(5): 0 [usable: 0], UpdateMetadata(6): 0 to 3 [usable: 3], ControlledShutdown(7): 1 [usable: 1], OffsetCommit(8): 0 to 3 [usable: 3], OffsetFetch(9): 0 to 3 [usable: 3], FindCoordinator(10): 0 to 1 [usable: 1], JoinGroup(11): 0 to 2 [usable: 2], Heartbeat(12): 0 to 1 [usable: 1], LeaveGroup(13): 0 to 1 [usable: 1], SyncGroup(14): 0 to 1 [usable: 1], DescribeGroups(15): 0 to 1 [usable: 1], ListGroups(16): 0 to 1 [usable: 1], SaslHandshake(17): 0 [usable: 0], ApiVersions(18): 0 to 1 [usable: 1], CreateTopics(19): 0 to 2 [usable: 2], DeleteTopics(20): 0 to 1 [usable: 1], DeleteRecords(21): 0 [usable: 0], InitProducerId(22): 0 [usable: 0], OffsetForLeaderEpoch(23): 0 [usable: 0], AddPartitionsToTxn(24): 0 [usable: 0], AddOffsetsToTxn(25): 0 [usable: 0], EndTxn(26): 0 [usable: 0], WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 [usable: 0], DescribeAcls(29): 0 [usable: 0], CreateAcls(30): 0 [usable: 0], DeleteAcls(31): 0 [usable: 0], DescribeConfigs(32): 0 [usable: 0], AlterConfigs(33): 0 [usable: 0])
18/10/31 17:40:08 DEBUG NetworkClient: Sending metadata request (type=MetadataRequest, topics=topic_advert_impress) to node -1
18/10/31 17:40:08 DEBUG Metrics: Added sensor with name node--3.bytes-sent
18/10/31 17:40:08 DEBUG Metrics: Added sensor with name node--3.bytes-received
18/10/31 17:40:08 DEBUG Metrics: Added sensor with name node--3.latency
18/10/31 17:40:08 DEBUG Selector: Created socket with SO_RCVBUF = 65536, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node -3
18/10/31 17:40:08 DEBUG NetworkClient: Completed connection to node -3.  Fetching API versions.
18/10/31 17:40:08 DEBUG NetworkClient: Initiating API versions fetch from node -3.
18/10/31 17:40:08 DEBUG Metadata: Updated cluster metadata version 2 to Cluster(id = -dsyi_nCTMGfFcaeYx-9Wg, nodes = [kafka3:9092 (id: 72 rack: null), kafka2:9092 (id: 71 rack: null), kafka1:9092 (id: 73 rack: null)], partitions = [Partition(topic = topic_advert_impress, partition = 3, leader = 72, replicas = [72,71,73], isr = [71,72,73]), Partition(topic = topic_advert_impress, partition = 2, leader = 71, replicas = [71,72,73], isr = [71,72,73]), Partition(topic = topic_advert_impress, partition = 5, leader = 71, replicas = [71,73,72], isr = [71,72,73]), Partition(topic = topic_advert_impress, partition = 4, leader = 73, replicas = [73,72,71], isr = [71,72,73]), Partition(topic = topic_advert_impress, partition = 1, leader = 73, replicas = [73,71,72], isr = [71,72,73]), Partition(topic = topic_advert_impress, partition = 0, leader = 72, replicas = [72,73,71], isr = [71,72,73]), Partition(topic = topic_advert_impress, partition = 7, leader = 73, replicas = [73,71,72], isr = [71,72,73]), Partition(topic = topic_advert_impress, partition = 6, leader = 72, replicas = [72,73,71], isr = [71,72,73]), Partition(topic = topic_advert_impress, partition = 8, leader = 71, replicas = [71,72,73], isr = [71,72,73])])
18/10/31 17:40:08 DEBUG NetworkClient: Recorded API versions for node -3: (Produce(0): 0 to 3 [usable: 3], Fetch(1): 0 to 5 [usable: 5], Offsets(2): 0 to 2 [usable: 2], Metadata(3): 0 to 4 [usable: 4], LeaderAndIsr(4): 0 [usable: 0], StopReplica(5): 0 [usable: 0], UpdateMetadata(6): 0 to 3 [usable: 3], ControlledShutdown(7): 1 [usable: 1], OffsetCommit(8): 0 to 3 [usable: 3], OffsetFetch(9): 0 to 3 [usable: 3], FindCoordinator(10): 0 to 1 [usable: 1], JoinGroup(11): 0 to 2 [usable: 2], Heartbeat(12): 0 to 1 [usable: 1], LeaveGroup(13): 0 to 1 [usable: 1], SyncGroup(14): 0 to 1 [usable: 1], DescribeGroups(15): 0 to 1 [usable: 1], ListGroups(16): 0 to 1 [usable: 1], SaslHandshake(17): 0 [usable: 0], ApiVersions(18): 0 to 1 [usable: 1], CreateTopics(19): 0 to 2 [usable: 2], DeleteTopics(20): 0 to 1 [usable: 1], DeleteRecords(21): 0 [usable: 0], InitProducerId(22): 0 [usable: 0], OffsetForLeaderEpoch(23): 0 [usable: 0], AddPartitionsToTxn(24): 0 [usable: 0], AddOffsetsToTxn(25): 0 [usable: 0], EndTxn(26): 0 [usable: 0], WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 [usable: 0], DescribeAcls(29): 0 [usable: 0], CreateAcls(30): 0 [usable: 0], DeleteAcls(31): 0 [usable: 0], DescribeConfigs(32): 0 [usable: 0], AlterConfigs(33): 0 [usable: 0])
18/10/31 17:40:08 DEBUG AbstractCoordinator: Received GroupCoordinator response ClientResponse(receivedTimeMs=1540978808909, latencyMs=125, disconnected=false, requestHeader={api_key=10,api_version=1,correlation_id=0,client_id=consumer-1}, responseBody=FindCoordinatorResponse(throttleTimeMs=0, errorMessage='null', error=NONE, node=kafka1:9092 (id: 73 rack: null))) for group td_topic_advert_impress_blacklist
18/10/31 17:40:08 INFO AbstractCoordinator: Discovered coordinator kafka1:9092 (id: 2147483574 rack: null) for group td_topic_advert_impress_blacklist.
18/10/31 17:40:08 DEBUG NetworkClient: Initiating connection to node 2147483574 at kafka1:9092.
18/10/31 17:40:20 DEBUG NetworkClient: Error connecting to node 2147483574 at kafka1:9092:
java.io.IOException: Can't resolve address: kafka1:9092at org.apache.kafka.common.network.Selector.connect(Selector.java:195)at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:762)at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:224)at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.tryConnect(ConsumerNetworkClient.java:462)at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$GroupCoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:598)at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$GroupCoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:579)at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:488)at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:348)at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208)at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:184)at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:214)at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:200)at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.paranoidPoll(DirectKafkaInputDStream.scala:165)at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:243)at org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54)at org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54)at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach_quick(ParArray.scala:143)at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach(ParArray.scala:136)at scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972)at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)at scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969)at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152)at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.nio.channels.UnresolvedAddressExceptionat sun.nio.ch.Net.checkAddress(Net.java:123)at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:622)at org.apache.kafka.common.network.Selector.connect(Selector.java:192)... 37 more
18/10/31 17:40:20 INFO AbstractCoordinator: Marking the coordinator kafka1:9092 (id: 2147483574 rack: null) dead for group td_topic_advert_impress_blacklist
18/10/31 17:40:20 DEBUG AbstractCoordinator: Sending GroupCoordinator request for group td_topic_advert_impress_blacklist to broker kafka3:9092 (id: 72 rack: null)

 

这样我们就定位到了最终的问题:

java.io.IOException: Can't resolve address: kafka1:9092at org.apache.kafka.common.network.Selector.connect(Selector.java:195)at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:762)at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:224)at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.tryConnect(Co

 

可以看到就是域名不解析的问题:

我们只需要修改 windows 下的  host 文件即可

 

这篇关于Kafka-Spark Streaming 异常: dead for group td_topic_advert_impress_blacklist的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/956012

相关文章

Python异常处理之避免try-except滥用的3个核心原则

《Python异常处理之避免try-except滥用的3个核心原则》在Python开发中,异常处理是保证程序健壮性的关键机制,本文结合真实案例与Python核心机制,提炼出避免异常滥用的三大原则,有需... 目录一、精准打击:只捕获可预见的异常类型1.1 通用异常捕获的陷阱1.2 精准捕获的实践方案1.3

Python函数的基本用法、返回值特性、全局变量修改及异常处理技巧

《Python函数的基本用法、返回值特性、全局变量修改及异常处理技巧》本文将通过实际代码示例,深入讲解Python函数的基本用法、返回值特性、全局变量修改以及异常处理技巧,感兴趣的朋友跟随小编一起看看... 目录一、python函数定义与调用1.1 基本函数定义1.2 函数调用二、函数返回值详解2.1 有返

java.sql.SQLTransientConnectionException连接超时异常原因及解决方案

《java.sql.SQLTransientConnectionException连接超时异常原因及解决方案》:本文主要介绍java.sql.SQLTransientConnectionExcep... 目录一、引言二、异常信息分析三、可能的原因3.1 连接池配置不合理3.2 数据库负载过高3.3 连接泄漏

Python中 try / except / else / finally 异常处理方法详解

《Python中try/except/else/finally异常处理方法详解》:本文主要介绍Python中try/except/else/finally异常处理方法的相关资料,涵... 目录1. 基本结构2. 各部分的作用tryexceptelsefinally3. 执行流程总结4. 常见用法(1)多个e

Debian 13升级后网络转发等功能异常怎么办? 并非错误而是管理机制变更

《Debian13升级后网络转发等功能异常怎么办?并非错误而是管理机制变更》很多朋友反馈,更新到Debian13后网络转发等功能异常,这并非BUG而是Debian13Trixie调整... 日前 Debian 13 Trixie 发布后已经有众多网友升级到新版本,只不过升级后发现某些功能存在异常,例如网络转

C#文件复制异常:"未能找到文件"的解决方案与预防措施

《C#文件复制异常:未能找到文件的解决方案与预防措施》在C#开发中,文件操作是基础中的基础,但有时最基础的File.Copy()方法也会抛出令人困惑的异常,当targetFilePath设置为D:2... 目录一个看似简单的文件操作问题问题重现与错误分析错误代码示例错误信息根本原因分析全面解决方案1. 确保

Java Kafka消费者实现过程

《JavaKafka消费者实现过程》Kafka消费者通过KafkaConsumer类实现,核心机制包括偏移量管理、消费者组协调、批量拉取消息及多线程处理,手动提交offset确保数据可靠性,自动提交... 目录基础KafkaConsumer类分析关键代码与核心算法2.1 订阅与分区分配2.2 拉取消息2.3

Java利用@SneakyThrows注解提升异常处理效率详解

《Java利用@SneakyThrows注解提升异常处理效率详解》这篇文章将深度剖析@SneakyThrows的原理,用法,适用场景以及隐藏的陷阱,看看它如何让Java异常处理效率飙升50%,感兴趣的... 目录前言一、检查型异常的“诅咒”:为什么Java开发者讨厌它1.1 检查型异常的痛点1.2 为什么说

Python利用PySpark和Kafka实现流处理引擎构建指南

《Python利用PySpark和Kafka实现流处理引擎构建指南》本文将深入解剖基于Python的实时处理黄金组合:Kafka(分布式消息队列)与PySpark(分布式计算引擎)的化学反应,并构建一... 目录引言:数据洪流时代的生存法则第一章 Kafka:数据世界的中央神经系统消息引擎核心设计哲学高吞吐

Java异常捕获及处理方式详解

《Java异常捕获及处理方式详解》异常处理是Java编程中非常重要的一部分,它允许我们在程序运行时捕获并处理错误或不预期的行为,而不是让程序直接崩溃,本文将介绍Java中如何捕获异常,以及常用的异常处... 目录前言什么是异常?Java异常的基本语法解释:1. 捕获异常并处理示例1:捕获并处理单个异常解释: