Kafka 实战演练:创建、配置与测试 Kafka全面教程

2024-09-07 12:36

本文主要是介绍Kafka 实战演练:创建、配置与测试 Kafka全面教程,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

  • 1.配置文件
  • 2.消费者
    • 1.注解方式
    • 2.KafkaConsumer
  • 3.依赖
    • 1.注解依赖
    • 2.KafkaConsumer依赖

本文档只是为了留档方便以后工作运维,或者给同事分享文档内容比较简陋命令也不是特别全,不适合小白观看,如有不懂可以私信,上班期间都是在得

1.配置文件

  1. Yml配置
spring:kafka:bootstrap-servers: consumer:group-id: iot-testaaaaaaaaaa11aaaaaaaaaauto-offset-reset: earliestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproperties:security:protocol: SASL_SSLsasl:mechanism: PLAINjaas:config: org.apache.kafka.common.security.plain.PlainLoginModule required username="" password="";ssl:truststore:type: JKSlocation: src/main/resources/client.truststore.jkspassword: endpoint.identification.algorithm:

yaml配置

2.消费者

1.注解方式

    @KafkaListener(topics = {"abcd"})public void listen(ConsumerRecord<?, ?> record){Optional<?> kafkaMessage = Optional.ofNullable(record.value());if (kafkaMessage.isPresent()) {Object message = kafkaMessage.get();System.out.println("---->"+record);System.out.println("---->"+message);}}

注解方式

2.KafkaConsumer

/*** @author XHao*/
public class MqsConsumer {public static final String CONFIG_CONSUMER_FILE_NAME = "mqs.sdk.consumer.properties";private KafkaConsumer<Object, Object> consumer;MqsConsumer(String path) {Properties props = new Properties();try {InputStream in = new BufferedInputStream(new FileInputStream(path));props.load(in);} catch (IOException e) {e.printStackTrace();return;}consumer = new KafkaConsumer<Object, Object>(props);}public MqsConsumer() {Properties props = new Properties();try {props = loadFromClasspath(CONFIG_CONSUMER_FILE_NAME);} catch (IOException e) {e.printStackTrace();return;}consumer = new KafkaConsumer<Object, Object>(props);}public void consume(List topics) {consumer.subscribe(topics);}public ConsumerRecords<Object, Object> poll(long timeout) {return consumer.poll(timeout);}public void close() {consumer.close();}/*** get classloader from thread context if no classloader found in thread* context return the classloader which has loaded this class** @return classloader*/public static ClassLoader getCurrentClassLoader() {ClassLoader classLoader = Thread.currentThread().getContextClassLoader();if (classLoader == null) {classLoader = MqsConsumer.class.getClassLoader();}return classLoader;}/*** 从classpath 加载配置信息** @param configFileName 配置文件名称* @return 配置信息* @throws IOException*/public static Properties loadFromClasspath(String configFileName) throws IOException {ClassLoader classLoader = getCurrentClassLoader();Properties config = new Properties();List<URL> properties = new ArrayList<URL>();Enumeration<URL> propertyResources = classLoader.getResources(configFileName);while (propertyResources.hasMoreElements()) {properties.add(propertyResources.nextElement());}for (URL url : properties) {InputStream is = null;try {is = url.openStream();config.load(is);} finally {if (is != null) {is.close();is = null;}}}return config;}
}

在这里插入图片描述

3.依赖

1.注解依赖

      <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>

2.KafkaConsumer依赖

        <dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>1.1.0</version></dependency>

可视化大屏项目经常用到消息转换,实时状态等 记录一下吧

如果点赞多,评论多会更新详细教程,待补充。

这篇关于Kafka 实战演练:创建、配置与测试 Kafka全面教程的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1145071

相关文章

SpringBoot3.4配置校验新特性的用法详解

