Spring Kafka 教程 – spring读取和发送kakfa消息

2023-10-21 03:38

本文主要是介绍Spring Kafka 教程 – spring读取和发送kakfa消息,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Apache Kafka, 分布式消息系统, 非常流行。Spring是非常流行的Java快速开发框架。将两者无缝平滑结合起来可以快速实现很多功能。本文件简要介绍Spring Kafka,如何使用 KafkaTemplate发送消息到kafka的broker上, 如何使用“listener container“接收Kafka消息。

1,Spring Kafka的组成
这一节我们首先介绍Spring Kafka的各个组成部分。
1.1 发送消息
与 JmsTemplate 或者JdbcTemplate类似,Spring Kafka提供了 KafkaTemplate. 该模板封装了Kafka消息生产者并提供各种消息发送方法。
消息发送的各种方法。

ListenableFuture<SendResult<K, V>> send(String topic, V data);ListenableFuture<SendResult<K, V>> send(String topic, K key, V data);ListenableFuture<SendResult<K, V>> send(String topic, int partition, V data);ListenableFuture<SendResult<K, V>> send(String topic, int partition, K key, V data);ListenableFuture<SendResult<K, V>> send(Message<?> message);

1.2 接收消息
要接收消息,我们需要配置MessageListenerContainer并提供一个Message Listener,或者使用 @KafkaListener注解。

MessageListenserContainer
MessageListenserContainer 有以下两个实现类:

KafkaMessageListenerContainer
ConcurrentMessageListenerContainer

KafkaMessageListenerContainer可以让我们使用单线程消费Kafka topic的消息,而ConcurrentMessageListenerContainer 可以让我们多线程消费消息。

@KafkaListener 注解
Spring Kafka提供的 @KafkaListener注解,可以让我们监听某个topic或者topicPattern的消息。

监听符合topicPattern = “topic.*”的所有topic的消息

@Component
@Slf4j
public class CmdReceiver {@KafkaListener(topicPattern = "topic.*")public void listen(ConsumerRecord<?, ?> record, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {Optional<?> kafkaMessage = Optional.ofNullable(record.value());if (kafkaMessage.isPresent()) {Object message = kafkaMessage.get();log.info("----------------- record =topic:"  + topic+ ", " + record);log.info("------------------ message =topic:" + topic+ ", " + message);}}
}

监听某个topic的消息    
public class Listener {@KafkaListener(id = "id01", topics = "Topic1")public void listen(String data) {}
}

2, Spring Kafka 例子

下面我们介绍一个具体的例子, 这个例会发送和接收指定topic的消息。
准备工作
kafka_2.11-1.1.0.tgz和zookeeper-3.4.10.tar.gz
JDK jdk-8u171-linux-x64.tar.gz
IDE (Eclipse or IntelliJ)
Build tool (Maven or Gradle)
本文不涉及安装Kafka的介绍,请自行搜索,或者看官方文档。

pom文件
也就是我们的依赖包. 这是笔者使用的依赖版本,仅供参考。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.yq</groupId><artifactId>kafkademo</artifactId><version>1.0-SNAPSHOT</version><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>1.5.12.RELEASE</version><relativePath/> <!-- lookup parent from repository --></parent><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>1.1.8.RELEASE</version></dependency><dependency><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId><version>2.8.2</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>0.10.1.1</version></dependency><dependency><groupId>io.springfox</groupId><artifactId>springfox-swagger2</artifactId><version>2.7.0</version></dependency><dependency><groupId>io.springfox</groupId><artifactId>springfox-swagger-ui</artifactId><version>2.7.0</version></dependency><dependency><groupId>io.springfox</groupId><artifactId>springfox-spring-web</artifactId><version>2.7.0</version></dependency><!-- fastjson--><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.1.33</version></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

*KafkaDemoApplication*
我们使用springboot的框架,这是我们程序的入口点。

@SpringBootApplication
public class KafkaDemoApplication {private static final Logger logger = LoggerFactory.getLogger(KafkaDemoApplication.class);public static void main(String[] args) {ConfigurableApplicationContext context = SpringApplication.run(KafkaDemoApplication.class, args);logger.info("Done start Spring boot");}
}

ProducerConfig
其实我们可以可以不用编写KafkaProducerConfig,直接使用KafkaTemplate(当然前提是我们要设置好producer需要的配置项,例如spring.kafka.bootstrap-servers, spring.kafka.producer.key-serializer, spring.kafka.producer.retries等等)

@Configuration
@EnableKafka
public class KafkaProducerConfig {@Beanpublic ProducerFactory<String, String> producerFactory() {return new DefaultKafkaProducerFactory<>(producerConfigs());}@Beanpublic Map<String, Object> producerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092(根据实际情况修改)");props.put(ProducerConfig.RETRIES_CONFIG, 0);props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);props.put(ProducerConfig.LINGER_MS_CONFIG, 1);props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return props;}@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {return new KafkaTemplate<String, String>(producerFactory());}
}

KafkaConsumerConfig
同理,其实我们可以可以不用编写KafkaConsumerConfig,直接使用 @KafkaListener(当然前提是我们要设置好consumer需要的配置项,例如spring.kafka.bootstrap-servers, spring.kafka.consumer.key-deserializer, spring.kafka.consumer.group-id、spring.kafka.consumer.auto-offset-reset等等)

@Configuration
@EnableKafka
public class KafkaConsumerConfig {@BeanKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setConcurrency(3);factory.getContainerProperties().setPollTimeout(3000);return factory;}@Beanpublic ConsumerFactory<String, String> consumerFactory() {return new DefaultKafkaConsumerFactory<>(consumerConfigs());}@Beanpublic Map<String, Object> consumerConfigs() {Map<String, Object> propsMap = new HashMap<>();propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092(根据实际情况修改)");propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");return propsMap;}@Beanpublic MyListener listener() {return new MyListener();}
}

