Java如何从Redis中批量读取数据

2025-05-31 03:50
文章标签 java redis 批量 读取数据

本文主要是介绍Java如何从Redis中批量读取数据,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

《Java如何从Redis中批量读取数据》:本文主要介绍Java如何从Redis中批量读取数据的情况,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教...

一.背景概述

本周接到一个新的需求:从用户dau日志文件中读取用户uid,然后到Redis中获取对应的用户数据。用户的uid存储于login_day_20220913.txt文件,共1亿2千多万条数据,数量达1.4G。

要求:尽量在2小时内获得结果,在数据处理过程中,Redis服务器QPS尽量低,不超过某个阈值,不然会触发监控报警。数据从Redis从库读取,只提供一个端口。

二.分析与实现

由于之前做过相同数据量的统计需求,所以从一开始就确定单线程完成此次数据处理也是可以的。实际上,对多线程和并发的使用需要慎之又慎,特别是在业务繁忙的系统或环境下。

接触Redis的朋友都知道,Redis是支持批量读取的,其中常用的两个方法:mget()和hmget()。

本次处理的数据不是哈希结构,所以确定使用mget()。

此时,我自然而然地问了同事一个问题,那就是mget批量处理数据的最佳参数范围是多少?因为mget()接受一个字符串数组参数,也就是说字符串数组的长度最佳为多少?

同事并没有给我明确的答案,只是说他们日常每批次处理10000条,建议我自己可以尝试一下,于是我打算试试50000条数据。

主要代码如下:

package com.sina.weibo;

import com.sina.weibo.util.FileUtils;
import com.sina.weibo.util.ListUtil;
import org.apache.commons.lang3.time.StopWatch;
import redis.clients.jedis.Jedis;

import Java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.concurrent.TimeUnit;

public class Application {
    /** dau数据读取路径 */
    private static String dauDataPath = "/data1/sinawap/var/logs/wapcommon/place/user_position/dau/login_day_20220913.txt";

    /** 结果输出路径 */
    private static String outputPath = "/data1/bingqing5/importcampusdata/output/campus_data.txt";

    /** 已处理过的uid数据存储路径 */
    private static String processedUidDataPath = "/data1/bingqing5/importcampusdata/process/processed_uid.txt";

    public static void main(String[] args) {
        StopWatch stopWatch = new StopWatch();
        // 开始时间
        stopWatch.start();
        System.out.println("================程序开始===============");
        transfer(dauDataPath, processedUidDataPath, outputPath);
        System.out.println("================程序结束===============");
        // 结束时间
        stopWatch.stop();
        // 统计执行时间(秒)
        System.out.println("执行时长:" + stopWatch.getTime(TimeUnit.SECONDS) + " 秒.");
    }

    private static void transfer(String dauDataPath, String processedUidDataPath, String outputPath) {
        List<String> dauDataList = FileUtils.readInfoFromFile(dauDataPath);
        List<List<String>> bucket = ListUtil.splitList(dauDataList, 50000);
        Jedis jedis = new Jedis("rdsxxxxx.xxxx.xxxx.xxxx.com.cn",50000);
        List<String> processedUidDataList = FileUtils.readInfoFromFile(processedUidDataPath);
        LinkedHashSet<String> linkedHashSet = ListUtil.getLinkedHashSet(processedUidDataList);
        for (List<String> list : bucket) {
            List<String> jsonStrList = jedis.mget(list.toArray(new String[list.size()]));
            for (int i = 0; i < list.size(); i++) {
                if (!linkedHashSet.contains(list.get(i))) {
                    String uid = list.get(i);
                    FileUtils.appendInfoToFile(processedUidDataPath, uid);
                    String jsonStr = jsonStrList.get(i);
                    if (jsonStr == null || jsonStr == "") continue;
                    String content = uid + "\t" + jsonStr;
                    FileUtils.appendInfoToFile(outputPath, content);
                }
            }
            System.out.println(list.size());
        }
    }
}

三.发现问题与屡次改进

3.1.QPS过高而且波动很大

上述代码上线后没多久,就被同事找来,说QPS过高,开始的时候瞬间达到近100k,之后稳定在70k~100k之间。因为担心影响其他业务,于是把jar包暂停,着手优化

于是,我多次修改如下代码:

List<List<String>> bucket = ListUtil.splitList(dauDataList, 50000);

将50000,调整为10000,5000,1000,500,100等值逐一尝试。

QPS确实逐步降下来了,但是即便是每次处理1000条,QPS也有40K左右。

