spring boot学习第八篇:kafka监听消费

2024-01-19 13:36

本文主要是介绍spring boot学习第八篇:kafka监听消费,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

为了实现监听器功能

pom.xml文件内容如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.6.4</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.hmblogs</groupId><artifactId>hmblogs</artifactId><version>0.0.1-SNAPSHOT</version><name>hmblogs</name><description>hmblogs</description><properties><java.version>8</java.version><druid.version>1.2.8</druid.version><log4jdbc.version>1.16</log4jdbc.version></properties><dependencies><!-- druid数据源驱动 --><dependency><groupId>com.alibaba</groupId><artifactId>druid-spring-boot-starter</artifactId><version>${druid.version}</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!-- mybatis --><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>3.5.3.1</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><!--Mysql依赖包--><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><scope>runtime</scope></dependency><!--lombok插件--><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><!--监控sql日志--><dependency><groupId>org.bgee.log4jdbc-log4j2</groupId><artifactId>log4jdbc-log4j2-jdbc4.1</artifactId><version>${log4jdbc.version}</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.9</version></dependency><dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.8.0</version></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.7.2</version></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

 application.yml文件内容如下:

server:port: 8081servlet.context-path: /#配置数据源
spring:datasource:druid:db-type: com.alibaba.druid.pool.DruidDataSourcedriverClassName: net.sf.log4jdbc.sql.jdbcapi.DriverSpyurl: jdbc:log4jdbc:mysql://${DB_HOST:localhost}:${DB_PORT:3306}/${DB_NAME:eladmin}?serverTimezone=Asia/Shanghai&characterEncoding=utf8&useSSL=falseusername: ${DB_USER:root}password: ${DB_PWD:123456}redis:host: localhostport: 6379password: hemingdatabase: 10

logback.xml文件内容如下:

<?xml version="1.0" encoding="UTF-8"?>
<configuration scan="true" scanPeriod="10 seconds"><!-- 日志级别从低到高分为TRACE < DEBUG < INFO < WARN < ERROR < FATAL,如果设置为WARN,则低于WARN的信息都不会输出 --><!-- scan:当此属性设置为true时,配置文件如果发生改变,将会被重新加载,默认值为true --><!-- scanPeriod:设置监测配置文件是否有修改的时间间隔,如果没有给出时间单位,默认单位是毫秒。当scan为true时,此属性生效。默认的时间间隔为1分钟。 --><!-- debug:当此属性设置为true时,将打印出logback内部日志信息,实时查看logback运行状态。默认值为false。 --><contextName>logback</contextName><property name="log.path" value="logs"></property><property name="Console_Pattern" value="%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level [%logger{50}] - %msg%n"/><appender name="Console" class="ch.qos.logback.core.ConsoleAppender"><encoder><Pattern>${Console_Pattern}</Pattern><!-- 设置字符集 --><charset>UTF-8</charset></encoder></appender><!-- 时间滚动输出 level为 INFO 日志 --><appender name="RollingFileBackend" class="ch.qos.logback.core.rolling.RollingFileAppender"><file>${log.path}/hmblogs.log</file><encoder><pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level [%logger{50}] - %msg%n</pattern><charset>UTF-8</charset></encoder><!-- 日志记录器的滚动策略,按日期,按大小记录 --><rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"><!-- 每天日志归档路径以及格式 --><fileNamePattern>${log.path}/hmblogs/log-hmblogs-%d{yyyy-MM-dd}.%i.log</fileNamePattern><timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP"><maxFileSize>100MB</maxFileSize></timeBasedFileNamingAndTriggeringPolicy><!--日志文件保留天数--><maxHistory>15</maxHistory></rollingPolicy><!-- 此日志文件只记录info级别的 --><filter class="ch.qos.logback.classic.filter.LevelFilter"><level>Info</level><onMatch>ACCEPT</onMatch><onMismatch>DENY</onMismatch></filter></appender><!--additivity:是否继承root节点,默认是true继承。默认情况下子Logger会继承父Logger的appender,也就是说子Logger会在父Logger的appender里输出。若是additivity设为false,则子Logger只会在自己的appender里输出,而不会在父Logger的appender里输出。--><logger name="org.springframework" level="INFO" additivity="false"><appender-ref ref="Console"/><appender-ref ref="RollingFileBackend"/></logger><logger name="org.mybatis" level="INFO"></logger><logger name="org.hibernate.SQL" level="DEBUG"  additivity="false"><appender-ref ref="Console"/><appender-ref ref="RollingFileBackend"/></logger><Logger name="org.apache.catalina" level="info"/><Logger name="org.apache.tomcat.util" level="info"/><!-- 从低到高为:All < Trace < Debug < Info < Warn < Error < Fatal < OFF--><root level="Info"><appender-ref ref="Console"/><appender-ref ref="RollingFileBackend"/></root></configuration>

BackendApplication.java文件内容如下:

