【spring boot结合rabbit mq 到点执行,可精确到秒】

2024-02-28 12:04

本文主要是介绍【spring boot结合rabbit mq 到点执行,可精确到秒】,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

【spring boot结合rabbit mq 到点执行,可精确到秒】

  • 创建队列枚举
  • 创建自定义的队列消息pojo
  • 创建队列和延迟队列
  • 发送mq 消息
  • 接收mq 消息
  • DateTimeUtil
  • 测试
  • 注意点

创建队列枚举

public enum QueueEnum {/*** 各种异步消息频道*/TEST(1,"test","队列频道"),DELAY_TEST(2,"delay_test","延迟延迟频道"),;private Integer code;private String channel;private String desc;QueueEnum(Integer code, String channel, String desc) {this.code = code;this.channel = channel;this.desc = desc;}public Integer getCode() {return code;}public void setCode(Integer code) {this.code = code;}public String getChannel() {return channel;}public void setChannel(String channel) {this.channel = channel;}public String getDesc() {return desc;}public void setDesc(String desc) {this.desc = desc;}public static String findChannelByCode(Integer code) {QueueEnum[] queueEnums = QueueEnum.values();for (QueueEnum queueEnum : queueEnums) {if (code == queueEnum.getCode()) {return queueEnum.getChannel();}}return "";}
}

创建自定义的队列消息pojo


