RedisCluster-Pipeline操作,提升10倍以上响应速度2021-03-15

2024-06-02 15:32

本文主要是介绍RedisCluster-Pipeline操作,提升10倍以上响应速度2021-03-15,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!


文章目录
什么是pipeLine 为什么使用pipeLine ?
为什么RedisCluster无法使用pipeline?
如何基于JedisCluster扩展pipeline?
性能对比(提升10倍以上):
本文中的代码来自我正在写的分布式缓存框架(主要解决缓存使用中的各种痛点:缓存穿透\redis-cluster pipeline\注解使用等等)。后续内部推广使用后、成熟后会开源回馈大家。

什么是pipeLine 为什么使用pipeLine ?
管道(pipeline)将客户端 client 与服务器端的交互明确划分为单向的发送请求(Send Request)和接收响应(Receive Response):用户可以将多个操作连续发给服务器,但在此期间服务器端并不对每个操作命令发送响应数据;全部请求发送完毕后用户关闭请求,开始接收响应获取每个操作命令的响应结果。

管道(pipeline)在某些场景下非常有用,比如有多个操作命令需要被迅速提交至服务器端,但用户并不依赖每个操作返回的响应结果,对结果响应也无需立即获得,那么管道就可以用来作为优化性能的批处理工具。性能提升的原因主要是减少了 TCP 连接中交互往返的开销。

不过在程序中使用管道请注意,使用 pipeline 时客户端将独占与服务器端的连接,此期间将不能进行其他“非管道”类型操作,直至 pipeline 被关闭;如果要同时执行其他操作,可以为 pipeline 操作单独建立一个连接,将其与常规操作分离开来。

从原理上来看,pipeline就是用一个redis 的Socket连接 去多次执行redis命令(发送请求)而不必等待响应,当所有请求都执行完毕后再一次性的从这个socket中读取请求。期间减少了在网络上的无用等待,通常会有3-10倍以上的速度提升:

    //非pipeline
    [req1]
         [==waiting===]
                          [resp1]
                                [req2]
                                     [====waiting=====]
                                                      [resp2]

    //pipeline
    [req1][==waiting===]
         [req2][==waiting===]
                     [resp1] [resp2]

pipeline代码示例

@Test

  public void pipeline() throws UnsupportedEncodingException {

    Pipeline p = jedis.pipelined();

    p.set("foo", "bar");

    p.get("foo");
      
    for(int i=0;i<10;i++){
        
        p.set("foo"+i, "bar");
    }


    List<Object> results = p.syncAndReturnAll();

  }

为什么RedisCluster无法使用pipeline?
主要是因为redis-cluster的hash分片,如下图一个3master-3slave 的 redisCluster:

这里写图片描述

具体的redis命令,会根据key计算出一个槽位(slot),然后根据槽位去特定的节点redis上执行操作。

其中master1代表了 0~5460的槽位,master2代表了 5461~10922的槽位,master1代表了 10923~16383的槽位。

    master1(slave1): 0~5460

    master2(slave2):5461~10922

    master3(slave3):10923~16383


以以下代码为例:

        for(int i=0;i<10;i++){
            p.set("foo"+i, "bar");
        }

那么pipeline中每个单独的操作,需要根据“key”运算一个槽位(JedisClusterCRC16.getSlot(key)),然后根据槽位去特定的机器执行命令。也就是说一次pipeline操作会使用多个节点的redis连接,而目前JedisCluster是无法支持的。

如何基于JedisCluster扩展pipeline?
设计思路(ShardedJedis、redisson也可供参考,):

1.首先要根据key计算出此次pipeline会使用到的节点对于的连接(也就是jedis对象,通常每个节点对应一个Pool)。

2.相同槽位的key,使用同一个jedis.pipeline去执行 命令。

3.合并此次pipeline所有的response返回。

4.连接释放返回到池中。

也就是讲一个JedisCluster下的pipeline分解为每个单节点下独立的jedisPipeline操作,最后合并response返回。

分享以下部分核心代码:

/**
 * @author zhangshuo
 */
@Slf4j
public class JedisClusterPipeLine extends PipelineBase implements Closeable {

......
......

    private final Queue<Client> orderedClients = new LinkedList<Client>();

    /** 一次pipeline过程中使用到的jedis缓存 */
    private final Map<JedisPool, Jedis> poolToJedisMap = new HashMap<JedisPool, Jedis>();

    private final JedisSlotBasedConnectionHandler connectionHandler;
    private final JedisClusterInfoCache clusterInfoCache;

    public JedisClusterPipeLine(JedisCluster jedisCluster) {
        this.connectionHandler = ClassUtils.getValue(jedisCluster, SLOT_BASED__CONNECTION_HANDLER_FIELD);
        this.clusterInfoCache = ClassUtils.getValue(connectionHandler, CLUSTER_INFO_CACHE_FIELD);
    }

    @Override
    protected Client getClient(String key) {

        return getClient(SafeEncoder.encode(key));
    }

    @Override
    protected Client getClient(byte[] key) {

        Client client;
        log.debug("size of orderedClients : {} , size of poolToJedis : {} ", orderedClients.size(),
                poolToJedisMap.size());

        int slot = JedisClusterCRC16.getSlot(key);

        JedisPool pool = clusterInfoCache.getSlotPool(slot);

        Jedis borrowedJedis = poolToJedisMap.get(pool);

        if (null == borrowedJedis) {
            borrowedJedis = pool.getResource();
            poolToJedisMap.put(pool, borrowedJedis);
        }

        client = borrowedJedis.getClient();

        orderedClients.add(client);

        return client;
    }