《SpringBoot3.4配置校验新特性的用法详解》SpringBoot3.4对配置校验支持进行了全面升级,这篇文章为大家详细介绍了一下它们的具体使用,文中的示例代码讲解详细,感兴趣的小伙伴可以参考... 目录基本用法示例定义配置类配置 application.yml注入使用嵌套对象与集合元素深度校验开发

使用Python和Pyecharts创建交互式地图

《使用Python和Pyecharts创建交互式地图》在数据可视化领域,创建交互式地图是一种强大的方式,可以使受众能够以引人入胜且信息丰富的方式探索地理数据,下面我们看看如何使用Python和Pyec... 目录简介Pyecharts 简介创建上海地图代码说明运行结果总结简介在数据可视化领域,创建交互式地

springboot使用Scheduling实现动态增删启停定时任务教程

《springboot使用Scheduling实现动态增删启停定时任务教程》:本文主要介绍springboot使用Scheduling实现动态增删启停定时任务教程,具有很好的参考价值,希望对大家有... 目录1、配置定时任务需要的线程池2、创建ScheduledFuture的包装类3、注册定时任务,增加、删

IntelliJ IDEA 中配置 Spring MVC 环境的详细步骤及问题解决

《IntelliJIDEA中配置SpringMVC环境的详细步骤及问题解决》:本文主要介绍IntelliJIDEA中配置SpringMVC环境的详细步骤及问题解决,本文分步骤结合实例给大... 目录步骤 1:创建 Maven Web 项目步骤 2:添加 Spring MVC 依赖1、保存后执行2、将新的依赖

SpringBoot基于配置实现短信服务策略的动态切换

《SpringBoot基于配置实现短信服务策略的动态切换》这篇文章主要为大家详细介绍了SpringBoot在接入多个短信服务商(如阿里云、腾讯云、华为云)后,如何根据配置或环境切换使用不同的服务商,需... 目录目标功能示例配置(application.yml)配置类绑定短信发送策略接口示例:阿里云 & 腾

如何为Yarn配置国内源的详细教程

《如何为Yarn配置国内源的详细教程》在使用Yarn进行项目开发时,由于网络原因,直接使用官方源可能会导致下载速度慢或连接失败,配置国内源可以显著提高包的下载速度和稳定性,本文将详细介绍如何为Yarn... 目录一、查询当前使用的镜像源二、设置国内源1. 设置为淘宝镜像源2. 设置为其他国内源三、还原为官方

CentOS7更改默认SSH端口与配置指南

《CentOS7更改默认SSH端口与配置指南》SSH是Linux服务器远程管理的核心工具,其默认监听端口为22,由于端口22众所周知,这也使得服务器容易受到自动化扫描和暴力破解攻击,本文将系统性地介绍... 目录引言为什么要更改 SSH 默认端口?步骤详解:如何更改 Centos 7 的 SSH 默认端口1

Maven的使用和配置国内源的保姆级教程

《Maven的使用和配置国内源的保姆级教程》Maven是⼀个项目管理工具,基于POM(ProjectObjectModel,项目对象模型)的概念,Maven可以通过一小段描述信息来管理项目的构建,报告... 目录1. 什么是Maven?2.创建⼀个Maven项目3.Maven 核心功能4.使用Maven H

SpringBoot多数据源配置完整指南

《SpringBoot多数据源配置完整指南》在复杂的企业应用中,经常需要连接多个数据库,SpringBoot提供了灵活的多数据源配置方式,以下是详细的实现方案,需要的朋友可以参考下... 目录一、基础多数据源配置1. 添加依赖2. 配置多个数据源3. 配置数据源Bean二、JPA多数据源配置1. 配置主数据

Spring 基于XML配置 bean管理 Bean-IOC的方法

《Spring基于XML配置bean管理Bean-IOC的方法》:本文主要介绍Spring基于XML配置bean管理Bean-IOC的方法,本文给大家介绍的非常详细,对大家的学习或工作具有一... 目录一. spring学习的核心内容二. 基于 XML 配置 bean1. 通过类型来获取 bean2. 通过