手拉手springboot整合kafka发送消息

2024-05-31 20:12

本文主要是介绍手拉手springboot整合kafka发送消息,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

环境介绍
技术栈springboot+mybatis-plus+mysql+rocketmq
软件版本
mysql8
IDEAIntelliJ IDEA 2022.2.1
JDK17
Spring Boot3.1.7
kafka2.13-3.7.0

创建topic时,若不指定topic的分区(Partition主题分区数)数量使,则默认为1个分区(partition)

springboot加入依赖kafka

<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

加入spring-kafka依赖后,springboot自动装配好kafkaTemplate的Bean

application.yml配置连接kafka

spring:
kafka:
bootstrap-servers: 192.168.68.133:9092

生产者

发送消息

@Resource
private KafkaTemplate<String,String> kafkaTemplate;@Test
void kafkaSendTest(){
kafkaTemplate.send("kafkamsg01","hello kafka");
}

消费者

接收消息

@Component
public class KafkaConsumer {@KafkaListener(topics = {"kafkamsg01","test"},groupId = "123")
public void consume(String message){
System.out.println("接收到消息:"+message);
}}

若没有配置groupid

Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is java.lang.IllegalStateException: No group.id found in consumer config, container properties, or @KafkaListener annotation; a group.id is required when group management is used.

@Component
public class KafkaConsumer {@KafkaListener(topics = {"kafkamsg01","test"},groupId = "123")
public void consume(String message){
System.out.println("接收到消息:"+message);
}}

想从第一条消息开始读取(若同组的消费者已经消费过该主题,并且kafka已经保存了该消费者组的偏移量,则设置auto.offset.reset设置为earliest不生效,需要手动修改偏移量或使用新的消费者组)

application.yml需要将auto.offset.reset设置为earliest

spring:
kafka:
bootstrap-servers: 192.168.68.133:9092
consumer:
auto-offset-reset: earliest

Earliest:将偏移量重置为最早的偏移量

Latest: 将偏移量重置为最新的偏移量

None: 没有为消费者组找到以前的偏移量,向消费者抛出异常

Exception: 向消费者抛出异常

重置消费者组偏移量

./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group 123 --topic kafkamsg01 --reset-offsets --to-earliest –execute

重置完成

Spring-kafka生产者发送消息

.send与sendDefault()方法都返回CompletableFuture<String<k,v>>;

CompletableFuture类用于异步编程,表示异步计算结果。该特征使得调用者不必等待操作完成就可以继续执行其他任务,从而提高引用的响应速度和吞吐量

@Resource
private KafkaTemplate<String,String> kafkaTemplate;@Test
void kafkaSendTest(){
kafkaTemplate.send("kafkamsg01","hello kafka");
}

发送Message

@Test
void kafkaSendMessageTest1(){
//通过构建器模式创建Message
Message<String> message = MessageBuilder.withPayload("hello kafka send message")
.setHeader(KafkaHeaders.TOPIC,"kafkamsg01")
.build();
kafkaTemplate.send(message);
}

SendProducerRecord

String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers

@Test
void kafkaSendProducerRecordTest1() {
//参数 String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers
Headers headers = new RecordHeaders();
headers.add("msg","123".getBytes(StandardCharsets.UTF_8));
ProducerRecord<String,String> record = new ProducerRecord(
"kafkaTopic01",
0,
System.currentTimeMillis(),
"key",
"hello kafka send message");
kafkaTemplate.send(record);
}

默认主题发送消息

yml配置默认主题

template:
default-topic: default-topic

@Test
void kafkaSendDefaultTest01(){
kafkaTemplate.sendDefault(0,System.currentTimeMillis(),"key01","hello ");
}

发送Object消息

序列化默认为String

