【spring boot结合rabbit mq 到点执行,可精确到秒】

2024-02-28 12:04

本文主要是介绍【spring boot结合rabbit mq 到点执行,可精确到秒】,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

【spring boot结合rabbit mq 到点执行,可精确到秒】

  • 创建队列枚举
  • 创建自定义的队列消息pojo
  • 创建队列和延迟队列
  • 发送mq 消息
  • 接收mq 消息
  • DateTimeUtil
  • 测试
  • 注意点

创建队列枚举

public enum QueueEnum {/*** 各种异步消息频道*/TEST(1,"test","队列频道"),DELAY_TEST(2,"delay_test","延迟延迟频道"),;private Integer code;private String channel;private String desc;QueueEnum(Integer code, String channel, String desc) {this.code = code;this.channel = channel;this.desc = desc;}public Integer getCode() {return code;}public void setCode(Integer code) {this.code = code;}public String getChannel() {return channel;}public void setChannel(String channel) {this.channel = channel;}public String getDesc() {return desc;}public void setDesc(String desc) {this.desc = desc;}public static String findChannelByCode(Integer code) {QueueEnum[] queueEnums = QueueEnum.values();for (QueueEnum queueEnum : queueEnums) {if (code == queueEnum.getCode()) {return queueEnum.getChannel();}}return "";}
}

创建自定义的队列消息pojo