    @Override
    public void close() {
        for (Jedis jedis : poolToJedisMap.values()) {
            jedis.close();
        }

        clean();
        orderedClients.clear();
        poolToJedisMap.clear();
    }

    public void sync() {
        for (Client client : orderedClients) {
            generateResponse(client.getOne());
        }
    }

    /**
     * go through all the responses and generate the right response type (warning :
     * usually it is a waste of time).
     * 
     * @return A list of all the responses in the order
     */
    public List<Object> syncAndReturnAll() {
        List<Object> formatted = new ArrayList<Object>();
        for (Client client : orderedClients) {
            formatted.add(generateResponse(client.getOne()).get());
        }
        return formatted;
    }

    public void refreshNodesInfo() {
        connectionHandler.renewSlotCache();
    }
......
......
}

性能对比(提升10倍以上):
    @Test
    public void jedisTest() throws UnsupportedEncodingException {
 
        long start2 = System.currentTimeMillis();
 
        try (JedisClusterClient<Object> jc = jedisClusterClient) {
            for (int i = 0; i < 100; i++) {
                jc.set("NO." + i, "value" + i);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println(System.currentTimeMillis() - start2);// 5688ms
 
    }
 
    /**
     *
     */
    @Test
    public void clusterPipeline() {
        long start = System.currentTimeMillis();
        try (JedisClusterPipeLine pipeline = jedisClusterClient.pipelined()) {
            for (int i = 0; i < 100; i++) {
 
                pipeline.set("NO." + i, "value" + i);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println(System.currentTimeMillis() - start);// 174ms
    }
}

结论:对于批量操作,响应提升明显:如上本机测试中,提升了约50倍。
 

这篇关于RedisCluster-Pipeline操作,提升10倍以上响应速度2021-03-15的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1024362

相关文章

python panda库从基础到高级操作分析

《pythonpanda库从基础到高级操作分析》本文介绍了Pandas库的核心功能,包括处理结构化数据的Series和DataFrame数据结构,数据读取、清洗、分组聚合、合并、时间序列分析及大数据... 目录1. Pandas 概述2. 基本操作:数据读取与查看3. 索引操作:精准定位数据4. Group

Python操作PDF文档的主流库使用指南

《Python操作PDF文档的主流库使用指南》PDF因其跨平台、格式固定的特性成为文档交换的标准,然而,由于其复杂的内部结构,程序化操作PDF一直是个挑战,本文主要为大家整理了Python操作PD... 目录一、 基础操作1.PyPDF2 (及其继任者 pypdf)2.PyMuPDF / fitz3.Fre

Python对接支付宝支付之使用AliPay实现的详细操作指南

《Python对接支付宝支付之使用AliPay实现的详细操作指南》支付宝没有提供PythonSDK,但是强大的github就有提供python-alipay-sdk,封装里很多复杂操作,使用这个我们就... 目录一、引言二、准备工作2.1 支付宝开放平台入驻与应用创建2.2 密钥生成与配置2.3 安装ali

MySQL 强制使用特定索引的操作

《MySQL强制使用特定索引的操作》MySQL可通过FORCEINDEX、USEINDEX等语法强制查询使用特定索引,但优化器可能不采纳,需结合EXPLAIN分析执行计划,避免性能下降,注意版本差异... 目录1. 使用FORCE INDEX语法2. 使用USE INDEX语法3. 使用IGNORE IND

Python使用openpyxl读取Excel的操作详解

《Python使用openpyxl读取Excel的操作详解》本文介绍了使用Python的openpyxl库进行Excel文件的创建、读写、数据操作、工作簿与工作表管理,包括创建工作簿、加载工作簿、操作... 目录1 概述1.1 图示1.2 安装第三方库2 工作簿 workbook2.1 创建:Workboo

Ubuntu 24.04启用root图形登录的操作流程

《Ubuntu24.04启用root图形登录的操作流程》Ubuntu默认禁用root账户的图形与SSH登录,这是为了安全,但在某些场景你可能需要直接用root登录GNOME桌面,本文以Ubuntu2... 目录一、前言二、准备工作三、设置 root 密码四、启用图形界面 root 登录1. 修改 GDM 配

JSONArray在Java中的应用操作实例

《JSONArray在Java中的应用操作实例》JSONArray是org.json库用于处理JSON数组的类,可将Java对象(Map/List)转换为JSON格式,提供增删改查等操作,适用于前后端... 目录1. jsONArray定义与功能1.1 JSONArray概念阐释1.1.1 什么是JSONA

PowerShell中15个提升运维效率关键命令实战指南

《PowerShell中15个提升运维效率关键命令实战指南》作为网络安全专业人员的必备技能,PowerShell在系统管理、日志分析、威胁检测和自动化响应方面展现出强大能力,下面我们就来看看15个提升... 目录一、PowerShell在网络安全中的战略价值二、网络安全关键场景命令实战1. 系统安全基线核查

Java操作Word文档的全面指南

《Java操作Word文档的全面指南》在Java开发中,操作Word文档是常见的业务需求,广泛应用于合同生成、报表输出、通知发布、法律文书生成、病历模板填写等场景,本文将全面介绍Java操作Word文... 目录简介段落页头与页脚页码表格图片批注文本框目录图表简介Word编程最重要的类是org.apach

Python实现对阿里云OSS对象存储的操作详解

《Python实现对阿里云OSS对象存储的操作详解》这篇文章主要为大家详细介绍了Python实现对阿里云OSS对象存储的操作相关知识,包括连接,上传,下载,列举等功能,感兴趣的小伙伴可以了解下... 目录一、直接使用代码二、详细使用1. 环境准备2. 初始化配置3. bucket配置创建4. 文件上传到os