centos 7 kafka 2.10-0.10.2.0 集群

2023-10-14 14:08
文章标签 集群 2.10 centos kafka 2.0 0.10

本文主要是介绍centos 7 kafka 2.10-0.10.2.0 集群,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!


快速开始:
http://kafka.apache.org/quickstart

 关于kafka说明可以参考:
http://kafka.apache.org/documentation.html

   0) kafka集群的安装, 准备三台服务器

    server1:192.168.0.1
server2:192.168.0.2
server3:192.168.0.3
 

1) 下载kafka 2.10-0.10.2.0( http://kafka.apache.org/downloads.html)

2) 解压 
tar -zxvf kafka_ 2.10-0.10.2.0.tgz 
mv  kafka_ 2.10-0.10.2.0   /usr/local/kafka
          mkdir  /usr/local/kafka/logs

3) 配置
修改 /usr/local/kafka/config/server.properties, 其中broker.id, log.dirs, zookeeper.connect 必须根据实际情况进行修改,其他项根据需要自行斟酌。
如下:
#==========================================
     broker.id=1  
#port=9092   #默认
num.network.threads=3  
num.io.threads=8  
socket.send.buffer.bytes=1048576  
socket.receive.buffer.bytes=1048576  
socket.request.max.bytes=104857600  
    log.dir=/usr/local/kafka/logs  
num.partitions=2
num.recovery.threads.per.data.dir=1
log.retention.hours=168   
log.segment.bytes=536870912 
log.retention.check.interval.ms=300000
    zookeeper.connect=192.168.0.1:2181,192.168.0.2:2181,192.168.0.3:2181/ kafka
zookeeper.connection.timeout.ms=6000  
    #==========================================
   注:在zookeeper 创建一个   kafka  结点,这样好管理。


4) 启动kafka


  针对服务器server2,server3可以将server1复制到相应的目录:

     scp -r /usr/local/kafka root@192.168.0.2:/usr/local/
     scp -r /usr/local/kafka root@192.168.0.3:/usr/local/


cd /usr/local/kafka

修改三台机子: config/server.properties  
server1 :
broker.id=1
server2 :
broker.id=2
server3 :
broker.id=3
启动
    /usr/local/kafka/ bin/kafka-server-start.sh  /usr/local/kafka/config/server.properties  &  

  查看
     lsof -i:9092 

5) 创建Topic(包含一个分区,三个副本)

   /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper  192.168.0.1:2181,192.168.0.2:2181,192.168.0.3:2181/kafka  --replication-factor 3 --partitions 1 --topic   mytopic  

  #删除
  #/usr/local/kafka/bin/kafka-topics.sh --delete  --zookeeper 192.168.0.1:2181,192.168.0.2:2181,192.168.0.3:2181/kafka  --topic   mytopic


6) 查看topic情况

  /usr/local/kafka/bin/kafka-topics.sh --list --zookeeper 192.168.0.1:2181,192.168.0.2:2181,192.168.0.3:2181/kafka  


7) 创建发送者(发)
   ./bin/kafka-console-producer.sh --broker-list   192.168.0.1 : 9092,192.168.0.2:9092,192.168.0.3:9092   --topic mytopic  
my msg1
my msg2
^C

8)  创建消费者(收)
    /usr/local/kafka/bin/kafka-console-consumer.sh --zookeeper   192.168.0.1:2181,192.168.0.2:2181,192.168.0.3:2181/kafka     --topic   mytopic   --from-beginning
my msg1
my msg2
^C

9)  查看描述
   /usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper  192.168.0.1:2181,192.168.0.2:2181,192.168.0.3:2181/kafka  --topic
mytopic

10)  杀掉server1上的broker

kill `lsof -i:9092 | sed -n '2p' | awk '{print $2}'` 

  kill `ps -ef | grep kafka.Kafka | grep -v grep | awk '{print $2}'`


=============================================================

常用问题:

=============================================================

1、 启动失败 正则解释出错

