SpringBoot集成kafka-生产者发送消息

2024-08-23 02:44

本文主要是介绍SpringBoot集成kafka-生产者发送消息,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

springboot集成kafka发送消息

  • 1、kafkaTemplate.send()方法
    • 1.1、springboot集成kafka发送消息Message对象消息
    • 1.2、springboot集成kafka发送ProducerRecord对象消息
    • 1.3、springboot集成kafka发送指定分区消息
  • 2、kafkaTemplate.sendDefault()方法
  • 3、kafkaTemplate.send(...)和kafkaTemplate.sendDefault(...)的区别

在这里插入图片描述在这里插入图片描述

1、kafkaTemplate.send()方法

1.1、springboot集成kafka发送消息Message对象消息

生产者代码

package com.power.producer;import org.springframework.kafka.annotation.KafkaHandler;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;import javax.annotation.Resource;@Component
public class EventProducer {@Resourceprivate KafkaTemplate<String,String> kafkaTemplate;public void sendMessage(){//通过构建器模式创建Message对象Message message = MessageBuilder.withPayload("hello-message").setHeader(KafkaHeaders.TOPIC,"test-topic-02")//在header中放置topic的名字.build();kafkaTemplate.send(message);}
}

2、测试类

package com.power;import com.power.producer.EventProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;import javax.annotation.Resource;@SpringBootTest
public class SpringBoot01KafkaBaseApplication {@Resourceprivate EventProducer eventProducer;@Testvoid testMessage(){eventProducer.sendMessage();}@Testvoid testMessage(){eventProducer.sendMessage();}
}

1.2、springboot集成kafka发送ProducerRecord对象消息

生产者

package com.power.producer;import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;@Component
public class EventProducer {@Resourceprivate KafkaTemplate<String,String> kafkaTemplate;public void sendMessage(){//通过构建器模式创建Message对象Message message = MessageBuilder.withPayload("hello-message").setHeader(KafkaHeaders.TOPIC,"test-topic-02")//在header中放置topic的名字.build();kafkaTemplate.send(message);}public void sendProducerRecord(){//Headers里面放一些信息(信息是key-value键值对),到时候消费组接收到消息后,可以拿到这个Headers里面放的信息Headers headers = new RecordHeaders();headers.add("phone","17676767676".getBytes(StandardCharsets.UTF_8));headers.add("orderId","OD1456467576467".getBytes(StandardCharsets.UTF_8));//String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headersProducerRecord<String,String> record =new ProducerRecord("test-topic-02", 0, System.currentTimeMillis(), "key1", "value", headers);kafkaTemplate.send(record);}
}

测试类:

package com.power;import com.power.producer.EventProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;import javax.annotation.Resource;@SpringBootTest
public class SpringBoot01KafkaBaseApplication {@Resourceprivate EventProducer eventProducer;@Testvoid testProducerRecord(){eventProducer.sendProducerRecord();}}

1.3、springboot集成kafka发送指定分区消息

生产者:(方法sendEvent4)

package com.power.producer;import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;@Component
public class EventProducer {@Resourceprivate KafkaTemplate<String,String> kafkaTemplate;public void sendEvent4(){//String topic, Integer partition, Long timestamp, K key, V datakafkaTemplate.send("test-topic-02",0,System.currentTimeMillis(),"k2","hello-kafka");}
}

测试类:

package com.power;import com.power.producer.EventProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;import javax.annotation.Resource;@SpringBootTest
public class SpringBoot01KafkaBaseApplication {@Resourceprivate EventProducer eventProducer;@Testvoid sendEvent4(){eventProducer.sendEvent4();}
}

2、kafkaTemplate.sendDefault()方法

生产者:

package com.power.producer;import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;@Component
public class EventProducer {@Resourceprivate KafkaTemplate<String,String> kafkaTemplate;public void sendDefault(){//Integer partition, Long timestamp, K key, V datakafkaTemplate.sendDefault(0,System.currentTimeMillis(),"k3","hello-kafka");}
}

测试类:

package com.power;import com.power.producer.EventProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;import javax.annotation.Resource;@SpringBootTest
public class SpringBoot01KafkaBaseApplication {@Resourceprivate EventProducer eventProducer;@Testvoid sendDefault(){eventProducer.sendDefault();}
}

配置文件:设置默认topic
不设置topic运行生产者会报找不到topic的错。
在这里插入图片描述

3、kafkaTemplate.send(…)和kafkaTemplate.sendDefault(…)的区别

在这里插入图片描述

这篇关于SpringBoot集成kafka-生产者发送消息的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!


原文地址:
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.chinasem.cn/article/1098128

相关文章

Java中JSON格式反序列化为Map且保证存取顺序一致的问题

《Java中JSON格式反序列化为Map且保证存取顺序一致的问题》:本文主要介绍Java中JSON格式反序列化为Map且保证存取顺序一致的问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未... 目录背景问题解决方法总结背景做项目涉及两个微服务之间传数据时,需要提供方将Map类型的数据序列化为co

Java Lambda表达式的使用详解

《JavaLambda表达式的使用详解》:本文主要介绍JavaLambda表达式的使用方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、前言二、Lambda表达式概述1. 什么是Lambda表达式?三、Lambda表达式的语法规则1. 无参数的Lambda表

java中Optional的核心用法和最佳实践

《java中Optional的核心用法和最佳实践》Java8中Optional用于处理可能为null的值,减少空指针异常,:本文主要介绍java中Optional核心用法和最佳实践的相关资料,文中... 目录前言1. 创建 Optional 对象1.1 常规创建方式2. 访问 Optional 中的值2.1

Spring Boot 整合 Apache Flink 的详细过程

《SpringBoot整合ApacheFlink的详细过程》ApacheFlink是一个高性能的分布式流处理框架,而SpringBoot提供了快速构建企业级应用的能力,下面给大家介绍Spri... 目录Spring Boot 整合 Apache Flink 教程一、背景与目标二、环境准备三、创建项目 & 添

Spring组件实例化扩展点之InstantiationAwareBeanPostProcessor使用场景解析

《Spring组件实例化扩展点之InstantiationAwareBeanPostProcessor使用场景解析》InstantiationAwareBeanPostProcessor是Spring... 目录一、什么是InstantiationAwareBeanPostProcessor?二、核心方法解

深入解析 Java Future 类及代码示例

《深入解析JavaFuture类及代码示例》JavaFuture是java.util.concurrent包中用于表示异步计算结果的核心接口,下面给大家介绍JavaFuture类及实例代码,感兴... 目录一、Future 类概述二、核心工作机制代码示例执行流程2. 状态机模型3. 核心方法解析行为总结:三

Spring @RequestMapping 注解及使用技巧详解

《Spring@RequestMapping注解及使用技巧详解》@RequestMapping是SpringMVC中定义请求映射规则的核心注解,用于将HTTP请求映射到Controller处理方法... 目录一、核心作用二、关键参数说明三、快捷组合注解四、动态路径参数(@PathVariable)五、匹配请

Java -jar命令如何运行外部依赖JAR包

《Java-jar命令如何运行外部依赖JAR包》在Java应用部署中,java-jar命令是启动可执行JAR包的标准方式,但当应用需要依赖外部JAR文件时,直接使用java-jar会面临类加载困... 目录引言:外部依赖JAR的必要性一、问题本质:类加载机制的限制1. Java -jar的默认行为2. 类加

Java进程CPU使用率过高排查步骤详细讲解

《Java进程CPU使用率过高排查步骤详细讲解》:本文主要介绍Java进程CPU使用率过高排查的相关资料,针对Java进程CPU使用率高的问题,我们可以遵循以下步骤进行排查和优化,文中通过代码介绍... 目录前言一、初步定位问题1.1 确认进程状态1.2 确定Java进程ID1.3 快速生成线程堆栈二、分析

Swagger在java中的运用及常见问题解决

《Swagger在java中的运用及常见问题解决》Swagger插件是一款深受Java开发者喜爱的工具,它在前后端分离的开发模式下发挥着重要作用,:本文主要介绍Swagger在java中的运用及常... 目录前言1. Swagger 的主要功能1.1 交互式 API 文档1.2 客户端 SDK 生成1.3