Tx-lcn分布式事务框架初体验

2023-12-24 10:38

本文主要是介绍Tx-lcn分布式事务框架初体验,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Tx-lcn分布式事务框架初体验

  • 架构学习
    • 底层通讯
      • 客户端注册
      • 事务消息通知
  • 事务开启注解类型
  • 事务锁底层实现

架构学习

TX-LCN 由两大模块组成,TxClient、TxManager
TxManager 独立服务部署
TxClient 作为模块的依赖框架,提供了 TX-LCN 的标准支持,事务发起方和参与方都属于 TxClient
在这里插入图片描述

底层通讯

客户端注册

http协议进行客户端注册并获取netty通信地址

所属类:com.codingapi.tx.config.ConfigReaderpublic String getTxUrl() {try {txManagerTxUrlService =  spring.getBean(TxManagerTxUrlService.class);}catch (Exception e){logger.debug("load default txManagerTxUrlService ");}if(txManagerTxUrlService == null){txManagerTxUrlService = new TxManagerTxUrlService() {private final String configName = "tx.properties";private final String configKey = "url";@Overridepublic String getTxUrl() {return ConfigUtils.getString(configName,configKey);}};logger.debug("load default txManagerTxUrlService");}else{logger.debug("load txManagerTxUrlService");}return txManagerTxUrlService.getTxUrl();//这个类需要自己根据业务实现}
所属类:com.codingapi.tx.netty.service.impl.MQTxManagerServiceImpl
@Override
public String httpGetServer() {String url = configReader.getTxUrl() + "getServer";return managerHelper.httpGet(url);
}
所属类:com.codingapi.tx.netty.service.impl.MQTxManagerServiceImpl
@Override
public String httpGetServer() {String url = configReader.getTxUrl() + "getServer";return managerHelper.httpGet(url);
}

事务消息通知

底层消息事务通知使用netty-tcp进行通信

所属类:com.codingapi.tx.netty.service.impl.NettyDistributeServiceImplprivate void getTxServer() {//获取负载均衡服务地址String json = null;while (StringUtils.isEmpty(json)) {json = txManagerService.httpGetServer();logger.info("get txManager ->" + json);if (StringUtils.isEmpty(json)) {logger.error("TxManager服务器无法访问.");try {Thread.sleep(1000 * 2);} catch (InterruptedException e) {e.printStackTrace();}}}TxServer txServer = TxServer.parser(json);if (txServer != null) {logger.debug("txServer -> " + txServer);logger.info(txServer.toString());Constants.txServer = txServer;logger.info(Constants.txServer.toString());connectCont = 0;}}
所属类:com.codingapi.tx.netty.service.impl.NettyServiceImpl@Overridepublic synchronized void start() {if (isStarting) {return;}isStarting = true;nettyDistributeService.loadTxServer();String host = Constants.txServer.getHost();int port = Constants.txServer.getPort();final int heart = Constants.txServer.getHeart();int delay = Constants.txServer.getDelay();final TransactionHandler transactionHandler = new TransactionHandler(threadPool,nettyControlService, delay);workerGroup = new NioEventLoopGroup();try {Bootstrap b = new Bootstrap(); // (1)b.group(workerGroup); // (2)b.channel(NioSocketChannel.class); // (3)b.option(ChannelOption.SO_KEEPALIVE, true); // (4)b.handler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast("timeout", new IdleStateHandler(heart, heart, heart, TimeUnit.SECONDS));ch.pipeline().addLast(new LengthFieldPrepender(4, false));ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));ch.pipeline().addLast(transactionHandler);}});// Start the client.logger.info("connection txManager-socket-> host:" + host + ",port:" + port);ChannelFuture future = b.connect(host, port); // (5)future.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture channelFuture) throws Exception {if (!channelFuture.isSuccess()) {channelFuture.channel().eventLoop().schedule(new Runnable() {@Overridepublic void run() {isStarting = false;start();}}, 5, TimeUnit.SECONDS);}}});} catch (Exception e) {logger.error(e.getLocalizedMessage());}}

事务开启注解类型

@TxTransaction(isStart = false/true)
1.使用切面来进行事务控制

public class AspectBeforeServiceImpl implements AspectBeforeService {@Autowiredprivate TransactionServerFactoryService transactionServerFactoryService;private Logger logger = LoggerFactory.getLogger(AspectBeforeServiceImpl.class);public Object around(String groupId, ProceedingJoinPoint point) throws Throwable {MethodSignature signature = (MethodSignature) point.getSignature();Method method = signature.getMethod();Class<?> clazz = point.getTarget().getClass();Object[] args = point.getArgs();Method thisMethod = clazz.getMethod(method.getName(), method.getParameterTypes());TxTransaction transaction = thisMethod.getAnnotation(TxTransaction.class);TxTransactionLocal txTransactionLocal = TxTransactionLocal.current();logger.debug("around--> groupId-> " +groupId+",txTransactionLocal->"+txTransactionLocal);TransactionInvocation invocation = new TransactionInvocation(clazz, thisMethod.getName(), thisMethod.toString(), args, method.getParameterTypes());TxTransactionInfo info = new TxTransactionInfo(transaction,txTransactionLocal,invocation,groupId);TransactionServer server = transactionServerFactoryService.createTransactionServer(info);return server.execute(point, info);//事务控制}
}

2.事务控制类包含三个实现
事务控制接口类:com.codingapi.tx.aop.service.TransactionServer
实现:
在这里插入图片描述

事务锁底层实现

事务锁任务task中使用lock 和 condition 条件锁,主要是通过条件锁等待进行阻塞,收到消息后通知解锁来进行事务控制,事务的最终控制是在收到回滚消息后抛出异常实现:

收到netty消息:

消息业务处理实现类:com.codingapi.tx.netty.service.impl.NettyControlServiceImpl@Overridepublic void executeService(final ChannelHandlerContext ctx,final String json) {if (StringUtils.isNotEmpty(json)) {JSONObject resObj = JSONObject.parseObject(json);if (resObj.containsKey("a")) {// tm发送数据给tx模块的处理指令transactionControlService.notifyTransactionMsg(ctx,resObj,json);}else{//tx发送数据给tm的响应返回数据String key = resObj.getString("k");responseMsg(key,resObj);}}}
------------------>通知消息@Overridepublic void notifyTransactionMsg(ChannelHandlerContext ctx,JSONObject resObj, String json) {String action = resObj.getString("a");String key = resObj.getString("k");IActionService actionService = spring.getBean(action, IActionService.class);String res = actionService.execute(resObj, json);JSONObject data = new JSONObject();data.put("k", key);data.put("a", action);JSONObject params = new JSONObject();params.put("d", res);data.put("p", params);SocketUtils.sendMsg(ctx, data.toString());logger.debug("send notify data ->" + data.toString());}
----------------------->通知更新任务状态,并解锁@Overridepublic String execute(JSONObject resObj, String json) {String res;//通知提醒final int state = resObj.getInteger("c");String taskId = resObj.getString("t");if(transactionControl.executeTransactionOperation()) {TaskGroup task = TaskGroupManager.getInstance().getTaskGroup(taskId);logger.info("accept notify data ->" + json);if (task != null) {if (task.isAwait()) {   //已经等待res = notifyWaitTask(task, state);} else {int index = 0;while (true) {if (index > 500) {res = "0";break;}if (task.isAwait()) {   //已经等待res = notifyWaitTask(task, state);break;}index++;try {Thread.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}}}} else {res = "0";}}else{//非事务操作res = "1";transactionControl.autoNoTransactionOperation();}logger.info("accept notify response res ->" + res);return res;}---------------------->更新task状态,并解锁private String notifyWaitTask(TaskGroup task, int state) {String res;task.setState(state);task.signalTask();int count = 0;while (true) {if (task.isRemove()) {if (task.getState() == TaskState.rollback.getCode()|| task.getState() == TaskState.commit.getCode()) {res = "1";} else {res = "0";}break;}if (count > 1000) {//已经通知了,有可能失败.res = "2";break;}count++;try {Thread.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}}return res;}------------------>事务切面处理收到解锁消息,抛出异常@Overridepublic Object execute(final ProceedingJoinPoint point, final TxTransactionInfo info) throws Throwable {String kid = KidUtils.generateShortUuid();String txGroupId = info.getTxGroupId();logger.debug("--->begin running transaction,groupId:" + txGroupId);long t1 = System.currentTimeMillis();boolean isHasIsGroup =  transactionControl.hasGroup(txGroupId);TxTransactionLocal txTransactionLocal = new TxTransactionLocal();txTransactionLocal.setGroupId(txGroupId);txTransactionLocal.setHasStart(false);txTransactionLocal.setKid(kid);txTransactionLocal.setHasIsGroup(isHasIsGroup);txTransactionLocal.setMaxTimeOut(Constants.txServer.getCompensateMaxWaitTime());TxTransactionLocal.setCurrent(txTransactionLocal);try {Object res = point.proceed();//写操作 处理if(!txTransactionLocal.isReadOnly()) {String methodStr = info.getInvocation().getMethodStr();TxGroup resTxGroup = txManagerService.addTransactionGroup(txGroupId, kid, isHasIsGroup, methodStr);//已经进入过该模块的,不再执行此方法if(!isHasIsGroup) {String type = txTransactionLocal.getType();TxTask waitTask = TaskGroupManager.getInstance().getTask(kid, type);//lcn 连接已经开始等待时.//等待锁释放while (waitTask != null && !waitTask.isAwait()) {TimeUnit.MILLISECONDS.sleep(1);}if (resTxGroup == null) {//通知业务回滚事务if (waitTask != null) {//修改事务组状态异常waitTask.setState(-1);waitTask.signalTask();//抛出异常消息回滚throw new ServiceException("update TxGroup error, groupId:" + txGroupId);}}}}return res;} catch (Throwable e) {throw e;} finally {TxTransactionLocal.setCurrent(null);long t2 = System.currentTimeMillis();logger.debug("<---end running transaction,groupId:" + txGroupId+",execute time:"+(t2-t1));}}

这篇关于Tx-lcn分布式事务框架初体验的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/531502

相关文章

Nginx分布式部署流程分析

《Nginx分布式部署流程分析》文章介绍Nginx在分布式部署中的反向代理和负载均衡作用,用于分发请求、减轻服务器压力及解决session共享问题,涵盖配置方法、策略及Java项目应用,并提及分布式事... 目录分布式部署NginxJava中的代理代理分为正向代理和反向代理正向代理反向代理Nginx应用场景

Java 线程池+分布式实现代码

《Java线程池+分布式实现代码》在Java开发中,池通过预先创建并管理一定数量的资源,避免频繁创建和销毁资源带来的性能开销,从而提高系统效率,:本文主要介绍Java线程池+分布式实现代码,需要... 目录1. 线程池1.1 自定义线程池实现1.1.1 线程池核心1.1.2 代码示例1.2 总结流程2. J

Spring的基础事务注解@Transactional作用解读

《Spring的基础事务注解@Transactional作用解读》文章介绍了Spring框架中的事务管理,核心注解@Transactional用于声明事务,支持传播机制、隔离级别等配置,结合@Tran... 目录一、事务管理基础1.1 Spring事务的核心注解1.2 注解属性详解1.3 实现原理二、事务事

MyBatis/MyBatis-Plus同事务循环调用存储过程获取主键重复问题分析及解决

《MyBatis/MyBatis-Plus同事务循环调用存储过程获取主键重复问题分析及解决》MyBatis默认开启一级缓存,同一事务中循环调用查询方法时会重复使用缓存数据,导致获取的序列主键值均为1,... 目录问题原因解决办法如果是存储过程总结问题myBATis有如下代码获取序列作为主键IdMappe

Java 缓存框架 Caffeine 应用场景解析

《Java缓存框架Caffeine应用场景解析》文章介绍Caffeine作为高性能Java本地缓存框架,基于W-TinyLFU算法,支持异步加载、灵活过期策略、内存安全机制及统计监控,重点解析其... 目录一、Caffeine 简介1. 框架概述1.1 Caffeine的核心优势二、Caffeine 基础2

详解Spring中REQUIRED事务的回滚机制详解

《详解Spring中REQUIRED事务的回滚机制详解》在Spring的事务管理中,REQUIRED是最常用也是默认的事务传播属性,本文就来详细的介绍一下Spring中REQUIRED事务的回滚机制,... 目录1. REQUIRED 的定义2. REQUIRED 下的回滚机制2.1 异常触发回滚2.2 回

Spring 中的切面与事务结合使用完整示例

《Spring中的切面与事务结合使用完整示例》本文给大家介绍Spring中的切面与事务结合使用完整示例,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考... 目录 一、前置知识:Spring AOP 与 事务的关系 事务本质上就是一个“切面”二、核心组件三、完

GSON框架下将百度天气JSON数据转JavaBean

《GSON框架下将百度天气JSON数据转JavaBean》这篇文章主要为大家详细介绍了如何在GSON框架下实现将百度天气JSON数据转JavaBean,文中的示例代码讲解详细,感兴趣的小伙伴可以了解下... 目录前言一、百度天气jsON1、请求参数2、返回参数3、属性映射二、GSON属性映射实战1、类对象映

Redis实现分布式锁全过程

《Redis实现分布式锁全过程》文章介绍Redis实现分布式锁的方法,包括使用SETNX和EXPIRE命令确保互斥性与防死锁,Redisson客户端提供的便捷接口,以及Redlock算法通过多节点共识... 目录Redis实现分布式锁1. 分布式锁的基本原理2. 使用 Redis 实现分布式锁2.1 获取锁

Redis分布式锁中Redission底层实现方式

《Redis分布式锁中Redission底层实现方式》Redission基于Redis原子操作和Lua脚本实现分布式锁,通过SETNX命令、看门狗续期、可重入机制及异常处理,确保锁的可靠性和一致性,是... 目录Redis分布式锁中Redission底层实现一、Redission分布式锁的基本使用二、Red