改造JedisCluster使其支持pipeline操作

2024-06-02 15:32

本文主要是介绍改造JedisCluster使其支持pipeline操作,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!


Redis 管道技术
Redis 管道技术可以在服务端未响应时,客户端可以继续向服务端发送请求,并最终一次性读取所有服务端的响应。

管道技术的优势
管道技术最显著的优势是提高了 redis 服务的性能。redis本身性能是很高的,单个redis命令的执行时间很短,大量的redis操作,网络IO的耗时非常大,而redis管道技术大大的减少了程序和redis的交互 次数,性能提升非常明显。

管道的使用----Jedis
public class PipelineTest {

    public static void main(String[] args){
        Jedis jedis = new Jedis("localhost",6060);
        Pipeline pipeline = jedis.pipelined();
        pipeline.set("xiajw","test");
        pipeline.set("hello","world");
        List<Object> result = pipeline.syncAndReturnAll();
        jedis.close();
    }
}

JedisCluster改造
在使用Jedis的时候,pipeline的使用非常简单。但是实际上,大家大部分情况会使用集群JedisCluster来进行操作,当我去JedisCluster类里面查找的时候发现,没有pipeline方法。JedisCluster本身是不支持管道操作的。

为什么不支持,以下仅代表个人观点。JedisCluster底层其实就是对Jedis的调用。redis集群中的每个主节点管理着各自的slot区间,而数据的存储,会计算key所在的slot,然后调用slot所在的redis实例去操作。JedisCluster本身如果支持管道的话,由于要分步调用不同的redis管道,redis本身对于事务的支持也基本没有,本身很难保证所有操作的一致性,因为是分批发送的,如果方法里提供,可能会让大家产生误解,以为它就是一致的,所以干脆不提供。

改造思路
1、改造JedisCluster,使其connectionHandler对外能够访问。因为JedisClusterConnectionHandler的成员变量JedisClusterInfoCache存储着redis集群的信息以及连接池。我们需要获取连接池。
2、改造JedisClusterConnectionHandler,使其能够根据slot返回JedisPool。
3、将大量的redis操作,根据key计算出相应所在slot,根据slot获取相应的redis实例,然后按照 Map<JedisPool,List>的结构存储起来。然后遍历调用。

JedisClusterConnectionHandler改造

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisSlotBasedConnectionHandler;
import redis.clients.jedis.exceptions.JedisNoReachableClusterNodeException;

import java.util.Set;

public class JedisSlotAdvancedConnectionHandler extends JedisSlotBasedConnectionHandler {

    public JedisSlotAdvancedConnectionHandler(Set<HostAndPort> nodes, GenericObjectPoolConfig poolConfig, int timeout) {
        super(nodes, poolConfig, timeout);
    }

    public JedisPool getJedisPoolFromSlot(int slot){
        JedisPool connectionPool = cache.getSlotPool(slot);
        if(connectionPool != null){
            return connectionPool;
        }else{
            renewSlotCache();
            connectionPool = cache.getSlotPool(slot);
            if(connectionPool != null){
                return connectionPool;
            }else{
                throw new JedisNoReachableClusterNodeException("No reachable node in cluster for slot "+slot);
            }
        }
    }
}

JedisCluster改造

import com.ai.ocbs.util.ConfigUtil;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.JedisCluster;

import java.util.Set;

public class JedisClusterPipeline extends JedisCluster{

    public JedisClusterPipeline(Set<HostAndPort> nodes, GenericObjectPoolConfig poolConfig) {
        super(nodes, poolConfig);
        super.connectionHandler = new JedisSlotAdvancedConnectionHandler(nodes,poolConfig, ConfigUtil.m_timeout);
    }

    public JedisSlotAdvancedConnectionHandler getConnectionHandler(){
        return (JedisSlotAdvancedConnectionHandler)this.connectionHandler;
    }

    /**
     * 刷新集群信息,当集群信息发生变更时调用
     * @param
     * @return
     */
    public void refreshCluster() {
        connectionHandler.renewSlotCache();
    }
}

使用
以hget为例

public static List<byte[]> hgetPipeline(JedisClusterPipeline jedisClusterPipeline,List<String> keyFields){
        List<byte[]> result = new ArrayList<>();
        Map<JedisPool,List<String>> poolKeys = new HashMap<>();
        JedisSlotAdvancedConnectionHandler jedisSlotAdvancedConnectionHandler = jedisClusterPipeline.getConnectionHandler();
        jedisClusterPipeline.refreshCluster();
        for(int i=0;i<keyFields.size();i++){
            String[] keyField = keyFields.get(i).split(",");
            if(keyField.length!=2)
                continue;
            byte[] key = keyField[0].getBytes();
            int slot = JedisClusterCRC16.getSlot(key);
            JedisPool jedisPool = jedisSlotAdvancedConnectionHandler.getJedisPoolFromSlot(slot);
            if(poolKeys.keySet().contains(jedisPool)){
                List<String> keys = poolKeys.get(jedisPool);
                keys.add(keyFields.get(i));
            }else{
                List<String> keys = new ArrayList<>();
                keys.add(keyFields.get(i));
                poolKeys.put(jedisPool,keys);
            }
        }
        for(JedisPool jedisPool:poolKeys.keySet()){
            Jedis jedis = jedisPool.getResource();
            Pipeline pipeline = jedis.pipelined();
            List<String> keys = poolKeys.get(jedisPool);
            for(int i = 0; i < keys.size();){
                for(int j=0;j<ConfigUtil.m_pipe_size&&i< keys.size();j++){
                    String[] keyField = keys.get(i).split(",");
                    if(keyField.length!=2)
                        continue;
                    byte[] key = keyField[0].getBytes();
                    byte[] field = keyField[1].getBytes();
                    pipeline.hget(key,field);
                    i++;
                }
                List<Object> responses = pipeline.syncAndReturnAll();
                if(responses != null && !responses.isEmpty()){
                    for(Object response : responses){
                        if(!(response instanceof byte[])) {
                            //logger.info();
                            continue;
                        }
                        byte[] valueData = (byte[])response;
                        result.add(valueData);
                    }
                }
            }
            jedis.close();
        }
        return result;
    }

