自定义InputFormat和OutputFormat案例

2023-12-14 06:18

本文主要是介绍自定义InputFormat和OutputFormat案例,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一、自定义InputFormat

  InputFormat是输入流,在前面的例子中使用的是文件输入输出流FileInputFormat和FileOutputFormat,而FileInputFormat和FileOutputFormat它们默认使用的是继承它们的子类TextInputFormat和TextOutputFormat,以Text的方式去读取数据。

  当我们遇到许多小文件,要将他们整理合成为一个文件SequenceFile(存储了多个小文件),且文件内的存储格式为:文件路径+文件内容,这时我们可以通过封装自定义的InputFormat输入流来实现需求。

  思路如下:

    1.自定义FuncFileInputFormat类继承FileInputFormat(参数类型为NullWritable和BytesWritable),并重写isSplitable和createRecordReader方法;

    2.isSplitable方法中return false即可表示不切割,createRecordReader方法中要返回一个RecordReader类,这是我们要自定义的对输入文件的业务逻辑,所以创建FuncRecordReader类;

    3.FuncRecordReader类继承RecordReader类,参数类型同为NullWritable和BytesWritable,重写initialize、nextKeyValue、getCurrentKey、getCurrentValue、getProcess、close方法;

    4.Mapper:初始化setup方法,通过context拿到切片、获取路径、将路径写入定义的全局变量Text t,然后在map阶段将t和value输出到reducer;

    5.Reducer:遍历values,输出key,value;

    6.Driver:在设置完Mapper和Reducer类后,添加设置setInputFormatClass为FuncFileInputFormat、设置setOutputFormatClass为SequenceFileOutputFormat。

  代码如下:

