kerberos kafka 问题解决

2024-08-28 14:58
文章标签 问题 解决 kafka kerberos

本文主要是介绍kerberos kafka 问题解决,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

修改 kafka界面 security.inter.broker.protocol SASL_PLAINTEXT
在这里插入图片描述

登录添加用户生成keytab

kadmin.local
add_principal flink/master@AQ.COM
add_principal flink/worker-1@AQ.COM
add_principal flink/worker-2@AQ.COM
生成文件
xst -k /usr/local/keytab/flink/flink-master.keytab flink/master@AQ.COM
xst -k /usr/local/keytab/flink/flink-worker-1.keytab flink/worker-1@AQ.COM
xst -k /usr/local/keytab/flink/flink-worker-2.keytab flink/worker-2@AQ.COM
或者
kadmin.local -q "xst -k /usr/local/keytab/flink/flink-master.keytab flink/master@AQ.COM"

keytab合并

ktutil
rkt /usr/local/keytab/flink/flink-master.keytab
rkt /usr/local/keytab/flink/flink-worker-1.keytab
rkt /usr/local/keytab/flink/flink-worker-2.keytab
wkt /usr/local/keytab/flink/flink.keytab

查看keytab

klist -ket user.keytab

kinit登录与kadmin.local不能同时存在

kinit -kt /usr/local/keytab/flink/flink-master.keytab flink/master@AQ.COM

删除认证缓存

kdestroy

kafka生产消费

1.客户端操作kafka生产消费需要jaas.conf以及各自的生产消费的配置文件
2.代码需要设置两个变量

jaas.conf

KafkaClient{com.sun.security.auth.module.Krb5LoginModule requiredkeyTab="/usr/local/keytab/flink/flink.keytab"principal="flink/master@AQ.COM"useKeyTab=trueuseTicketCache=true;
};
Client{com.sun.security.auth.module.Krb5LoginModule requiredkeyTab="/usr/local/keytab/flink/flink.keytab"principal="flink/master@AQ.COM"useKeyTab=trueuseTicketCache=true;
};
KafkaServer{com.sun.security.auth.module.Krb5LoginModule requiredkeyTab="/usr/local/keytab/flink/flink.keytab"principal="flink/master@AQ.COM"useKeyTab=trueuseTicketCache=true;
};

producer.properties

security.protocol=SASL_PLAINTEXT
sasl.mechanism=GSSAPI
sasl.kerberos.service.name=kafka

consumer.properties

security.protocol=SASL_PLAINTEXT
sasl.mechanism=GSSAPI
sasl.kerberos.service.name=kafka
group.id=testClient

客户端消费命令

export KAFKA_OPTS="-Djava.security.auth.login.config=/usr/local/keytab/kafka/jaas.conf"
kafka-console-consumer --bootstrap-server master:9092 --topic tes  --consumer.config /usr/local/keytab/kafka/consumer.properties
kafka-console-producer --broker-list master:9092 --topic tes --producer.config /usr/local/keytab/kafka/producer.properties

springboot-kafka配置问题
yml配置

  kafka:bootstrap-servers: master:9092,worker-2:9092producer:#procedure要求leader在考虑完成请求之前收到的确认数,用于控制发送记录在服务端的持久化,其值可以为如下:#acks = 0 如果设置为零,则生产者将不会等待来自服务器的任何确认,该记录将立即添加到套接字缓冲区并视为已发送。在这种情况下,无法保证服务器已收到记录,并且重试配置将不会生效(因为客户端通常不会知道任何故障),为每条记录返回的偏移量始终设置为-1。#acks = 1 这意味着leader会将记录写入其本地日志,但无需等待所有副本服务器的完全确认即可做出回应,在这种情况下,如果leader在确认记录后立即失败,但在将数据复制到所有的副本服务器之前,则记录将会丢失。#acks = all 这意味着leader将等待完整的同步副本集以确认记录,这保证了只要至少一个同步副本服务器仍然存活,记录就不会丢失,这是最强有力的保证,这相当于acks = -1的设置。#可以设置的值为:all, -1, 0, 1acks: 1key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer#=============== provider  =======================# 写入失败时,重试次数。当leader节点失效,一个repli节点会替代成为leader节点,此时可能出现写入失败,# 当retris为0时,produce不会重复。retirs重发,此时repli节点完全成为leader节点,不会产生消息丢失。retries: 0consumer:group-id: mysqltestUserBehaviorGroupenable-auto-commit: trueauto-offset-reset: latestkey-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerjaas:enabled: trueproperties:security.protocol: SASL_PLAINTEXTsasl.kerberos.service.name: kafka

java flink 消费时候getKerberosJaas就好


