hadoop CombineFileInputFormat的使用

2024-06-16 20:18

本文主要是介绍hadoop CombineFileInputFormat的使用,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

CombineFileInputFormat作用:将多个小文件打包成一个InputSplit提供给一个Map处理,避免因为大量小文件问题,启动大量任务

CombineFileInputFormat是一种新的inputformat,用于将多个文件合并成一个单独的split,另外,它会考虑数据的存储位置。旧版本的MultiFileInputFormat是按文件单位切分,可能造成split不均匀,如果有一个大文件则会单独由一个map处理,严重偏慢

CombineFileInputFormat是个抽象类,需要手工实现

1、Hive中可以设置:

set mapred.max.split.size=256000000; //合并的每个map大小

Set mapred.min.split.size.per.node=256000000 //控制一个节点上split的至少的大小,mapred.max.split.size大小切分文件后,剩余大小如果超过mapred.min.split.size.per.node则作为一个分片,否则保留等待rack层处理

Set  Mapred.min.split.size.per.rack=256000000  // 控制一个交换机下split至少的大小,合并碎片文件,按mapred.max.split.size分割,最后若剩余大小超过 Mapred.min.split.size.per.rack则作为单独的一分片

最后合并不同rack下的碎片,按mapred.max.split.size分割,剩下的碎片无论大小作为一个split

set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat

 

2、Mapreduce中使用

自定义类MyMultiFileInputFormat,代码参考其他博客

 

import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
public class MyMultiFileInputFormat extends CombineFileInputFormat<MultiFileInputWritableComparable, Text>  
{
public RecordReader<MultiFileInputWritableComparable,Text> createRecordReader(InputSplit split,TaskAttemptContext context) throws IOException 
{
return new org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader<MultiFileInputWritableComparable, Text>
((CombineFileSplit)split, context, CombineFileLineRecordReader.class);
//CombineFileLineRecordReader.class为自定义类,一个split可能对应多个path则系统自带类..input.CombineFileRecordReader会通过java反射,针对不同的path分别构建自定义的CombineFileLineRecordReader去读key,value数据,具体看input.CombineFileRecordReader类源码
}
}

自定义CombineFileLineRecordReader类:

 

import java.io.IOException;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
import org.apache.hadoop.util.LineReader;
@SuppressWarnings("deprecation")
public class CombineFileLineRecordReader extends RecordReader<MultiFileInputWritableComparable, Text> {
private long startOffset; // offset of the chunk;
private long end; // end of the chunk;
private long pos; // current pos
private FileSystem fs;
private Path path; // path of hdfs
private MultiFileInputWritableComparable key;
private Text value; // value should be string(hadoop Text)
private FSDataInputStream fileIn;
private LineReader reader;
public CombineFileLineRecordReader(CombineFileSplit split, TaskAttemptContext context, Integer index) throws IOException 
{
fs = FileSystem.get(context.getConfiguration());
this.path = split.getPath(index);
this.startOffset = split.getOffset(index);
this.end = startOffset + split.getLength(index);
boolean skipFirstLine = false;
fileIn = fs.open(path); // open the file
if (startOffset != 0) {
skipFirstLine = true;
--startOffset;
fileIn.seek(startOffset);
}
reader = new LineReader(fileIn);
if (skipFirstLine) // skip first line and re-establish "startOffset".
{
int readNum = reader.readLine(new Text(),0,(int) Math.min((long) Integer.MAX_VALUE, end - startOffset));
startOffset += readNum;
}
this.pos = startOffset;
}
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException 
{}
public void close() throws IOException 
{
reader.close();
}
public float getProgress() throws IOException 
{
if (startOffset == end) {
return 0.0f;
} else {
return Math.min(1.0f, (pos - startOffset) / (float) (end - startOffset));
}
}
public boolean nextKeyValue() throws IOException 
{
if (key == null) {
key = new MultiFileInputWritableComparable();
key.setFileName(path.getName());
}
key.setOffset(pos);
if (value == null) {
value = new Text();
}
int newSize = 0;
if (pos < end) {
newSize = reader.readLine(value);
pos += newSize;
}
if (newSize == 0) {
key = null;
value = null;
return false;
} else {
return true;
}
}
public MultiFileInputWritableComparable getCurrentKey() throws IOException, InterruptedException 
{
return key;
}
public Text getCurrentValue() throws IOException, InterruptedException 
{
return value;
}
}


