Hadoop3:MapReduce中Reduce阶段自定义OutputFormat逻辑

2024-06-22 03:52

本文主要是介绍Hadoop3:MapReduce中Reduce阶段自定义OutputFormat逻辑,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一、情景描述

我们知道,在MapTask阶段开始时,需要InputFormat来读取数据
而在ReduceTask阶段结束时,将处理完成的数据,输出到磁盘,此时就要用到OutputFormat

在之前的程序中,我们都没有设置过这部分配置
所以,采用的是默认输出格式:TextOutputFormat

在实际工作中,我们的输出不一定是到磁盘,可能是输出到MySQL、HBase

那么,如何实现自定义的OutputFormat
在这里插入图片描述

二、案例

1、源数据

http://www.baidu.com
http://www.google.com
http://cn.bing.com
http://www.atguigu.com
http://www.sohu.com
http://www.baidu.com
http://www.sina.com
http://www.sin2a.com
http://www.baidu.com
http://www.sin2desa.com
http://www.sindsafa.com

2、需求分析

过滤输入的log日志,包含atguigu的网站输出到e:/atguigu.log,不包含atguigu的网站输出到e:/other.log

3、代码实现

LogMapper.java

package com.atguigu.mapreduce.outputformat;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class LogMapper extends Mapper<LongWritable, Text,Text, NullWritable> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {// http://www.baidu.com//http://www.google.com// (http://www.google.com, NullWritable)// 不做任何处理context.write(value, NullWritable.get());}
}

LogReducer.java

