COLA-statemachine事务失效踩坑

2023-10-31 14:30

本文主要是介绍COLA-statemachine事务失效踩坑,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

背景

cola-statemachine是阿里开源项目COLA中的轻量级状态机组件。最大的特点是无状态、采用纯Java实现,用Fluent Interface(连贯接口)定义状态和事件,可用于管理状态转换场景。比如:订单状态、支付状态等简单有限状态场景。在实际使用的过程中我曾发现状态机内事务不生效的问题,经过排查得到解决,以此记录一下。博客地址

问题场景

一个简单的基于cola的状态机可能如下

  • 创建状态机
public StateMachine<State, Event, Context> stateMachine() {StateMachineBuilder<State, Event, Context> builder = StateMachineBuilderFactory.create();builder.externalTransition().from(State.TEST).to(State.DEPLOY).on(Event.PASS).when(passCondition()).perform(passAction());return builder.build("testMachine");
}

上述代码翻译过来是

State.TEST状态转化到State.DEPLOY状态,在Event.PASS事件下,当满足passCondition()条件时,执行passAction()内的逻辑

  • 执行状态机
/*** 根据当前状态、事件、上下文,进行状态流转** @param State 当前状态* @param Event 当前事件* @param Context 当前上下文*/
public void fire(State state, Event event, Context context) {StateMachine<State, Event, Context> stateMachine = StateMachineFactory.get("testMachine");stateMachine.fireEvent(state, event, context);
}

上述代码在纯Java环境可以很好的运行,一般来说,开发者会进一步结合Spring来完善多个状态机的获取

过程中通常会将状态机进行@Bean注入,将passCondition()passAction()独立出Service以期望在后续操作中更好的利用Spring的特性

简单改造后的状态机代码可能如下

@Component
public class StateMachine {@Autowiredprivate ConditionService conditionService;@Autowiredprivate ActionService actionService;@Beanpublic StateMachine<State, Event, Context> stateMachine() {StateMachineBuilder<State, Event, Context> builder = StateMachineBuilderFactory.create();builder.externalTransition().from(State.TEST).to(State.DEPLOY).on(Event.PASS).when(conditionService.passCondition()).perform(actionService.passAction());return builder.build("testMachine");}
}

假设ConditionService的实现为

当上下文不为空就满足条件,为空则不满足条件

@Service
public class ConditionServiceImpl implements ConditionService {/*** 通过条件** @return Condition*/@Overridepublic Condition<Context> passCondition() {return context -> {if (context!=null) {return true;}return false;};}

假设ActionService的实现为

更新金额,同时更新状态,之后推送通知事件进行后续异步操作

@Service
public class ActionServiceImpl implements ActionService {@Autowiredprivate PriceManager priceManager;@Autowiredprivate StatusManager statusManager;@Autowiredprivate ApplicationEventPublisher applicationEventPublisher;/*** 通过执行动作** @return Action*/@Overridepublic Action<State, Event, Context> passAction() {return (from, to, event, context) -> {priceManager.updatePrice(context.getPrice());statusManager.updateStatus(to.getCode());NoticeEvent noticeEvent = context.toNoticeEvent();applicationEventPublisher.publishEvent(noticeEvent);};}
}

NoticeListener监听者

假设这里只是记录操作日志

@Component
public class NoticeListener {@Autowiredprivate LogManager logManager;@Async(value = "EventExecutor")@EventListener(classes = NoticeEvent.class)@Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRES_NEW)public void noticeEventAction(NoticeEvent noticeEvent) {logManager.log(noticeEvent);}
}

上述代码正常运行时没有问题,但这时候有的同学就会想到,想要金额和状态的更新具有一致性,不能更新了金额之后更新状态失败了。

想要保证两个操作的一致性,最简单的方式就是加上@Transactional注解,使得两个操作要么一起成功,要么一起失败

于是ActionService的代码在改动后可能是这样的

@Service
public class ActionServiceImpl implements ActionService {@Autowiredprivate PriceManager priceManager;@Autowiredprivate StatusManager statusManager;/*** 通过执行动作** @return Action*/@Override@Transactional(rollbackFor = Exception.class)public Action<State, Event, Context> passAction() {return (from, to, event, context) -> {priceManager.updatePrice(context.getPrice());statusManager.updateStatus(to.getCode());NoticeEvent noticeEvent = context.toNoticeEvent();applicationEventPublisher.publishEvent(noticeEvent);};}
}

对应的NoticeListener改为@TransactionalEventListener,以适应在上文事务提交后再执行

@Component
public class NoticeListener {@Autowiredprivate LogManager logManager;@Async(value = "EventExecutor")@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT, classes = NoticeEvent.class)@Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRES_NEW)public void noticeEventAction(NoticeEvent noticeEvent) {logManager.log(noticeEvent);}
}

