本文主要是介绍redis和redission分布式锁原理及区别说明,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
《redis和redission分布式锁原理及区别说明》文章对比了synchronized、乐观锁、Redis分布式锁及Redission锁的原理与区别,指出在集群环境下synchronized失效,...
redis和redission分布式锁原理及区别
我最近做租车项目,在处理分布式时用到分布式锁,我发现很多同事都在网上找分布式锁的资料,但是看的资料都不是很全,所以在这里我谈谈自己的分布式锁理解。
结合我的其中某一业务需求:多个用户在同一个区域内发现只有一辆可租的车,最终结果肯定只有一位用户租车成功,这就产生了多线程(多个用户)抢同一资源的问题。
1、有的同伴想到了synchronized关键字锁
暂且抛开性能问题,项目为了高可用,都会做集群部署,那么synchronized就失去了加锁的意义,这里多嘴解释一下:
2、有的小伙伴可能想到了乐观锁
没错!!乐观锁可以解决的我的问题,但是在高并发的场景,频繁的操作数据库,数据库的资源是很珍贵的,并且还存在性能的问题。但是我这里简单说下乐观锁的使用:
- 我们在车的表中添加一个字段:version(int类型)(建议使用这个名称,这样别人看到就会直觉这是乐观锁字段,也可以使用别的名称)
- 查询出该车的数据,数据中就有version字段,假如version=1
select * from u_car where car_id = 10;
- 修改该车的状态为锁定
update u_car set status = 2,version = version +1 where car_id = 10 and version = 1
在修改的时候将version作为参数,如果其他用户锁车,那么version已经发生变化(version = version +1),所以version = 1不成立,修改失败
乐观锁不是本次的终点,但还是简单说下;
3、使用redis的分布式锁
public boolean lock(String key, V v, int expireTime){ //获取锁 //在redis早期版本中,设置key和key的存活时间是分开的,设置key成功,但是设置存活时间时服务宕机,那么你的key就永远不会过期,有BUG //后来redis将加锁和设置时间用同一个命令 //这里是重点,redis.setNx(key,value,time)方法是原子性的,设置key成功说明锁车成功,如果失败说明该车被别人租了 boolean b = false; try { b = redis.setNx(key, v, expireTime); } catch (Exception e) { log.error(e.getMessage(), e); } return b; } publphpic boolean unlock(String key){ return redwww.chinasem.cnis.delete(key); } }
但是这样写还是存在BUG的,我的key设置了加锁时间为5秒,但是我的业务逻辑5秒还没有执行完成,key过期了,那么其他用户执行redis.setNx(key, v, expireTime)时就成功了,将该车锁定,又产生了抢资源;我们想一下,如果我能够在业务逻辑没有执行完的时候,让锁过期后能够延长锁的时间,是不是就解决了上面的BUG;
实现这个锁的延长,非要自己动手的话就得另启一个线程来监听我们的业务线程,每隔1秒监测当前业务线程是否执行完成,如果没有就获取key的存活时间,时间小于一个阈值时,就自动给key设置N秒;当然,我们可以不用自己动手,redission已经帮我们实现key的时间时间过期问题;
4、使用redission的分布式锁
//引入依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>org.redisson</groupId> <artifactId>redisson-spring-boot-starter</artifactId> <version>3.10.6</version> </dependency>
redisson支持单点、集群等模式,这里选择单点的。
- application.yml配置好redis的连接:
spring: redis: host: 127.0.0.1 port: 6379 password:
- 配置redisson的客户端bean
@Configuration public class RedisConfig { @Value("${spring.redis.host}") private String host; @Bean(name = {"redisTemplate", "stringRedisTemplate"}) public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory factory) { StringRedisTemplate redisTemplate = new StringRedisTemplate(); redisTemplate.setConnectionFactory(factory); return redisTemplate; } @Bean public Redisson redisson() { Config config = new Config(); config.useSingleServer().setAddress("redis://" + host + ":6379"); return (Redisson) Redisson.create(config); } }
- 加锁使用
private Logger log = LoggerFactory.getLogger(getClass()); @Resource private Redisson redisson; //加锁 public Boolean lock(String key,long waitTime,long leaseTime){ Boolean b = false; try { RLock rLock = redisson.getLock(key); //说下参数 waitTime:锁的存活时间 leaseTime:锁的延长时间 后面的参数是单位 b = rLock.tryLock(waitTime,leaseTime,TimeUnit.SECONDS); } catch (Exception e) { log.error(e.getMessage(), e); } } return b; } //释放锁 public void unlock(String key){ try { RLock rLock = redisson.getLock(key); if(null!=lock){ lock.unlock(); lock.forceUnlock(); fileLog.info("unlock succesed");China编程 } } catch (Exception e) { fileLog.error(e.getMessage(), e); } }
- 带大家看下tryLock方法的实现源码:
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { long time = unit.toMillis(waitTime); long current = System.currentTimeMillis(); long threadId = Thread.currentThread().getId(); //尝试获取锁,如果没取到锁,则获取锁的剩余超时时间 Long ttl = tryAcquire(leaseTime, unit, threadId); // lock acquired if (ttl == null) { return true; } //如果waitTime已经超时了,就返回false time -= System.currentTimeMillis() - current; if (time <= 0) { acquireFailed(threadId); return false; } current = System.currentTimeMillis(); RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId); if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) { if (!subscribeFuture.cancel(false)) { subscribeFuture.onComplete((res, e) -> { if (e == null) { unsubscribe(subscribeFuture, threadId); } }); } acquireFailed(threadId); return false; } try { time -= System.currentTimeMillis() - current; if (time <= 0) { acquireFailed(threadId); return false; } //进入死循环,反复去调用tryAcquire尝试获取锁,ttl为null时就是别的线程已经unlock了 while (true) { long currentTime = System.currentTimeMillis(); ttl = tryAcquire(leaseTime, unit, threadId); // lock acquired if (ttl == null) { return true; } time -= System.currentTimeMillis() - currentTime; if (time <= 0) { China编程 acquireFailed(threadId); return false; } // waiting for message China编程 currentTime = System.currentTimeMillis(); if (ttl >= 0 && ttl < time) { getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } else { getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS); } time -= System.currentTimeMillis() - currentTime; if (time <= 0) { acquireFailed(threadId); return false; } } } finally { unsubscribe(subscribeFuture, threadId); } // return get(tryLockAsync(waitTime, leaseTime, unit)); }
可以看到,其中主要的逻辑就是尝试加锁,成功了就返回true,失败了就进入死循环反复去尝试加锁。中途还有一些超时的判断。逻辑还是比较简单的。
- 再看看tryAcquire方法
- 这个方法的调用栈也是比较多,之后会进入下面这个方法
上面的Lua(俗称胶水语言)脚本比较重要,主要是为了执行命令的原子性解释一下:
- KEYS[1]代表你的key
- ARGV[1]代表你的key的存活时间,默认存活30秒
- ARGV[2]代表的是请求加锁的客户端ID,后面的1则理解为加锁的次数,简单理解就是 如果该客户端多次对key加锁时,就会执行hincrby原子加1命令
第一段if就是判断你的key是否存在,如果不存在,就执行redis call(hset key ARGV[2],1)加锁和设置redis call(pexpire key ARGV[1])存活时间;
当第二个客户来加锁时,第一个if判断已存在key,就执行第二个if判断key的hash是否存在客户端2的ID,很明显不是;
则进入到最后的return返回该key的剩余存活时间
当加锁成功后会在后台启动一个watch dog(看门狗)线程,key的默认存活时间为30秒,则watch dog每隔10秒钟就会检查一下客户端1是否还持有该锁,如果持有,就会不断的延长锁key的存活时间
所以这里建议大家在设置key的存活时间时,最好大于10秒,延续时间也大于等于10秒
所以,总体流程应该是这样的。
总结
这篇关于redis和redission分布式锁原理及区别说明的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!