Java延时订单处理(上)- - 抛砖引玉

2024-02-05 01:18

本文主要是介绍Java延时订单处理(上)- - 抛砖引玉,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

更多最新文章欢迎大家访问我的个人博客😄:豆腐别馆

一、前言

在订单系统中,我们常常有如下需求:下单N分钟内(本文以三十分钟为例)未付款订单要自动取消,同时要恢复库存。在这里,如何保证延时订单的实时性将是我们首先需要跨过的第一道坎。那么该如何处理?请往下看 ↓

二、使用Timer处理延时订单

Timer: Java util里自带的一个类,可异步延时执行。

1. 代码实现
......
// 此处略去下单逻辑
......// 下单成功后,延迟执行检查订单状态,恢复库存
Timer timer = new Timer();
final Long orderId= returnOrder.getId();
timer.schedule(new TimerTask() {@Overridepublic void run() {// 处理延时订单disposeTimeOut(orderId);}
}, (30 * 60 * 1000); // 延迟三十分钟执行disposeTimeOut()方法

◇ 关于disposeTimeOut(String orderId),里面的主要逻辑为判断当前订单是否已付款,如果:

  • **未付款:**取消订单、恢复库存
  • **已付款:**不做任何操作
2. 服务重启处理

正常来讲,当下完订单三十分钟后即去调用判断处理超时订单并不会有什么问题,但是如果在这下完单的三十分钟内,服务器宕机或者重启了怎么办?(假设只有一台服务器)
在Spring中,其为我们提供了一个InitializingBean接口,实现该接口即可实现在服务启动时执行重写的方法。那么要做的无非就是在服务器启动时,扫描全部待支付订单,如果:

  • **已超时:**取消订单、恢复库存
  • **未超时:**获取超时时间继续放入Timer中,待超过订单超时时间后继续调用上述的disposeTimeOut()方法。
/*** 处理延时订单*/
public class DisposeTimeOutOrder implements InitializingBean {protected Log log = LogFactory.getLog(this.getClass());@Resourceprivate OrderService orderService;@Overridepublic void afterPropertiesSet() throws Exception {DisposeThread thread = new DisposeThread();// 使用多线程,避免占用服务器启动时间new Thread(thread).start();}/*** 处理待支付且未超时/已超时订单*/class DisposeThread implements Runnable {@Overridepublic void run() {String now = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());int timeout = 0;try {log.debug("服务器启动完毕,开始扫描处理待支付已超时订单...记录时间:" + now);// 处理待支付且已超时订单timeout = orderService.disposeTimeOut();log.info("【成功】处理待支付已超时订单成功记录数:" + timeout + ",记录时间:" + now);// 处理待支付且未超时订单List<DzOrder> waitPayList = orderService.getWaitPayOrder();int waitCount = 0;if (CollectionsUtil.isNotNull(waitPayList)) {waitCount = waitPayList.size();for(DzOrder order : waitPayList) {Timer timer = new Timer();final Long orderId = order.getId();timer.schedule(new TimerTask() {@Overridepublic void run() {// 处理待支付且未超时订单orderService.disposeTimeOut(orderId);}}, (order.getPaymentTimeout().getTime()));}}log.debug("服务器启动完毕,开始扫描处理待支付未超时订单,共发现记录数为:" + waitCount + "!已推入检查队列准备到期检查...记录时间:" + now);} catch (Exception e) {log.debug("【异常】服务器启动完毕,处理待支付订单异常,记录时间:" + now);e.printStackTrace();}}}
}

至此关于Timer处理延时订单已完毕,但是等等,虽然业务逻辑没问题、甚至服务重启看似也能解决,但是我们知道Timer其本身也是存在着问题的。

3. 存在问题

1、Timer在执行所有定时任务时只会创建一个线程。如果某个任务的执行时间长度大于其周期时间长度,那么就会导致这一次的任务还在执行,而下一个周期的任务已经需要开始执行了,当然在一个线程内这两个任务只能顺序执行,有两种情况:对于之前需要执行但还没有执行的任务,一是当前任务执行完马上执行那些任务(按顺序来),二是干脆把那些任务丢掉,不去执行它们。至于具体采取哪种做法,需要看是调用schedule还是scheduleAtFixedRate。

2、Timer线程是不会捕获异常的,如果TimerTask抛出的了未检查异常则会导致Timer线程终止,同时Timer也不会重新恢复线程的执行,他会错误的认为整个Timer线程都会取消。同时,已经被安排单尚未执行的TimerTask也不会再执行了,新的任务也不能被调度。故如果TimerTask抛出未检查的异常,Timer将会产生无法预料的行为。

针对上述问题,我们可以使用ScheduledExecutorService替代。但是由于ScheduledExecutorService是多线程处理,即不同任务会被分放到其线程池中的不同线程,因此当订单数据量稍微增长,随着线程的消耗,就容易出现无可用线程池甚至内存溢出等异常。
因此此处,还可以使用DelayQueue进行处理。再往下看:

三、使用DelayQueue处理延时订单

**DelayQueue:**顾名思义,即延时队列,同样是Java util里自带的一个类。里面的put()、及take()方法可为我们实现队列延时抓取。

1. DelayQueue核心源码
/*** Inserts the specified element into this delay queue. As the queue is* unbounded this method will never block.** @param e the element to add* @throws NullPointerException {@inheritDoc}*/
public void put(E e) {offer(e);
}/*** Inserts the specified element into this delay queue.** @param e the element to add* @return <tt>true</tt>* @throws NullPointerException if the specified element is null*/
public boolean offer(E e) {final ReentrantLock lock = this.lock;lock.lock();try {q.offer(e);if (q.peek() == e) {leader = null;available.signal();}return true;} finally {lock.unlock();}
}/*** Retrieves and removes the head of this queue, waiting if necessary* until an element with an expired delay is available on this queue.** @return the head of this queue* @throws InterruptedException {@inheritDoc}*/public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {for (;;) {E first = q.peek();if (first == null)available.await();else {long delay = first.getDelay(TimeUnit.NANOSECONDS);if (delay <= 0)return q.poll();else if (leader != null)available.await();else {Thread thisThread = Thread.currentThread();leader = thisThread;try {available.awaitNanos(delay);} finally {if (leader == thisThread)leader = null;}}}}} finally {if (leader == null && q.peek() != null)available.signal();lock.unlock();}}

相信了解过或是英文好的朋友看过源码已经知道它们的各自用处,其中最关键的便是take()方法,源码中的介绍如下:

Retrieves and removes the head of this queue, waiting if necessary until an element with an expired delay is available on this queue.

用专20的英文水平翻译后便是:

在必要时阻塞等待,直到该队列上有一个具有过期延迟的元素可用

2. 实现思路
  1. 客户下单,订单数据(如订单状态、订单过期时间等)保存进数据库的同时存进DelayedQueue延时队列
  2. 延时队列按订单超时时刻进行排序,最快过期的队列最先出队。
  3. 订单到期出队,到数据库查询订单数据,同时根据订单状态处理到期订单。如过期未支付,则修改订单状态为已过期。

好了,话不多说,上代码:

3. 代码实现

(1)首先声明一个DelayedVo实现Delayed接口

package com.yby.duanzu.service.impl.core;import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;/*** 存放延迟队列*/
public class DelayedVo<T> implements Delayed {// 到期时间,单位为毫秒,实际计算为纳秒private long activeTime;// 业务数据private T data;// activeTime:过期时长,单位为毫秒public DelayedVo(long activeTime, T data) {super();this.activeTime = activeTime;this.data = data;}public long getActiveTime() {return activeTime;}public T getData() {return data;}/*** 返回激活日期的剩余时间,时间单位由单位参数指定*/@Overridepublic long getDelay(TimeUnit unit) {long excessTime = unit.convert(this.activeTime - System.currentTimeMillis(), unit);return excessTime;}/*** Delated接口继承了Comparable接口,剩余时间排序由小到大排序(纳秒)*/@Overridepublic int compareTo(Delayed delayed) {long excessTime = getDelay(TimeUnit.NANOSECONDS) - delayed.getDelay(TimeUnit.NANOSECONDS);return excessTime == 0 ? 0 : ((excessTime < 0 ? -1 : 1));}}

(2)使用延时队列处理延时订单

package com.yby.duanzu.service.impl.core;import java.util.Date;
import java.util.concurrent.DelayQueue;import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;import com.yby.api.common.RandomUtil;
import com.yby.duanzu.po.DzOrder;
import com.yby.duanzu.service.core.OrderService;/*** 使用阻塞队列实现延时订单*/
@Service
@Qualifier("DelayQueueServiceImpl")
public class DelayQueueServiceImpl implements DelayQueueService {private Log logger = LogFactory.getLog(this.getClass());@Resourceprivate OrderService orderService;private Thread takeOrder;// 负责保存限时订单的队列private static DelayQueue<DelayedVo<DzOrder>> delayOrder = new DelayQueue<DelayedVo<DzOrder>>();/*** 进行延时处理的方法* * @param order*            要进行延时处理的订单* @param expireTime*            延时时长,单位为毫秒*/@Overridepublic void orderDelay(DzOrder order, long expireTime) {DelayedVo<DzOrder> delayedOrder = new DelayedVo<DzOrder>(expireTime, order);// 将订单推入队列delayOrder.put(delayedOrder);logger.info("订单[超时时长:" + expireTime / 1000 + "秒]被推入检查队列,订单详情:" + order);}/*** 处理到期的订单线程*/private class TakeOrder implements Runnable {private OrderService orderService;public TakeOrder(OrderService orderService) {super();this.orderService = orderService;}@Overridepublic void run() {logger.info("处理到期订单线程已经启动");// 检查当前线程是否中断while (!Thread.currentThread().isInterrupted()) {try {// take():获取队列,在必要时阻塞等待,直到该队列上有一个具有过期延迟的元素可用。DelayedVo<DzOrder> delayedOrder = delayOrder.take();if (delayedOrder != null) {// 处理待支付且支付超时订单orderService.disposeTimeOut(delayedOrder.getData().getId());}} catch (Exception e) {logger.error("The thread is Interrupted!");}}logger.info("处理到期订单线程准备关闭...");}}// @PostConstruct:当整个bean被初始化完成后执行@PostConstructpublic void init() {takeOrder = new Thread(new TakeOrder(orderService));takeOrder.start();}// 销毁示例之前调用@PreDestroypublic void close() {takeOrder.interrupt();}
}

(3)方法调用

......
// 此处略去下单逻辑
......// 下单成功后,将订单放入延时队列
delayQueueService.orderDelay(returnOrder, returnOrder.getPaymentTimeout().getTime());......
......
4. 服务重启处理

同样的,DelayQueue尽管特殊,可以进行延时处理,但说到底其还是一个队列,是队列,没做持久化,那么就还是得存放在内存当中,那么就一样面临服务重启后数据丢失的风险。
依旧使用InitializingBean(或可使用Spring中的@PostConstruct注解),在服务启动时扫描全部待支付订单,如果:

  • **已超时:**取消订单、恢复库存
  • **未超时:**获取超时时间继续放入DelayQueue中,待超过订单中支付超时时间后继续调用上述的disposeTimeOut()方法。
package com.yby.duanzu.service.impl.core;import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;import javax.annotation.Resource;import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;import com.yby.duanzu.po.DzOrder;
import com.yby.duanzu.service.core.OrderService;
import com.yby.duanzu.util.CollectionsUtil;/*** 处理支付超时订单*/
public class DisposeTimeOutOrder implements InitializingBean {protected Log log = LogFactory.getLog(this.getClass());@Resourceprivate OrderService orderService;@Autowired@Qualifier("delayQueueService")private DelayQueueService delayQueueService;@Overridepublic void afterPropertiesSet() throws Exception {DisposeThread thread = new DisposeThread();// 使用多线程,避免占用服务器启动时间new Thread(thread).start();}/*** 处理待支付且未超时/已超时订单*/class DisposeThread implements Runnable {@Overridepublic void run() {String now = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());int timeout = 0;try {log.debug("服务器启动完毕,开始扫描处理待支付已超时订单...记录时间:" + now);// 处理待支付且已超时订单timeout = orderService.disposeTimeOut();log.info("【成功】处理待支付已超时订单成功记录数:" + timeout + ",记录时间:" + now);// 处理待支付且未超时订单List<DzOrder> waitPayList = orderService.getWaitPayOrder();int waitCount = 0;if (CollectionsUtil.isNotNull(waitPayList)) {waitCount = waitPayList.size();for (DzOrder order : waitPayList) {long expireTime = order.getPaymentTimeout().getTime() - (new Date().getTime());// 放入延时队列delayQueueService.orderDelay(order, expireTime);}}log.debug("服务器启动完毕,开始扫描处理待支付未超时订单,共发现记录数为:" + waitCount + "!已推入检查队列准备到期检查...记录时间:" + now);} catch (Exception e) {log.debug("【异常】服务器启动完毕,处理待支付订单异常,记录时间:" + now);e.printStackTrace();}}}
}

至此关于DelayQueue处理延时订单也已完毕,但是,这样是否就已经完美了么?

5. 存在问题

尽管DelayQueue避免了Timer中关于时间问题及发生异常使其它任务一起终止的问题,但是由于DelayQueue是一个无限容量的队列容器,即只要你有足够的内存,那么就可以存放无限的数据。如果在队列失效时间内存放过多的数据,那么对内存一样是种损耗 ,且程序里放着一个死循环,就算有阻塞队列的存在,也怎么着都感觉不是个滋味。那么是否还有其它方式可以解决?答案当然必须还是肯定的,所以事情还远未结束。

四、远未结束

实际上,除了上述提到的未解决的问题,还有一个缺陷就是:如果在集群环境下,无论是Timer还是DelayQueue,如果出现应用重启,在上述的解决思路中,全局扫描订单将会把原先应该由其它服务管理的部分一起给扫描,那么这样势必将引起服务间订单数据的抢夺。
再者,除了在前言中提到的实时性、正文中一再提到的内存消耗、并发吞吐量也应该是我们所需要考虑的问题,那么如何既优雅又高效地解决?

… …

请看下回分解♪(*)

这篇关于Java延时订单处理(上)- - 抛砖引玉的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/679363

相关文章

SpringBoot实现RSA+AES自动接口解密的实战指南

《SpringBoot实现RSA+AES自动接口解密的实战指南》在当今数据泄露频发的网络环境中,接口安全已成为开发者不可忽视的核心议题,RSA+AES混合加密方案因其安全性高、性能优越而被广泛采用,本... 目录一、项目依赖与环境准备1.1 Maven依赖配置1.2 密钥生成与配置二、加密工具类实现2.1

在Java中实现线程之间的数据共享的几种方式总结

《在Java中实现线程之间的数据共享的几种方式总结》在Java中实现线程间数据共享是并发编程的核心需求,但需要谨慎处理同步问题以避免竞态条件,本文通过代码示例给大家介绍了几种主要实现方式及其最佳实践,... 目录1. 共享变量与同步机制2. 轻量级通信机制3. 线程安全容器4. 线程局部变量(ThreadL

Python调用LibreOffice处理自动化文档的完整指南

《Python调用LibreOffice处理自动化文档的完整指南》在数字化转型的浪潮中,文档处理自动化已成为提升效率的关键,LibreOffice作为开源办公软件的佼佼者,其命令行功能结合Python... 目录引言一、环境搭建:三步构建自动化基石1. 安装LibreOffice与python2. 验证安装

分布式锁在Spring Boot应用中的实现过程

《分布式锁在SpringBoot应用中的实现过程》文章介绍在SpringBoot中通过自定义Lock注解、LockAspect切面和RedisLockUtils工具类实现分布式锁,确保多实例并发操作... 目录Lock注解LockASPect切面RedisLockUtils工具类总结在现代微服务架构中,分布

Java使用Thumbnailator库实现图片处理与压缩功能

《Java使用Thumbnailator库实现图片处理与压缩功能》Thumbnailator是高性能Java图像处理库,支持缩放、旋转、水印添加、裁剪及格式转换,提供易用API和性能优化,适合Web应... 目录1. 图片处理库Thumbnailator介绍2. 基本和指定大小图片缩放功能2.1 图片缩放的

Spring Boot集成/输出/日志级别控制/持久化开发实践

《SpringBoot集成/输出/日志级别控制/持久化开发实践》SpringBoot默认集成Logback,支持灵活日志级别配置(INFO/DEBUG等),输出包含时间戳、级别、类名等信息,并可通过... 目录一、日志概述1.1、Spring Boot日志简介1.2、日志框架与默认配置1.3、日志的核心作用

破茧 JDBC:MyBatis 在 Spring Boot 中的轻量实践指南

《破茧JDBC:MyBatis在SpringBoot中的轻量实践指南》MyBatis是持久层框架,简化JDBC开发,通过接口+XML/注解实现数据访问,动态代理生成实现类,支持增删改查及参数... 目录一、什么是 MyBATis二、 MyBatis 入门2.1、创建项目2.2、配置数据库连接字符串2.3、入

Springboot项目启动失败提示找不到dao类的解决

《Springboot项目启动失败提示找不到dao类的解决》SpringBoot启动失败,因ProductServiceImpl未正确注入ProductDao,原因:Dao未注册为Bean,解决:在启... 目录错误描述原因解决方法总结***************************APPLICA编

深度解析Spring Security 中的 SecurityFilterChain核心功能

《深度解析SpringSecurity中的SecurityFilterChain核心功能》SecurityFilterChain通过组件化配置、类型安全路径匹配、多链协同三大特性,重构了Spri... 目录Spring Security 中的SecurityFilterChain深度解析一、Security

SpringBoot多环境配置数据读取方式

《SpringBoot多环境配置数据读取方式》SpringBoot通过环境隔离机制,支持properties/yaml/yml多格式配置,结合@Value、Environment和@Configura... 目录一、多环境配置的核心思路二、3种配置文件格式详解2.1 properties格式(传统格式)1.