修改完成后在单测中发现了2个现象

  1. 如果其中一个更新失败了,另外一个并没有回滚
  2. 如果两个都没有更新失败,NoticeListener并没有成功监听到事件

在确认ActionServiceNoticeListener无配置遗漏的地方,无典型事务失效场景,搜索半天@TransactionalEventListener监听不起作用的原因无果后,我又仔细检查了StateMachine类中whenperform的调用,也都是通过@Autowired的类进行调用的,没有产生AOP的自调用问题。代码改造后看起来很正常,按理来说不应该出现这个问题。

在百思不得其解的时候,我发现本地的日志输出稍微和平时有些不一样,在执行上述Action逻辑时,没有mybatis-plus的事务相关日志。于是想到可能@Transactional根本没有切到Action方法。

再仔细扫了眼Action逻辑可以看出写法是采用的匿名方法形式

@Override
@Transactional(rollbackFor = Exception.class)
public Action<State, Event, Context> passAction() {return (from, to, event, context) -> {priceManager.updatePrice(context.getPrice());statusManager.updateStatus(to.getCode());};
}

实际上非匿名方法写法等价于

@Override
@Transactional(rollbackFor = Exception.class)
public Action<State, Event, Context> passAction() {Action<State, Event, Context> action = new Action<>() {@Overridepublic void execute(State from, State to, Event event, Context context) {priceManager.updatePrice(context.getPrice());statusManager.updateStatus(to.getCode());}}return action;
}

可以看到匿名方法实际为execute

我在状态机的使用过程中并没有直接调用该方法,所以只能是由框架内部调用的。

问题剖析

重新回到状态机开始执行的地方

public void fire(State state, Event event, Context context) {StateMachine<State, Event, Context> stateMachine = StateMachineFactory.get("testMachine");stateMachine.fireEvent(state, event, context);
}

跟进去fireEvent方法,可以看到第36行判断当前的状态、时间、上下文是否能够转移,如果能够进行转移则进入到第43

在这里插入图片描述

之后便是校验的逻辑,当我们的action不为空的时候,便执行91行的action.execute()

在这里插入图片描述

这时候我们可以看到此时的action实际上就是ActionSeriveImpl,而真正的execute实现也在ActionSeriveImpl中,于是产生了AOP自调用问题,由于无法获取到代理对象事务切面自然就不会生效了

这里的action变量则是由状态机定义时所赋值的,点击setAction方法,全局只有2个地方使用到了,一个在批量的状态流转的实现类中,一个在单个的状态流转的实现类中

在这里插入图片描述

批量流转

@Override
public void perform(Action<S, E, C> action) {for(Transition transition : transitions){transition.setAction(action);}
}

单个流转

@Override
public void perform(Action<S, E, C> action) {transition.setAction(action);
}

代码很简单,注意函数签名都为perform,这就是状态机定义时的连贯接口

@Bean
public StateMachine<State, Event, Context> stateMachine() {StateMachineBuilder<State, Event, Context> builder = StateMachineBuilderFactory.create();builder.externalTransition().from(State.TEST).to(State.DEPLOY).on(Event.PASS).when(conditionService.passCondition()).perform(actionService.passAction());return builder.build("testMachine");
}

在这里actionService.passAction()看上去是一次service调用,实际上并没有实际调用execute方法

passAction的接口定义为Action<State, Event, Context>,这里仅仅是将定义好的action函数通过perform接口赋值到状态机内部而已。真正的执行,需要在fireEvent之后。

解决方法

在了解了问题所在之后,便是想办法进行解决。

通常来说一个AOP自调用的解决方法可以为如下2点

  1. 在自调用类中注入自己(仅限低版本Springboot,在高版本中会有循环依赖检测)
  2. 采用AopContext.currentProxy()获取当前类的代理对象,用代理对象进行自身方法的调用

很可惜,两种方法在当前场景都不适用,因为自调用在COLA框架内部,如果为了解决这个问题去再包装框架就有点大动干戈了。

既然没有声明式事务,直接采用编程式事务就好了

改进后的Action代码如下

@Service
public class ActionServiceImpl implements ActionService {@Autowiredprivate PriceManager priceManager;@Autowiredprivate StatusManager statusManager;@Autowiredprivate DataSourceTransactionManager dataSourceManager;/*** 通过执行动作** @return Action*/@Override@Transactional(rollbackFor = Exception.class)public Action<State, Event, Context> passAction() {return (from, to, event, context) -> {TransactionStatus begin = dataSourceManager.getTransaction(new DefaultTransactionAttribute());try {priceManager.updatePrice(context.getPrice());statusManager.updateStatus(to.getCode());NoticeEvent noticeEvent = context.toNoticeEvent();applicationEventPublisher.publishEvent(noticeEvent);dataSourceManager.commit(begin);} catch (Exception e) {dataSourceManager.rollback(begin);}};}
}