定义了ProducerConfig和ConsumerConfig后我们需要实现具体的生产者和消费者。

本文的KafkaListenerContainerFactory 中使用了ConcurrentKafkaListenerContainer, 我们将使用多线程消费消息。

注意消息代理的地址是localhost:9092, 需要根据实际情况修改。需要特别注意的是,我在windows运行程序,kafka在我的linux虚拟机, 我需要配置windows的hosts文件,配置虚拟机hostname和ip的映射,例如192.168.119.131 ubuntu01

开发Listener

我们来开发自己的Listener监听具体的topic, 这里例子中我们监听以topic开头的主题,不做其他业务,只是打印出来。

@Component
@Slf4j
public class MyListener{@KafkaListener(topicPattern = "topic.*")public void listen(ConsumerRecord<?, ?> record, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {Optional<?> kafkaMessage = Optional.ofNullable(record.value());if (kafkaMessage.isPresent()) {Object message = kafkaMessage.get();log.info("------------------ message =topic:" + topic+ ", " + message);}}
}

开发producer
我在程序中增加了controller,这样我们可以通过controller给topic发消息。consumer一直在监听,只要有消息发送过去,就会打印出来。controller中调用了ProducerServiceImpl , 具体代码比较简单就不再罗列。

我们producerServiceImpl主要是有这句, 通过KafkaTemplate 发送消息。
@Autowired
private KafkaTemplate template;

@Service
public class ProducerServiceImpl implements ProducerService {private static final Logger logger = LoggerFactory.getLogger(ProducerServiceImpl.class);private Gson gson = new GsonBuilder().create();@Autowiredprivate KafkaTemplate template;//发送消息方法public void sendJson(String topic, String json) {JSONObject jsonObj = JSON.parseObject(json);jsonObj.put("topic", topic);jsonObj.put("ts", System.currentTimeMillis() + "");logger.info("json+++++++++++++++++++++  message = {}", jsonObj.toJSONString());ListenableFuture<SendResult<String, String>> future = template.send(topic, jsonObj.toJSONString());future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {@Overridepublic void onSuccess(SendResult<String, String> result) {System.out.println("msg OK." + result.toString());}@Overridepublic void onFailure(Throwable ex) {System.out.println("msg send failed: ");}});}

运行程序

运行第一步,确保Kafka broker配置正确,笔者的程序在Windows10机器上,Kafka在虚拟机上,因为我的地址是192.168.119.129:9092, 而不是localhost:9092.

运行第二步骤,在IDEA中选中KafkaDemoApplication , 单击鼠标右键,选择 Run KafkaDemoApplication

效果图
这里写图片描述

kafka段命令行接收到的消息

这里写图片描述

3,总结
Spring Kafka提供了很好的集成,我们只需配置properties文件,就可以直接使用KafkaTemplate发送消息,使用@KafkaListener监听消息。

参考文档:
https://docs.spring.io/spring-kafka/docs/2.1.6.RELEASE/reference/html/_reference.html#kafka-template

这篇关于Spring Kafka 教程 – spring读取和发送kakfa消息的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!


原文地址:
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.chinasem.cn/article/251666

相关文章

深入解析 Java Future 类及代码示例

《深入解析JavaFuture类及代码示例》JavaFuture是java.util.concurrent包中用于表示异步计算结果的核心接口,下面给大家介绍JavaFuture类及实例代码,感兴... 目录一、Future 类概述二、核心工作机制代码示例执行流程2. 状态机模型3. 核心方法解析行为总结:三

Spring @RequestMapping 注解及使用技巧详解

《Spring@RequestMapping注解及使用技巧详解》@RequestMapping是SpringMVC中定义请求映射规则的核心注解,用于将HTTP请求映射到Controller处理方法... 目录一、核心作用二、关键参数说明三、快捷组合注解四、动态路径参数(@PathVariable)五、匹配请

Java -jar命令如何运行外部依赖JAR包

《Java-jar命令如何运行外部依赖JAR包》在Java应用部署中,java-jar命令是启动可执行JAR包的标准方式,但当应用需要依赖外部JAR文件时,直接使用java-jar会面临类加载困... 目录引言:外部依赖JAR的必要性一、问题本质:类加载机制的限制1. Java -jar的默认行为2. 类加

Java进程CPU使用率过高排查步骤详细讲解

《Java进程CPU使用率过高排查步骤详细讲解》:本文主要介绍Java进程CPU使用率过高排查的相关资料,针对Java进程CPU使用率高的问题,我们可以遵循以下步骤进行排查和优化,文中通过代码介绍... 目录前言一、初步定位问题1.1 确认进程状态1.2 确定Java进程ID1.3 快速生成线程堆栈二、分析

Swagger在java中的运用及常见问题解决

《Swagger在java中的运用及常见问题解决》Swagger插件是一款深受Java开发者喜爱的工具,它在前后端分离的开发模式下发挥着重要作用,:本文主要介绍Swagger在java中的运用及常... 目录前言1. Swagger 的主要功能1.1 交互式 API 文档1.2 客户端 SDK 生成1.3

Java中的登录技术保姆级详细教程

《Java中的登录技术保姆级详细教程》:本文主要介绍Java中登录技术保姆级详细教程的相关资料,在Java中我们可以使用各种技术和框架来实现这些功能,文中通过代码介绍的非常详细,需要的朋友可以参考... 目录1.登录思路2.登录标记1.会话技术2.会话跟踪1.Cookie技术2.Session技术3.令牌技

Java 枚举的基本使用方法及实际使用场景

《Java枚举的基本使用方法及实际使用场景》枚举是Java中一种特殊的类,用于定义一组固定的常量,枚举类型提供了更好的类型安全性和可读性,适用于需要定义一组有限且固定的值的场景,本文给大家介绍Jav... 目录一、什么是枚举?二、枚举的基本使用方法定义枚举三、实际使用场景代替常量状态机四、更多用法1.实现接

java String.join()方法实例详解

《javaString.join()方法实例详解》String.join()是Java提供的一个实用方法,用于将多个字符串按照指定的分隔符连接成一个字符串,这一方法是Java8中引入的,极大地简化了... 目录bVARxMJava String.join() 方法详解1. 方法定义2. 基本用法2.1 拼接

java连接opcua的常见问题及解决方法

《java连接opcua的常见问题及解决方法》本文将使用EclipseMilo作为示例库,演示如何在Java中使用匿名、用户名密码以及证书加密三种方式连接到OPCUA服务器,若需要使用其他SDK,原理... 目录一、前言二、准备工作三、匿名方式连接3.1 匿名方式简介3.2 示例代码四、用户名密码方式连接4

springboot项目中使用JOSN解析库的方法

《springboot项目中使用JOSN解析库的方法》JSON,全程是JavaScriptObjectNotation,是一种轻量级的数据交换格式,本文给大家介绍springboot项目中使用JOSN... 目录一、jsON解析简介二、Spring Boot项目中使用JSON解析1、pom.XML文件引入依