Spark+Hbase 亿级流量分析实战(小巧高性能的ETL)

2024-01-10 10:48

本文主要是介绍Spark+Hbase 亿级流量分析实战(小巧高性能的ETL),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

在上一篇文章 大猪 已经介绍了日志存储设计方案 ,我们数据已经落地到数据中心上了,那接下来如何ETL呢?毕竟可是生产环境级别的,可不能乱来。其实只要解决几个问题即可,不必要引入很大级别的组件来做,当然了各有各的千秋,本文主要从 易懂小巧简洁高性能 这三个方面去设计出发点,顺便还实现了一个精巧的 Filebeat。

9028759-f64f78b25e312817.png

要实现的功能就是扫描每天的增量日志并写入Hbase中

9028759-1e383558758292e8.png

需要搞定下面几个不务正业的小老弟

9028759-c294a9d51452c16b.png
  1. 需要把文件中的每一行数据都取出来
  2. 能处理超过10G以上的大日志文件,并且只能占用机器一定的内存,越小越好
  3. 从上图可以看到标黄的是已经写入Hbase的数据,不能重复读取
  4. 非活跃文件不能扫,因为文件过多会影响整体读取IO性能
  5. 读取中的过程要保证增量数据不能录入,因为要保证offset的时候写入mysql稳定不跳跃
9028759-4085e8474dcefb43.png

大猪 根据线上的生产环境一一把上面的功能重新分析给实现一下。

从第一点看还是比较简单的嘛?但是我们要结合上面的 5 个问题来看才行。

总结一句话就是:要实现一个高性能而且能随时重启继续工作的 loghub ETL 程序

实际也必需这样做,因为生产环境容不得马虎,不然就等着被BOSS

9028759-a51c023de6fb202d.png
9028759-bfc932f12f86e08f.png

需要有一个读取所有日志文件方法

9028759-1ea5a6cf0b8a638f.png

还要实现一个保存并读取文件进度的方法

9028759-1853910d9211c234.png

由于不能把一个日志文件全部读入内存进行处理
所以还需要一个能根据索引一行一行接着读取数据的方法

9028759-d689a95bd39e85e4.png

最后剩下一个Hbase的连接池小工具

9028759-682336126ccefe66.png

几个核心方法已经写完了,接着是我们的主程序