[2017-04-29 19:25:54,810] FATAL  (kafka.Kafka$)
java.lang.VerifyError: Uninitialized object exists on backward branch 162
Exception Details:Location:scala/util/matching/Regex.unapplySeq(Lscala/util/matching/Regex$Match;)Lscala/Option; @216: gotoReason:Error exists in the bytecodeBytecode:0x0000000: 2bc6 000a 2bb6 00ef c700 07b2 0052 b02b0x0000010: b600 f2b6 00f3 2ab6 0054 4d59 c700 0b570x0000020: 2cc6 000d a700 c92c b600 f799 00c2 bb000x0000030: 6059 b200 65b2 006a 043e c700 0501 bf1d0x0000040: 2bb6 00f8 b600 74b6 0078 2bba 0100 00000x0000050: b200 93b6 0097 3a06 3a05 59c7 0005 01bf0x0000060: 3a04 1906 b200 93b6 009b a600 7619 04b20x0000070: 00a0 a600 09b2 00a0 a700 71bb 00a2 59190x0000080: 04b6 00a8 3a0b 2b19 0bb8 00fc b200 a0b70x0000090: 00ac 3a07 1907 3a08 1904 b600 afc0 00a40x00000a0: 3a09 1909 b200 a0a5 0034 bb00 a259 19090x00000b0: b600 a83a 0b2b 190b b800 fcb2 00a0 b7000x00000c0: ac3a 0a19 0819 0ab6 00b3 190a 3a08 19090x00000d0: b600 afc0 00a4 3a09 a7ff ca19 07a7 000c0x00000e0: 1904 1905 1906 b800 b9b7 00bc b02a 2bb60x00000f0: 00ef b601 02b0                         Stackmap Table:same_frame(@11)same_frame(@15)full_frame(@39,{Object[#2],Object[#34],Object[#86]},{Object[#86]})same_frame(@46)full_frame(@63,{Object[#2],Object[#34],Object[#86],Integer},{Uninitialized[#46],Uninitialized[#46],Object[#98]})full_frame(@96,{Object[#2],Object[#34],Object[#86],Integer,Top,Object[#206],Object[#208]},{Uninitialized[#46],Uninitialized[#46],Object[#164]})full_frame(@123,{Object[#2],Object[#34],Object[#86],Integer,Object[#164],Object[#206],Object[#208]},{Uninitialized[#46],Uninitialized[#46]})full_frame(@162,{Object[#2],Object[#34],Object[#86],Integer,Object[#164],Object[#206],Object[#208],Object[#162],Object[#162],Object[#164],Top,Object[#4]},{Uninitialized[#46],Uninitialized[#46]})full_frame(@219,{Object[#2],Object[#34],Object[#86],Integer,Object[#164],Object[#206],Object[#208],Object[#162],Object[#162],Object[#164],Top,Object[#4]},{Uninitialized[#46],Uninitialized[#46]})full_frame(@224,{Object[#2],Object[#34],Object[#86],Integer,Object[#164],Object[#206],Object[#208]},{Uninitialized[#46],Uninitialized[#46]})full_frame(@233,{Object[#2],Object[#34],Object[#86],Integer,Object[#164],Object[#206],Object[#208]},{Uninitialized[#46],Uninitialized[#46],Object[#4]})full_frame(@237,{Object[#2],Object[#34],Object[#86]},{})at scala.collection.immutable.StringLike.r(StringLike.scala:287)at scala.collection.immutable.StringLike.r$(StringLike.scala:287)at scala.collection.immutable.StringOps.r(StringOps.scala:29)at scala.collection.immutable.StringLike.r(StringLike.scala:276)at scala.collection.immutable.StringLike.r$(StringLike.scala:276)at scala.collection.immutable.StringOps.r(StringOps.scala:29)at kafka.cluster.EndPoint$.<init>(EndPoint.scala:29)at kafka.cluster.EndPoint$.<clinit>(EndPoint.scala)at kafka.server.Defaults$.<init>(KafkaConfig.scala:63)at kafka.server.Defaults$.<clinit>(KafkaConfig.scala)at kafka.server.KafkaConfig$.<init>(KafkaConfig.scala:616)at kafka.server.KafkaConfig$.<clinit>(KafkaConfig.scala)at kafka.server.KafkaServerStartable$.fromProps(KafkaServerStartable.scala:28)at kafka.Kafka$.main(Kafka.scala:58)at kafka.Kafka.main(Kafka.scala)



查了一天都无助了,最后查源代码,感觉好是jdk的问题,出错的jdk是:jdk-8u20-linux-x64.tar.gz ,换成 jdk-8u131-linux-x64.tar.gz 就能正常启动了。


2、doesn’t match stored brokerId 0 in meta.properties

错误的原因是log.dirs目录下的meta.properties中配置的broker.id和配置目录下的server.properties中的broker.id不一致了,解决问题的方法是将两者修改一致后再重启。

3、WARN Error while fetching metadata with correlation id  错误处理

[2016-10-14 06:36:18,401] WARN Error while fetching metadata with correlation id 0 : {test333=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2016-10-14 06:36:19,543] WARN Error while fetching metadata with correlation id 1 : {test333=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2016-10-14 06:36:19,680] WARN Error while fetching metadata with correlation id 2 : {test333=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2016-10-14 06:36:19,908] WARN Error while fetching metadata with correlation id 3 : {test333=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2016-10-14 06:36:20,116] WARN Error while fetching metadata with correlation id 4 : {test333=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2016-10-14 06:36:20,334] WARN Error while fetching metadata with correlation id 5 : {test333=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2016-10-14 06:36:20,505] WARN Error while fetching metadata with correlation id 6 : {test333=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2016-10-14 06:36:20,757] WARN Error while fetching metadata with correlation id 7 : {test333=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)

解决:config/server.properties kafka必须配置hostname


4、java.io.IOException: Connection to 127.0.0.1:9092 (id: 0 rack: null) failed

java.io.IOException: Connection to 127.0.0.1:9092 (id: 0 rack: null) failedat kafka.utils.NetworkClientBlockingOps$.awaitReady$1(NetworkClientBlockingOps.scala:84)at kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(NetworkClientBlockingOps.scala:94)at kafka.server.ReplicaFetcherThread.sendRequest(ReplicaFetcherThread.scala:244)at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:234)at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42)at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:118)at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:103)at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)

