如何在Scala中读取Hadoop集群上的gz压缩文件

2024-05-15 03:18

本文主要是介绍如何在Scala中读取Hadoop集群上的gz压缩文件,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

存在Hadoop集群上的文件,大部分都会经过压缩,如果是压缩后的文件,我们直接在应用程序中如何读取里面的数据?答案是肯定的,但是比普通的文本读取要稍微复杂一点,需要使用到Hadoop的压缩工具类支持,比如处理gz,snappy,lzo,bz压缩的,前提是首先我们的Hadoop集群得支持上面提到的各种压缩文件。

本次就给出一个读取gz压缩文件的例子核心代码:

def readHdfsWriteKafkaByDate(fs:FileSystem,date:String,conf:Configuration,topic:String,finishTimeStamp:Long):Unit={//访问hdfs文件,只读取gz结尾的压缩文件,如果是.tmp结尾的不会读取val path=new Path("/collect_data/userlog/"+date+"/log*.gz")//实例化压缩工厂编码类val factory = new CompressionCodecFactory(conf)//读取通配路径val items=fs.globStatus(path)var count=0//遍历每一个路径文件items.foreach(f=>{//打印全路径println(f.getPath)//通过全路径获取其编码val codec = factory.getCodec(f.getPath())//获取编码//读取成数据流var  stream:InputStream = null;if(codec!=null){//如果编码识别直接从编码创建输入流stream = codec.createInputStream(fs.open(f.getPath()))}else{//如果不识别则直接打开stream = fs.open(f.getPath())}val writer=new StringWriter()//将字节流转成字符串流IOUtils.copy(stream,writer,"UTF-8")//得到字符串内容val raw=writer.toString//根据字符串内容split出所有的行数据,至此解压数据完毕val raw_array=raw.split("\n")//遍历数据      raw_array.foreach(line=>{val array = line.split("--",2) //拆分数组val map = JSON.parseObject(array(1)).asScalaval userId = map.get("userId").getOrElse("").asInstanceOf[String] //为空为非法数据val time = map.get("time").getOrElse("") //为空为非法数据if(StringUtils.isNotEmpty(userId)&&(time+"").toLong<=finishTimeStamp){//只有数据pushToKafka(topic,userId,line)count=count+1}})})}

压缩和解压模块用的工具包是apache-commons下面的类:

import org.apache.commons.io.IOUtils
import org.apache.commons.lang.StringUtils

如果想在Windows上调试,可以直接设置HDFS的地址即可

-     val conf = new Configuration()//获取hadoop的conf
//    conf.set("fs.defaultFS","hdfs://192.168.10.14:8020/")//windows上调试用

至此数据已经解压并读取完毕,其实并不是很复杂,用java代码和上面的代码也差不多类似,如果直接用原生的api读取会稍微复杂,但如果我们使用Hive,Spark框架的时候,框架内部会自动帮我们完成压缩文件的读取或者写入,对用户透明,当然底层也是封装了不同压缩格式的读取和写入代码,这样以来使用者将会方便许多。

参考文章:

https://blog.matthewrathbone.com/2013/12/28/reading-data-from-hdfs-even-if-it-is-compressed


有什么问题可以扫码关注微信公众号:我是攻城师(woshigcs),在后台留言咨询。 技术债不能欠,健康债更不能欠, 求道之路,与君同行。

输入图片说明



这篇关于如何在Scala中读取Hadoop集群上的gz压缩文件的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/990656

相关文章

如何使用 Python 读取 Excel 数据

《如何使用Python读取Excel数据》:本文主要介绍使用Python读取Excel数据的详细教程,通过pandas和openpyxl,你可以轻松读取Excel文件,并进行各种数据处理操... 目录使用 python 读取 Excel 数据的详细教程1. 安装必要的依赖2. 读取 Excel 文件3. 读

Spring Boot读取配置文件的五种方式小结

《SpringBoot读取配置文件的五种方式小结》SpringBoot提供了灵活多样的方式来读取配置文件,这篇文章为大家介绍了5种常见的读取方式,文中的示例代码简洁易懂,大家可以根据自己的需要进... 目录1. 配置文件位置与加载顺序2. 读取配置文件的方式汇总方式一:使用 @Value 注解读取配置方式二

基于Python实现读取嵌套压缩包下文件的方法

《基于Python实现读取嵌套压缩包下文件的方法》工作中遇到的问题,需要用Python实现嵌套压缩包下文件读取,本文给大家介绍了详细的解决方法,并有相关的代码示例供大家参考,需要的朋友可以参考下... 目录思路完整代码代码优化思路打开外层zip压缩包并遍历文件:使用with zipfile.ZipFil

Redis分片集群的实现

《Redis分片集群的实现》Redis分片集群是一种将Redis数据库分散到多个节点上的方式,以提供更高的性能和可伸缩性,本文主要介绍了Redis分片集群的实现,具有一定的参考价值,感兴趣的可以了解一... 目录1. Redis Cluster的核心概念哈希槽(Hash Slots)主从复制与故障转移2.

解决Java中基于GeoTools的Shapefile读取乱码的问题

《解决Java中基于GeoTools的Shapefile读取乱码的问题》本文主要讨论了在使用Java编程语言进行地理信息数据解析时遇到的Shapefile属性信息乱码问题,以及根据不同的编码设置进行属... 目录前言1、Shapefile属性字段编码的情况:一、Shp文件常见的字符集编码1、System编码

利用Python实现添加或读取Excel公式

《利用Python实现添加或读取Excel公式》Excel公式是数据处理的核心工具,从简单的加减运算到复杂的逻辑判断,掌握基础语法是高效工作的起点,下面我们就来看看如何使用Python进行Excel公... 目录python Excel 库安装Python 在 Excel 中添加公式/函数Python 读取

Python如何实现读取csv文件时忽略文件的编码格式

《Python如何实现读取csv文件时忽略文件的编码格式》我们再日常读取csv文件的时候经常会发现csv文件的格式有多种,所以这篇文章为大家介绍了Python如何实现读取csv文件时忽略文件的编码格式... 目录1、背景介绍2、库的安装3、核心代码4、完整代码1、背景介绍我们再日常读取csv文件的时候经常

C#中读取XML文件的四种常用方法

《C#中读取XML文件的四种常用方法》Xml是Internet环境中跨平台的,依赖于内容的技术,是当前处理结构化文档信息的有力工具,下面我们就来看看C#中读取XML文件的方法都有哪些吧... 目录XML简介格式C#读取XML文件方法使用XmlDocument使用XmlTextReader/XmlTextWr

Java读取InfluxDB数据库的方法详解

《Java读取InfluxDB数据库的方法详解》本文介绍基于Java语言,读取InfluxDB数据库的方法,包括读取InfluxDB的所有数据库,以及指定数据库中的measurement、field、... 首先,创建一个Java项目,用于撰写代码。接下来,配置所需要的依赖;这里我们就选择可用于与Infl

C#读取本地网络配置信息全攻略分享

《C#读取本地网络配置信息全攻略分享》在当今数字化时代,网络已深度融入我们生活与工作的方方面面,对于软件开发而言,掌握本地计算机的网络配置信息显得尤为关键,而在C#编程的世界里,我们又该如何巧妙地读取... 目录一、引言二、C# 读取本地网络配置信息的基础准备2.1 引入关键命名空间2.2 理解核心类与方法