Mapreduce的排序(全局排序、分区加排序、Combiner优化)

2023-12-14 06:18

本文主要是介绍Mapreduce的排序(全局排序、分区加排序、Combiner优化),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一、MR排序的分类

  1.部分排序:MR会根据自己输出记录的KV对数据进行排序,保证输出到每一个文件内存都是经过排序的;

  2.全局排序;

  3.辅助排序:再第一次排序后经过分区再排序一次;

  4.二次排序:经过一次排序后又根据业务逻辑再次进行排序。

 

二、MR排序的接口——WritableComparable

  该接口继承了Hadoop的Writable接口和Java的Comparable接口,实现该接口要重写write、readFields、compareTo三个方法。

 

三、流量统计案例的排序与分区

/*** @author: PrincessHug* @date: 2019/3/24, 15:36* @Blog: https://www.cnblogs.com/HelloBigTable/*/
public class FlowSortBean implements WritableComparable<FlowSortBean> {private long upFlow;private long dwFlow;private long flowSum;public FlowSortBean() {}public FlowSortBean(long upFlow, long dwFlow) {this.upFlow = upFlow;this.dwFlow = dwFlow;this.flowSum = upFlow + dwFlow;}public long getUpFlow() {return upFlow;}public void setUpFlow(long upFlow) {this.upFlow = upFlow;}public long getDwFlow() {return dwFlow;}public void setDwFlow(long dwFlow) {this.dwFlow = dwFlow;}public long getFlowSum() {return flowSum;}public void setFlowSum(long flowSum) {this.flowSum = flowSum;}@Overridepublic void write(DataOutput out) throws IOException {out.writeLong(upFlow);out.writeLong(dwFlow);out.writeLong(flowSum);}@Overridepublic void readFields(DataInput in) throws IOException {upFlow = in.readLong();dwFlow = in.readLong();flowSum = in.readLong();}@Overridepublic String toString() {return upFlow + "\t" + dwFlow + "\t" + flowSum;}@Overridepublic int compareTo(FlowSortBean o) {return this.flowSum > o.getFlowSum() ? -1:1;}
}public class FlowSortMapper extends Mapper<LongWritable, Text,FlowSortBean,Text> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//获取数据String line = value.toString();//切分数据String[] fields = line.split("\t");//封装数据long upFlow = Long.parseLong(fields[1]);long dwFlow = Long.parseLong(fields[2]);//传输数据context.write(new FlowSortBean(upFlow,dwFlow),new Text(fields[0]));}
}public class FlowSortReducer extends Reducer<FlowSortBean,Text,Text,FlowSortBean> {@Overrideprotected void reduce(FlowSortBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {context.write(values.iterator().next(),key);}
}public class FlowSortPartitioner extends Partitioner<FlowSortBean, Text> {@Overridepublic int getPartition(FlowSortBean key, Text value, int i) {String phoneNum = value.toString().substring(0, 3);int partition = 4;if ("135".equals(phoneNum)){return 0;}else if ("137".equals(phoneNum)){return 1;}else if ("138".equals(phoneNum)){return 2;}else if ("139".equals(phoneNum)){return 3;}return partition;}
}public class FlowSortDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {//设置配置,初始化Job类Configuration conf = new Configuration();Job job = Job.getInstance(conf);//设置执行类job.setJarByClass(FlowSortDriver.class);//设置Mapper、Reducer类job.setMapperClass(FlowSortMapper.class);job.setReducerClass(FlowSortReducer.class);//设置Mapper输出数据类型job.setMapOutputKeyClass(FlowSortBean.class);job.setMapOutputValueClass(Text.class);//设置Reducer输出数据类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowSortBean.class);//设置自定义分区job.setPartitionerClass(FlowSortPartitioner.class);job.setNumReduceTasks(5);//设置文件输入输出类型FileInputFormat.setInputPaths(job,new Path("G:\\mapreduce\\flow\\flowsort\\in"));FileOutputFormat.setOutputPath(job,new Path("G:\\mapreduce\\flow\\flowsort\\partitionout"));//提交任务if (job.waitForCompletion(true)){System.out.println("运行完成!");}else {System.out.println("运行失败!");}}
}

  注意:再写Mapper类的时候,要注意KV对输出的数据类型,Key的类型一定要为FlowSortBean,因为在Mapper和Reducer之间进行的排序(只是排序)是通过Mapper输出的Key来进行排序的,而分区可以指定是通过Key或者Value。

 

四、Combiner合并

  Combiner是在MR之外的一个组件,可以用来在maptask输出到环形缓冲区溢写之后,分区排序完成时进行局部的汇总,可以减少网络传输量,进而优化MR程序。

  Combiner是用在当数据量到达一定规模之后的,小的数据量并不是很明显。

  例如WordCount程序,当单词文件的大小到达一定程度,可以使用自定义Combiner进行优化:

public class WordCountCombiner extends Reducer<Text,IntWritable,Text,IntWritable>{protected void reduce(Text key,Iterable<IntWritable> values,Context context){//计数int count = 0;//累加求和for(IntWritable v:values){count += v.get();}//输出context.write(key,new IntWritable(count));}
}

  然后再Driver类中设置使用Combiner类

job.setCombinerClass(WordCountCombiner.class);

  如果仔细观察,WordCount的自定义Combiner类与Reducer类是完全相同的,因为他们的逻辑是相同的,即在maptask之后的分区内先进行一次累加求和,然后到reducer后再进行总的累加求和,所以在设置Combiner时也可以这样:

job.setCombinerClass(WordCountReducer.class);

 

  注意:Combiner的应用一定要注意不能影响最终业务逻辑的情况下使用,比如在求平均值的时候:

  mapper输出两个分区:3,5,7  =>avg=5

            2,6    =>avg=4

  reducer合并输出:  5,4     =>avg=4.5  但是实际应该为4.6,错误!

  所以在使用Combiner时要注意其不会影响最中的结果!!!

 

转载于:https://www.cnblogs.com/HelloBigTable/p/10591267.html

这篇关于Mapreduce的排序(全局排序、分区加排序、Combiner优化)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/491430

相关文章

深入解析Java NIO在高并发场景下的性能优化实践指南

《深入解析JavaNIO在高并发场景下的性能优化实践指南》随着互联网业务不断演进,对高并发、低延时网络服务的需求日益增长,本文将深入解析JavaNIO在高并发场景下的性能优化方法,希望对大家有所帮助... 目录简介一、技术背景与应用场景二、核心原理深入分析2.1 Selector多路复用2.2 Buffer

SpringBoot利用树形结构优化查询速度

《SpringBoot利用树形结构优化查询速度》这篇文章主要为大家详细介绍了SpringBoot利用树形结构优化查询速度,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录一个真实的性能灾难传统方案为什么这么慢N+1查询灾难性能测试数据对比核心解决方案:一次查询 + O(n)算法解决

小白也能轻松上手! 路由器设置优化指南

《小白也能轻松上手!路由器设置优化指南》在日常生活中,我们常常会遇到WiFi网速慢的问题,这主要受到三个方面的影响,首要原因是WiFi产品的配置优化不合理,其次是硬件性能的不足,以及宽带线路本身的质... 在数字化时代,网络已成为生活必需品,追剧、游戏、办公、学习都离不开稳定高速的网络。但很多人面对新路由器

MySQL深分页进行性能优化的常见方法

《MySQL深分页进行性能优化的常见方法》在Web应用中,分页查询是数据库操作中的常见需求,然而,在面对大型数据集时,深分页(deeppagination)却成为了性能优化的一个挑战,在本文中,我们将... 目录引言:深分页,真的只是“翻页慢”那么简单吗?一、背景介绍二、深分页的性能问题三、业务场景分析四、

Linux进程CPU绑定优化与实践过程

《Linux进程CPU绑定优化与实践过程》Linux支持进程绑定至特定CPU核心,通过sched_setaffinity系统调用和taskset工具实现,优化缓存效率与上下文切换,提升多核计算性能,适... 目录1. 多核处理器及并行计算概念1.1 多核处理器架构概述1.2 并行计算的含义及重要性1.3 并

MySQL 定时新增分区的实现示例

《MySQL定时新增分区的实现示例》本文主要介绍了通过存储过程和定时任务实现MySQL分区的自动创建,解决大数据量下手动维护的繁琐问题,具有一定的参考价值,感兴趣的可以了解一下... mysql创建好分区之后,有时候会需要自动创建分区。比如,一些表数据量非常大,有些数据是热点数据,按照日期分区MululbU

Spring Boot @RestControllerAdvice全局异常处理最佳实践

《SpringBoot@RestControllerAdvice全局异常处理最佳实践》本文详解SpringBoot中通过@RestControllerAdvice实现全局异常处理,强调代码复用、统... 目录前言一、为什么要使用全局异常处理?二、核心注解解析1. @RestControllerAdvice2

MySQL中的锁机制详解之全局锁,表级锁,行级锁

《MySQL中的锁机制详解之全局锁,表级锁,行级锁》MySQL锁机制通过全局、表级、行级锁控制并发,保障数据一致性与隔离性,全局锁适用于全库备份,表级锁适合读多写少场景,行级锁(InnoDB)实现高并... 目录一、锁机制基础:从并发问题到锁分类1.1 并发访问的三大问题1.2 锁的核心作用1.3 锁粒度分

MyBatisPlus如何优化千万级数据的CRUD

《MyBatisPlus如何优化千万级数据的CRUD》最近负责的一个项目,数据库表量级破千万,每次执行CRUD都像走钢丝,稍有不慎就引起数据库报警,本文就结合这个项目的实战经验,聊聊MyBatisPl... 目录背景一、MyBATis Plus 简介二、千万级数据的挑战三、优化 CRUD 的关键策略1. 查

一文详解Java Stream的sorted自定义排序

《一文详解JavaStream的sorted自定义排序》Javastream中的sorted方法是用于对流中的元素进行排序的方法,它可以接受一个comparator参数,用于指定排序规则,sorte... 目录一、sorted 操作的基础原理二、自定义排序的实现方式1. Comparator 接口的 Lam