ES实现百亿级数据实时分析实战案例

2024-09-06 20:08

本文主要是介绍ES实现百亿级数据实时分析实战案例,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

点击上方蓝色字体,选择“设为星标

回复”资源“获取更多资源

背景

我们小组前段时间接到一个需求,希望能够按照小时为单位,看到每个实验中各种特征(单个或组合)的覆盖率、正样本占比、负样本占比。我简单解释一下这三种指标的定义:

  • 覆盖率:所有样本中出现某一特征的样本的比例

  • 正样本占比:所有出现该特征的样本中,正样本的比例

  • 负样本占比:所有出现该特征的样本中,负样本的比例

光看这三个指标,大家可能会觉得这个需求很简单,无非就是一个简单的筛选、聚合而已。

如果真的这么简单,我也没必要写这篇文章单独记录了。问题的关键就在于,每小时有将近1亿的数据量,而我们需要保存7天的数据,数据总量预计超过了100亿

技术方案

在了解清楚需求后,我们小组马上对技术方案展开讨论,讨论过程中出现了3种方案:

  • 第一种:用Spark流式计算,计算每一种可能单个或组合特征的相关指标

  • 第二种:收到客户端请求后,遍历HDFS中相关数据,进行离线计算

  • 第三种:将数据按照实验+小时分索引存入ES,收到客户端请求后,实时计算返回

首先,第一种方案直接被diss,原因是一个实验一般会出现几百、上千个特征,而这些特征的组合何止几亿种,全部计算的话,可行性暂且不论,光是对资源的消耗就无法承受。

第二种方案,虽然技术上是可行的,但离线计算所需时间较长,对用户来说,体验并不理想。并且,为了计算目标1%的数据而要遍历所有数据,对资源也存在很大浪费。

第三种方案,将数据按照实验+小时分索引后,可以将每个索引包含的数据量降到1000万以下,再借助ES在查询、聚合方面高效的能力,应该可以实现秒级响应,并且用户体验也会非常好。

技术方案由此确定。

技术架构

1.用Spark从Kafka中接入原始数据,之后对数据进行解析,转换成我们的目标格式

2.将数据按照实验+小时分索引存入ES中

3.接受到用户请求后,将请求按照实验+特征+小时组合,创建多个异步任务,由这些异步任务并行从ES中过滤并聚合相关数据,得到结果

4.将异步任务的结果进行合并,返回给前端进行展示

代码实现

异步任务

// 启动并行任务final Map<String,List<Future<GetCoverageTask.Result>>> futures = Maps.newHashMap();for(String metric : metrics) { // 遍历要计算的指标final SampleRatio sampleRatio = getSampleRatio(metric);for (String exptId : expts) { // 遍历目标实验列表for (String id : features) { // 遍历要分析的特征final String name = getMetricsName(exptId, sampleRatio, id);final List<Future<GetCoverageTask.Result>> resultList = Lists.newArrayList();for (Date hour : coveredHours) { // 将时间按照小时进行拆分final String fieldName = getFieldName(isFect ? Constants.FACET_COLLECT : Constants.FEATURE_COLLECT, id);final GetCoverageTask task = new GetCoverageTask(exptId, fieldName, sampleRatio, hour);// 启动并行任务final Future<GetCoverageTask.Result> future = TaskExecutor.submit(task);resultList.add(future);}futures.put(name, resultList);}}}final QueryRes queryRes = new QueryRes();final Iterator<Map.Entry<String, List<Future<GetCoverageTask.Result>>>> it = futures.entrySet().iterator();while (it.hasNext()){// 省略结果处理流程}

指标计算

// 1\. 对文档进行聚合运行,分别得到基础文档的数量,以及目标文档数量final AggregationBuilder[] agg = getAggregationBuilder(sampleRatio, fieldName);final SearchSourceBuilder searchBuilder = new SearchSourceBuilder();searchBuilder.aggregation(agg[0]).aggregation(agg[1]).size(0);// 2\. 得到覆盖率final String indexName = getIndexName(exptId, hour);final Search search = new Search.Builder(searchBuilder.toString()).addIndex(indexName).addType(getType()).build();final SearchResult result = jestClient.execute(search);if(result.getResponseCode() != HttpUtils.STATUS_CODE_200){// 请求出错log.warn(result.getErrorMessage());return 0f;}final MetricAggregation aggregations = result.getAggregations();// 3\. 解析结果final long dividend ;if(SampleRatio.ALL == sampleRatio){dividend = aggregations.getValueCountAggregation(Constants.DIVIDEND).getValueCount();}else {dividend = aggregations.getFilterAggregation(Constants.DIVIDEND).getCount();}// 防止出现被除数为0时程序异常if(dividend <= 0){return 0f;}long divisor = aggregations.getFilterAggregation(Constants.DIVISOR).getCount();return divisor / (float)dividend;

聚合