MultiFileInputWritableComparable类

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
@SuppressWarnings("rawtypes")
public class MultiFileInputWritableComparable implements WritableComparable {
private long offset;       //offset of this file block
private String fileName;   //filename of this block
public long getOffset() {
return offset;
}
public void setOffset(long offset) {
this.offset = offset;
}
public String getFileName() {
return fileName;
}
public void setFileName(String fileName) {
this.fileName = fileName;
}
public void readFields(DataInput in) throws IOException {
this.offset = in.readLong();
this.fileName = Text.readString(in);
}
public void write(DataOutput out) throws IOException {
out.writeLong(offset);
Text.writeString(out, fileName);
}
public int compareTo(Object object) {
MultiFileInputWritableComparable that = (MultiFileInputWritableComparable)object;
int compare = this.fileName.compareTo(that.fileName);
if(compare == 0) {
return (int)Math.signum((double)(this.offset - that.offset));
}
return compare;
}
@Override
public boolean equals(Object object) {
if(object instanceof MultiFileInputWritableComparable)
return this.compareTo(object) == 0;
return false;
}
@Override
public int hashCode() {
assert false : "hashCode not designed";
return 42; //an arbitrary constant
}
}


测试

import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.reduce.IntSumReducer;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
@SuppressWarnings("deprecation")
public class MultiFileWordCount extends Configured implements Tool {
public static class MapClass extends 
Mapper<MultiFileInputWritableComparable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(MultiFileInputWritableComparable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
StringTo

这篇关于hadoop CombineFileInputFormat的使用的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1067444

相关文章

Java使用Javassist动态生成HelloWorld类

《Java使用Javassist动态生成HelloWorld类》Javassist是一个非常强大的字节码操作和定义库,它允许开发者在运行时创建新的类或者修改现有的类,本文将简单介绍如何使用Javass... 目录1. Javassist简介2. 环境准备3. 动态生成HelloWorld类3.1 创建CtC

使用Python批量将.ncm格式的音频文件转换为.mp3格式的实战详解

《使用Python批量将.ncm格式的音频文件转换为.mp3格式的实战详解》本文详细介绍了如何使用Python通过ncmdump工具批量将.ncm音频转换为.mp3的步骤,包括安装、配置ffmpeg环... 目录1. 前言2. 安装 ncmdump3. 实现 .ncm 转 .mp34. 执行过程5. 执行结

Java使用jar命令配置服务器端口的完整指南

《Java使用jar命令配置服务器端口的完整指南》本文将详细介绍如何使用java-jar命令启动应用,并重点讲解如何配置服务器端口,同时提供一个实用的Web工具来简化这一过程,希望对大家有所帮助... 目录1. Java Jar文件简介1.1 什么是Jar文件1.2 创建可执行Jar文件2. 使用java

C#使用Spire.Doc for .NET实现HTML转Word的高效方案

《C#使用Spire.Docfor.NET实现HTML转Word的高效方案》在Web开发中,HTML内容的生成与处理是高频需求,然而,当用户需要将HTML页面或动态生成的HTML字符串转换为Wor... 目录引言一、html转Word的典型场景与挑战二、用 Spire.Doc 实现 HTML 转 Word1

Java中的抽象类与abstract 关键字使用详解

《Java中的抽象类与abstract关键字使用详解》:本文主要介绍Java中的抽象类与abstract关键字使用详解,本文通过实例代码给大家介绍的非常详细,感兴趣的朋友跟随小编一起看看吧... 目录一、抽象类的概念二、使用 abstract2.1 修饰类 => 抽象类2.2 修饰方法 => 抽象方法,没有

MyBatis ParameterHandler的具体使用

《MyBatisParameterHandler的具体使用》本文主要介绍了MyBatisParameterHandler的具体使用,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参... 目录一、概述二、源码1 关键属性2.setParameters3.TypeHandler1.TypeHa

Spring 中的切面与事务结合使用完整示例

《Spring中的切面与事务结合使用完整示例》本文给大家介绍Spring中的切面与事务结合使用完整示例,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考... 目录 一、前置知识:Spring AOP 与 事务的关系 事务本质上就是一个“切面”二、核心组件三、完

使用docker搭建嵌入式Linux开发环境

《使用docker搭建嵌入式Linux开发环境》本文主要介绍了使用docker搭建嵌入式Linux开发环境,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面... 目录1、前言2、安装docker3、编写容器管理脚本4、创建容器1、前言在日常开发全志、rk等不同

使用Python实现Word文档的自动化对比方案

《使用Python实现Word文档的自动化对比方案》我们经常需要比较两个Word文档的版本差异,无论是合同修订、论文修改还是代码文档更新,人工比对不仅效率低下,还容易遗漏关键改动,下面通过一个实际案例... 目录引言一、使用python-docx库解析文档结构二、使用difflib进行差异比对三、高级对比方

sky-take-out项目中Redis的使用示例详解

《sky-take-out项目中Redis的使用示例详解》SpringCache是Spring的缓存抽象层,通过注解简化缓存管理,支持Redis等提供者,适用于方法结果缓存、更新和删除操作,但无法实现... 目录Spring Cache主要特性核心注解1.@Cacheable2.@CachePut3.@Ca