MapReduce高级编程之自定义InputFormat

2024-03-19 13:58

本文主要是介绍MapReduce高级编程之自定义InputFormat,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

InputFormat是MapReduce中一个很常用的概念,它在程序的运行中到底起到了什么作用呢?
InputFormat其实是一个接口,包含了两个方法:
public interface InputFormat<K, V> {
InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;
RecordReader<K, V> getRecordReader(InputSplit split,
JobConf job,
Reporter reporter) throws IOException;
}
这两个方法有分别完成着以下工作:
方法getSplits将输入数据切分成splits, splits的个数即为map tasks的个数,splits的大小默认为块大小,即64M
方法getSplits将每个split解析成records, 再依次将record解析成<K,V>对
也就是说InputFormat完成以下工作:
InputFile --> splits--> <K,V>
系统常用的 InputFormat 又有哪些呢?
其中TextInputFormat便是最常用的,它的<K,V>就代表<行偏移,该行内容>
然而系统所提供的这几种固定的将 InputFile转换为<K,V>的方式有时候并不能满足我们的需求:
此时需要我们自定义InputFormat ,从而使 Hadoop框架按照我们预设的方式来将
InputFile解析为<K,V>
在领会自定义InputFormat 之前,需要弄懂一下几个抽象类、接口及其之间的关系:
InputFormat(interface), FileInputFormat(abstract class), TextInputFormat(class),
RecordReader(interface), LineRecordReader(class)的关系
FileInputFormat implements InputFormat
TextInputFormat extends FileInputFormat
TextInputFormat.getRecordReader calls LineRecordReader
LineRecordReader  implements RecordReader
对于InputFormat接口,上面已经有详细的描述
再看看FileInputFormat,它实现了InputFormat接口中的getSplits方法,而将getRecordReader与isSplitable留给具体类(如TextInputFormat)实现,isSplitable方法通常不用修改,所以只需要在自定义的InputFormat中实现
getRecordReader方法即可,而该方法的核心是调用LineRecordReader(即由LineRecorderReader类来实现 "将每个split解析成records, 再依次将record解析成<K,V>对"),该方法实现了接口RecordReader
public interface RecordReader<K, V> {
booleannext(K key, V value) throws IOException;
KcreateKey();
VcreateValue();
longgetPos() throws IOException;
public voidclose() throws IOException;
floatgetProgress() throws IOException;
}

因此自定义InputFormat的核心是自定义一个实现接口RecordReader类似于LineRecordReader的类,该类的核心也正是重写接口RecordReader中的几大方法,

定义一个InputFormat的核心是定义一个类似于LineRecordReader的,自己的RecordReader
示例,数据每一行为 “物体,x坐标,y坐标,z坐标
ball 3.5,12.7,9.0
car 15,23.76,42.23
device 0.0,12.4,-67.1
每一行将要被解析为<Text, Point3D>(Point3D是我们在上一篇日志中自定义的数据类型)
方式一,自定义的RecordReader使用中LineRecordReader,
public class ObjectPositionInputFormat extends
FileInputFormat<Text, Point3D> {
public RecordReader<Text, Point3D> getRecordReader (
InputSplit input, JobConf job, Reporter reporter)
throws IOException {
reporter.setStatus(input.toString());
return new ObjPosRecordReader(job, (FileSplit)input);
}
}
class ObjPosRecordReader implements RecordReader<Text, Point3D> {
private LineRecordReader lineReader;
private LongWritable lineKey;
private Text lineValue;
public ObjPosRecordReader (JobConf job, FileSplit split) throws IOException {
lineReader = new LineRecordReader(job, split);
lineKey = lineReader.createKey();
lineValue = lineReader.createValue();
}
public boolean next (Text key, Point3D value) throws IOException {
// get the next line
if (!lineReader.next(lineKey, lineValue)) {
return false ;
}
// parse the lineValue which is in the format:
// objName, x, y, z
String [] pieces = lineValue.toString().split( "," );
if (pieces.length != 4) {
throw new IOException( "Invalid record received");
}
// try to parse floating point components of value
float fx, fy, fz;
try {
fx = Float.parseFloat(pieces[1].trim());
fy = Float.parseFloat(pieces[2].trim());
fz = Float.parseFloat(pieces[3].trim());
} catch (NumberFormatException nfe) {
throw new IOException( "Error parsing floating point value in record" );
}
// now that we know we'll succeed, overwrite the output objects
key.set(pieces[0].trim()); // objName is the output key.
value.x = fx;
value.y = fy;
value.z = fz;
return true ;
}
public Text createKey () {
return new Text( "" );
}
public Point3D createValue () {
return new Point3D();
}
public long getPos () throws IOException {
return lineReader.getPos();
}
public void close () throws IOException {
lineReader.close();
}
public float getProgress () throws IOException {
return lineReader.getProgress();
}
}
方式二:自定义的RecordReader中使用LineReader,
public class ObjectPositionInputFormat extends FileInputFormat<Text, Point3D> {
@ Override
protected boolean isSplitable (JobContext context, Path filename) {
// TODO Auto-generated method stub
return false ;
}
@ Override
public RecordReader<Text, Point3D> createRecordReader (InputSplit inputsplit,
TaskAttemptContext context) throws IOException, InterruptedException {
// TODO Auto-generated method stub
return new objPosRecordReader();
}
public static class objPosRecordReader extends RecordReader<Text,Point3D>{
public LineReader in;
public Text lineKey;
public Point3D lineValue;
public StringTokenizer token= null ;
public Text line;
@ Override
public void close () throws IOException {
// TODO Auto-generated method stub
}
@ Override
public Text getCurrentKey () throws IOException, InterruptedException {
// TODO Auto-generated method stub
System.out.println( "key" );
//lineKey.set(token.nextToken());
System.out.println( "hello" );
return lineKey;
}
@ Override
public Point3D getCurrentValue () throws IOException,
InterruptedException {
// TODO Auto-generated method stub
return lineValue;
}
@ Override
public float getProgress () throws IOException, InterruptedException {
// TODO Auto-generated method stub
return 0;
}
@ Override
public void initialize (InputSplit input, TaskAttemptContext context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
FileSplit split=(FileSplit)input;
Configuration job=context.getConfiguration();
Path file=split.getPath();
FileSystem fs=file.getFileSystem(job);
FSDataInputStream filein=fs.open(file);
in= new LineReader(filein,job);
line= new Text();
lineKey= new Text();
lineValue= new Point3D();
}
@ Override
public boolean nextKeyValue () throws IOException, InterruptedException {
// TODO Auto-generated method stub
int linesize=in.readLine(line);
if (linesize==0)
return false ;
token= new StringTokenizer(line.toString());
String []temp= new String[2];
if (token.hasMoreElements()){
temp[0]=token.nextToken();
if (token.hasMoreElements()){
temp[1]=token.nextToken();
}
}
System.out.println(temp[0]);
System.out.println(temp[1]);
String []points=temp[1].split( "," );
System.out.println(points[0]);
System.out.println(points[1]);
System.out.println(points[2]);
lineKey.set(temp[0]);
lineValue.set(Float.parseFloat(points[0]),Float.parseFloat(points[1]), Float.parseFloat(points[2]));
System.out.println( "pp" );
return true ;
}
}
}
从以上可以看出,自定义一个InputFormat的核心是定义一个类似于LineRecordReader的,自己的RecordReader,而在其中可能会到LineReader/LineRecordReader/KeyValueLineRecordReader
因此,要自定义InputFormat,这三个类的源码就必须很熟悉~

