MapReduce高级编程之自定义InputFormat

2024-03-19 13:58

本文主要是介绍MapReduce高级编程之自定义InputFormat,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

InputFormat是MapReduce中一个很常用的概念,它在程序的运行中到底起到了什么作用呢?
InputFormat其实是一个接口,包含了两个方法:
public interface InputFormat<K, V> {
InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;
RecordReader<K, V> getRecordReader(InputSplit split,
JobConf job,
Reporter reporter) throws IOException;
}
这两个方法有分别完成着以下工作:
方法getSplits将输入数据切分成splits, splits的个数即为map tasks的个数,splits的大小默认为块大小,即64M
方法getSplits将每个split解析成records, 再依次将record解析成<K,V>对
也就是说InputFormat完成以下工作:
InputFile --> splits--> <K,V>
系统常用的 InputFormat 又有哪些呢?
其中TextInputFormat便是最常用的,它的<K,V>就代表<行偏移,该行内容>
然而系统所提供的这几种固定的将 InputFile转换为<K,V>的方式有时候并不能满足我们的需求:
此时需要我们自定义InputFormat ,从而使 Hadoop框架按照我们预设的方式来将
InputFile解析为<K,V>
在领会自定义InputFormat 之前,需要弄懂一下几个抽象类、接口及其之间的关系:
InputFormat(interface), FileInputFormat(abstract class), TextInputFormat(class),
RecordReader(interface), LineRecordReader(class)的关系
FileInputFormat implements InputFormat
TextInputFormat extends FileInputFormat
TextInputFormat.getRecordReader calls LineRecordReader
LineRecordReader  implements RecordReader
对于InputFormat接口,上面已经有详细的描述
再看看FileInputFormat,它实现了InputFormat接口中的getSplits方法,而将getRecordReader与isSplitable留给具体类(如TextInputFormat)实现,isSplitable方法通常不用修改,所以只需要在自定义的InputFormat中实现
getRecordReader方法即可,而该方法的核心是调用LineRecordReader(即由LineRecorderReader类来实现 "将每个split解析成records, 再依次将record解析成<K,V>对"),该方法实现了接口RecordReader
public interface RecordReader<K, V> {
booleannext(K key, V value) throws IOException;
KcreateKey();
VcreateValue();
longgetPos() throws IOException;
public voidclose() throws IOException;
floatgetProgress() throws IOException;
}

因此自定义InputFormat的核心是自定义一个实现接口RecordReader类似于LineRecordReader的类,该类的核心也正是重写接口RecordReader中的几大方法,

定义一个InputFormat的核心是定义一个类似于LineRecordReader的,自己的RecordReader
示例,数据每一行为 “物体,x坐标,y坐标,z坐标
ball 3.5,12.7,9.0
car 15,23.76,42.23
device 0.0,12.4,-67.1
每一行将要被解析为<Text, Point3D>(Point3D是我们在上一篇日志中自定义的数据类型)
方式一,自定义的RecordReader使用中LineRecordReader,
public class ObjectPositionInputFormat extends
FileInputFormat<Text, Point3D> {
public RecordReader<Text, Point3D> getRecordReader (
InputSplit input, JobConf job, Reporter reporter)
throws IOException {
reporter.setStatus(input.toString());
return new ObjPosRecordReader(job, (FileSplit)input);
}
}
class ObjPosRecordReader implements RecordReader<Text, Point3D> {
private LineRecordReader lineReader;
private LongWritable lineKey;
private Text lineValue;
public ObjPosRecordReader (JobConf job, FileSplit split) throws IOException {
lineReader = new LineRecordReader(job, split);
lineKey = lineReader.createKey();
lineValue = lineReader.createValue();
}
public boolean next (Text key, Point3D value) throws IOException {
// get the next line
if (!lineReader.next(lineKey, lineValue)) {
return false ;
}
// parse the lineValue which is in the format:
// objName, x, y, z
String [] pieces = lineValue.toString().split( "," );
if (pieces.length != 4) {
throw new IOException( "Invalid record received");
}
// try to parse floating point components of value
float fx, fy, fz;
try {
fx = Float.parseFloat(pieces[1].trim());
fy = Float.parseFloat(pieces[2].trim());
fz = Float.parseFloat(pieces[3].trim());
} catch (NumberFormatException nfe) {
throw new IOException( "Error parsing floating point value in record" );
}
// now that we know we'll succeed, overwrite the output objects
key.set(pieces[0].trim()); // objName is the output key.
value.x = fx;
value.y = fy;
value.z = fz;
return true ;
}
public Text createKey () {
return new Text( "" );
}
public Point3D createValue () {
return new Point3D();
}
public long getPos () throws IOException {
return lineReader.getPos();
}
public void close () throws IOException {
lineReader.close();
}
public float getProgress () throws IOException {
return lineReader.getProgress();
}
}
方式二:自定义的RecordReader中使用LineReader,
public class ObjectPositionInputFormat extends FileInputFormat<Text, Point3D> {
@ Override
protected boolean isSplitable (JobContext context, Path filename) {
// TODO Auto-generated method stub
return false ;
}
@ Override
public RecordReader<Text, Point3D> createRecordReader (InputSplit inputsplit,
TaskAttemptContext context) throws IOException, InterruptedException {
// TODO Auto-generated method stub
return new objPosRecordReader();
}
public static class objPosRecordReader extends RecordReader<Text,Point3D>{
public LineReader in;
public Text lineKey;
public Point3D lineValue;
public StringTokenizer token= null ;
public Text line;
@ Override
public void close () throws IOException {
// TODO Auto-generated method stub
}
@ Override
public Text getCurrentKey () throws IOException, InterruptedException {
// TODO Auto-generated method stub
System.out.println( "key" );
//lineKey.set(token.nextToken());
System.out.println( "hello" );
return lineKey;
}
@ Override
public Point3D getCurrentValue () throws IOException,
InterruptedException {
// TODO Auto-generated method stub
return lineValue;
}
@ Override
public float getProgress () throws IOException, InterruptedException {
// TODO Auto-generated method stub
return 0;
}
@ Override
public void initialize (InputSplit input, TaskAttemptContext context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
FileSplit split=(FileSplit)input;
Configuration job=context.getConfiguration();
Path file=split.getPath();
FileSystem fs=file.getFileSystem(job);
FSDataInputStream filein=fs.open(file);
in= new LineReader(filein,job);
line= new Text();
lineKey= new Text();
lineValue= new Point3D();
}
@ Override
public boolean nextKeyValue () throws IOException, InterruptedException {
// TODO Auto-generated method stub
int linesize=in.readLine(line);
if (linesize==0)
return false ;
token= new StringTokenizer(line.toString());
String []temp= new String[2];
if (token.hasMoreElements()){
temp[0]=token.nextToken();
if (token.hasMoreElements()){
temp[1]=token.nextToken();
}
}
System.out.println(temp[0]);
System.out.println(temp[1]);
String []points=temp[1].split( "," );
System.out.println(points[0]);
System.out.println(points[1]);
System.out.println(points[2]);
lineKey.set(temp[0]);
lineValue.set(Float.parseFloat(points[0]),Float.parseFloat(points[1]), Float.parseFloat(points[2]));
System.out.println( "pp" );
return true ;
}
}
}
从以上可以看出,自定义一个InputFormat的核心是定义一个类似于LineRecordReader的,自己的RecordReader,而在其中可能会到LineReader/LineRecordReader/KeyValueLineRecordReader
因此,要自定义InputFormat,这三个类的源码就必须很熟悉~