package com.hmblogs.backend;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class BackendApplication {public static void main(String[] args) {SpringApplication.run(BackendApplication.class, args);}}

然后添加了kafkaConsumerListenerExample.java文件

package com.hmblogs.backend.util;import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;import java.util.Optional;/**** @description:  kafka 消费者* @copyright: @Copyright (c) 2022* @company: hmblogs* @author: heming* @version: 1.0.0* @createTime: 2024-01-18 8:31*/
@Component
@Slf4j
public class kafkaConsumerListenerExample {@KafkaListener(topics = "test", groupId = "0")public void consume(ConsumerRecord<?, ?> record) {Optional<?> value = Optional.ofNullable(record.value());// 进行消息处理逻辑log.info("print message: " + value);}
}

发到服务器上,启动hmblogs报错,截图如下:

Caused by: java.lang.TypeNotPresentException: Type org.springframework.kafka.listener.CommonErrorHandler not present

java.lang.ClassNotFoundException: org.springframework.kafka.listener.CommonErrorHandler

网上搜索资料,大部分讲的都是包冲突,在本地启动也是报这样的错,如下所示:

 

这篇关于spring boot学习第八篇:kafka监听消费的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/622583

相关文章

Go语言使用select监听多个channel的示例详解

《Go语言使用select监听多个channel的示例详解》本文将聚焦Go并发中的一个强力工具,select,这篇文章将通过实际案例学习如何优雅地监听多个Channel,实现多任务处理、超时控制和非阻... 目录一、前言:为什么要使用select二、实战目标三、案例代码:监听两个任务结果和超时四、运行示例五

分布式锁在Spring Boot应用中的实现过程

《分布式锁在SpringBoot应用中的实现过程》文章介绍在SpringBoot中通过自定义Lock注解、LockAspect切面和RedisLockUtils工具类实现分布式锁,确保多实例并发操作... 目录Lock注解LockASPect切面RedisLockUtils工具类总结在现代微服务架构中,分布

Java使用Thumbnailator库实现图片处理与压缩功能

《Java使用Thumbnailator库实现图片处理与压缩功能》Thumbnailator是高性能Java图像处理库,支持缩放、旋转、水印添加、裁剪及格式转换,提供易用API和性能优化,适合Web应... 目录1. 图片处理库Thumbnailator介绍2. 基本和指定大小图片缩放功能2.1 图片缩放的

Spring Boot集成/输出/日志级别控制/持久化开发实践

《SpringBoot集成/输出/日志级别控制/持久化开发实践》SpringBoot默认集成Logback,支持灵活日志级别配置(INFO/DEBUG等),输出包含时间戳、级别、类名等信息,并可通过... 目录一、日志概述1.1、Spring Boot日志简介1.2、日志框架与默认配置1.3、日志的核心作用

破茧 JDBC:MyBatis 在 Spring Boot 中的轻量实践指南

《破茧JDBC:MyBatis在SpringBoot中的轻量实践指南》MyBatis是持久层框架,简化JDBC开发,通过接口+XML/注解实现数据访问,动态代理生成实现类,支持增删改查及参数... 目录一、什么是 MyBATis二、 MyBatis 入门2.1、创建项目2.2、配置数据库连接字符串2.3、入

Springboot项目启动失败提示找不到dao类的解决

《Springboot项目启动失败提示找不到dao类的解决》SpringBoot启动失败,因ProductServiceImpl未正确注入ProductDao,原因:Dao未注册为Bean,解决:在启... 目录错误描述原因解决方法总结***************************APPLICA编

深度解析Spring Security 中的 SecurityFilterChain核心功能

《深度解析SpringSecurity中的SecurityFilterChain核心功能》SecurityFilterChain通过组件化配置、类型安全路径匹配、多链协同三大特性,重构了Spri... 目录Spring Security 中的SecurityFilterChain深度解析一、Security

SpringBoot多环境配置数据读取方式

《SpringBoot多环境配置数据读取方式》SpringBoot通过环境隔离机制,支持properties/yaml/yml多格式配置,结合@Value、Environment和@Configura... 目录一、多环境配置的核心思路二、3种配置文件格式详解2.1 properties格式(传统格式)1.

Apache Ignite 与 Spring Boot 集成详细指南

《ApacheIgnite与SpringBoot集成详细指南》ApacheIgnite官方指南详解如何通过SpringBootStarter扩展实现自动配置,支持厚/轻客户端模式,简化Ign... 目录 一、背景:为什么需要这个集成? 二、两种集成方式(对应两种客户端模型) 三、方式一:自动配置 Thick

Spring WebClient从入门到精通

《SpringWebClient从入门到精通》本文详解SpringWebClient非阻塞响应式特性及优势,涵盖核心API、实战应用与性能优化,对比RestTemplate,为微服务通信提供高效解决... 目录一、WebClient 概述1.1 为什么选择 WebClient?1.2 WebClient 与