int label = 0;final ExistsQueryBuilder existsQuery = QueryBuilders.existsQuery(fieldName);// 包含指定特征的正样本数量final BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();final List<QueryBuilder> must = boolQuery.must();// 计算样本数量TermQueryBuilder labelQuery = null;if(SampleRatio.POSITIVE == sampleRatio) {// 计算正样本数量label = 1;labelQuery = QueryBuilders.termQuery(Constants.LABEL, label);must.add(labelQuery);}else if(SampleRatio.NEGATIVE == sampleRatio) {// 计算负样本数量labelQuery = QueryBuilders.termQuery(Constants.LABEL, label);must.add(labelQuery);}must.add(existsQuery);final ValueCountAggregationBuilder existsCountAgg = AggregationBuilders.count(sampleRatio.getField());existsCountAgg.field(fieldName);final FilterAggregationBuilder filterAgg = AggregationBuilders.filter(aggName, boolQuery);filterAgg.subAggregation(existsCountAgg);return filterAgg;
上线效果

上线后表现完全满足预期,平均请求耗时在3秒左右,用户体验良好。感谢各位小伙伴的辛苦付出~~

下图是ES中部分索引的信息:

突破性能瓶颈!ElasticSearch百亿级数据检索优化案例

ElasticSearch读写底层原理及性能调优

一文俯瞰Elasticsearch核心原理

文章不错?点个【在看】吧! ????

这篇关于ES实现百亿级数据实时分析实战案例的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1142996

相关文章

C++中unordered_set哈希集合的实现

《C++中unordered_set哈希集合的实现》std::unordered_set是C++标准库中的无序关联容器,基于哈希表实现,具有元素唯一性和无序性特点,本文就来详细的介绍一下unorder... 目录一、概述二、头文件与命名空间三、常用方法与示例1. 构造与析构2. 迭代器与遍历3. 容量相关4

C++中悬垂引用(Dangling Reference) 的实现

《C++中悬垂引用(DanglingReference)的实现》C++中的悬垂引用指引用绑定的对象被销毁后引用仍存在的情况,会导致访问无效内存,下面就来详细的介绍一下产生的原因以及如何避免,感兴趣... 目录悬垂引用的产生原因1. 引用绑定到局部变量,变量超出作用域后销毁2. 引用绑定到动态分配的对象,对象

SpringBoot基于注解实现数据库字段回填的完整方案

《SpringBoot基于注解实现数据库字段回填的完整方案》这篇文章主要为大家详细介绍了SpringBoot如何基于注解实现数据库字段回填的相关方法,文中的示例代码讲解详细,感兴趣的小伙伴可以了解... 目录数据库表pom.XMLRelationFieldRelationFieldMapping基础的一些代

Java HashMap的底层实现原理深度解析

《JavaHashMap的底层实现原理深度解析》HashMap基于数组+链表+红黑树结构,通过哈希算法和扩容机制优化性能,负载因子与树化阈值平衡效率,是Java开发必备的高效数据结构,本文给大家介绍... 目录一、概述:HashMap的宏观结构二、核心数据结构解析1. 数组(桶数组)2. 链表节点(Node

Java AOP面向切面编程的概念和实现方式

《JavaAOP面向切面编程的概念和实现方式》AOP是面向切面编程,通过动态代理将横切关注点(如日志、事务)与核心业务逻辑分离,提升代码复用性和可维护性,本文给大家介绍JavaAOP面向切面编程的概... 目录一、AOP 是什么?二、AOP 的核心概念与实现方式核心概念实现方式三、Spring AOP 的关

Nginx分布式部署流程分析

《Nginx分布式部署流程分析》文章介绍Nginx在分布式部署中的反向代理和负载均衡作用,用于分发请求、减轻服务器压力及解决session共享问题,涵盖配置方法、策略及Java项目应用,并提及分布式事... 目录分布式部署NginxJava中的代理代理分为正向代理和反向代理正向代理反向代理Nginx应用场景

Python版本信息获取方法详解与实战

《Python版本信息获取方法详解与实战》在Python开发中,获取Python版本号是调试、兼容性检查和版本控制的重要基础操作,本文详细介绍了如何使用sys和platform模块获取Python的主... 目录1. python版本号获取基础2. 使用sys模块获取版本信息2.1 sys模块概述2.1.1

Python实现字典转字符串的五种方法

《Python实现字典转字符串的五种方法》本文介绍了在Python中如何将字典数据结构转换为字符串格式的多种方法,首先可以通过内置的str()函数进行简单转换;其次利用ison.dumps()函数能够... 目录1、使用json模块的dumps方法:2、使用str方法:3、使用循环和字符串拼接:4、使用字符

Linux下利用select实现串口数据读取过程

《Linux下利用select实现串口数据读取过程》文章介绍Linux中使用select、poll或epoll实现串口数据读取,通过I/O多路复用机制在数据到达时触发读取,避免持续轮询,示例代码展示设... 目录示例代码(使用select实现)代码解释总结在 linux 系统里,我们可以借助 select、

Redis中的有序集合zset从使用到原理分析

《Redis中的有序集合zset从使用到原理分析》Redis有序集合(zset)是字符串与分值的有序映射,通过跳跃表和哈希表结合实现高效有序性管理,适用于排行榜、延迟队列等场景,其时间复杂度低,内存占... 目录开篇:排行榜背后的秘密一、zset的基本使用1.1 常用命令1.2 Java客户端示例二、zse