/*** @author: PrincessHug* @date: 2019/3/29, 20:49* @Blog: https://www.cnblogs.com/HelloBigTable/*/
public class FuncFileInputFormat extends FileInputFormat<NullWritable, BytesWritable> {@Overrideprotected boolean isSplitable(JobContext context, Path filename) {return false;}@Overridepublic RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {FuncRecordReader recordReader = new FuncRecordReader();return recordReader;}
}public class FuncRecordReader  extends RecordReader<NullWritable, BytesWritable> {boolean isProcess = false;FileSplit split;Configuration conf;BytesWritable value = new BytesWritable();//初始化@Overridepublic void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {//初始化切片文件this.split = (FileSplit) inputSplit;//初始化配置信息conf = taskAttemptContext.getConfiguration();}//获取下一个文件@Overridepublic boolean nextKeyValue() throws IOException, InterruptedException {if (!isProcess){//根据切片的长度来创建缓冲区byte[] buf = new byte[(int) split.getLength()];FSDataInputStream fis = null;FileSystem fs = null;try {//获取路径Path path = split.getPath();//根据路径获取文件系统fs = path.getFileSystem(conf);//拿到输入流fis = fs.open(path);//数据拷贝IOUtils.readFully(fis,buf,0,buf.length);//拷贝缓存到最终的输出value.set(buf,0,buf.length);} catch (IOException e) {e.printStackTrace();} finally {IOUtils.closeStream(fis);IOUtils.closeStream(fs);}isProcess = true;return true;}return false;}@Overridepublic NullWritable getCurrentKey() throws IOException, InterruptedException {return NullWritable.get();}@Overridepublic BytesWritable getCurrentValue() throws IOException, InterruptedException {return value;}@Overridepublic float getProgress() throws IOException, InterruptedException {return 0;}@Overridepublic void close() throws IOException {}
}public class SequencceFileMapper extends Mapper<NullWritable, BytesWritable, Text,BytesWritable> {Text t = new Text();@Overrideprotected void setup(Context context) throws IOException, InterruptedException {//拿到切片信息FileSplit split = (FileSplit) context.getInputSplit();//路径Path path = split.getPath();//即带路径有待名称t.set(path.toString());}@Overrideprotected void map(NullWritable key, BytesWritable value, Context context) throws IOException, InterruptedException {context.write(t,value);}
}public class SequenceFileReducer extends Reducer<Text, BytesWritable,Text,BytesWritable> {@Overrideprotected void reduce(Text key, Iterable<BytesWritable> values, Context context) throws IOException, InterruptedException {for (BytesWritable v:values){context.write(key,v);}}
}public class SequenceFileDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {//1.获取job信息Configuration conf = new Configuration();Job job = Job.getInstance(conf);//2.获取Jar包job.setJarByClass(SequenceFileDriver.class);//3.获取Mapper、Redcuer类job.setMapperClass(SequencceFileMapper.class);job.setReducerClass(SequenceFileReducer.class);//4.设置自定义读取方法job.setInputFormatClass(FuncFileInputFormat.class);//5.设置默认的输出方式job.setOutputFormatClass(SequenceFileOutputFormat.class);//6.获取Mapper输出数据类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(BytesWritable.class);//7.获取Reducer输出数据类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(BytesWritable.class);//8.设置输入存在的路径与处理后的结果路径FileInputFormat.setInputPaths(job,new Path("G:\\mapreduce\\inputformat\\in"));FileOutputFormat.setOutputPath(job,new Path("G:\\mapreduce\\inputformat\\out"));//9.提交任务if (job.waitForCompletion(true)){System.out.println("运行完成!");}else {System.out.println("运行失败!");}}
}

 

  

二、自定义OutputFormat

  需求:目前我们有一个网站ip的文件,每行都有一个网站的ip地址,要求我们将含有“www.baidu.com”的ip地址取出放入一个结果文件,其他的地址放入另一个结果文件。

  思路:1.首先Mapper、Reduer就是简单的读取数据、写出数据;

    2.自定义FuncFileOutputFormat,重写它的getRecordWriter方法,返回一个FIleRecordWriter对象,这里我们再定义一个FileRecordWriter,重写FileRecordWriter、write、close方法;

    3.Driver:再设置Reducer输出后添加设置setOutputFormatClass为我们自定义的FuncFileOutputFormat即可;

  代码如下:

/*** @author: PrincessHug* @date: 2019/3/30, 14:44* @Blog: https://www.cnblogs.com/HelloBigTable/*/
public class FileMapper extends Mapper<LongWritable, Text, IntWritable, Text> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {context.write(new IntWritable(1),new value);}
}public class FileReducer extends Reducer<IntWritable, Text,Text,NullWritable> {@Overrideprotected void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {for (Text k:values){String s = k.toString() + "\n";context.write(new Text(s),NullWritable.get());}}
}public class FuncFileOutputFormat extends FileOutputFormat<Text, NullWritable> {@Overridepublic RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {return new FileRecordWriter(taskAttemptContext);}
}public class FileRecordWriter extends RecordWriter<Text, NullWritable> {Configuration conf = null;FSDataOutputStream baidulog = null;FSDataOutputStream otherlog = null;//定义数据输出路径public FileRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException {//获取配置信息和文件系统conf = taskAttemptContext.getConfiguration();FileSystem fs = FileSystem.get(conf);//定义输出路径itstarlog = fs.create(new Path("G:\\mapreduce\\outputformat\\out\\itstart\\baidu.logs"));otherlog = fs.create(new Path("G:\\mapreduce\\outputformat\\out\\other\\other.logs"));}//数据输出@Overridepublic void write(Text key, NullWritable value) throws IOException, InterruptedException {if (key.toString().contains("baidu")){baidulog.write(key.getBytes());}else {otherlog.write(key.getBytes());}}//关闭资源@Overridepublic void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {if (itstarlog != null){itstarlog.close();}if (otherlog != null){otherlog.close();}}
}public class FileDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {//配置、jobConfiguration conf = new Configuration();Job job = Job.getInstance(conf);//jar包job.setJarByClass(FileDriver.class);//Mapper、Reducerjob.setMapperClass(FileMapper.class);job.setReducerClass(FileReducer.class);//Mapper输出job.setMapOutputKeyClass(IntWritable.class);job.setMapOutputValueClass(Text.class);//Reudcer输出job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);//自定义输出类job.setOutputFormatClass(FuncFileOutputFormat.class);//文件输入输出流FileInputFormat.setInputPaths(job,new Path("G:\\mapreduce\\outputformat\\in"));FileOutputFormat.setOutputPath(job,new Path("G:\\mapreduce\\outputformat\\out"));//提交任务if (job.waitForCompletion(true)){System.out.println("运行完成!");}else {System.out.println("运行失败!");}}
}

  

 

