java DelayQueue 关联redis

2024-05-29 05:08
文章标签 java redis 关联 delayqueue

本文主要是介绍java DelayQueue 关联redis,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

关联redis的DelayQueue

 

java 的延时队列相信很多人都用过,因为这东西应用场景太多了,但是有一点,就是如果重启服务器,那么延时队列里的东西都没了,特别是超多数据量的延时队列,这就是个大问题。

 

因此,有很多解决方案,比如出现了所谓的"redis延时队列",但是此延时队列并非我们想要的,因为他还是需要一个定时任务来执行,而我们真正想要的是这么个队列,有任务的时候,立刻以最快的速度取出,没有任务的时候线程阻塞,不额外占用cpu时间,做到系统负责应答,而非主动查询!!这点很关键,特别是高并发,大量数据的情况下系统负载又很大,这种方式就更加有优势了!!!

 

说了这么多,简单说一下思路,那就是用空间换取系统的效率。

 

思路是这样实现的还是基于java的延时队列

我们将收到的消息转化成自定义的消息对象(DelayQueueObject),放入延时队列,然后将他们在redis中进行备份,当延时队列中取出元素的时候,在redis中删除对应的key,需要删除某个元素的时候,用key从redis中取出对象,然后去延时队列中删除,中间用一些手段保证redis和延时队列中的元素一直统一,可以借鉴数据库的事务实现的思路。

但是!!!

需要重写DelayQueueObject.equals()方法,因为从redis中取出来的对象,并非是队列中的对象,所以默认的equals()是直接判断对象的地址的,不是同一个对象,当然不相等,队列中没有redis中的对象,所以删除会失败的!!!

而delayQueue.remove()方法调用的是PriorityQueue.remove(),而PriorityQueue.remove()方法又是利用对象的.equals()方法进行对象相等的比较而取出对象在队列中的位置,然后移除的,因此只要重写DelayQueueObject的.equals()方法即可让反序列化的对象和队列中的对象比较是否相等,具体.equals()怎么写,还需要看其中的元素是什么,找到对应的方法即可。

 

当然最好的办法还是自己封装一个延时队列,将操作合并在延时队列里面,这样用起来很方便,当然我的这种处理方式也还不错,如果大家不喜欢也不要喷我哈,欢迎大家讨论。

 