这篇关于MapReduce高级编程之自定义InputFormat的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/826210

相关文章

MySQL的JDBC编程详解

《MySQL的JDBC编程详解》:本文主要介绍MySQL的JDBC编程,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录前言一、前置知识1. 引入依赖2. 认识 url二、JDBC 操作流程1. JDBC 的写操作2. JDBC 的读操作总结前言本文介绍了mysq

JavaScript中的高级调试方法全攻略指南

《JavaScript中的高级调试方法全攻略指南》什么是高级JavaScript调试技巧,它比console.log有何优势,如何使用断点调试定位问题,通过本文,我们将深入解答这些问题,带您从理论到实... 目录观点与案例结合观点1观点2观点3观点4观点5高级调试技巧详解实战案例断点调试:定位变量错误性能分

Vite 打包目录结构自定义配置小结

《Vite打包目录结构自定义配置小结》在Vite工程开发中,默认打包后的dist目录资源常集中在asset目录下,不利于资源管理,本文基于Rollup配置原理,本文就来介绍一下通过Vite配置自定义... 目录一、实现原理二、具体配置步骤1. 基础配置文件2. 配置说明(1)js 资源分离(2)非 JS 资

从基础到高级详解Python数值格式化输出的完全指南

《从基础到高级详解Python数值格式化输出的完全指南》在数据分析、金融计算和科学报告领域,数值格式化是提升可读性和专业性的关键技术,本文将深入解析Python中数值格式化输出的相关方法,感兴趣的小伙... 目录引言:数值格式化的核心价值一、基础格式化方法1.1 三种核心格式化方式对比1.2 基础格式化示例

Android协程高级用法大全

《Android协程高级用法大全》这篇文章给大家介绍Android协程高级用法大全,本文结合实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友跟随小编一起学习吧... 目录1️⃣ 协程作用域(CoroutineScope)与生命周期绑定Activity/Fragment 中手

Python异步编程之await与asyncio基本用法详解

《Python异步编程之await与asyncio基本用法详解》在Python中,await和asyncio是异步编程的核心工具,用于高效处理I/O密集型任务(如网络请求、文件读写、数据库操作等),接... 目录一、核心概念二、使用场景三、基本用法1. 定义协程2. 运行协程3. 并发执行多个任务四、关键

AOP编程的基本概念与idea编辑器的配合体验过程

《AOP编程的基本概念与idea编辑器的配合体验过程》文章简要介绍了AOP基础概念,包括Before/Around通知、PointCut切入点、Advice通知体、JoinPoint连接点等,说明它们... 目录BeforeAroundAdvise — 通知PointCut — 切入点Acpect — 切面

聊聊springboot中如何自定义消息转换器

《聊聊springboot中如何自定义消息转换器》SpringBoot通过HttpMessageConverter处理HTTP数据转换,支持多种媒体类型,接下来通过本文给大家介绍springboot中... 目录核心接口springboot默认提供的转换器如何自定义消息转换器Spring Boot 中的消息

深度解析Python yfinance的核心功能和高级用法

《深度解析Pythonyfinance的核心功能和高级用法》yfinance是一个功能强大且易于使用的Python库,用于从YahooFinance获取金融数据,本教程将深入探讨yfinance的核... 目录yfinance 深度解析教程 (python)1. 简介与安装1.1 什么是 yfinance?

C#异步编程ConfigureAwait的使用小结

《C#异步编程ConfigureAwait的使用小结》本文介绍了异步编程在GUI和服务器端应用的优势,详细的介绍了async和await的关键作用,通过实例解析了在UI线程正确使用await.Conf... 异步编程是并发的一种形式,它有两大好处:对于面向终端用户的GUI程序,提高了响应能力对于服务器端应