总结
redis管道可以极大的提升性能。批量操作的数据量越大,性能提升越明显。在改造之后,几百万数据量的情况下,性能提升相当恐怖。
 

这篇关于改造JedisCluster使其支持pipeline操作的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1024361

相关文章

Java操作Word文档的全面指南

《Java操作Word文档的全面指南》在Java开发中,操作Word文档是常见的业务需求,广泛应用于合同生成、报表输出、通知发布、法律文书生成、病历模板填写等场景,本文将全面介绍Java操作Word文... 目录简介段落页头与页脚页码表格图片批注文本框目录图表简介Word编程最重要的类是org.apach

Python实现对阿里云OSS对象存储的操作详解

《Python实现对阿里云OSS对象存储的操作详解》这篇文章主要为大家详细介绍了Python实现对阿里云OSS对象存储的操作相关知识,包括连接,上传,下载,列举等功能,感兴趣的小伙伴可以了解下... 目录一、直接使用代码二、详细使用1. 环境准备2. 初始化配置3. bucket配置创建4. 文件上传到os

mysql表操作与查询功能详解

《mysql表操作与查询功能详解》本文系统讲解MySQL表操作与查询,涵盖创建、修改、复制表语法,基本查询结构及WHERE、GROUPBY等子句,本文结合实例代码给大家介绍的非常详细,感兴趣的朋友跟随... 目录01.表的操作1.1表操作概览1.2创建表1.3修改表1.4复制表02.基本查询操作2.1 SE

c++中的set容器介绍及操作大全

《c++中的set容器介绍及操作大全》:本文主要介绍c++中的set容器介绍及操作大全,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录​​一、核心特性​​️ ​​二、基本操作​​​​1. 初始化与赋值​​​​2. 增删查操作​​​​3. 遍历方

MySQL追踪数据库表更新操作来源的全面指南

《MySQL追踪数据库表更新操作来源的全面指南》本文将以一个具体问题为例,如何监测哪个IP来源对数据库表statistics_test进行了UPDATE操作,文内探讨了多种方法,并提供了详细的代码... 目录引言1. 为什么需要监控数据库更新操作2. 方法1:启用数据库审计日志(1)mysql/mariad

springboot如何通过http动态操作xxl-job任务

《springboot如何通过http动态操作xxl-job任务》:本文主要介绍springboot如何通过http动态操作xxl-job任务的问题,具有很好的参考价值,希望对大家有所帮助,如有错... 目录springboot通过http动态操作xxl-job任务一、maven依赖二、配置文件三、xxl-

Oracle 数据库数据操作如何精通 INSERT, UPDATE, DELETE

《Oracle数据库数据操作如何精通INSERT,UPDATE,DELETE》在Oracle数据库中,对表内数据进行增加、修改和删除操作是通过数据操作语言来完成的,下面给大家介绍Oracle数... 目录思维导图一、插入数据 (INSERT)1.1 插入单行数据,指定所有列的值语法:1.2 插入单行数据,指

k8s上运行的mysql、mariadb数据库的备份记录(支持x86和arm两种架构)

《k8s上运行的mysql、mariadb数据库的备份记录(支持x86和arm两种架构)》本文记录在K8s上运行的MySQL/MariaDB备份方案,通过工具容器执行mysqldump,结合定时任务实... 目录前言一、获取需要备份的数据库的信息二、备份步骤1.准备工作(X86)1.准备工作(arm)2.手

SQL中JOIN操作的条件使用总结与实践

《SQL中JOIN操作的条件使用总结与实践》在SQL查询中,JOIN操作是多表关联的核心工具,本文将从原理,场景和最佳实践三个方面总结JOIN条件的使用规则,希望可以帮助开发者精准控制查询逻辑... 目录一、ON与WHERE的本质区别二、场景化条件使用规则三、最佳实践建议1.优先使用ON条件2.WHERE用

华为鸿蒙HarmonyOS 5.1官宣7月开启升级! 首批支持名单公布

《华为鸿蒙HarmonyOS5.1官宣7月开启升级!首批支持名单公布》在刚刚结束的华为Pura80系列及全场景新品发布会上,除了众多新品的发布,还有一个消息也点燃了所有鸿蒙用户的期待,那就是Ha... 在今日的华为 Pura 80 系列及全场景新品发布会上,华为宣布鸿蒙 HarmonyOS 5.1 将于 7