RedisCluster-Pipeline操作,提升10倍以上响应速度2021-03-15

2024-06-02 15:32

本文主要是介绍RedisCluster-Pipeline操作,提升10倍以上响应速度2021-03-15,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!


文章目录
什么是pipeLine 为什么使用pipeLine ?
为什么RedisCluster无法使用pipeline?
如何基于JedisCluster扩展pipeline?
性能对比(提升10倍以上):
本文中的代码来自我正在写的分布式缓存框架(主要解决缓存使用中的各种痛点:缓存穿透\redis-cluster pipeline\注解使用等等)。后续内部推广使用后、成熟后会开源回馈大家。

什么是pipeLine 为什么使用pipeLine ?
管道(pipeline)将客户端 client 与服务器端的交互明确划分为单向的发送请求(Send Request)和接收响应(Receive Response):用户可以将多个操作连续发给服务器,但在此期间服务器端并不对每个操作命令发送响应数据;全部请求发送完毕后用户关闭请求,开始接收响应获取每个操作命令的响应结果。

管道(pipeline)在某些场景下非常有用,比如有多个操作命令需要被迅速提交至服务器端,但用户并不依赖每个操作返回的响应结果,对结果响应也无需立即获得,那么管道就可以用来作为优化性能的批处理工具。性能提升的原因主要是减少了 TCP 连接中交互往返的开销。

不过在程序中使用管道请注意,使用 pipeline 时客户端将独占与服务器端的连接,此期间将不能进行其他“非管道”类型操作,直至 pipeline 被关闭;如果要同时执行其他操作,可以为 pipeline 操作单独建立一个连接,将其与常规操作分离开来。

从原理上来看,pipeline就是用一个redis 的Socket连接 去多次执行redis命令(发送请求)而不必等待响应,当所有请求都执行完毕后再一次性的从这个socket中读取请求。期间减少了在网络上的无用等待,通常会有3-10倍以上的速度提升:

    //非pipeline
    [req1]
         [==waiting===]
                          [resp1]
                                [req2]
                                     [====waiting=====]
                                                      [resp2]

    //pipeline
    [req1][==waiting===]
         [req2][==waiting===]
                     [resp1] [resp2]

pipeline代码示例

@Test

  public void pipeline() throws UnsupportedEncodingException {

    Pipeline p = jedis.pipelined();

    p.set("foo", "bar");

    p.get("foo");
      
    for(int i=0;i<10;i++){
        
        p.set("foo"+i, "bar");
    }


    List<Object> results = p.syncAndReturnAll();

  }

为什么RedisCluster无法使用pipeline?
主要是因为redis-cluster的hash分片,如下图一个3master-3slave 的 redisCluster:

这里写图片描述

具体的redis命令,会根据key计算出一个槽位(slot),然后根据槽位去特定的节点redis上执行操作。

其中master1代表了 0~5460的槽位,master2代表了 5461~10922的槽位,master1代表了 10923~16383的槽位。

    master1(slave1): 0~5460

    master2(slave2):5461~10922

    master3(slave3):10923~16383


以以下代码为例:

        for(int i=0;i<10;i++){
            p.set("foo"+i, "bar");
        }

那么pipeline中每个单独的操作,需要根据“key”运算一个槽位(JedisClusterCRC16.getSlot(key)),然后根据槽位去特定的机器执行命令。也就是说一次pipeline操作会使用多个节点的redis连接,而目前JedisCluster是无法支持的。

如何基于JedisCluster扩展pipeline?
设计思路(ShardedJedis、redisson也可供参考,):

1.首先要根据key计算出此次pipeline会使用到的节点对于的连接(也就是jedis对象,通常每个节点对应一个Pool)。

2.相同槽位的key,使用同一个jedis.pipeline去执行 命令。

3.合并此次pipeline所有的response返回。

4.连接释放返回到池中。

也就是讲一个JedisCluster下的pipeline分解为每个单节点下独立的jedisPipeline操作,最后合并response返回。

分享以下部分核心代码:

/**
 * @author zhangshuo
 */
@Slf4j
public class JedisClusterPipeLine extends PipelineBase implements Closeable {

......
......

    private final Queue<Client> orderedClients = new LinkedList<Client>();

    /** 一次pipeline过程中使用到的jedis缓存 */
    private final Map<JedisPool, Jedis> poolToJedisMap = new HashMap<JedisPool, Jedis>();

    private final JedisSlotBasedConnectionHandler connectionHandler;
    private final JedisClusterInfoCache clusterInfoCache;

    public JedisClusterPipeLine(JedisCluster jedisCluster) {
        this.connectionHandler = ClassUtils.getValue(jedisCluster, SLOT_BASED__CONNECTION_HANDLER_FIELD);
        this.clusterInfoCache = ClassUtils.getValue(connectionHandler, CLUSTER_INFO_CACHE_FIELD);
    }

    @Override
    protected Client getClient(String key) {

        return getClient(SafeEncoder.encode(key));
    }

    @Override
    protected Client getClient(byte[] key) {

        Client client;
        log.debug("size of orderedClients : {} , size of poolToJedis : {} ", orderedClients.size(),
                poolToJedisMap.size());

        int slot = JedisClusterCRC16.getSlot(key);

        JedisPool pool = clusterInfoCache.getSlotPool(slot);

        Jedis borrowedJedis = poolToJedisMap.get(pool);

        if (null == borrowedJedis) {
            borrowedJedis = pool.getResource();
            poolToJedisMap.put(pool, borrowedJedis);
        }

        client = borrowedJedis.getClient();

        orderedClients.add(client);

        return client;
    }

