本文主要是介绍java如何实现高并发场景下三级缓存的数据一致性,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
《java如何实现高并发场景下三级缓存的数据一致性》这篇文章主要为大家详细介绍了java如何实现高并发场景下三级缓存的数据一致性,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下...
下面代码是一个使用Java和Redisson实现的三级缓存服务,主要功能包括:
1.缓存结构:
- 本地缓存:使用Caffeine实现,最大容量10,000,写入后10分钟过期
- 分布式缓存:使用Redisson的RMap结构操作Redis
- 数据库:作为最终数据源
2.数据读取流程:
- 先查本地缓存,命中则返回
- 未命中则查Redis,命中则更新本地缓存并返回
- 仍未命中则获取锁,再次检查两级缓存(双重检查)
- 最后从数据库读取,更新两级缓存后返回
3.数据更新流程:
- 使用分布式锁保证写操作原子性
- 先更新数据库
- 删除本地缓存和Redis缓存
- 通过Redis Pub/Sub发布缓存清除消息给集群内其他节点
- 执行延迟双删(100毫秒后再次删除Redis缓存)
4.并发控制:
- 读取时使用本地锁(ReentrantLock)防止缓存击穿
- 更新时使用Redisson分布式锁(RLock)保证跨节点原子性
- 锁使用完成后从ConcurrentHashMap中移除
5.集群同步:
- 使用Redis的RTopic实现消息发布订阅
- 接收到清除消息时自动删除本地缓存
- 确保集群内各节点缓存一致性
该实现综合运用了延迟双删、发布订阅、锁机制和TTL等多种策略,保障了高并发场景下三级缓存的数据一致性,尤其适合分布式微服务架构。
实战代码:
import java.util.concurrent.*; import java.util.concurrent.locks.ReentrantLock; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.redisson.api.*; import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.CacheBuilder; @Service public class CacheService { // 本地一级缓存(Caffeine) private final Cache<String, Object> localCache; // Rphpedisson客户端,用于分布式操作 private final RedissonClient redissonClient; // 锁缓存,用于控制并发 private final ConcurrentHashMap<String, ReentrantLock> lockMap = new ConcurrentHashMap<>(); // 延迟任务执行器 private final ScheduledExecutorService scheduledExecutorService; // 主题订阅,用于接收集群消息 private final RTopic cacheClearTopic; @Autowired 编程 public CacheService(RedissonClient redissonClient) { this.redissonClient = redissonClient; this.localCache = CacheBuilder.newBuilder() .maximumSize(10_000) .expireAfterWrite(10, TimeUnit.MINUTES) .build(); this.scheduledExecutorService = Executors.newScheduledThreadPool(5); this.cacheClearTopic = redissonClient.getTopiwww.chinasem.cnc("cache:clear"); // 注册消息监听器 cacheClearTopic.addListener(String.class, (channel, key) -> { localCache.invalidate(key); }); } // 读取缓存 public Object get(String key) { // 1. 先查本地缓存 Object value = localCache.getIfPresent(key); if (value != null) { return value; } // 2. 本地缓存未命中,查Redis RMap<String, Object> redisMap = redissonClient.getMap("cache"); value = redisMap.get(key); if (value != null) { localCache.put(key, value); return value; } // 3. Redis未命中,查数据库 ReentrantLock lock = lockMap.computephpIfAbsent(key, k -> new ReentrantLock()); lock.lock(); try { // 双重检查 value = localCache.getIfPresent(key); if (value != null) { return value; } value = redisMap.get(key); if (value != null) { localCache.put(key, value); return value; } // 从数据库读取 VMGPmRs value = readFromDatabase(key); if (value != null) { // 放入Redis并设置TTL redisMap.put(key, value, 300, TimeUnit.SECONDS); // 放入本地缓存 localCache.put(key, value); } return value; } finally { lock.unlock(); lockMap.remove(key); } } // 更新数据 public void update(String key, Object value) { // 使用分布式锁保证写操作的原子性 RLock lock = redissonClient.getLock("writeLock:" + key); lock.lock(); try { // 1. 更新数据库 boolean success = updateDatabase(key, value); if (success) { // 2. 先删除本地缓存 localCache.invalidate(key); // 3. 删除Redis缓存 RMap<String, Object> redisMap = redissonClient.getMap("cache"); redisMap.remove(key); // 4. 发布清除缓存的消息到集群 cacheClearTopic.publish(key); // 5. 延迟双删 scheduledExecutorService.schedule(() -> { redisMap.remove(key); }, 100, TimeUnit.MILLISECONDS); } } finally { lock.unlock(); } } // 从数据库读取数据(示例方法) private Object readFromDatabase(String key) { // 实际实现中会查询数据库 return "data_from_db_" + key; } // 更新数据库(示例方法) private boolean updateDatabase(String key, Object value) { // 实际实现中会更新数据库 return true; } }
redisson配置
import org.redisson.Redisson; import org.redisson.api.RedissonClient; import org.redisson.config.Config; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RedissonConfig { @Bean public RedissonClient redissonClient() { Config config = new Config(); // 单机模式配置 config.useSingleServer() .setAddress("redis://localhost:6379") .setConnectionMinimumIdleSize(5) .setConnectionPoolSize(50); // 集群模式配置示例 /* config.useClusterServers() .addNodeAddress("redis://node1:6379", "redis://node2:6379") .setScanInterval(2000) .setMasterConnectionMinimumIdleSize(10) .setMasterConnectionPoolSize(64) .setSlaveConnectionMinimumIdleSize(10) .setSlaveConnectionPoolSize(64); */ return Redisson.create(config); } }
到此这篇关于java如何实现高并发场景下三级缓存的数据一致性的文章就介绍到这了,更多相关java三级缓存内容请搜索China编程(www.chinasem.cn)以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程China编程(www.chinasem.cn)!
这篇关于java如何实现高并发场景下三级缓存的数据一致性的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!