3.2.程序中断,抛异常

最终以每批次读取500条数据,将代码上线。但是程序总是中断报错,抛出异常:

Java如何从Redis中批量读取数据

而这时候已处理的数据量达到几千万条。

最初怀疑是因为jedis对象没有调用close方法,于是修改代码如下:

package com.sina.weibo;

import com.sina.weibo.util.FileUtils;
import com.sina.weibo.util.ListUtil;
import org.apache.commons.lang3.time.StopWatch;
import redis.clients.jedis.Jedis;

import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.concurrent.TimeUnit;

public class Application {
    /** dau数据读取路径 */
    private static String dauDataPath = "/data1/sinawap/var/logs/wapcommon/place/user_position/dau/login_day_20220913.txt";

    /** 结果输出路径 */
    private static String outputPath = "/data1/bingqing5/importcampusdata/output/campus_data.txt";

    /** 已处理过的uid数据存储路径 */
    private static String processedUidDataPath = "/data1/bingqing5/importcampusdata/process/processed_uid.txt";

    public static void main(String[] args) {
        StopWatch stopWatch = new StopWatch();
        // 开始时间
        stopWatch.start();
        System.out.println("================程序开始===============");
        transfer(dauDataPath, processedUidDataPath, outputPath);
        System.out.println("================程序结束===============");
      编程  // 结束时间
        stopWatch.stop();
        // 统计执行时间(秒)
        System.out.println("执行时长:" + stopWatch.getTime(TimeUnit.SECONDS) + " 秒.");
    }

    private static void transfer(String dauDataPath, String processedUidDataPath, String outputPath) {
        List<String> dauDataList = FileUtils.readInfoFromFile(dauDataPath);
        List<List<String>> bucket = ListUtil.splitList(dauDataList, 50000);
        List<String> processedUidDataList = FileUtils.readInfoFromFile(processedUidDataPath);
        LinkedHashSet<String> linkedHashSet = ListUtil.getLinkedHashSet(processedUidDataList);
        for (List<String> list : bucket) {
            Jedis jedis = new Jedis(rdsxxxxx.xxxx.xxxx.xxxx.com.cn", 50000);
            List<String> jsonStrList = jedis.mget(list.toArray(new String[list.size()]));
            for (int i = 0; i < list.size(); i++) {
                if (!linkedHashSet.contains(list.get(i))) {
                    String uid = list.get(i);
                    FileUtils.appendInfoToFile(processedUidDataPath, uid);
                    String jsonStr = jsonStrList.get(i);
                    if (jsonStr == null || jsonStr == "") continue;
                    String content = uid + "\t" + jsonStr;
                    FileUtils.appendInfoToFile(outputPath, content);
                }
            }
            jedis.close();
            System.out.println(list.size());
        }
    }
}

修改后跑程序依旧没有任何改善,继续修改,代码如下:

package com.sina.weibo;

import com.sina.weibo.util.FileUtils;
import com.sina.weibo.util.ListUtil;
import org.apache.commons.lang3.time.StopWatch;
import redis.clients.jedis.Jedis;

import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.concurrent.TimeUnit;

public class A {
    /** dau数据读取路径 */
    private static String dauDataPath = "/data1/sinawap/var/logs/wapcommon/place/user_position/dau/login_day_20220913.txt";

    /** 结果输出路径 */
    private static String outputPath = "/data1/bingqing5/importcampusdata/output/campus_data.txt";

    /** 已处理过的uid数据存储路径 */
    private static String processedUidDataPath = "/data1/bingqing5/importcampusdata/process/processed_uid.txt";

    public static void main(String[] args) {
        StopWatch stopWatch = new StopWatch();
        // 开始时间
        stopWatch.start();
        System.out.println("================程序开始===============");
        transfer(dauDataPath, processedUidDataPath, outputPath);
        System.out.println("================程序结束===============");
        // 结束时间
        stopWatch.stop();
        // 统计执行时间(秒)
        System.out.println("执行时长:" + stopWatch.getTime(TimeUnit.SECONDS) + " 秒.");
    }