    @Override
    public void close() {
        for (Jedis jedis : poolToJedisMap.values()) {
            jedis.close();
        }

        clean();
        orderedClients.clear();
        poolToJedisMap.clear();
    }

    public void sync() {
        for (Client client : orderedClients) {
            generateResponse(client.getOne());
        }
    }

    /**
     * go through all the responses and generate the right response type (warning :
     * usually it is a waste of time).
     * 
     * @return A list of all the responses in the order
     */
    public List<Object> syncAndReturnAll() {
        List<Object> formatted = new ArrayList<Object>();
        for (Client client : orderedClients) {
            formatted.add(generateResponse(client.getOne()).get());
        }
        return formatted;
    }

    public void refreshNodesInfo() {
        connectionHandler.renewSlotCache();
    }
......
......
}

性能对比(提升10倍以上):
    @Test
    public void jedisTest() throws UnsupportedEncodingException {
 
        long start2 = System.currentTimeMillis();
 
        try (JedisClusterClient<Object> jc = jedisClusterClient) {
            for (int i = 0; i < 100; i++) {
                jc.set("NO." + i, "value" + i);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println(System.currentTimeMillis() - start2);// 5688ms
 
    }
 
    /**
     *
     */
    @Test
    public void clusterPipeline() {
        long start = System.currentTimeMillis();
        try (JedisClusterPipeLine pipeline = jedisClusterClient.pipelined()) {
            for (int i = 0; i < 100; i++) {
 
                pipeline.set("NO." + i, "value" + i);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println(System.currentTimeMillis() - start);// 174ms
    }
}

结论:对于批量操作,响应提升明显:如上本机测试中,提升了约50倍。
 

这篇关于RedisCluster-Pipeline操作,提升10倍以上响应速度2021-03-15的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1024362

相关文章

SQL中JOIN操作的条件使用总结与实践

《SQL中JOIN操作的条件使用总结与实践》在SQL查询中,JOIN操作是多表关联的核心工具,本文将从原理,场景和最佳实践三个方面总结JOIN条件的使用规则,希望可以帮助开发者精准控制查询逻辑... 目录一、ON与WHERE的本质区别二、场景化条件使用规则三、最佳实践建议1.优先使用ON条件2.WHERE用

Linux链表操作方式

《Linux链表操作方式》:本文主要介绍Linux链表操作方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、链表基础概念与内核链表优势二、内核链表结构与宏解析三、内核链表的优点四、用户态链表示例五、双向循环链表在内核中的实现优势六、典型应用场景七、调试技巧与

Java Multimap实现类与操作的具体示例

《JavaMultimap实现类与操作的具体示例》Multimap出现在Google的Guava库中,它为Java提供了更加灵活的集合操作,:本文主要介绍JavaMultimap实现类与操作的... 目录一、Multimap 概述Multimap 主要特点:二、Multimap 实现类1. ListMult

Python中文件读取操作漏洞深度解析与防护指南

《Python中文件读取操作漏洞深度解析与防护指南》在Web应用开发中,文件操作是最基础也最危险的功能之一,这篇文章将全面剖析Python环境中常见的文件读取漏洞类型,成因及防护方案,感兴趣的小伙伴可... 目录引言一、静态资源处理中的路径穿越漏洞1.1 典型漏洞场景1.2 os.path.join()的陷

Python使用Code2flow将代码转化为流程图的操作教程

《Python使用Code2flow将代码转化为流程图的操作教程》Code2flow是一款开源工具,能够将代码自动转换为流程图,该工具对于代码审查、调试和理解大型代码库非常有用,在这篇博客中,我们将深... 目录引言1nVflRA、为什么选择 Code2flow?2、安装 Code2flow3、基本功能演示

Python中OpenCV与Matplotlib的图像操作入门指南

《Python中OpenCV与Matplotlib的图像操作入门指南》:本文主要介绍Python中OpenCV与Matplotlib的图像操作指南,本文通过实例代码给大家介绍的非常详细,对大家的学... 目录一、环境准备二、图像的基本操作1. 图像读取、显示与保存 使用OpenCV操作2. 像素级操作3.

python操作redis基础

《python操作redis基础》Redis(RemoteDictionaryServer)是一个开源的、基于内存的键值对(Key-Value)存储系统,它通常用作数据库、缓存和消息代理,这篇文章... 目录1. Redis 简介2. 前提条件3. 安装 python Redis 客户端库4. 连接到 Re

Java Stream.reduce()方法操作实际案例讲解

《JavaStream.reduce()方法操作实际案例讲解》reduce是JavaStreamAPI中的一个核心操作,用于将流中的元素组合起来产生单个结果,:本文主要介绍JavaStream.... 目录一、reduce的基本概念1. 什么是reduce操作2. reduce方法的三种形式二、reduce

MySQL表空间结构详解表空间到段页操作

《MySQL表空间结构详解表空间到段页操作》在MySQL架构和存储引擎专题中介绍了使用不同存储引擎创建表时生成的表空间数据文件,在本章节主要介绍使用InnoDB存储引擎创建表时生成的表空间数据文件,对... 目录️‍一、什么是表空间结构1.1 表空间与表空间文件的关系是什么?️‍二、用户数据在表空间中是怎么

Python对PDF书签进行添加,修改提取和删除操作

《Python对PDF书签进行添加,修改提取和删除操作》PDF书签是PDF文件中的导航工具,通常包含一个标题和一个跳转位置,本教程将详细介绍如何使用Python对PDF文件中的书签进行操作... 目录简介使用工具python 向 PDF 添加书签添加书签添加嵌套书签Python 修改 PDF 书签Pytho