转载于:https://www.cnblogs.com/HelloBigTable/p/10638866.html

这篇关于自定义InputFormat和OutputFormat案例的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/491432

相关文章

C#中通过Response.Headers设置自定义参数的代码示例

《C#中通过Response.Headers设置自定义参数的代码示例》:本文主要介绍C#中通过Response.Headers设置自定义响应头的方法,涵盖基础添加、安全校验、生产实践及调试技巧,强... 目录一、基础设置方法1. 直接添加自定义头2. 批量设置模式二、高级配置技巧1. 安全校验机制2. 类型

Java中的分布式系统开发基于 Zookeeper 与 Dubbo 的应用案例解析

《Java中的分布式系统开发基于Zookeeper与Dubbo的应用案例解析》本文将通过实际案例,带你走进基于Zookeeper与Dubbo的分布式系统开发,本文通过实例代码给大家介绍的非常详... 目录Java 中的分布式系统开发基于 Zookeeper 与 Dubbo 的应用案例一、分布式系统中的挑战二

Java 中的 equals 和 hashCode 方法关系与正确重写实践案例

《Java中的equals和hashCode方法关系与正确重写实践案例》在Java中,equals和hashCode方法是Object类的核心方法,广泛用于对象比较和哈希集合(如HashMa... 目录一、背景与需求分析1.1 equals 和 hashCode 的背景1.2 需求分析1.3 技术挑战1.4

Java中实现对象的拷贝案例讲解

《Java中实现对象的拷贝案例讲解》Java对象拷贝分为浅拷贝(复制值及引用地址)和深拷贝(递归复制所有引用对象),常用方法包括Object.clone()、序列化及JSON转换,需处理循环引用问题,... 目录对象的拷贝简介浅拷贝和深拷贝浅拷贝深拷贝深拷贝和循环引用总结对象的拷贝简介对象的拷贝,把一个

Java中最全最基础的IO流概述和简介案例分析

《Java中最全最基础的IO流概述和简介案例分析》JavaIO流用于程序与外部设备的数据交互,分为字节流(InputStream/OutputStream)和字符流(Reader/Writer),处理... 目录IO流简介IO是什么应用场景IO流的分类流的超类类型字节文件流应用简介核心API文件输出流应用文

SpringBoot AspectJ切面配合自定义注解实现权限校验的示例详解

《SpringBootAspectJ切面配合自定义注解实现权限校验的示例详解》本文章介绍了如何通过创建自定义的权限校验注解,配合AspectJ切面拦截注解实现权限校验,本文结合实例代码给大家介绍的非... 目录1. 创建权限校验注解2. 创建ASPectJ切面拦截注解校验权限3. 用法示例A. 参考文章本文

MyBatis分页查询实战案例完整流程

《MyBatis分页查询实战案例完整流程》MyBatis是一个强大的Java持久层框架,支持自定义SQL和高级映射,本案例以员工工资信息管理为例,详细讲解如何在IDEA中使用MyBatis结合Page... 目录1. MyBATis框架简介2. 分页查询原理与应用场景2.1 分页查询的基本原理2.1.1 分

Vite 打包目录结构自定义配置小结

《Vite打包目录结构自定义配置小结》在Vite工程开发中,默认打包后的dist目录资源常集中在asset目录下,不利于资源管理,本文基于Rollup配置原理,本文就来介绍一下通过Vite配置自定义... 目录一、实现原理二、具体配置步骤1. 基础配置文件2. 配置说明(1)js 资源分离(2)非 JS 资

深度解析Java @Serial 注解及常见错误案例

《深度解析Java@Serial注解及常见错误案例》Java14引入@Serial注解,用于编译时校验序列化成员,替代传统方式解决运行时错误,适用于Serializable类的方法/字段,需注意签... 目录Java @Serial 注解深度解析1. 注解本质2. 核心作用(1) 主要用途(2) 适用位置3

Java 正则表达式的使用实战案例

《Java正则表达式的使用实战案例》本文详细介绍了Java正则表达式的使用方法,涵盖语法细节、核心类方法、高级特性及实战案例,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要... 目录一、正则表达式语法详解1. 基础字符匹配2. 字符类([]定义)3. 量词(控制匹配次数)4. 边