    private static void transfer(String dauDataPath, String processedUidDataPath, String outputPath) {
        List<String> dauDataList = FileUtils.readInfoFromFile(dauDataPath);
        List<List<String>> bucket = ListUtil.splitList(dauDataList, 50000);
        List<String> processedUidDataList = FileUtils.readInfoFromFile(processedUidDataPath);
        LinkedHashSet<String> linkedHashSet = ListUtil.getLinkedHashSet(processedUidDataList);
        for (List<String> list : bucket) {
            Jedis jedis = new Jedis("rdsxxxxx.xxxx.xxxx.xxxx.com.cn", 50000);
            List<String> jsonStrList = jedis.mget(list.toArray(new String[list.size()]));
            for (int i = 0; i < list.size(); i++) {
                if (!linkedHashSet.contains(list.get(i))) {
                    String uid = list.get(i);
                    FileUtils.appendInfoToFile(processedUidDataPath, uid);
                    String jsonStr = jsonStrList.get(i);
                    if (jsonStr == null || jsonStr == "") continue;
                    String content = uid + "\t" + jsonStr;
                    FileUtils.appendInfoToFile(outputPath, content);
                }
   python         }
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                jedis.close();
            }
            System.out.println(list.size());
        }
    }
}

上线以后,观测发现QPS区域稳定,但是程序会空跑,也就是从头开始将已处理的数据也要逐一读取一次,很多时候都没有跑到上次程序处理的地方就已经被迫退出。

linkedHashSet本来是用来标记上次程序运行停止的地方,但是似乎并没有完全发挥作用。

于是修改代码,加入一个新的list集合,用于存放还没有处理过的数据,代码如下:

package com.sina.weibo;

import com.sina.weibo.util.FileUtils;
import com.sina.weibo.util.ListUtil;
import org.apache.commons.lang3.time.StopWatch;
import redis.clients.jedis.Jedis;

import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.concurrent.TimeUnit;

/**
 * @author bingqing5 
 * @date 2022/09/14 15:00
 * @version 1.0
 */

public class Application {

    /** dau数据读取路径 */
    private static String dauDataPath = "/data1/sinawap/var/logs/wapcommon/place/user_position/dau/login_day_20220913.txt";

    /** 结果输出路径 */
    private static String outputPath = "/data1/bingqing5/importcampusdata/output/campus_data.txt";
    
    /** 已处理过的uid数据存储路径 */
    private static String processedUidDataPath = "/data1/bingqing5/importcampusdata/process/processed_uid.txt";

    public static void main(String[] args) {
        StopWatch stopWatch = new StopWatch();
        // 开始时间
        stopWatch.start();
        System.out.println("================程序开始===============");
//        transfer(dauDataPath, processedUidDataPath, outputPath);
        List<String> dauDataList = FileUtils.readInfoFromFile(dauDataPath);
//        List<List<String>> bucket = ListUtil.splitList(dauDataList, 50000);
//        Jedis jedis = new Jedis("rdsxxxxx.xxxx.xxxx.xxxx.com.cn", 50000);
        List<String> processedUidDataList = FileUtils.readInfoFromFile(processedUidDataPath);
        LinkedHashSet<String> linkedHashSet = ListUtil.getLinkedHashSet(processedUidDataList);
        List<String> uidList = new ArrayList<>();
        for (String uid : dauDataList) {
            if (linkedHashSet.contains(uid)) {
                continue;
            } else {
                uidList.add(uid);
            }
        }

        List<List<String>> bucket;
        if (uidList.size() != 0) {
            bucket = ListUtil.splitList(uidList, 10000);
        } else {
            bucket = new ArrayList<>();
        }

        for (List<String> list : bucket) {
            Jedis jedis = new Jedis("rdsxxxxx.xxxx.xxxx.xxxx.com.cn", 50000);
            List<String> jsonStrList = jedis.mget(list.toArray(new String[list.size()]));
            for (int i = 0; i < list.size(); i++) {
                if (!linkedHashSet.contains(list.get(i))) {
                    String uid = list.get(i);
                    FileUtils.appendInfoToFile(processedUidDataPath, uid);
                    String jsonStr = jsonStrList.get(i);
                    if (jsonStr == null || jsonStr == "") continue;
                    String content = uid + "\t" + jsonStr;
            javascript        FileUtils.appendInfoToFile(outputPath, content);
                }
            }
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                jedis.close();
            }
            System.out.println(list.size());
        }
        System.out.println("================程序结束===============");
        // 结束时间
        stopWatch.stop();
        // 统计执行时间(秒)
        System.out.println("执行时长:" + stopWatch.getTime(TimeUnit.SECONDS) + " 秒.");
    }


}

终于这次修改后,上线代码,代码平稳运行。

此时查看QPS,发现10000的批读取量,QPS文档在25K以下,此前同样的数据量,QPS能达到40K。

Java如何从Redis中批量读取数据

3.3.内存消耗过大