package com.atguigu.mapreduce.outputformat;import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class LogReducer extends Reducer<Text, NullWritable, Text, NullWritable> {@Overrideprotected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {// http://www.baidu.com// http://www.baidu.com// 防止有相同数据,丢数据for (NullWritable value : values) {context.write(key, NullWritable.get());}}
}

LogRecordWriter.java

package com.atguigu.mapreduce.outputformat;import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;import java.io.IOException;public class LogRecordWriter extends RecordWriter<Text, NullWritable> {private  FSDataOutputStream atguiguOut;private  FSDataOutputStream otherOut;public LogRecordWriter(TaskAttemptContext job) {// 创建两条流try {FileSystem fs = FileSystem.get(job.getConfiguration());atguiguOut = fs.create(new Path("D:\\hadoop\\atguigu.log"));otherOut = fs.create(new Path("D:\\hadoop\\other.log"));} catch (IOException e) {e.printStackTrace();}}@Overridepublic void write(Text key, NullWritable value) throws IOException, InterruptedException {String log = key.toString();// 具体写if (log.contains("atguigu")){atguiguOut.writeBytes(log+"\n");}else {otherOut.writeBytes(log+"\n");}}@Overridepublic void close(TaskAttemptContext context) throws IOException, InterruptedException {// 关流IOUtils.closeStream(atguiguOut);IOUtils.closeStream(otherOut);}
}

LogOutputFormat.java

package com.atguigu.mapreduce.outputformat;import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class LogOutputFormat extends FileOutputFormat<Text, NullWritable> {@Overridepublic RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {LogRecordWriter lrw = new LogRecordWriter(job);return lrw;}
}

LogDriver.java

package com.atguigu.mapreduce.outputformat;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class LogDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(LogDriver.class);job.setMapperClass(LogMapper.class);job.setReducerClass(LogReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(NullWritable.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);//设置自定义的outputformatjob.setOutputFormatClass(LogOutputFormat.class);FileInputFormat.setInputPaths(job, new Path("D:\\input\\inputoutputformat"));//虽然我们自定义了outputformat,但是因为我们的outputformat继承自fileoutputformat//而fileoutputformat要输出一个_SUCCESS文件,所以在这还得指定一个输出目录FileOutputFormat.setOutputPath(job, new Path("D:\\hadoop\\output1111"));boolean b = job.waitForCompletion(true);System.exit(b ? 0 : 1);}
}

3、测试

在这里插入图片描述
在这里插入图片描述

三、总结

关键文件:
LogRecordWriter.java
LogOutputFormat.java
LogDriver.java

        //设置自定义的outputformatjob.setOutputFormatClass(LogOutputFormat.class);

这篇关于Hadoop3:MapReduce中Reduce阶段自定义OutputFormat逻辑的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1083214

相关文章

C#中通过Response.Headers设置自定义参数的代码示例

《C#中通过Response.Headers设置自定义参数的代码示例》:本文主要介绍C#中通过Response.Headers设置自定义响应头的方法,涵盖基础添加、安全校验、生产实践及调试技巧,强... 目录一、基础设置方法1. 直接添加自定义头2. 批量设置模式二、高级配置技巧1. 安全校验机制2. 类型

Docker多阶段镜像构建与缓存利用性能优化实践指南

《Docker多阶段镜像构建与缓存利用性能优化实践指南》这篇文章将从原理层面深入解析Docker多阶段构建与缓存机制,结合实际项目示例,说明如何有效利用构建缓存,组织镜像层次,最大化提升构建速度并减少... 目录一、技术背景与应用场景二、核心原理深入分析三、关键 dockerfile 解读3.1 Docke

SpringBoot AspectJ切面配合自定义注解实现权限校验的示例详解

《SpringBootAspectJ切面配合自定义注解实现权限校验的示例详解》本文章介绍了如何通过创建自定义的权限校验注解,配合AspectJ切面拦截注解实现权限校验,本文结合实例代码给大家介绍的非... 目录1. 创建权限校验注解2. 创建ASPectJ切面拦截注解校验权限3. 用法示例A. 参考文章本文

Vite 打包目录结构自定义配置小结

《Vite打包目录结构自定义配置小结》在Vite工程开发中,默认打包后的dist目录资源常集中在asset目录下,不利于资源管理,本文基于Rollup配置原理,本文就来介绍一下通过Vite配置自定义... 目录一、实现原理二、具体配置步骤1. 基础配置文件2. 配置说明(1)js 资源分离(2)非 JS 资

聊聊springboot中如何自定义消息转换器

《聊聊springboot中如何自定义消息转换器》SpringBoot通过HttpMessageConverter处理HTTP数据转换,支持多种媒体类型,接下来通过本文给大家介绍springboot中... 目录核心接口springboot默认提供的转换器如何自定义消息转换器Spring Boot 中的消息

mybatisplus的逻辑删除过程

《mybatisplus的逻辑删除过程》:本文主要介绍mybatisplus的逻辑删除过程,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录myBATisplus的逻辑删除1、在配置文件中添加逻辑删除的字段2、在实体类上加上@TableLogic3、业务层正常删除即

Python自定义异常的全面指南(入门到实践)

《Python自定义异常的全面指南(入门到实践)》想象你正在开发一个银行系统,用户转账时余额不足,如果直接抛出ValueError,调用方很难区分是金额格式错误还是余额不足,这正是Python自定义异... 目录引言:为什么需要自定义异常一、异常基础:先搞懂python的异常体系1.1 异常是什么?1.2

Linux中的自定义协议+序列反序列化用法

《Linux中的自定义协议+序列反序列化用法》文章探讨网络程序在应用层的实现,涉及TCP协议的数据传输机制、结构化数据的序列化与反序列化方法,以及通过JSON和自定义协议构建网络计算器的思路,强调分层... 目录一,再次理解协议二,序列化和反序列化三,实现网络计算器3.1 日志文件3.2Socket.hpp

C语言自定义类型之联合和枚举解读

《C语言自定义类型之联合和枚举解读》联合体共享内存,大小由最大成员决定,遵循对齐规则;枚举类型列举可能值,提升可读性和类型安全性,两者在C语言中用于优化内存和程序效率... 目录一、联合体1.1 联合体类型的声明1.2 联合体的特点1.2.1 特点11.2.2 特点21.2.3 特点31.3 联合体的大小1

springboot自定义注解RateLimiter限流注解技术文档详解

《springboot自定义注解RateLimiter限流注解技术文档详解》文章介绍了限流技术的概念、作用及实现方式,通过SpringAOP拦截方法、缓存存储计数器,结合注解、枚举、异常类等核心组件,... 目录什么是限流系统架构核心组件详解1. 限流注解 (@RateLimiter)2. 限流类型枚举 (