未有停机的情况现出这个错误,解决: rm -rf /usr/local/kafka/logs/*



关于kafka说明可以参考: http://kafka.apache.org/documentation.html

这篇关于centos 7 kafka 2.10-0.10.2.0 集群的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/211013

相关文章

深入理解Apache Kafka(分布式流处理平台)

《深入理解ApacheKafka(分布式流处理平台)》ApacheKafka作为现代分布式系统中的核心中间件,为构建高吞吐量、低延迟的数据管道提供了强大支持,本文将深入探讨Kafka的核心概念、架构... 目录引言一、Apache Kafka概述1.1 什么是Kafka?1.2 Kafka的核心概念二、Ka

Redis分片集群的实现

《Redis分片集群的实现》Redis分片集群是一种将Redis数据库分散到多个节点上的方式,以提供更高的性能和可伸缩性,本文主要介绍了Redis分片集群的实现,具有一定的参考价值,感兴趣的可以了解一... 目录1. Redis Cluster的核心概念哈希槽(Hash Slots)主从复制与故障转移2.

CentOS 7部署主域名服务器 DNS的方法

《CentOS7部署主域名服务器DNS的方法》文章详细介绍了在CentOS7上部署主域名服务器DNS的步骤,包括安装BIND服务、配置DNS服务、添加域名区域、创建区域文件、配置反向解析、检查配置... 目录1. 安装 BIND 服务和工具2.  配置 BIND 服务3 . 添加你的域名区域配置4.创建区域

Centos环境下Tomcat虚拟主机配置详细教程

《Centos环境下Tomcat虚拟主机配置详细教程》这篇文章主要讲的是在CentOS系统上,如何一步步配置Tomcat的虚拟主机,内容很简单,从目录准备到配置文件修改,再到重启和测试,手把手带你搞定... 目录1. 准备虚拟主机的目录和内容创建目录添加测试文件2. 修改 Tomcat 的 server.X

一文详解kafka开启kerberos认证的完整步骤

《一文详解kafka开启kerberos认证的完整步骤》这篇文章主要为大家详细介绍了kafka开启kerberos认证的完整步骤,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录一、kerberos安装部署二、准备机器三、Kerberos Server 安装1、配置krb5.con

Debezium 与 Apache Kafka 的集成方式步骤详解

《Debezium与ApacheKafka的集成方式步骤详解》本文详细介绍了如何将Debezium与ApacheKafka集成,包括集成概述、步骤、注意事项等,通过KafkaConnect,D... 目录一、集成概述二、集成步骤1. 准备 Kafka 环境2. 配置 Kafka Connect3. 安装 D

CentOS系统Maven安装教程分享

《CentOS系统Maven安装教程分享》本文介绍了如何在CentOS系统中安装Maven,并提供了一个简单的实际应用案例,安装Maven需要先安装Java和设置环境变量,Maven可以自动管理项目的... 目录准备工作下载并安装Maven常见问题及解决方法实际应用案例总结Maven是一个流行的项目管理工具

Java中Springboot集成Kafka实现消息发送和接收功能

《Java中Springboot集成Kafka实现消息发送和接收功能》Kafka是一个高吞吐量的分布式发布-订阅消息系统,主要用于处理大规模数据流,它由生产者、消费者、主题、分区和代理等组件构成,Ka... 目录一、Kafka 简介二、Kafka 功能三、POM依赖四、配置文件五、生产者六、消费者一、Kaf

Kafka拦截器的神奇操作方法

《Kafka拦截器的神奇操作方法》Kafka拦截器是一种强大的机制,用于在消息发送和接收过程中插入自定义逻辑,它们可以用于消息定制、日志记录、监控、业务逻辑集成、性能统计和异常处理等,本文介绍Kafk... 目录前言拦截器的基本概念Kafka 拦截器的定义和基本原理:拦截器是 Kafka 消息传递的不可或缺

centos7基于keepalived+nginx部署k8s1.26.0高可用集群

《centos7基于keepalived+nginx部署k8s1.26.0高可用集群》Kubernetes是一个开源的容器编排平台,用于自动化地部署、扩展和管理容器化应用程序,在生产环境中,为了确保集... 目录一、初始化(所有节点都执行)二、安装containerd(所有节点都执行)三、安装docker-