def run(logPath: File, defaultOffsetDay: String): Unit = {val sdfstr = Source.fromFile(seekDayFile).getLines().mkStringval offsetDay = Option(if (sdfstr == "") null else sdfstr)//读取设置读取日期的倒数一天之后的日期文件夹val noneOffsetFold = logPath.listFiles().filter(_.getName >= LocalDate.parse(offsetDay.getOrElse(defaultOffsetDay)).minusDays(1).toString).sortBy(f => LocalDate.parse(f.getName).toEpochDay)//读取文件夹中的所有日志文件,并取出索引进行匹配val filesPar = noneOffsetFold.flatMap(files(_, file => file.getName.endsWith(".log"))).map(file => (file, seeks().getOrDefault(MD5Hash.getMD5AsHex(file.getAbsolutePath.getBytes()), 0), file.length())).filter(tp2 => {//过滤出新文件,与有增量的日志文件val fileMd5 = MD5Hash.getMD5AsHex(tp2._1.getAbsolutePath.getBytes())val result = offsets.asScala.filter(m => fileMd5.equals(m._1))result.isEmpty || tp2._3 > result.head._2}).parfilesPar.tasksupport = poolval willUpdateOffset = new util.HashMap[String, Long]()val formatter = DateTimeFormatter.ofPattern("yyyyMMddHHmmssSSS")var logTime:String = nullfilesPar.foreach(tp3 => {val hbaseClient = HbasePool.getTable//因为不能全量读取数据,所有只能一条一条读取,批量提出交给HbaseClient的客户端的mutate方式优雅处理//foreach 里面的部分就是我们的业务处理部分lines(tp3._1, tp3._2, tp3._3, () => {willUpdateOffset.put(tp3._1.getAbsolutePath, tp3._3)offsets.put(MD5Hash.getMD5AsHex(tp3._1.getAbsolutePath.getBytes), tp3._3)}).foreach(line => {val jsonObject = parse(line)val time = (jsonObject \ "time").extract[Long]val data = jsonObject \ "data"val dataMap = data.values.asInstanceOf[Map[String, Any]].filter(_._2 != null).map(x => x._1 -> x._2.toString)val uid = dataMap("uid")logTime = time.getLocalDateTime.toStringval rowkey = uid.take(2) + "|" + time.getLocalDateTime.format(formatter) + "|" + uid.substring(2, 8)val row = new Put(Bytes.toBytes(rowkey))dataMap.foreach(tp2 => row.addColumn(Bytes.toBytes("info"), Bytes.toBytes(tp2._1), Bytes.toBytes(tp2._2)))hbaseClient.mutate(row)})hbaseClient.flush()})//更新索引到文件上writeSeek(willUpdateOffset)//更新索引日期到文件上writeSeekDay(noneOffsetFold.last.getName)//把 logTime offset 写到mysql中,方便Spark+Hbase程序读取并计算}

程序很精简,没有任何没用的功能在里面,线上的生产环境就应该是这子的了。
大家还可以根据需求加入程序退出发邮件通知功能之类的。
真正去算了一下也就100行功能代码,而且占用极小的内存,都不到100M,很精很精。

9028759-31d65a54c8cfda02.png

传送门 完整ETL程序源码

心明眼亮的你、从此刻开始。

9028759-7586fee586f2075b.png
9028759-4c3677dea1cf2734.png

这篇关于Spark+Hbase 亿级流量分析实战(小巧高性能的ETL)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/590528

相关文章

MySQL 内存使用率常用分析语句

《MySQL内存使用率常用分析语句》用户整理了MySQL内存占用过高的分析方法,涵盖操作系统层确认及数据库层bufferpool、内存模块差值、线程状态、performance_schema性能数据... 目录一、 OS层二、 DB层1. 全局情况2. 内存占js用详情最近连续遇到mysql内存占用过高导致

java中pdf模版填充表单踩坑实战记录(itextPdf、openPdf、pdfbox)

《java中pdf模版填充表单踩坑实战记录(itextPdf、openPdf、pdfbox)》:本文主要介绍java中pdf模版填充表单踩坑的相关资料,OpenPDF、iText、PDFBox是三... 目录准备Pdf模版方法1:itextpdf7填充表单(1)加入依赖(2)代码(3)遇到的问题方法2:pd

深度解析Nginx日志分析与499状态码问题解决

《深度解析Nginx日志分析与499状态码问题解决》在Web服务器运维和性能优化过程中,Nginx日志是排查问题的重要依据,本文将围绕Nginx日志分析、499状态码的成因、排查方法及解决方案展开讨论... 目录前言1. Nginx日志基础1.1 Nginx日志存放位置1.2 Nginx日志格式2. 499

PyTorch中的词嵌入层(nn.Embedding)详解与实战应用示例

《PyTorch中的词嵌入层(nn.Embedding)详解与实战应用示例》词嵌入解决NLP维度灾难,捕捉语义关系,PyTorch的nn.Embedding模块提供灵活实现,支持参数配置、预训练及变长... 目录一、词嵌入(Word Embedding)简介为什么需要词嵌入?二、PyTorch中的nn.Em

在IntelliJ IDEA中高效运行与调试Spring Boot项目的实战步骤

《在IntelliJIDEA中高效运行与调试SpringBoot项目的实战步骤》本章详解SpringBoot项目导入IntelliJIDEA的流程,教授运行与调试技巧,包括断点设置与变量查看,奠定... 目录引言:为良驹配上好鞍一、为何选择IntelliJ IDEA?二、实战:导入并运行你的第一个项目步骤1

Olingo分析和实践之EDM 辅助序列化器详解(最佳实践)

《Olingo分析和实践之EDM辅助序列化器详解(最佳实践)》EDM辅助序列化器是ApacheOlingoOData框架中无需完整EDM模型的智能序列化工具,通过运行时类型推断实现灵活数据转换,适用... 目录概念与定义什么是 EDM 辅助序列化器?核心概念设计目标核心特点1. EDM 信息可选2. 智能类

Spring Boot3.0新特性全面解析与应用实战

《SpringBoot3.0新特性全面解析与应用实战》SpringBoot3.0作为Spring生态系统的一个重要里程碑,带来了众多令人兴奋的新特性和改进,本文将深入解析SpringBoot3.0的... 目录核心变化概览Java版本要求提升迁移至Jakarta EE重要新特性详解1. Native Ima

Olingo分析和实践之OData框架核心组件初始化(关键步骤)

《Olingo分析和实践之OData框架核心组件初始化(关键步骤)》ODataSpringBootService通过初始化OData实例和服务元数据,构建框架核心能力与数据模型结构,实现序列化、URI... 目录概述第一步:OData实例创建1.1 OData.newInstance() 详细分析1.1.1

Spring Boot 与微服务入门实战详细总结

《SpringBoot与微服务入门实战详细总结》本文讲解SpringBoot框架的核心特性如快速构建、自动配置、零XML与微服务架构的定义、演进及优缺点,涵盖开发环境准备和HelloWorld实战... 目录一、Spring Boot 核心概述二、微服务架构详解1. 微服务的定义与演进2. 微服务的优缺点三

Olingo分析和实践之ODataImpl详细分析(重要方法详解)

《Olingo分析和实践之ODataImpl详细分析(重要方法详解)》ODataImpl.java是ApacheOlingoOData框架的核心工厂类,负责创建序列化器、反序列化器和处理器等组件,... 目录概述主要职责类结构与继承关系核心功能分析1. 序列化器管理2. 反序列化器管理3. 处理器管理重要方