@Resource
private KafkaTemplate<String,Object> kafkaTemplate1;
@Test
void kafkaSendObject(){
MessageM messageM =MessageM.builder().userID(123).sn("xo1111").desc("测试").build();
//分区是null,kafka自行决定消息发送到哪个分区
kafkaTemplate1.sendDefault(null,System.currentTimeMillis(),"key01",messageM);
}

Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。 这种动作(网页,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。

这篇关于手拉手springboot整合kafka发送消息的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!


原文地址:
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.chinasem.cn/article/1018936

相关文章

关于跨域无效的问题及解决(java后端方案)

《关于跨域无效的问题及解决(java后端方案)》:本文主要介绍关于跨域无效的问题及解决(java后端方案),具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录通用后端跨域方法1、@CrossOrigin 注解2、springboot2.0 实现WebMvcConfig

Java SWT库详解与安装指南(最新推荐)

《JavaSWT库详解与安装指南(最新推荐)》:本文主要介绍JavaSWT库详解与安装指南,在本章中,我们介绍了如何下载、安装SWTJAR包,并详述了在Eclipse以及命令行环境中配置Java... 目录1. Java SWT类库概述2. SWT与AWT和Swing的区别2.1 历史背景与设计理念2.1.

使用SpringBoot整合Sharding Sphere实现数据脱敏的示例

《使用SpringBoot整合ShardingSphere实现数据脱敏的示例》ApacheShardingSphere数据脱敏模块,通过SQL拦截与改写实现敏感信息加密存储,解决手动处理繁琐及系统改... 目录痛点一:痛点二:脱敏配置Quick Start——Spring 显示配置:1.引入依赖2.创建脱敏

SpringBoot 中 CommandLineRunner的作用示例详解

《SpringBoot中CommandLineRunner的作用示例详解》SpringBoot提供的一种简单的实现方案就是添加一个model并实现CommandLineRunner接口,实现功能的... 目录1、CommandLineRunnerSpringBoot中CommandLineRunner的作用

Python使用smtplib库开发一个邮件自动发送工具

《Python使用smtplib库开发一个邮件自动发送工具》在现代软件开发中,自动化邮件发送是一个非常实用的功能,无论是系统通知、营销邮件、还是日常工作报告,Python的smtplib库都能帮助我们... 目录代码实现与知识点解析1. 导入必要的库2. 配置邮件服务器参数3. 创建邮件发送类4. 实现邮件

Java死锁问题解决方案及示例详解

《Java死锁问题解决方案及示例详解》死锁是指两个或多个线程因争夺资源而相互等待,导致所有线程都无法继续执行的一种状态,本文给大家详细介绍了Java死锁问题解决方案详解及实践样例,需要的朋友可以参考下... 目录1、简述死锁的四个必要条件:2、死锁示例代码3、如何检测死锁?3.1 使用 jstack3.2

Java日期类详解(最新推荐)

《Java日期类详解(最新推荐)》早期版本主要使用java.util.Date、java.util.Calendar等类,Java8及以后引入了新的日期和时间API(JSR310),包含在ja... 目录旧的日期时间API新的日期时间 API(Java 8+)获取时间戳时间计算与其他日期时间类型的转换Dur

java对接海康摄像头的完整步骤记录

《java对接海康摄像头的完整步骤记录》在Java中调用海康威视摄像头通常需要使用海康威视提供的SDK,下面这篇文章主要给大家介绍了关于java对接海康摄像头的完整步骤,文中通过代码介绍的非常详细,需... 目录一、开发环境准备二、实现Java调用设备接口(一)加载动态链接库(二)结构体、接口重定义1.类型

SpringBoot读取ZooKeeper(ZK)属性的方法实现

《SpringBoot读取ZooKeeper(ZK)属性的方法实现》本文主要介绍了SpringBoot读取ZooKeeper(ZK)属性的方法实现,强调使用@ConfigurationProperti... 目录1. 在配置文件中定义 ZK 属性application.propertiesapplicati

Java Multimap实现类与操作的具体示例

《JavaMultimap实现类与操作的具体示例》Multimap出现在Google的Guava库中,它为Java提供了更加灵活的集合操作,:本文主要介绍JavaMultimap实现类与操作的... 目录一、Multimap 概述Multimap 主要特点:二、Multimap 实现类1. ListMult