import java.io.Serializable;
import java.time.LocalDate;/**** 队列消息** 注意:涉及序列化问题,请勿将此类移动与修改* @author linjianhui*/
public class QueueMessage implements Serializable {private static final long serialVersionUID = 1L;//自定义的队列枚举private QueueEnum queueEnum;private String activityId;/*** 任务日期- yyyy-MM-dd* 任务日期- yyyy-MM-dd HH:mm:ss*/private String taskDate;private String msgId;public String getActivityId() {return activityId;}public String getTaskDate() {return taskDate==null? LocalDate.now().toString():taskDate;}public void setQueueEnum(QueueEnum queueEnum) {this.queueEnum = queueEnum;}public void setActivityId(String activityId) {this.activityId = activityId;}public void setTaskDate(String taskDate) {this.taskDate = taskDate;}public String getMsgId() {return msgId;}public void setMsgId(String msgId) {this.msgId = msgId;}public QueueEnum getQueueEnum() {return queueEnum;}public QueueMessage() {}public QueueMessage(QueueEnum queueEnum, String activityId) {this.queueEnum = queueEnum;this.activityId = activityId;}public QueueMessage(QueueEnum queueEnum, String activityId,String msgId) {this.queueEnum = queueEnum;this.activityId = activityId;this.msgId=msgId;}@Overridepublic String toString() {final StringBuilder sb = new StringBuilder("QueueMessage{");sb.append("queueEnum=").append(queueEnum);sb.append(", activityId='").append(activityId).append('\'');sb.append(", taskDate='").append(taskDate).append('\'');sb.append(", mgsId='").append(msgId).append('\'');sb.append('}');return sb.toString();}

创建队列和延迟队列

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.annotation.Order;import java.util.HashMap;@Configuration
//保证队列的创建优先于监听队列
@Order(1)
public class TestRabbitConfig {@Bean("testQueue")public Queue testQueue() {return new Queue(QueueEnum.TEST.getChannel());}@Bean("testExchange")public DirectExchange testExchange() {return new DirectExchange(QueueEnum.TEST.getChannel());}/*** 将队列绑定到exchange,使用指定的路由key* @return*/@BeanBinding bindingtestQueueToExchange(@Qualifier("testQueue") Queue testQueue, @Qualifier("testExchange")DirectExchange testExchange) {return BindingBuilder.bind(testQueue).to(testExchange).with(QueueEnum.TEST.getChannel());}/*** 描述:定义延迟更新队列【死信队列】*  当队列到期后就会通过死信交换机和路由key,路由到指定队列* x-message-ttl 消息定时时间* x-max-length  队列最大长度* x-dead-letter-exchange:出现dead letter之后将dead letter重新发送到指定exchange* x-dead-letter-routing-key:出现dead letter之后将dead letter重新按照指定的routing-key发送* @param* @return*/@Bean("delayTestQueue")public Queue delayTestQueue() {HashMap<String, Object> arguments = new HashMap<>(4);//设置延15天// arguments.put("x-message-ttl", 15*24*6*10*60*1000);//需要时可以打开// x-message-ttl这个设置对队列中所有的消息有效【属于队列级别】//如果你想要【为每个消息动态设置过期时间】,你需要在【消息级别】设置Time To Live (TTL)。在Spring AMQP中,你可以通过设置MessageProperties的expiration属性来实现这一点://在convertAndSend传入MessagePostProcessor实现类,覆盖其方法postProcessMessage(Message message),使用message.getMessageProperties().setExpiration(delayInMs+"");来为消息单独设置过期时间// arguments.put("x-message-ttl", 10*60*1000);//10分钟arguments.put("x-max-length", 500000);arguments.put("x-dead-letter-exchange", QueueEnum.TEST.getChannel());arguments.put("x-dead-letter-routing-key", QueueEnum.TEST.getChannel());return new Queue(QueueEnum.DELAY_TEST.getChannel(), true, false, false, arguments);}/*** 描述:定义延迟更新队列交换机* @param* @return*/@Bean("delayTestExchange")public DirectExchange delayTestExchange() {return new DirectExchange(QueueEnum.DELAY_TEST.getChannel());}/*** 描述:绑定延迟更新队列到exchange* @param* @return*/@BeanBinding bindingDelayTestQueueToExchange(@Qualifier("delayTestQueue")Queue delayTestQueue, @Qualifier("delayTestExchange")DirectExchange delayTestExchange) {return BindingBuilder.bind(delayTestQueue).to(delayTestExchange).with(QueueEnum.DELAY_TEST.getChannel());}

发送mq 消息


import com.alibaba.fastjson.JSON;
import com.project.utils.StringUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.time.Duration;
import java.time.LocalDateTime;/*** 描述:发送消息*/
@Component
@Slf4j(topic = "sendMqTask")
public class SendMqMessage {@AutowiredRabbitTemplate rabbitTemplate;public void sendTestMessage(QueueMessage queueMessage) {String messageId = StringUtil.getUniqueId("mq-");queueMessage.setMsgId(messageId);rabbitTemplate.convertAndSend(queueMessage.getQueueEnum().getChannel(), queueMessage.getQueueEnum().getChannel(), queueMessage, new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {// 计算时间差long delayInMs = Duration.between(LocalDateTime.now(), DateTimeUtil.fromString2LocalDateTime(queueMessage.getTaskDate())).toMillis();//如果你想要为每个消息动态设置过期时间,你需要在【消息级别:更加细粒度控制】设置Time To Live (TTL)。在Spring AMQP中,你可以通过设置MessageProperties的expiration属性来实现这一点://在convertAndSend传入MessagePostProcessor实现类,覆盖其方法postProcessMessage(Message message),使用message.getMessageProperties().setExpiration(delayInMs+"");来为消息单独设置过期时间//这里,expiration属性的值是以毫秒为单位的过期时间戳。当这个时间戳过去后,消息就会变为死信//这样每条消息都有自己的过期时间,不用受死信队列的x-message-ttl的影响,死信队列的x-message-ttl这个设置对队列中所有的消息有效【队列级别】//在RabbitMQ中,如果同时在队列级别和消息级别设置了TTL(x-message-ttl 和 expiration 属性),那么将会遵循以下原则:// 1. 消息级别的TTL(expiration)优先:如果消息自身携带了TTL属性,那么即使队列设置了x-message-ttl,也会以消息本身的TTL为准。消息过期后,会被当作死信处理。// 2. 队列级别的TTL(x-message-ttl)作为默认值:只有当消息没有携带TTL属性时,才会使用队列级别的x-message-ttl作为消息的过期时间。// 因此,在你的场景中,如果同时设置了队列级别的x-message-ttl和消息级别的message.getMessageProperties().setExpiration(delayInMs+""),那么将会以消息级别的TTL为准。//设置消息多长时间后过期message.getMessageProperties().setExpiration(delayInMs+"");return message;}});}
}    

接收mq 消息

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.apache.ibatis.exceptions.PersistenceException;
import org.mybatis.spring.MyBatisSystemException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.annotation.Order;
import org.springframework.jdbc.CannotGetJdbcConnectionException;
import org.springframework.stereotype.Component;import java.io.IOException;
import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;/*** 描述:消息消费监听*/
@Component
@Order(2)
@Slf4j(topic = "receiveMqTask")
public class ReceiveMqMessage {// private static final Logger MQ_LOG = LoggerFactory.getLogger("mqTask");@Value("${spring.profiles.active}")private String active;/*** 判断是否是正式环境** @return*/private boolean isProdEnv() {return "prod".equals(active);}/*** 判断是否是测试环境** @return*/private boolean isTestEnv() {return "test".equals(active);}/*** 监听消息队列* @param queueMessage* @param message : org.springframework.amqp.core.Message* @param channel : com.rabbitmq.client.Channel*/@RabbitListener(queues = ApiConstants.TEST)@RabbitHandlerpublic void test(QueueMessage queueMessage, Message message, Channel channel) {String env=isProdEnv()?"正式":isTestEnv()?"测试":active;log.info("====={}== test Mq Message={}",env, queueMessage);// String consumerTag = message.getMessageProperties().getConsumerTag();long deliveryTag = message.getMessageProperties().getDeliveryTag();try {System.out.println("发送时间是:"+ queueMessage.getTaskDate());System.out.println("当前时间是:"+ LocalDateTime.now().toLocalDate()+" "+LocalDateTime.now().toLocalTime());// 手动ACKtry {channel.basicAck(deliveryTag, false);} catch (IOException e) {log.error("MQ手动ACK错误: ", e);}} catch (Exception e) {log.error("test queue 失败");}}
}    

DateTimeUtil

/*** 日期工具类*/
public class DateTimeUtil {/*** yyyy-MM-dd HH:mm:ss*/public static final String FORMAT_DATETIME = "yyyy-MM-dd HH:mm:ss";/*** discription: */public static String getLocalDateTime(LocalDateTime localDateTime) {DateTimeFormatter df = DateTimeFormatter.ofPattern(DateTimeUtil.FORMAT_DATETIME);if (localDateTime != null) {String localTime = df.format(localDateTime);return localTime;}return null;}
}    

测试

@RestController
@RequestMapping(value = "/test")
public class TestController {@Autowiredprivate SendMqMessage sendMqMessage;@RequestMapping(value = "/testMqMessage", method = RequestMethod.GET)public ResultEntity testMqMessage(@RequestParam(value = "second",defaultValue = "20",required = false) Long second){QueueMessage queueMessage = new QueueMessage(QueueEnum.DELAY_TEST,"123");//设置20秒后更新【默认】queueMessage.setTaskDate(DateTimeUtil.getLocalDateTime(LocalDateTime.now().plusSeconds(second)));sendMqMessage.sendTestMessage(queueMessage);return "发送成功";}
}    

注意点

//如果你想要为每个消息动态设置过期时间,你需要在【消息级别:更加细粒度控制】设置Time To Live (TTL)。在Spring AMQP中,你可以通过设置MessageProperties的expiration属性来实现这一点://在convertAndSend传入MessagePostProcessor实现类,覆盖其方法postProcessMessage(Message message),使用message.getMessageProperties().setExpiration(delayInMs+"");来为消息单独设置过期时间//这里,expiration属性的值是以毫秒为单位的过期时间戳。当这个时间戳过去后,消息就会变为死信//这样每条消息都有自己的过期时间,不用受死信队列的x-message-ttl的影响,死信队列的x-message-ttl这个设置对队列中所有的消息有效【队列级别】//在RabbitMQ中,如果同时在队列级别和消息级别设置了TTL(x-message-ttl 和 expiration 属性),那么将会遵循以下原则:// 1. 消息级别的TTL(expiration)优先:如果消息自身携带了TTL属性,那么即使队列设置了x-message-ttl,也会以消息本身的TTL为准。消息过期后,会被当作死信处理。// 2. 队列级别的TTL(x-message-ttl)作为默认值:只有当消息没有携带TTL属性时,才会使用队列级别的x-message-ttl作为消息的过期时间。// 因此,在你的场景中,如果同时设置了队列级别的x-message-ttl和消息级别的message.getMessageProperties().setExpiration(delayInMs+""),那么将会以消息级别的TTL为准。

这篇关于【spring boot结合rabbit mq 到点执行,可精确到秒】的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/755445

相关文章

Java实现字节字符转bcd编码

《Java实现字节字符转bcd编码》BCD是一种将十进制数字编码为二进制的表示方式,常用于数字显示和存储,本文将介绍如何在Java中实现字节字符转BCD码的过程,需要的小伙伴可以了解下... 目录前言BCD码是什么Java实现字节转bcd编码方法补充总结前言BCD码(Binary-Coded Decima

SpringBoot全局域名替换的实现

《SpringBoot全局域名替换的实现》本文主要介绍了SpringBoot全局域名替换的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一... 目录 项目结构⚙️ 配置文件application.yml️ 配置类AppProperties.Ja

Java使用Javassist动态生成HelloWorld类

《Java使用Javassist动态生成HelloWorld类》Javassist是一个非常强大的字节码操作和定义库,它允许开发者在运行时创建新的类或者修改现有的类,本文将简单介绍如何使用Javass... 目录1. Javassist简介2. 环境准备3. 动态生成HelloWorld类3.1 创建CtC

JavaScript中的高级调试方法全攻略指南

《JavaScript中的高级调试方法全攻略指南》什么是高级JavaScript调试技巧,它比console.log有何优势,如何使用断点调试定位问题,通过本文,我们将深入解答这些问题,带您从理论到实... 目录观点与案例结合观点1观点2观点3观点4观点5高级调试技巧详解实战案例断点调试:定位变量错误性能分

Java实现将HTML文件与字符串转换为图片

《Java实现将HTML文件与字符串转换为图片》在Java开发中,我们经常会遇到将HTML内容转换为图片的需求,本文小编就来和大家详细讲讲如何使用FreeSpire.DocforJava库来实现这一功... 目录前言核心实现:html 转图片完整代码场景 1:转换本地 HTML 文件为图片场景 2:转换 H

Java使用jar命令配置服务器端口的完整指南

《Java使用jar命令配置服务器端口的完整指南》本文将详细介绍如何使用java-jar命令启动应用,并重点讲解如何配置服务器端口,同时提供一个实用的Web工具来简化这一过程,希望对大家有所帮助... 目录1. Java Jar文件简介1.1 什么是Jar文件1.2 创建可执行Jar文件2. 使用java

C++统计函数执行时间的最佳实践

《C++统计函数执行时间的最佳实践》在软件开发过程中,性能分析是优化程序的重要环节,了解函数的执行时间分布对于识别性能瓶颈至关重要,本文将分享一个C++函数执行时间统计工具,希望对大家有所帮助... 目录前言工具特性核心设计1. 数据结构设计2. 单例模式管理器3. RAII自动计时使用方法基本用法高级用法

SpringBoot实现不同接口指定上传文件大小的具体步骤

《SpringBoot实现不同接口指定上传文件大小的具体步骤》:本文主要介绍在SpringBoot中通过自定义注解、AOP拦截和配置文件实现不同接口上传文件大小限制的方法,强调需设置全局阈值远大于... 目录一  springboot实现不同接口指定文件大小1.1 思路说明1.2 工程启动说明二 具体实施2

Python实现精确小数计算的完全指南

《Python实现精确小数计算的完全指南》在金融计算、科学实验和工程领域,浮点数精度问题一直是开发者面临的重大挑战,本文将深入解析Python精确小数计算技术体系,感兴趣的小伙伴可以了解一下... 目录引言:小数精度问题的核心挑战一、浮点数精度问题分析1.1 浮点数精度陷阱1.2 浮点数误差来源二、基础解决

Java实现在Word文档中添加文本水印和图片水印的操作指南

《Java实现在Word文档中添加文本水印和图片水印的操作指南》在当今数字时代,文档的自动化处理与安全防护变得尤为重要,无论是为了保护版权、推广品牌,还是为了在文档中加入特定的标识,为Word文档添加... 目录引言Spire.Doc for Java:高效Word文档处理的利器代码实战:使用Java为Wo