MapReduce的自定义inputFormat(合并小文件)

2024-04-07 12:38

本文主要是介绍MapReduce的自定义inputFormat(合并小文件),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

需求
无论hdfs还是mapreduce,对于小文件都有损效率,实践中,又难免面临处理大量小文件的场景,此时,就需要有相应解决方案

分析(小文件的优化无非以下几种方式:)
1、 在数据采集的时候,就将小文件或小批数据合成大文件再上传HDFS
2、 在业务处理之前,在HDFS上使用mapreduce程序对小文件进行合并
3、 在mapreduce处理时,可采用combineInputFormat提高效率

实现(本节实现的是上述第二种方式)
自定义一个InputFormat
改写RecordReader,实现一次读取一个完整文件封装为KV
在输出时使用SequenceFileOutPutFormat输出合并文件

自定义InputFromat:

public class Inputfromat extends FileInputFormat {@Override
public class Inputfromat extends FileInputFormat {@Overridepublic RecordReader createRecordReader(InputSplit inputSplit, TaskAttemptContext context) throws IOException, InterruptedException {RR rr = new RR();rr.initialize(inputSplit,context);return rr;}
}

自定义RecordReader(RR):

public class RR extends RecordReader {private FileSplit fileSplit;private Configuration configuration;private BytesWritable value = new BytesWritable();private Boolean b=false;@Overridepublic void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {fileSplit = (FileSplit) inputSplit;configuration = taskAttemptContext.getConfiguration();}@Overridepublic boolean nextKeyValue() throws IOException, InterruptedException {if (!b){Path path = fileSplit.getPath();FileSystem fileSystem = path.getFileSystem(configuration);FSDataInputStream open = fileSystem.open(path);byte[] contents = new byte[(int) fileSplit.getLength()];IOUtils.readFully(open,contents,0, contents.length);value.set(contents,0,contents.length);b=true;return true;}return false;}@Overridepublic Object getCurrentKey() throws IOException, InterruptedException {return new Text(fileSplit.getPath().getName());}@Overridepublic Object getCurrentValue() throws IOException, InterruptedException {return value;}@Overridepublic float getProgress() throws IOException, InterruptedException {return 0;}@Overridepublic void close() throws IOException {}
}

定义map:

public class Map extends Mapper<Text, BytesWritable,Text,BytesWritable> {@Overrideprotected void map(Text key, BytesWritable value, Context context) throws IOException, InterruptedException {context.write(key,value);}
}

定义程序运行main方法:

public class Dirver {public static void main(String[] args)throws Exception {Job job = Job.getInstance(new Configuration(), "inputfromat");job.setJarByClass(Dirver.class);job.setMapperClass(Map.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(BytesWritable.class);job.setInputFormatClass(Inputfromat.class);Inputfromat.addInputPath(job,new Path("E:\\自定义inputformat_小文件合并\\input"));job.setOutputFormatClass(SequenceFileOutputFormat.class);SequenceFileOutputFormat.setOutputPath(job,new Path("E:\\output"));boolean b = job.waitForCompletion(true);System.exit(b?0:1);}
}

这篇关于MapReduce的自定义inputFormat(合并小文件)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/882624

相关文章

C#实现一键批量合并PDF文档

《C#实现一键批量合并PDF文档》这篇文章主要为大家详细介绍了如何使用C#实现一键批量合并PDF文档功能,文中的示例代码简洁易懂,感兴趣的小伙伴可以跟随小编一起学习一下... 目录前言效果展示功能实现1、添加文件2、文件分组(书签)3、定义页码范围4、自定义显示5、定义页面尺寸6、PDF批量合并7、其他方法

Vite 打包目录结构自定义配置小结

《Vite打包目录结构自定义配置小结》在Vite工程开发中,默认打包后的dist目录资源常集中在asset目录下,不利于资源管理,本文基于Rollup配置原理,本文就来介绍一下通过Vite配置自定义... 目录一、实现原理二、具体配置步骤1. 基础配置文件2. 配置说明(1)js 资源分离(2)非 JS 资

聊聊springboot中如何自定义消息转换器

《聊聊springboot中如何自定义消息转换器》SpringBoot通过HttpMessageConverter处理HTTP数据转换,支持多种媒体类型,接下来通过本文给大家介绍springboot中... 目录核心接口springboot默认提供的转换器如何自定义消息转换器Spring Boot 中的消息

MySQL进行分片合并的实现步骤

《MySQL进行分片合并的实现步骤》分片合并是指在分布式数据库系统中,将不同分片上的查询结果进行整合,以获得完整的查询结果,下面就来具体介绍一下,感兴趣的可以了解一下... 目录环境准备项目依赖数据源配置分片上下文分片查询和合并代码实现1. 查询单条记录2. 跨分片查询和合并测试结论分片合并(Shardin

Python自定义异常的全面指南(入门到实践)

《Python自定义异常的全面指南(入门到实践)》想象你正在开发一个银行系统,用户转账时余额不足,如果直接抛出ValueError,调用方很难区分是金额格式错误还是余额不足,这正是Python自定义异... 目录引言:为什么需要自定义异常一、异常基础:先搞懂python的异常体系1.1 异常是什么?1.2

Linux中的自定义协议+序列反序列化用法

《Linux中的自定义协议+序列反序列化用法》文章探讨网络程序在应用层的实现,涉及TCP协议的数据传输机制、结构化数据的序列化与反序列化方法,以及通过JSON和自定义协议构建网络计算器的思路,强调分层... 目录一,再次理解协议二,序列化和反序列化三,实现网络计算器3.1 日志文件3.2Socket.hpp

C语言自定义类型之联合和枚举解读

《C语言自定义类型之联合和枚举解读》联合体共享内存,大小由最大成员决定,遵循对齐规则;枚举类型列举可能值,提升可读性和类型安全性,两者在C语言中用于优化内存和程序效率... 目录一、联合体1.1 联合体类型的声明1.2 联合体的特点1.2.1 特点11.2.2 特点21.2.3 特点31.3 联合体的大小1

基于Python实现进阶版PDF合并/拆分工具

《基于Python实现进阶版PDF合并/拆分工具》在数字化时代,PDF文件已成为日常工作和学习中不可或缺的一部分,本文将详细介绍一款简单易用的PDF工具,帮助用户轻松完成PDF文件的合并与拆分操作... 目录工具概述环境准备界面说明合并PDF文件拆分PDF文件高级技巧常见问题完整源代码总结在数字化时代,PD

pandas数据的合并concat()和merge()方式

《pandas数据的合并concat()和merge()方式》Pandas中concat沿轴合并数据框(行或列),merge基于键连接(内/外/左/右),concat用于纵向或横向拼接,merge用于... 目录concat() 轴向连接合并(1) join='outer',axis=0(2)join='o

springboot自定义注解RateLimiter限流注解技术文档详解

《springboot自定义注解RateLimiter限流注解技术文档详解》文章介绍了限流技术的概念、作用及实现方式,通过SpringAOP拦截方法、缓存存储计数器,结合注解、枚举、异常类等核心组件,... 目录什么是限流系统架构核心组件详解1. 限流注解 (@RateLimiter)2. 限流类型枚举 (