在上次修改后,程序平稳运行,期间我查看了机器状态,发现我跑的jar包竟然消耗了32%左右的内存,那台机器也不过62G的总内存。虽然不缺内存资源,但是还是决定趁着程序在跑的期间,回顾一下代码。

List<List<String>> bucket = ListUtil.splitList(dauDataList, 10000);

上面这行代码是将所有的用户uid数据按照10000的大小均等分割,每次遍历,要重复创建同一类Jedis对象,也会消耗大量内存。

另外,下面这段程序:

 List<String> uidList = new ArrayList<>();
        for (String uid : dauDataList) {
            if (linkedHashSet.contains(uid)) {
                continue;
            } else {
                uidList.add(uid);
            }
        }

已经对处理过的数据做过筛选,在循环中再次做如下判断:

if (!linkedHashSet.contains(list.get(i))) {
                   
}

也是多次一举,会增加耗时。

综合以上考虑,我做了修改,代码如下:

package com.sina.weibo;

import com.sina.weibo.util.FileUtils;
import com.sina.weibo.util.ListUtil;
import org.apache.commons.lang3.time.StopWatch;
import redis.clients.jedis.Jedis;

import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.concurrent.TimeUnit;

/**
 * @author bingqing5
 * @date 2022/09/14 15:00
 * @version 1.0
 */

public class Application {

    /** dau数据读取路径 */
    private static String dauDataPath = "/data1/sinawap/var/logs/wapcommon/place/user_position/dau/login_day_20220913.txt";

    /** 结果输出路径 */
//    private static String outputPath = "/data1/bingqing5/redis_test/output/campus_data.txt";
    private static String outputPath = "/data1/bingqing/redis_test/output/campus_data.txt";

    /** 已处理过的uid数据存储路径 */
//    private static String processedUidDataPath = "/data1/bingqing5/redis_test/process/processed_uid.txt";
    private static String processedUidDataPath = "/data1/bingqing/redis_test/process/processed_uid.txt";

    public static void main(String[] args) {
        StopWatch stopWatch = new StopWatch();
        // 开始时间
        stopWatch.start();
        System.out.println("================程序开始===============");
        transfer(dauDataPath, processedUidDataPath, outputPath);
        System.out.println("================程序结束===============");
        // 结束时间
        stopWatch.stop();
        // 统计执行时间(秒)
        System.out.println("执行时长:" + stopWatch.getTime(TimeUnit.SECONDS) + " 秒.");
    }

    private static void transfer(String dauDataPath, String processedUidDataPath, String outputPath) {
        List<String> dauDataList = FileUtils.readInfoFromFile(dauDataPath);
        Jedis jedis = new Jedis("rdsxxxxx.xxxx.xxxx.xxxx.com.cn",www.chinasem.cn 50000);
        List<String> processedUidDataList = FileUtils.readInfoFromFile(processedUidDataPath);
        LinkedHashSet<String> linkedHashSet = ListUtil.getLinkedHashSet(processedUidDataList);
        List<String> uidList = new ArrayList<>();
        for (String uid : dauDataList) {
            if (linkedHashSet.contains(uid)) {
                continue;
            } else {
                uidList.add(uid);
            }
        }
        List<List<String>> bucket;
        if (uidList.size() != 0) {
            bucket = ListUtil.splitList(uidList, 50000);
        } else {
            bucket = new ArrayList<>();
        }

        for (List<String> list : bucket) {
            List<String> jsonStrList = jedis.mget(list.toArray(new String[list.size()]));
            for (int i = 0; i < list.size(); i++) {
                String uid = list.get(i);
                FileUtils.appendInfoToFile(processedUidDataPath, uid);
                String jsonStr = jsonStrList.get(i);
                if (jsonStr == null || jsonStr == "") continue;
                String content = uid + "\t" + jsonStr;
                FileUtils.appendInfoToFile(outputPath, content);
            }

            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                jedis.close();
            }
            System.out.println(list.size());
        }
    }
}

修改代码以后,替换掉原先运行的jphpar包,接着运行。发现内存消耗明显降低,稳定占总内存的20%。

然后尝试修改了mget参数量,修改为50000条,再次运行程序发现QPS稳定在40K左右。

Java如何从Redis中批量读取数据

总结

本篇算是笔者刚接触Redis不久的一篇随手记。通过本次需求的开发经历,让我对Redis有了直观的了解,同时也理解了代码优化在实际生产工作和开发中的潜在价值。