这篇关于MapReduce高级编程之自定义InputFormat的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/826210

相关文章

Java并发编程之如何优雅关闭钩子Shutdown Hook

《Java并发编程之如何优雅关闭钩子ShutdownHook》这篇文章主要为大家详细介绍了Java如何实现优雅关闭钩子ShutdownHook,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起... 目录关闭钩子简介关闭钩子应用场景数据库连接实战演示使用关闭钩子的注意事项开源框架中的关闭钩子机制1.

Spring Security自定义身份认证的实现方法

《SpringSecurity自定义身份认证的实现方法》:本文主要介绍SpringSecurity自定义身份认证的实现方法,下面对SpringSecurity的这三种自定义身份认证进行详细讲解,... 目录1.内存身份认证(1)创建配置类(2)验证内存身份认证2.JDBC身份认证(1)数据准备 (2)配置依

Spring Boot 整合 SSE的高级实践(Server-Sent Events)

《SpringBoot整合SSE的高级实践(Server-SentEvents)》SSE(Server-SentEvents)是一种基于HTTP协议的单向通信机制,允许服务器向浏览器持续发送实... 目录1、简述2、Spring Boot 中的SSE实现2.1 添加依赖2.2 实现后端接口2.3 配置超时时

mysql中的group by高级用法

《mysql中的groupby高级用法》MySQL中的GROUPBY是数据聚合分析的核心功能,主要用于将结果集按指定列分组,并结合聚合函数进行统计计算,下面给大家介绍mysql中的groupby用法... 目录一、基本语法与核心功能二、基础用法示例1. 单列分组统计2. 多列组合分组3. 与WHERE结合使

shell编程之函数与数组的使用详解

《shell编程之函数与数组的使用详解》:本文主要介绍shell编程之函数与数组的使用,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录shell函数函数的用法俩个数求和系统资源监控并报警函数函数变量的作用范围函数的参数递归函数shell数组获取数组的长度读取某下的

MySQL高级查询之JOIN、子查询、窗口函数实际案例

《MySQL高级查询之JOIN、子查询、窗口函数实际案例》:本文主要介绍MySQL高级查询之JOIN、子查询、窗口函数实际案例的相关资料,JOIN用于多表关联查询,子查询用于数据筛选和过滤,窗口函... 目录前言1. JOIN(连接查询)1.1 内连接(INNER JOIN)1.2 左连接(LEFT JOI

前端高级CSS用法示例详解

《前端高级CSS用法示例详解》在前端开发中,CSS(层叠样式表)不仅是用来控制网页的外观和布局,更是实现复杂交互和动态效果的关键技术之一,随着前端技术的不断发展,CSS的用法也日益丰富和高级,本文将深... 前端高级css用法在前端开发中,CSS(层叠样式表)不仅是用来控制网页的外观和布局,更是实现复杂交

揭秘Python Socket网络编程的7种硬核用法

《揭秘PythonSocket网络编程的7种硬核用法》Socket不仅能做聊天室,还能干一大堆硬核操作,这篇文章就带大家看看Python网络编程的7种超实用玩法,感兴趣的小伙伴可以跟随小编一起... 目录1.端口扫描器:探测开放端口2.简易 HTTP 服务器:10 秒搭个网页3.局域网游戏:多人联机对战4.

Java并发编程必备之Synchronized关键字深入解析

《Java并发编程必备之Synchronized关键字深入解析》本文我们深入探索了Java中的Synchronized关键字,包括其互斥性和可重入性的特性,文章详细介绍了Synchronized的三种... 目录一、前言二、Synchronized关键字2.1 Synchronized的特性1. 互斥2.

使用Sentinel自定义返回和实现区分来源方式

《使用Sentinel自定义返回和实现区分来源方式》:本文主要介绍使用Sentinel自定义返回和实现区分来源方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录Sentinel自定义返回和实现区分来源1. 自定义错误返回2. 实现区分来源总结Sentinel自定