需要注意的是,applicationEventPublisher.publishEvent(noticeEvent);需要放在dataSourceManager.commit(begin);前,这样@TransactionalEventListener才能正确监听到,如果放在commit之后,上文事务会做完提交和释放SqlSession的动作,后续的监听者无法监听一个已释放的事务。

对应的控制台日志为

Releasing transactional SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@295854a]
Transaction synchronization committing SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@295854a]
Transaction synchronization deregistering SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@295854a]
Transaction synchronization closing SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@295854a]

总结

有的时候Spring代码写多了,看起来代码和平时没区别,实际上在特殊场景还是会踩坑,当事务和其他框架结合时一定要注意潜在的事务问题,做好单元测试。

这篇关于COLA-statemachine事务失效踩坑的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/316002

相关文章

MyBatis Plus 中 update_time 字段自动填充失效的原因分析及解决方案(最新整理)

《MyBatisPlus中update_time字段自动填充失效的原因分析及解决方案(最新整理)》在使用MyBatisPlus时,通常我们会在数据库表中设置create_time和update... 目录前言一、问题现象二、原因分析三、总结:常见原因与解决方法对照表四、推荐写法前言在使用 MyBATis

MySQL 事务的概念及ACID属性和使用详解

《MySQL事务的概念及ACID属性和使用详解》MySQL通过多线程实现存储工作,因此在并发访问场景中,事务确保了数据操作的一致性和可靠性,下面通过本文给大家介绍MySQL事务的概念及ACID属性和... 目录一、什么是事务二、事务的属性及使用2.1 事务的 ACID 属性2.2 为什么存在事务2.3 事务

Spring Boot 事务详解(事务传播行为、事务属性)

《SpringBoot事务详解(事务传播行为、事务属性)》SpringBoot提供了强大的事务管理功能,通过@Transactional注解可以方便地配置事务的传播行为和属性,本文将详细介绍Spr... 目录Spring Boot 事务详解引言声明式事务管理示例编程式事务管理示例事务传播行为1. REQUI

MySQL中的事务隔离级别详解

《MySQL中的事务隔离级别详解》在MySQL中,事务(Transaction)是一个执行单元,它要么完全执行,要么完全回滚,以保证数据的完整性和一致性,下面给大家介绍MySQL中的事务隔离级别详解,... 目录一、事务并发问题二、mysql 事务隔离级别1. READ UNCOMMITTED(读未提交)2

如何合理使用Spring的事务方式

《如何合理使用Spring的事务方式》:本文主要介绍如何合理使用Spring的事务方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录1、介绍1.1、底层构造1.1.事务管理器1.2.事务定义信息1.3.事务状态1.4.联系1.2、特点1.3、原理2. Sprin

Idea插件MybatisX失效的问题解决

《Idea插件MybatisX失效的问题解决》:本文主要介绍Idea插件MybatisX失效的问题解决,详细的介绍了4种问题的解决方法,具有一定的参考价值,感兴趣的可以了解一下... 目录一、重启idea或者卸载重装MyBATis插件(无需多言)二、检查.XML文件与.Java(该文件后缀Idea可能会隐藏

C++迭代器失效的避坑指南

《C++迭代器失效的避坑指南》在C++中,迭代器(iterator)是一种类似指针的对象,用于遍历STL容器(如vector、list、map等),迭代器失效是指在对容器进行某些操作后... 目录1. 什么是迭代器失效?2. 哪些操作会导致迭代器失效?2.1 vector 的插入操作(push_back,

MySQL索引失效问题及解决方案

《MySQL索引失效问题及解决方案》:本文主要介绍MySQL索引失效问题及解决方案,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录mysql索引失效一、概要二、常见的导致MpythonySQL索引失效的原因三、如何诊断MySQL索引失效四、如何解决MySQL索引失

通过Spring层面进行事务回滚的实现

《通过Spring层面进行事务回滚的实现》本文主要介绍了通过Spring层面进行事务回滚的实现,包括声明式事务和编程式事务,具有一定的参考价值,感兴趣的可以了解一下... 目录声明式事务回滚:1. 基础注解配置2. 指定回滚异常类型3. ​不回滚特殊场景编程式事务回滚:1. ​使用 TransactionT

SpringKafka消息发布之KafkaTemplate与事务支持功能

《SpringKafka消息发布之KafkaTemplate与事务支持功能》通过本文介绍的基本用法、序列化选项、事务支持、错误处理和性能优化技术,开发者可以构建高效可靠的Kafka消息发布系统,事务支... 目录引言一、KafkaTemplate基础二、消息序列化三、事务支持机制四、错误处理与重试五、性能优