import java.io.Serializable;
import java.time.LocalDate;/**** 队列消息** 注意:涉及序列化问题,请勿将此类移动与修改* @author linjianhui*/
public class QueueMessage implements Serializable {private static final long serialVersionUID = 1L;//自定义的队列枚举private QueueEnum queueEnum;private String activityId;/*** 任务日期- yyyy-MM-dd* 任务日期- yyyy-MM-dd HH:mm:ss*/private String taskDate;private String msgId;public String getActivityId() {return activityId;}public String getTaskDate() {return taskDate==null? LocalDate.now().toString():taskDate;}public void setQueueEnum(QueueEnum queueEnum) {this.queueEnum = queueEnum;}public void setActivityId(String activityId) {this.activityId = activityId;}public void setTaskDate(String taskDate) {this.taskDate = taskDate;}public String getMsgId() {return msgId;}public void setMsgId(String msgId) {this.msgId = msgId;}public QueueEnum getQueueEnum() {return queueEnum;}public QueueMessage() {}public QueueMessage(QueueEnum queueEnum, String activityId) {this.queueEnum = queueEnum;this.activityId = activityId;}public QueueMessage(QueueEnum queueEnum, String activityId,String msgId) {this.queueEnum = queueEnum;this.activityId = activityId;this.msgId=msgId;}@Overridepublic String toString() {final StringBuilder sb = new StringBuilder("QueueMessage{");sb.append("queueEnum=").append(queueEnum);sb.append(", activityId='").append(activityId).append('\'');sb.append(", taskDate='").append(taskDate).append('\'');sb.append(", mgsId='").append(msgId).append('\'');sb.append('}');return sb.toString();}

创建队列和延迟队列

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.annotation.Order;import java.util.HashMap;@Configuration
//保证队列的创建优先于监听队列
@Order(1)
public class TestRabbitConfig {@Bean("testQueue")public Queue testQueue() {return new Queue(QueueEnum.TEST.getChannel());}@Bean("testExchange")public DirectExchange testExchange() {return new DirectExchange(QueueEnum.TEST.getChannel());}/*** 将队列绑定到exchange,使用指定的路由key* @return*/@BeanBinding bindingtestQueueToExchange(@Qualifier("testQueue") Queue testQueue, @Qualifier("testExchange")DirectExchange testExchange) {return BindingBuilder.bind(testQueue).to(testExchange).with(QueueEnum.TEST.getChannel());}/*** 描述:定义延迟更新队列【死信队列】*  当队列到期后就会通过死信交换机和路由key,路由到指定队列* x-message-ttl 消息定时时间* x-max-length  队列最大长度* x-dead-letter-exchange:出现dead letter之后将dead letter重新发送到指定exchange* x-dead-letter-routing-key:出现dead letter之后将dead letter重新按照指定的routing-key发送* @param* @return*/@Bean("delayTestQueue")public Queue delayTestQueue() {HashMap<String, Object> arguments = new HashMap<>(4);//设置延15天// arguments.put("x-message-ttl", 15*24*6*10*60*1000);//需要时可以打开// x-message-ttl这个设置对队列中所有的消息有效【属于队列级别】//如果你想要【为每个消息动态设置过期时间】,你需要在【消息级别】设置Time To Live (TTL)。在Spring AMQP中,你可以通过设置MessageProperties的expiration属性来实现这一点://在convertAndSend传入MessagePostProcessor实现类,覆盖其方法postProcessMessage(Message message),使用message.getMessageProperties().setExpiration(delayInMs+"");来为消息单独设置过期时间// arguments.put("x-message-ttl", 10*60*1000);//10分钟arguments.put("x-max-length", 500000);arguments.put("x-dead-letter-exchange", QueueEnum.TEST.getChannel());arguments.put("x-dead-letter-routing-key", QueueEnum.TEST.getChannel());return new Queue(QueueEnum.DELAY_TEST.getChannel(), true, false, false, arguments);}/*** 描述:定义延迟更新队列交换机* @param* @return*/@Bean("delayTestExchange")public DirectExchange delayTestExchange() {return new DirectExchange(QueueEnum.DELAY_TEST.getChannel());}/*** 描述:绑定延迟更新队列到exchange* @param* @return*/@BeanBinding bindingDelayTestQueueToExchange(@Qualifier("delayTestQueue")Queue delayTestQueue, @Qualifier("delayTestExchange")DirectExchange delayTestExchange) {return BindingBuilder.bind(delayTestQueue).to(delayTestExchange).with(QueueEnum.DELAY_TEST.getChannel());}

发送mq 消息


import com.alibaba.fastjson.JSON;
import com.project.utils.StringUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.time.Duration;
import java.time.LocalDateTime;/*** 描述:发送消息*/
@Component
@Slf4j(topic = "sendMqTask")
public class SendMqMessage {@AutowiredRabbitTemplate rabbitTemplate;public void sendTestMessage(QueueMessage queueMessage) {String messageId = StringUtil.getUniqueId("mq-");queueMessage.setMsgId(messageId);rabbitTemplate.convertAndSend(queueMessage.getQueueEnum().getChannel(), queueMessage.getQueueEnum().getChannel(), queueMessage, new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {// 计算时间差long delayInMs = Duration.between(LocalDateTime.now(), DateTimeUtil.fromString2LocalDateTime(queueMessage.getTaskDate())).toMillis();//如果你想要为每个消息动态设置过期时间,你需要在【消息级别:更加细粒度控制】设置Time To Live (TTL)。在Spring AMQP中,你可以通过设置MessageProperties的expiration属性来实现这一点://在convertAndSend传入MessagePostProcessor实现类,覆盖其方法postProcessMessage(Message message),使用message.getMessageProperties().setExpiration(delayInMs+"");来为消息单独设置过期时间//这里,expiration属性的值是以毫秒为单位的过期时间戳。当这个时间戳过去后,消息就会变为死信//这样每条消息都有自己的过期时间,不用受死信队列的x-message-ttl的影响,死信队列的x-message-ttl这个设置对队列中所有的消息有效【队列级别】//在RabbitMQ中,如果同时在队列级别和消息级别设置了TTL(x-message-ttl 和 expiration 属性),那么将会遵循以下原则:// 1. 消息级别的TTL(expiration)优先:如果消息自身携带了TTL属性,那么即使队列设置了x-message-ttl,也会以消息本身的TTL为准。消息过期后,会被当作死信处理。// 2. 队列级别的TTL(x-message-ttl)作为默认值:只有当消息没有携带TTL属性时,才会使用队列级别的x-message-ttl作为消息的过期时间。// 因此,在你的场景中,如果同时设置了队列级别的x-message-ttl和消息级别的message.getMessageProperties().setExpiration(delayInMs+""),那么将会以消息级别的TTL为准。//设置消息多长时间后过期message.getMessageProperties().setExpiration(delayInMs+"");return message;}});}
}    

接收mq 消息

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.apache.ibatis.exceptions.PersistenceException;
import org.mybatis.spring.MyBatisSystemException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.annotation.Order;
import org.springframework.jdbc.CannotGetJdbcConnectionException;
import org.springframework.stereotype.Component;import java.io.IOException;
import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;/*** 描述:消息消费监听*/
@Component
@Order(2)
@Slf4j(topic = "receiveMqTask")
public class ReceiveMqMessage {// private static final Logger MQ_LOG = LoggerFactory.getLogger("mqTask");@Value("${spring.profiles.active}")private String active;/*** 判断是否是正式环境** @return*/private boolean isProdEnv() {return "prod".equals(active);}/*** 判断是否是测试环境** @return*/private boolean isTestEnv() {return "test".equals(active);}/*** 监听消息队列* @param queueMessage* @param message : org.springframework.amqp.core.Message* @param channel : com.rabbitmq.client.Channel*/@RabbitListener(queues = ApiConstants.TEST)@RabbitHandlerpublic void test(QueueMessage queueMessage, Message message, Channel channel) {String env=isProdEnv()?"正式":isTestEnv()?"测试":active;log.info("====={}== test Mq Message={}",env, queueMessage);// String consumerTag = message.getMessageProperties().getConsumerTag();long deliveryTag = message.getMessageProperties().getDeliveryTag();try {System.out.println("发送时间是:"+ queueMessage.getTaskDate());System.out.println("当前时间是:"+ LocalDateTime.now().toLocalDate()+" "+LocalDateTime.now().toLocalTime());// 手动ACKtry {channel.basicAck(deliveryTag, false);} catch (IOException e) {log.error("MQ手动ACK错误: ", e);}} catch (Exception e) {log.error("test queue 失败");}}
}    

DateTimeUtil

/*** 日期工具类*/
public class DateTimeUtil {/*** yyyy-MM-dd HH:mm:ss*/public static final String FORMAT_DATETIME = "yyyy-MM-dd HH:mm:ss";/*** discription: */public static String getLocalDateTime(LocalDateTime localDateTime) {DateTimeFormatter df = DateTimeFormatter.ofPattern(DateTimeUtil.FORMAT_DATETIME);if (localDateTime != null) {String localTime = df.format(localDateTime);return localTime;}return null;}
}    

测试

@RestController
@RequestMapping(value = "/test")
public class TestController {@Autowiredprivate SendMqMessage sendMqMessage;@RequestMapping(value = "/testMqMessage", method = RequestMethod.GET)public ResultEntity testMqMessage(@RequestParam(value = "second",defaultValue = "20",required = false) Long second){QueueMessage queueMessage = new QueueMessage(QueueEnum.DELAY_TEST,"123");//设置20秒后更新【默认】queueMessage.setTaskDate(DateTimeUtil.getLocalDateTime(LocalDateTime.now().plusSeconds(second)));sendMqMessage.sendTestMessage(queueMessage);return "发送成功";}
}    

注意点

//如果你想要为每个消息动态设置过期时间,你需要在【消息级别:更加细粒度控制】设置Time To Live (TTL)。在Spring AMQP中,你可以通过设置MessageProperties的expiration属性来实现这一点://在convertAndSend传入MessagePostProcessor实现类,覆盖其方法postProcessMessage(Message message),使用message.getMessageProperties().setExpiration(delayInMs+"");来为消息单独设置过期时间//这里,expiration属性的值是以毫秒为单位的过期时间戳。当这个时间戳过去后,消息就会变为死信//这样每条消息都有自己的过期时间,不用受死信队列的x-message-ttl的影响,死信队列的x-message-ttl这个设置对队列中所有的消息有效【队列级别】//在RabbitMQ中,如果同时在队列级别和消息级别设置了TTL(x-message-ttl 和 expiration 属性),那么将会遵循以下原则:// 1. 消息级别的TTL(expiration)优先:如果消息自身携带了TTL属性,那么即使队列设置了x-message-ttl,也会以消息本身的TTL为准。消息过期后,会被当作死信处理。// 2. 队列级别的TTL(x-message-ttl)作为默认值:只有当消息没有携带TTL属性时,才会使用队列级别的x-message-ttl作为消息的过期时间。// 因此,在你的场景中,如果同时设置了队列级别的x-message-ttl和消息级别的message.getMessageProperties().setExpiration(delayInMs+""),那么将会以消息级别的TTL为准。

这篇关于【spring boot结合rabbit mq 到点执行,可精确到秒】的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/755445

相关文章

Java中流式并行操作parallelStream的原理和使用方法

《Java中流式并行操作parallelStream的原理和使用方法》本文详细介绍了Java中的并行流(parallelStream)的原理、正确使用方法以及在实际业务中的应用案例,并指出在使用并行流... 目录Java中流式并行操作parallelStream0. 问题的产生1. 什么是parallelS

Java中Redisson 的原理深度解析

《Java中Redisson的原理深度解析》Redisson是一个高性能的Redis客户端,它通过将Redis数据结构映射为Java对象和分布式对象,实现了在Java应用中方便地使用Redis,本文... 目录前言一、核心设计理念二、核心架构与通信层1. 基于 Netty 的异步非阻塞通信2. 编解码器三、

Linux kill正在执行的后台任务 kill进程组使用详解

《Linuxkill正在执行的后台任务kill进程组使用详解》文章介绍了两个脚本的功能和区别,以及执行这些脚本时遇到的进程管理问题,通过查看进程树、使用`kill`命令和`lsof`命令,分析了子... 目录零. 用到的命令一. 待执行的脚本二. 执行含子进程的脚本,并kill2.1 进程查看2.2 遇到的

SpringBoot基于注解实现数据库字段回填的完整方案

《SpringBoot基于注解实现数据库字段回填的完整方案》这篇文章主要为大家详细介绍了SpringBoot如何基于注解实现数据库字段回填的相关方法,文中的示例代码讲解详细,感兴趣的小伙伴可以了解... 目录数据库表pom.XMLRelationFieldRelationFieldMapping基础的一些代

一篇文章彻底搞懂macOS如何决定java环境

《一篇文章彻底搞懂macOS如何决定java环境》MacOS作为一个功能强大的操作系统,为开发者提供了丰富的开发工具和框架,下面:本文主要介绍macOS如何决定java环境的相关资料,文中通过代码... 目录方法一:使用 which命令方法二:使用 Java_home工具(Apple 官方推荐)那问题来了,

Java HashMap的底层实现原理深度解析

《JavaHashMap的底层实现原理深度解析》HashMap基于数组+链表+红黑树结构,通过哈希算法和扩容机制优化性能,负载因子与树化阈值平衡效率,是Java开发必备的高效数据结构,本文给大家介绍... 目录一、概述:HashMap的宏观结构二、核心数据结构解析1. 数组(桶数组)2. 链表节点(Node

Java AOP面向切面编程的概念和实现方式

《JavaAOP面向切面编程的概念和实现方式》AOP是面向切面编程,通过动态代理将横切关注点(如日志、事务)与核心业务逻辑分离,提升代码复用性和可维护性,本文给大家介绍JavaAOP面向切面编程的概... 目录一、AOP 是什么?二、AOP 的核心概念与实现方式核心概念实现方式三、Spring AOP 的关

详解SpringBoot+Ehcache使用示例

《详解SpringBoot+Ehcache使用示例》本文介绍了SpringBoot中配置Ehcache、自定义get/set方式,并实际使用缓存的过程,文中通过示例代码介绍的非常详细,对大家的学习或者... 目录摘要概念内存与磁盘持久化存储:配置灵活性:编码示例引入依赖:配置ehcache.XML文件:配置

Java 虚拟线程的创建与使用深度解析

《Java虚拟线程的创建与使用深度解析》虚拟线程是Java19中以预览特性形式引入,Java21起正式发布的轻量级线程,本文给大家介绍Java虚拟线程的创建与使用,感兴趣的朋友一起看看吧... 目录一、虚拟线程简介1.1 什么是虚拟线程?1.2 为什么需要虚拟线程?二、虚拟线程与平台线程对比代码对比示例:三

Java中的.close()举例详解

《Java中的.close()举例详解》.close()方法只适用于通过window.open()打开的弹出窗口,对于浏览器的主窗口,如果没有得到用户允许是不能关闭的,:本文主要介绍Java中的.... 目录当你遇到以下三种情况时,一定要记得使用 .close():用法作用举例如何判断代码中的 input