【Hadoop】9.MapReduce框架原理-OutputFormat数据输出

2023-11-09 13:32

本文主要是介绍【Hadoop】9.MapReduce框架原理-OutputFormat数据输出,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

在前面,我们知道了多种输入模式,输出也一样。OutputFormat是MapReduce输出的基类,所有实现MapReduce输出都实现了OutputFormat接口。

OutputFormat 接口实现类
  1. 文本输出TextOutputFormat
    默认的输出格式是TextOutputFormat它把每条记录写为文本行。它的键和值可以是任意类型,因为TextOutputFormat调用toString() 方法把它们转换为字符串
  2. SequenceFileOutputFormat
    将SequenceFileOutputFormat输出作为后续MapReduce任务的输入,这便是一种好的输出格式,因为他的格式紧凑,很容易被压缩
  3. 自定义OutputFormat
    根据自己的需求,自定义实现。
自定义OutputFormat实现过程

步骤:

  1. 自定义一个类继承FileOutputFormat
  2. 改写RecordWriter,具体改写输出数据的方法write()

示例:
在这里插入图片描述

CustomOutputDriver

package com.xing.MapReduce.CustomOutputFormat;import com.xing.MapReduce.Flowsum.FlowBean;
import com.xing.MapReduce.Flowsum.FlowBeanMapper;
import com.xing.MapReduce.Flowsum.FlowDriver;
import com.xing.MapReduce.Flowsum.FlowReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class CustomOutputDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {System.setProperty("hadoop.home.dir", "E:\\hadoop-2.7.1");String in = "E:\\hdfs\\data\\customout\\input\\demo.txt";String out = "E:\\hdfs\\data\\customout\\out";Path inPath = new Path(in);Path outPath = new Path(out);Configuration configuration = new Configuration();FileSystem fs = FileSystem.get(configuration);if (fs.exists(outPath)) {if (fs.delete(outPath, true)){System.out.println("success delete outfile");}}Job job = Job.getInstance(configuration);job.setJobName("CustomOutput");job.setJarByClass(CustomOutputDriver.class);job.setMapperClass(CustomOutputMapper.class);job.setReducerClass(CustomOutputReduce.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(NullWritable.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);// 设置自定义FileOutputFormat类job.setOutputFormatClass(CustomFileOutputFormat.class);FileInputFormat.setInputPaths(job, inPath);FileOutputFormat.setOutputPath(job, outPath);boolean rel = job.waitForCompletion(true);if (rel) {System.out.println("success");}}}

CustomOutputMapper

package com.xing.MapReduce.CustomOutputFormat;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class CustomOutputMapper extends Mapper<LongWritable,Text,Text,NullWritable> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {context.write(value,NullWritable.get() );}
}

CustomOutputReduce

package com.xing.MapReduce.CustomOutputFormat;import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class CustomOutputReduce extends Reducer<Text,NullWritable,Text,NullWritable> {@Overrideprotected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {for (NullWritable value : values) {context.write(key,value );}}
}

CustomFileOutputFormat

package com.xing.MapReduce.CustomOutputFormat;import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class CustomFileOutputFormat extends FileOutputFormat<Text,NullWritable> {/***  返回自定义的Writer* @param context* @return* @throws IOException* @throws InterruptedException*/public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {return new CustomRecordWriter(context);}
}

CustomRecordWriter

package com.xing.MapReduce.CustomOutputFormat;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;import java.io.IOException;public class CustomRecordWriter extends RecordWriter<Text, NullWritable> {private FileSystem fs;private Configuration conf;private FSDataOutputStream fos1;private FSDataOutputStream fos2;CustomRecordWriter(){}CustomRecordWriter(TaskAttemptContext context) {// 初始化一些属性try {conf = context.getConfiguration();fs = FileSystem.get(conf);fos1 = fs.create(new Path("E:\\hdfs\\data\\customout\\output\\http.txt"));fos2 = fs.create(new Path("E:\\hdfs\\data\\customout\\output\\other.txt"));} catch (IOException e) {e.printStackTrace();}}/***  判断key值是否含有HTTP,有则输出到http.txt 其他都输出到other.txt文件* @param text* @param nullWritable* @throws IOException* @throws InterruptedException*/public void write(Text text, NullWritable nullWritable) throws IOException, InterruptedException {if (text.toString().toUpperCase().contains("HTTP")){System.out.println("yes");// \r\n 为windows的换行 fos1.write(text.toString().concat("\r\n").getBytes());}else {System.out.println("no");fos2.write(text.toString().concat("\r\n").getBytes());}}/***  关闭流* @param taskAttemptContext* @throws IOException* @throws InterruptedException*/public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {IOUtils.closeStream(fos1);IOUtils.closeStream(fos2);}
}

输出结果
在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

这篇关于【Hadoop】9.MapReduce框架原理-OutputFormat数据输出的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!


原文地址:
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.chinasem.cn/article/376425

相关文章

使用SpringBoot整合Sharding Sphere实现数据脱敏的示例

《使用SpringBoot整合ShardingSphere实现数据脱敏的示例》ApacheShardingSphere数据脱敏模块,通过SQL拦截与改写实现敏感信息加密存储,解决手动处理繁琐及系统改... 目录痛点一:痛点二:脱敏配置Quick Start——Spring 显示配置:1.引入依赖2.创建脱敏

Mysql的主从同步/复制的原理分析

《Mysql的主从同步/复制的原理分析》:本文主要介绍Mysql的主从同步/复制的原理分析,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录为什么要主从同步?mysql主从同步架构有哪些?Mysql主从复制的原理/整体流程级联复制架构为什么好?Mysql主从复制注意

详解如何使用Python构建从数据到文档的自动化工作流

《详解如何使用Python构建从数据到文档的自动化工作流》这篇文章将通过真实工作场景拆解,为大家展示如何用Python构建自动化工作流,让工具代替人力完成这些数字苦力活,感兴趣的小伙伴可以跟随小编一起... 目录一、Excel处理:从数据搬运工到智能分析师二、PDF处理:文档工厂的智能生产线三、邮件自动化:

Nacos注册中心和配置中心的底层原理全面解读

《Nacos注册中心和配置中心的底层原理全面解读》:本文主要介绍Nacos注册中心和配置中心的底层原理的全面解读,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录临时实例和永久实例为什么 Nacos 要将服务实例分为临时实例和永久实例?1.x 版本和2.x版本的区别

Python数据分析与可视化的全面指南(从数据清洗到图表呈现)

《Python数据分析与可视化的全面指南(从数据清洗到图表呈现)》Python是数据分析与可视化领域中最受欢迎的编程语言之一,凭借其丰富的库和工具,Python能够帮助我们快速处理、分析数据并生成高质... 目录一、数据采集与初步探索二、数据清洗的七种武器1. 缺失值处理策略2. 异常值检测与修正3. 数据

pandas实现数据concat拼接的示例代码

《pandas实现数据concat拼接的示例代码》pandas.concat用于合并DataFrame或Series,本文主要介绍了pandas实现数据concat拼接的示例代码,具有一定的参考价值,... 目录语法示例:使用pandas.concat合并数据默认的concat:参数axis=0,join=

C#代码实现解析WTGPS和BD数据

《C#代码实现解析WTGPS和BD数据》在现代的导航与定位应用中,准确解析GPS和北斗(BD)等卫星定位数据至关重要,本文将使用C#语言实现解析WTGPS和BD数据,需要的可以了解下... 目录一、代码结构概览1. 核心解析方法2. 位置信息解析3. 经纬度转换方法4. 日期和时间戳解析5. 辅助方法二、L

使用Python和Matplotlib实现可视化字体轮廓(从路径数据到矢量图形)

《使用Python和Matplotlib实现可视化字体轮廓(从路径数据到矢量图形)》字体设计和矢量图形处理是编程中一个有趣且实用的领域,通过Python的matplotlib库,我们可以轻松将字体轮廓... 目录背景知识字体轮廓的表示实现步骤1. 安装依赖库2. 准备数据3. 解析路径指令4. 绘制图形关键

解决mysql插入数据锁等待超时报错:Lock wait timeout exceeded;try restarting transaction

《解决mysql插入数据锁等待超时报错:Lockwaittimeoutexceeded;tryrestartingtransaction》:本文主要介绍解决mysql插入数据锁等待超时报... 目录报错信息解决办法1、数据库中执行如下sql2、再到 INNODB_TRX 事务表中查看总结报错信息Lock

apache的commons-pool2原理与使用实践记录

《apache的commons-pool2原理与使用实践记录》ApacheCommonsPool2是一个高效的对象池化框架,通过复用昂贵资源(如数据库连接、线程、网络连接)优化系统性能,这篇文章主... 目录一、核心原理与组件二、使用步骤详解(以数据库连接池为例)三、高级配置与优化四、典型应用场景五、注意事