我这个人就是不爱多说话,希望谅解,直接上代码,不懂的私聊

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;import com.fasterxml.jackson.annotation.JsonIgnoreProperties;import net.sf.json.JSONArray;/*** 用于延时队列的对象* * @author TY** @param <E> 队列中存放的数据类型*/public class DelayQueueObject<E> implements Delayed, Serializable {/*** */private static final long serialVersionUID = 1L;private String id;private E object;private long delayTime;public DelayQueueObject() {}public DelayQueueObject(String id, E object, long delayTime) {this.id = id;this.object = object;this.delayTime = TimeUnit.NANOSECONDS.convert(delayTime, TimeUnit.MILLISECONDS) + System.nanoTime();}/*** 设置变形后的DelayTime* * @param delayTime*/public void setDecoratedDelayTime(long delayTime) {this.delayTime = delayTime;}/*** 从redis中取出后转换成对象使用* * @param delayQueueObject* @return* @throws Exception*/public static <E> String serializeToString(DelayQueueObject<E> delayQueueObject) throws Exception {ByteArrayOutputStream byteOut = new ByteArrayOutputStream();ObjectOutputStream objOut = new ObjectOutputStream(byteOut);objOut.writeObject(delayQueueObject);// 此处只能是ISO-8859-1,但是不会影响中文使用String objectStr = byteOut.toString("ISO-8859-1");return objectStr;}/*** 将对象存入reids中使用* * @param delayQueueObjectStr* @return* @throws Exception*/@SuppressWarnings("unchecked")public static <E> DelayQueueObject<E> deserializeToObject(String delayQueueObjectStr) throws Exception {// 判断参数if (delayQueueObjectStr == null || "".equals(delayQueueObjectStr)) {return null;}ByteArrayInputStream byteIn = new ByteArrayInputStream(delayQueueObjectStr.getBytes("ISO-8859-1"));ObjectInputStream objIn = new ObjectInputStream(byteIn);return (DelayQueueObject<E>) objIn.readObject();}/** 用于从delayQueue队列中取出元素时使用 */@Overridepublic boolean equals(Object obj) {if (this == obj) {return true;}if (obj == null) {return false;}if (getClass() != obj.getClass()) {return false;}@SuppressWarnings("unchecked")DelayQueueObject<E> other = (DelayQueueObject<E>) obj;if (delayTime != other.delayTime) {return false;}if (id == null) {if (other.id != null) {return false;}} else if (!id.equals(other.id)) {return false;}if (object == null) {if (other.object != null) {return false;}} else if (!object.equals(other.object)) {return false;}return true;}@Overridepublic String toString() {String objectStr;objectStr = JSONArray.fromObject(object).toString();return "{\"id\":" + id + ", \"object\":" + objectStr + ",\"delayTime\":" + delayTime + "}";}@Overridepublic int compareTo(Delayed delayed) {return (int) (this.getDelay(TimeUnit.MILLISECONDS) - delayed.getDelay(TimeUnit.MILLISECONDS));}@Overridepublic long getDelay(TimeUnit unit) {return unit.convert(this.delayTime - System.nanoTime(), TimeUnit.NANOSECONDS);}public String getId() {return id;}public void setId(String id) {this.id = id;}public E getObject() {return object;}public void setObject(E object) {this.object = object;}public long getDelayTime() {return delayTime;}public void setDelayTime(long delayTime) {this.delayTime = TimeUnit.NANOSECONDS.convert(delayTime, TimeUnit.MILLISECONDS) + System.nanoTime();}public static long getSerialversionuid() {return serialVersionUID;}}
/*** * 采用Redis备份恢复机制* * Component默认单利,完美!!* * @author TY 2018.09.21* */
@Component
public class OfflineDelayQueue {/** 延时消息队列map,用于对应delayObjectQueue */@Autowiredprivate RedisTemplate redisTemplate;/** 延时消息队列,用于处理offline */private DelayQueue<DelayQueueObject<List<String>>> delayQueue;/*** 启动线程,随系统启动,从reids初始化OfflineDelayQueue,用于加载系统重启前的OfflineDelayQueue* * @throws Exception*/@PostConstructprivate void initOfflineDelayQueue() throws Exception {delayQueue = new DelayQueue<DelayQueueObject<List<String>>>();Set<String> keys = redisTemplate.keys(Const.REDIS_OFFLINE_DELAY_QUEUE_KEY_HEAD + "*");Iterator<String> key = keys.iterator();while (key.hasNext()) {DelayQueueObject<List<String>> tempDelayQueueObject = DelayQueueObject.deserializeToObject(redisTemplate.get(key.next()));delayQueue.offer(tempDelayQueueObject);}}/*** 将delayQueueObject加入延时队列* * @param delayQueueObject* @throws Exception*/public void addToDelayQueue(DelayQueueObject<List<String>> delayQueueObject) throws Exception {// 如果延时队列添加报错,则抛出异常,上层处理delayQueue.offer(delayQueueObject);try {redisTemplate.set(Const.REDIS_OFFLINE_DELAY_QUEUE_KEY_HEAD + delayQueueObject.getId(),Const.REDIS_OFFLINE_DELAY_QUEUE_ACTIVE, DelayQueueObject.serializeToString(delayQueueObject));} catch (Exception e) {// 如果redis添加报错,则将队列元素删除,并向上层抛出异常delayQueue.remove(delayQueueObject);throw new Exception();}}/*** 根据id从延时队列中删除元素* * @param id* @return* @throws Exception*/public Boolean removeFromDelayQueue(String id) throws Exception {Boolean back = false;// redis中id对应的队列中的元素DelayQueueObject<List<String>> tempDelayQueueObject = DelayQueueObject.deserializeToObject(redisTemplate.get(Const.REDIS_OFFLINE_DELAY_QUEUE_KEY_HEAD + id));if (tempDelayQueueObject != null) {// 如果队列移除报报错,直接抛出异常,上层处理back = delayQueue.remove(tempDelayQueueObject);Boolean flag = false;try {// redis中没有相应元素不会抛异常flag = redisTemplate.del(Const.REDIS_OFFLINE_DELAY_QUEUE_KEY_HEAD + id);} catch (Exception e) {if (flag == false) {// 如果reids移除报错,再将元素加回去,并向上层抛出异常delayQueue.offer(tempDelayQueueObject);throw new Exception();}}}return back;}/*** 从队列中take元素* * @return* @throws Exception*/public DelayQueueObject<List<String>> takeFromDelayQueue() throws Exception {// 如果队列移除报报错,直接抛出异常,上层处理DelayQueueObject<List<String>> delayQueueObject = delayQueue.take();Boolean flag = false;try {flag = redisTemplate.del(Const.REDIS_OFFLINE_DELAY_QUEUE_KEY_HEAD + delayQueueObject.getId());} catch (Exception e) {if (flag == false) {// 如果map移除报错,再将元素加回去,并向上层抛出异常delayQueue.offer(delayQueueObject);throw new Exception();}}return delayQueueObject;}}
/*** DelayQueueConsumer* * @author TY 2018.09.21**/
@Component
public class DelayQueueConsumer implements Runnable {@Autowiredprivate OfflineDelayQueue offlineDelayQueue;/** 启动线程,随系统启动 */@PostConstructprivate void StartDelayQueueConsumer() {new Thread(this).start();}@Overridepublic void run() {System.out.println("---------------------------OfflineDelayQueue检测线程启动---------------------------");while (offlineDelayQueue != null && !Thread.currentThread().isInterrupted()) {try {DelayQueueObject<List<String>> delayQueueObject = offlineDelayQueue.takeFromDelayQueue();//自己发挥哦。。。//} catch (Exception e) {e.printStackTrace();// 防止死循环try {// 休眠10秒钟Thread.sleep(10000);} catch (InterruptedException e1) {e1.printStackTrace();}}}}
}

 

这篇关于java DelayQueue 关联redis的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1012728

相关文章

javax.net.ssl.SSLHandshakeException:异常原因及解决方案

《javax.net.ssl.SSLHandshakeException:异常原因及解决方案》javax.net.ssl.SSLHandshakeException是一个SSL握手异常,通常在建立SS... 目录报错原因在程序中绕过服务器的安全验证注意点最后多说一句报错原因一般出现这种问题是因为目标服务器

Java实现删除文件中的指定内容

《Java实现删除文件中的指定内容》在日常开发中,经常需要对文本文件进行批量处理,其中,删除文件中指定内容是最常见的需求之一,下面我们就来看看如何使用java实现删除文件中的指定内容吧... 目录1. 项目背景详细介绍2. 项目需求详细介绍2.1 功能需求2.2 非功能需求3. 相关技术详细介绍3.1 Ja

springboot项目中整合高德地图的实践

《springboot项目中整合高德地图的实践》:本文主要介绍springboot项目中整合高德地图的实践,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一:高德开放平台的使用二:创建数据库(我是用的是mysql)三:Springboot所需的依赖(根据你的需求再

spring中的ImportSelector接口示例详解

《spring中的ImportSelector接口示例详解》Spring的ImportSelector接口用于动态选择配置类,实现条件化和模块化配置,关键方法selectImports根据注解信息返回... 目录一、核心作用二、关键方法三、扩展功能四、使用示例五、工作原理六、应用场景七、自定义实现Impor

SpringBoot3应用中集成和使用Spring Retry的实践记录

《SpringBoot3应用中集成和使用SpringRetry的实践记录》SpringRetry为SpringBoot3提供重试机制,支持注解和编程式两种方式,可配置重试策略与监听器,适用于临时性故... 目录1. 简介2. 环境准备3. 使用方式3.1 注解方式 基础使用自定义重试策略失败恢复机制注意事项

SpringBoot整合Flowable实现工作流的详细流程

《SpringBoot整合Flowable实现工作流的详细流程》Flowable是一个使用Java编写的轻量级业务流程引擎,Flowable流程引擎可用于部署BPMN2.0流程定义,创建这些流程定义的... 目录1、流程引擎介绍2、创建项目3、画流程图4、开发接口4.1 Java 类梳理4.2 查看流程图4

一文详解如何在idea中快速搭建一个Spring Boot项目

《一文详解如何在idea中快速搭建一个SpringBoot项目》IntelliJIDEA作为Java开发者的‌首选IDE‌,深度集成SpringBoot支持,可一键生成项目骨架、智能配置依赖,这篇文... 目录前言1、创建项目名称2、勾选需要的依赖3、在setting中检查maven4、编写数据源5、开启热

Java对异常的认识与异常的处理小结

《Java对异常的认识与异常的处理小结》Java程序在运行时可能出现的错误或非正常情况称为异常,下面给大家介绍Java对异常的认识与异常的处理,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参... 目录一、认识异常与异常类型。二、异常的处理三、总结 一、认识异常与异常类型。(1)简单定义-什么是

Redis Cluster模式配置

《RedisCluster模式配置》:本文主要介绍RedisCluster模式配置,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录分片 一、分片的本质与核心价值二、分片实现方案对比 ‌三、分片算法详解1. ‌范围分片(顺序分片)‌2. ‌哈希分片3. ‌虚

SpringBoot项目配置logback-spring.xml屏蔽特定路径的日志

《SpringBoot项目配置logback-spring.xml屏蔽特定路径的日志》在SpringBoot项目中,使用logback-spring.xml配置屏蔽特定路径的日志有两种常用方式,文中的... 目录方案一:基础配置(直接关闭目标路径日志)方案二:结合 Spring Profile 按环境屏蔽关