import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication(scanBasePackages = "org.fuwushe")
public class KafkaApplication {public static void main(String []args){getKerberosJaas();SpringApplication.run(KafkaApplication.class,args);}public static void getKerberosJaas(){System.setProperty("java.security.krb5.conf", "/etc/krb5.conf");//加载本地jass.conf文件System.setProperty("java.security.auth.login.config", "/usr/local/keytab/kafka/jaas.conf");//System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");//System.setProperty("sun.security.krb5.debug","true");//System.setProperty("java.security.auth.login.config", "/load/data/flink.keytab");}}import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.fuwushe.da.kafka.ConsumerListener;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;@Configuration
@EnableKafka
public class KafkaConsumerConfig {@Value("${spring.kafka.bootstrap-servers}")private String servers;@Value("${spring.kafka.consumer.enable-auto-commit}")private boolean enableAutoCommit;@Value("${spring.kafka.consumer.group-id}")private String groupId;@Value("${spring.kafka.consumer.auto-offset-reset}")private String autoOffsetReset;@Beanpublic KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.getContainerProperties().setPollTimeout(1500);return factory;}public ConsumerFactory<String, String> consumerFactory() {return new DefaultKafkaConsumerFactory<>(consumerConfigs());}public Map<String, Object> consumerConfigs() {Map<String, Object> propsMap = new HashMap<>();try {String addr = InetAddress.getLocalHost().getHostAddress().replaceAll("\\.", "");//获得本机IPpropsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId + addr);} catch (UnknownHostException e) {e.printStackTrace();}//        propsMap.put("security.protocol", "SASL_PLAINTEXT");
//        propsMap.put("sasl.kerberos.service.name", "kafka");
//        propsMap.put("java.security.auth.login.config","/load/data/jaas.conf");
//        propsMap.put("java.security.krb5.conf","/load/data/krb5.conf");propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);return propsMap;}/*** kafka监听* @return*/@Beanpublic ConsumerListener listener() {return new ConsumerListener();}}

这篇关于kerberos kafka 问题解决的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1115091

相关文章

解决mysql插入数据锁等待超时报错:Lock wait timeout exceeded;try restarting transaction

《解决mysql插入数据锁等待超时报错:Lockwaittimeoutexceeded;tryrestartingtransaction》:本文主要介绍解决mysql插入数据锁等待超时报... 目录报错信息解决办法1、数据库中执行如下sql2、再到 INNODB_TRX 事务表中查看总结报错信息Lock

MySQL启动报错:InnoDB表空间丢失问题及解决方法

《MySQL启动报错:InnoDB表空间丢失问题及解决方法》在启动MySQL时,遇到了InnoDB:Tablespace5975wasnotfound,该错误表明MySQL在启动过程中无法找到指定的s... 目录mysql 启动报错:InnoDB 表空间丢失问题及解决方法错误分析解决方案1. 启用 inno

Java使用MethodHandle来替代反射,提高性能问题

《Java使用MethodHandle来替代反射,提高性能问题》:本文主要介绍Java使用MethodHandle来替代反射,提高性能问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑... 目录一、认识MethodHandle1、简介2、使用方式3、与反射的区别二、示例1、基本使用2、(重要)

电脑蓝牙连不上怎么办? 5 招教你轻松修复Mac蓝牙连接问题的技巧

《电脑蓝牙连不上怎么办?5招教你轻松修复Mac蓝牙连接问题的技巧》蓝牙连接问题是一些Mac用户经常遇到的常见问题之一,在本文章中,我们将提供一些有用的提示和技巧,帮助您解决可能出现的蓝牙连接问... 蓝牙作为一种流行的无线技术,已经成为我们连接各种设备的重要工具。在 MAC 上,你可以根据自己的需求,轻松地

Java 中的跨域问题解决方法

《Java中的跨域问题解决方法》跨域问题本质上是浏览器的一种安全机制,与Java本身无关,但Java后端开发者需要理解其来源以便正确解决,下面给大家介绍Java中的跨域问题解决方法,感兴趣的朋友一起... 目录1、Java 中跨域问题的来源1.1. 浏览器同源策略(Same-Origin Policy)1.

如何清理MySQL中的binlog问题

《如何清理MySQL中的binlog问题》:本文主要介绍清理MySQL中的binlog问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目http://www.chinasem.cn录清理mysql中的binlog1.查看binlog过期时间2. 修改binlog过期

如何解决yum无法安装epel-release的问题

《如何解决yum无法安装epel-release的问题》:本文主要介绍如何解决yum无法安装epel-release的问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不... 目录yum无法安装epel-release尝试了第一种方法第二种方法(我就是用这种方法解决的)总结yum

SpringBoot实现Kafka动态反序列化的完整代码

《SpringBoot实现Kafka动态反序列化的完整代码》在分布式系统中,Kafka作为高吞吐量的消息队列,常常需要处理来自不同主题(Topic)的异构数据,不同的业务场景可能要求对同一消费者组内的... 目录引言一、问题背景1.1 动态反序列化的需求1.2 常见问题二、动态反序列化的核心方案2.1 ht

python3 pip终端出现错误解决的方法详解

《python3pip终端出现错误解决的方法详解》这篇文章主要为大家详细介绍了python3pip如果在终端出现错误该如何解决,文中的示例方法讲解详细,感兴趣的小伙伴可以跟随小编一起了解一下... 目录前言一、查看是否已安装pip二、查看是否添加至环境变量1.查看环境变量是http://www.cppcns

IDEA下"File is read-only"可能原因分析及"找不到或无法加载主类"的问题

《IDEA下Fileisread-only可能原因分析及找不到或无法加载主类的问题》:本文主要介绍IDEA下Fileisread-only可能原因分析及找不到或无法加载主类的问题,具有很好的参... 目录1.File is read-only”可能原因2.“找不到或无法加载主类”问题的解决总结1.File