如何在Scala中读取Hadoop集群上的gz压缩文件

2024-05-15 03:18

本文主要是介绍如何在Scala中读取Hadoop集群上的gz压缩文件,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

存在Hadoop集群上的文件,大部分都会经过压缩,如果是压缩后的文件,我们直接在应用程序中如何读取里面的数据?答案是肯定的,但是比普通的文本读取要稍微复杂一点,需要使用到Hadoop的压缩工具类支持,比如处理gz,snappy,lzo,bz压缩的,前提是首先我们的Hadoop集群得支持上面提到的各种压缩文件。

本次就给出一个读取gz压缩文件的例子核心代码:

def readHdfsWriteKafkaByDate(fs:FileSystem,date:String,conf:Configuration,topic:String,finishTimeStamp:Long):Unit={//访问hdfs文件,只读取gz结尾的压缩文件,如果是.tmp结尾的不会读取val path=new Path("/collect_data/userlog/"+date+"/log*.gz")//实例化压缩工厂编码类val factory = new CompressionCodecFactory(conf)//读取通配路径val items=fs.globStatus(path)var count=0//遍历每一个路径文件items.foreach(f=>{//打印全路径println(f.getPath)//通过全路径获取其编码val codec = factory.getCodec(f.getPath())//获取编码//读取成数据流var  stream:InputStream = null;if(codec!=null){//如果编码识别直接从编码创建输入流stream = codec.createInputStream(fs.open(f.getPath()))}else{//如果不识别则直接打开stream = fs.open(f.getPath())}val writer=new StringWriter()//将字节流转成字符串流IOUtils.copy(stream,writer,"UTF-8")//得到字符串内容val raw=writer.toString//根据字符串内容split出所有的行数据,至此解压数据完毕val raw_array=raw.split("\n")//遍历数据      raw_array.foreach(line=>{val array = line.split("--",2) //拆分数组val map = JSON.parseObject(array(1)).asScalaval userId = map.get("userId").getOrElse("").asInstanceOf[String] //为空为非法数据val time = map.get("time").getOrElse("") //为空为非法数据if(StringUtils.isNotEmpty(userId)&&(time+"").toLong<=finishTimeStamp){//只有数据pushToKafka(topic,userId,line)count=count+1}})})}

压缩和解压模块用的工具包是apache-commons下面的类:

import org.apache.commons.io.IOUtils
import org.apache.commons.lang.StringUtils

如果想在Windows上调试,可以直接设置HDFS的地址即可

-     val conf = new Configuration()//获取hadoop的conf
//    conf.set("fs.defaultFS","hdfs://192.168.10.14:8020/")//windows上调试用

至此数据已经解压并读取完毕,其实并不是很复杂,用java代码和上面的代码也差不多类似,如果直接用原生的api读取会稍微复杂,但如果我们使用Hive,Spark框架的时候,框架内部会自动帮我们完成压缩文件的读取或者写入,对用户透明,当然底层也是封装了不同压缩格式的读取和写入代码,这样以来使用者将会方便许多。

参考文章:

https://blog.matthewrathbone.com/2013/12/28/reading-data-from-hdfs-even-if-it-is-compressed


有什么问题可以扫码关注微信公众号:我是攻城师(woshigcs),在后台留言咨询。 技术债不能欠,健康债更不能欠, 求道之路,与君同行。

输入图片说明



这篇关于如何在Scala中读取Hadoop集群上的gz压缩文件的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/990656

相关文章

解决pandas无法读取csv文件数据的问题

《解决pandas无法读取csv文件数据的问题》本文讲述作者用Pandas读取CSV文件时因参数设置不当导致数据错位,通过调整delimiter和on_bad_lines参数最终解决问题,并强调正确参... 目录一、前言二、问题复现1. 问题2. 通过 on_bad_lines=‘warn’ 跳过异常数据3

Python使用openpyxl读取Excel的操作详解

《Python使用openpyxl读取Excel的操作详解》本文介绍了使用Python的openpyxl库进行Excel文件的创建、读写、数据操作、工作簿与工作表管理,包括创建工作簿、加载工作簿、操作... 目录1 概述1.1 图示1.2 安装第三方库2 工作簿 workbook2.1 创建:Workboo

Java中读取YAML文件配置信息常见问题及解决方法

《Java中读取YAML文件配置信息常见问题及解决方法》:本文主要介绍Java中读取YAML文件配置信息常见问题及解决方法,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要... 目录1 使用Spring Boot的@ConfigurationProperties2. 使用@Valu

Jenkins分布式集群配置方式

《Jenkins分布式集群配置方式》:本文主要介绍Jenkins分布式集群配置方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录1.安装jenkins2.配置集群总结Jenkins是一个开源项目,它提供了一个容易使用的持续集成系统,并且提供了大量的plugin满

Redis分片集群、数据读写规则问题小结

《Redis分片集群、数据读写规则问题小结》本文介绍了Redis分片集群的原理,通过数据分片和哈希槽机制解决单机内存限制与写瓶颈问题,实现分布式存储和高并发处理,但存在通信开销大、维护复杂及对事务支持... 目录一、分片集群解android决的问题二、分片集群图解 分片集群特征如何解决的上述问题?(与哨兵模

SpringBoot连接Redis集群教程

《SpringBoot连接Redis集群教程》:本文主要介绍SpringBoot连接Redis集群教程,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录1. 依赖2. 修改配置文件3. 创建RedisClusterConfig4. 测试总结1. 依赖 <de

SpringBoot读取ZooKeeper(ZK)属性的方法实现

《SpringBoot读取ZooKeeper(ZK)属性的方法实现》本文主要介绍了SpringBoot读取ZooKeeper(ZK)属性的方法实现,强调使用@ConfigurationProperti... 目录1. 在配置文件中定义 ZK 属性application.propertiesapplicati

Python中文件读取操作漏洞深度解析与防护指南

《Python中文件读取操作漏洞深度解析与防护指南》在Web应用开发中,文件操作是最基础也最危险的功能之一,这篇文章将全面剖析Python环境中常见的文件读取漏洞类型,成因及防护方案,感兴趣的小伙伴可... 目录引言一、静态资源处理中的路径穿越漏洞1.1 典型漏洞场景1.2 os.path.join()的陷

Nginx使用Keepalived部署web集群(高可用高性能负载均衡)实战案例

《Nginx使用Keepalived部署web集群(高可用高性能负载均衡)实战案例》本文介绍Nginx+Keepalived实现Web集群高可用负载均衡的部署与测试,涵盖架构设计、环境配置、健康检查、... 目录前言一、架构设计二、环境准备三、案例部署配置 前端 Keepalived配置 前端 Nginx

Redis高可用-主从复制、哨兵模式与集群模式详解

《Redis高可用-主从复制、哨兵模式与集群模式详解》:本文主要介绍Redis高可用-主从复制、哨兵模式与集群模式的使用,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝... 目录Redis高可用-主从复制、哨兵模式与集群模式概要一、主从复制(Master-Slave Repli