关于Redis,在快速直接从Redis读取数据的场景中,尤其是数据量大的时候,为了防止QPS过高,最好在处理一批次数据后空出一定的时间间隔,比如可以让线程暂时休眠一定时间间隔,再进行下批次读取和处理。

关于代码优化,尽量创建可重复使用的对象,非必要不添加同类对象,避免大量创建对象带来的资源消耗,本次经历也算是很鲜明的体会到这点。

以上为个人经验,希望能给大家一个参考,也希望大家多多支持China编程(www.chinasem.cn)。

这篇关于Java如何从Redis中批量读取数据的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1154871

相关文章

Python实现批量CSV转Excel的高性能处理方案

《Python实现批量CSV转Excel的高性能处理方案》在日常办公中,我们经常需要将CSV格式的数据转换为Excel文件,本文将介绍一个基于Python的高性能解决方案,感兴趣的小伙伴可以跟随小编一... 目录一、场景需求二、技术方案三、核心代码四、批量处理方案五、性能优化六、使用示例完整代码七、小结一、

Java实现将HTML文件与字符串转换为图片

《Java实现将HTML文件与字符串转换为图片》在Java开发中,我们经常会遇到将HTML内容转换为图片的需求,本文小编就来和大家详细讲讲如何使用FreeSpire.DocforJava库来实现这一功... 目录前言核心实现:html 转图片完整代码场景 1:转换本地 HTML 文件为图片场景 2:转换 H

Java使用jar命令配置服务器端口的完整指南

《Java使用jar命令配置服务器端口的完整指南》本文将详细介绍如何使用java-jar命令启动应用,并重点讲解如何配置服务器端口,同时提供一个实用的Web工具来简化这一过程,希望对大家有所帮助... 目录1. Java Jar文件简介1.1 什么是Jar文件1.2 创建可执行Jar文件2. 使用java

C#实现一键批量合并PDF文档

《C#实现一键批量合并PDF文档》这篇文章主要为大家详细介绍了如何使用C#实现一键批量合并PDF文档功能,文中的示例代码简洁易懂,感兴趣的小伙伴可以跟随小编一起学习一下... 目录前言效果展示功能实现1、添加文件2、文件分组(书签)3、定义页码范围4、自定义显示5、定义页面尺寸6、PDF批量合并7、其他方法

SpringBoot实现不同接口指定上传文件大小的具体步骤

《SpringBoot实现不同接口指定上传文件大小的具体步骤》:本文主要介绍在SpringBoot中通过自定义注解、AOP拦截和配置文件实现不同接口上传文件大小限制的方法,强调需设置全局阈值远大于... 目录一  springboot实现不同接口指定文件大小1.1 思路说明1.2 工程启动说明二 具体实施2

Java实现在Word文档中添加文本水印和图片水印的操作指南

《Java实现在Word文档中添加文本水印和图片水印的操作指南》在当今数字时代,文档的自动化处理与安全防护变得尤为重要,无论是为了保护版权、推广品牌,还是为了在文档中加入特定的标识,为Word文档添加... 目录引言Spire.Doc for Java:高效Word文档处理的利器代码实战:使用Java为Wo

SpringBoot日志级别与日志分组详解

《SpringBoot日志级别与日志分组详解》文章介绍了日志级别(ALL至OFF)及其作用,说明SpringBoot默认日志级别为INFO,可通过application.properties调整全局或... 目录日志级别1、级别内容2、调整日志级别调整默认日志级别调整指定类的日志级别项目开发过程中,利用日志

Java中的抽象类与abstract 关键字使用详解

《Java中的抽象类与abstract关键字使用详解》:本文主要介绍Java中的抽象类与abstract关键字使用详解,本文通过实例代码给大家介绍的非常详细,感兴趣的朋友跟随小编一起看看吧... 目录一、抽象类的概念二、使用 abstract2.1 修饰类 => 抽象类2.2 修饰方法 => 抽象方法,没有

SpringBoot 多环境开发实战(从配置、管理与控制)

《SpringBoot多环境开发实战(从配置、管理与控制)》本文详解SpringBoot多环境配置,涵盖单文件YAML、多文件模式、MavenProfile分组及激活策略,通过优先级控制灵活切换环境... 目录一、多环境开发基础(单文件 YAML 版)(一)配置原理与优势(二)实操示例二、多环境开发多文件版

Spring 中的切面与事务结合使用完整示例

《Spring中的切面与事务结合使用完整示例》本文给大家介绍Spring中的切面与事务结合使用完整示例,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考... 目录 一、前置知识:Spring AOP 与 事务的关系 事务本质上就是一个“切面”二、核心组件三、完