HadoopSpark解决二次排序问题(Hadoop篇)

2024-05-27 12:58

本文主要是介绍HadoopSpark解决二次排序问题(Hadoop篇),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

问题描述

  二次排序就是对每一个key对应的value进行排序,也就是对MapReduce的输出(KEY, Value(v1,v2,v3,......,vn))中的Value(v1,v2,v3,......,vn)值进行排序(升序或者降序),使得Value(s1,s2,s3,......,sn),si ∈ (v1,v2,v3,......,vn)且s1 < s2 < s3 < ...... < sn。假设我们有以下输入文件(逗号分割的分别是年,月,总数):

[root@iteblog.com /tmp ] # vim data.txt
2015,1,24
2015,3,56
2015,1,3
2015,2,-43
2015,4,5
2015,3,46
2014,2,64
2015,1,4
2015,1,21
2015,2,35
2015,2,0

我们期望的输出结果是

2014-2  64
2015-1  3,4,21,24
2015-2  -43,0,35
2015-3  46,56
2015-4  5

  但是Hadoop默认的输出结果只能对Key进行排序,其中Value中的值次序是不定的;也就是说,Hadoop默认的输出可能如下:

2014-2  64
2015-1  21,4,3,24
2015-2  0,35,-43
2015-3  46,56
2015-4  5

解决方案

  针对这个问题我们有两种方法来解决:(1)、将每个Key对应的Value全部存储到内存(这个只会存储到单台机器),然后对这些Value进行相应的排序。但是如果Value的数据量非常大,导致单台内存无法存储这些数据,这将会导致程序出现java.lang.OutOfMemoryError,所以这个方法不是很通用。(2)、这种方法将Value中的值和旧的Key组成一个新的Key,这样我们就可以利用Reduce来排序这个Key,其生成的结果就是我们需要的。过程如下:
  1、原始的键值对是(k,v)。这里的k就是就的key,也可以 称为natural key;
  2、我们可以将k和v组合成新的key(可以称为composite key),也就是((k,v), v)
  3、自定义分区函数,将k相同的键值对发送到同一个Reduce中;
  4、自定义分组函数,将k相同的键值对当作一个分组。
  文字比较枯燥,我们来看看下面实例:
  1、原始数据是

[root@iteblog.com /tmp ] # vim data.txt
2015,1,24
2015,3,56
2015,1,3
2015,2,-43
2015,4,5
2015,3,46
2014,2,64
2015,1,4
2015,1,21
2015,2,35
2015,2,0

我们将年、月组成key(natural key),总数作为value,结果变成:

(2015-1,24)
(2015-3,56)
(2015-1,3)
(2015-2,-43)
(2015-4,5)
(2015-3,46)
(2014-2,64)
(2015-1,4)
(2015-1,21)
(2015-2,35)
(2015-2,0)

  2、将value和key(natural key)组成新的key(composite key),如下:

((2015-1,24),24)
((2015-3,56),56)
((2015-1,3),3)
((2015-2,-43),-43)
((2015-4,5),5)
((2015-3,46),46)
((2014-2,64),64)
((2015-1,4),4)
((2015-1,21),21)
((2015-2,35),35)
((2015-2,0),0)

  3、自定义分区函数,将k相同的键值对发送到同一个Reduce中,结果如下:

[((2014-2,64),64)]
[((2015-1,24),24),((2015-1,3),3),((2015-1,4),4),((2015-1,21),21)]
[((2015-2,-43),-43),((2015-2,0),0),((2015-2,35),35)]
[((2015-3,56),56),((2015-3,46),46)]
[((2015-4,5),5)]

  4、自定义组排序函数,结果如下:

[((2014-2,64),64)]
[((2015-1,3),3),((2015-1,4),4),((2015-1,21),21),((2015-1,24),24)]
[((2015-2,-43),-43),((2015-2,0),0),((2015-2,35),35)]
[((2015-3,46),46),((2015-3,56),56)]
[((2015-4,5),5)]

  5、自定义分组函数,结果如下:

((2014-2,64),(64))
((2015-1,24),(3,4,21,24))
((2015-2,35),(-43,0,35))
((2015-3,56),(46,56))
((2015-4,5),(5))

  6、最后输出的结果就是我们要的:

2014-2  64
2015-1  3,4,21,24
2015-2  -43,0,35
2015-3  46,56
2015-4  5

代码实例

  下面将贴出使用MapReduce解决这个问题的代码:

package com.iteblog;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
  * User: 过往记忆
  * Date: 2015-08-05
  * Time: 下午23:49
  * bolg: http://www.iteblog.com
  * 本文地址:http://www.iteblog.com/archives/1415
  * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
  * 过往记忆博客微信公共帐号:iteblog_hadoop
  */
public class Entry implements WritableComparable<Entry> {
     private String yearMonth;
     private int count;
     public Entry() {
     }
     @Override
     public int compareTo(Entry entry) {
         int result = this .yearMonth.compareTo(entry.getYearMonth());
         if (result == 0 ) {
             result = compare(count, entry.getCount());
         }
         return result;
     }
     @Override
     public void write(DataOutput dataOutput) throws IOException {
         dataOutput.writeUTF(yearMonth);
         dataOutput.writeInt(count);
     }
     @Override
     public void readFields(DataInput dataInput) throws IOException {
         this .yearMonth = dataInput.readUTF();
         this .count = dataInput.readInt();
     }
     public String getYearMonth() {
         return yearMonth;
     }
     public void setYearMonth(String yearMonth) {
         this .yearMonth = yearMonth;
     }
     public int getCount() {
         return count;
     }
     public void setCount( int count) {
         this .count = count;
     }
     public static int compare( int a, int b) {
         return a < b ? - 1 : (a > b ? 1 : 0 );
     }
     @Override
     public String toString() {
         return yearMonth;
     }
}

  上面就是将旧的Key(natural key)和Value组合成新的Key(composite key)的代码,接下来看下自定义的分区类:

package com.iteblog;
import org.apache.hadoop.mapreduce.Partitioner;
public class EntryPartitioner extends Partitioner<Entry, Integer> {
     @Override
     public int getPartition(Entry entry, Integer integer, int numberPartitions) {
         return Math.abs((entry.getYearMonth().hashCode() % numberPartitions));
     }
}

  这个类使得natural key相同的数据分派到同一个Reduce中。然后看下自定义分组类:

package com.iteblog;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
/**
  * User: 过往记忆
  * Date: 2015-08-05
  * Time: 下午23:49
  * bolg: http://www.iteblog.com
  * 本文地址:http://www.iteblog.com/archives/1415
  * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
  * 过往记忆博客微信公共帐号:iteblog_hadoop
  */
public class EntryGroupingComparator extends WritableComparator {
     public EntryGroupingComparator() {
         super (Entry. class , true );
     }
     @Override
     public int compare(WritableComparable a, WritableComparable b) {
         Entry a1 = (Entry) a;
         Entry b1 = (Entry) b;
         return a1.getYearMonth().compareTo(b1.getYearMonth());
     }
}

  只要是natural key相同,我们就认为是同一个分组,这样Reduce内部才可以对Value中的值进行排序。接下来看下Map类

public class SecondarySortMapper extends Mapper<LongWritable, Text, Entry, Text> {
     private Entry entry = new Entry();
     private Text value = new Text();
     @Override
     protected void map(LongWritable key, Text lines, Context context)
             throws IOException, InterruptedException {
         String line = lines.toString();
         String[] tokens = line.split( "," );
         // YYYY = tokens[0]
         // MM = tokens[1]
         // count = tokens[2]
         String yearMonth = tokens[ 0 ] + "-" + tokens[ 1 ];
         int count = Integer.parseInt(tokens[ 2 ]);
         entry.setYearMonth(yearMonth);
         entry.setCount(count);
         value.set(tokens[ 2 ]);
         context.write(entry, value);
     }
}

  其实就是解析每一行的数据,然后将旧的Key(natural key)和Value组合成新的Key(composite key)。接下来看下Reduce类实现

public class SecondarySortReducer extends Reducer<Entry, Text, Entry, Text> {
     @Override
     protected void reduce(Entry key, Iterable<Text> values, Context context)
             throws IOException, InterruptedException {
         StringBuilder builder = new StringBuilder();
         for (Text value : values) {
             builder.append(value.toString());
             builder.append( "," );
         }
         context.write(key, new Text(builder.toString()));
     }
}

builder存储的就是排序好的Value序列,最后来看看启动程序的使用:

Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(Iteblog. class );
job.setJobName( "SecondarySort" );
FileInputFormat.setInputPaths(job, new Path(args[ 0 ]));
FileOutputFormat.setOutputPath(job, new Path(args[ 1 ]));
job.setOutputKeyClass(Entry. class );
job.setOutputValueClass(Text. class );
job.setMapperClass(SecondarySortMapper. class );
job.setReducerClass(SecondarySortReducer. class );
job.setPartitionerClass(EntryPartitioner. class );
job.setGroupingComparatorClass(EntryGroupingComparator. class );

关键看上面第12-15行的代码。下面是运行这个程序的方法和结果:

[root@iteblog.com /hadoop ] # bin/hadoop jar /tmp/iteblog-1.0-SNAPSHOT.jar 
     com.iteblog.Main  /iteblog/data .txt /iteblog/output
[root@iteblog.com /hadoop ] # bin/hadoop fs -cat /iteblog/output/pa*
2014-2  64,
2015-1  3,4,21,24,
2015-2  -43,0,35,
2015-3  46,56,
2015-4  5,
明天我将使用Spark来解决这个问题,敬请关注本博客。
本博客文章除特别声明,全部都是原创!
尊重原创,转载请注明: 转载自过往记忆(http://www.iteblog.com/)

这篇关于HadoopSpark解决二次排序问题(Hadoop篇)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1007543

相关文章

SQL Server配置管理器无法打开的四种解决方法

《SQLServer配置管理器无法打开的四种解决方法》本文总结了SQLServer配置管理器无法打开的四种解决方法,文中通过图文示例介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的... 目录方法一:桌面图标进入方法二:运行窗口进入检查版本号对照表php方法三:查找文件路径方法四:检查 S

怎样通过分析GC日志来定位Java进程的内存问题

《怎样通过分析GC日志来定位Java进程的内存问题》:本文主要介绍怎样通过分析GC日志来定位Java进程的内存问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、GC 日志基础配置1. 启用详细 GC 日志2. 不同收集器的日志格式二、关键指标与分析维度1.

Java 线程安全与 volatile与单例模式问题及解决方案

《Java线程安全与volatile与单例模式问题及解决方案》文章主要讲解线程安全问题的五个成因(调度随机、变量修改、非原子操作、内存可见性、指令重排序)及解决方案,强调使用volatile关键字... 目录什么是线程安全线程安全问题的产生与解决方案线程的调度是随机的多个线程对同一个变量进行修改线程的修改操

Redis出现中文乱码的问题及解决

《Redis出现中文乱码的问题及解决》:本文主要介绍Redis出现中文乱码的问题及解决,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录1. 问题的产生2China编程. 问题的解决redihttp://www.chinasem.cns数据进制问题的解决中文乱码问题解决总结

Golang如何对cron进行二次封装实现指定时间执行定时任务

《Golang如何对cron进行二次封装实现指定时间执行定时任务》:本文主要介绍Golang如何对cron进行二次封装实现指定时间执行定时任务问题,具有很好的参考价值,希望对大家有所帮助,如有错误... 目录背景cron库下载代码示例【1】结构体定义【2】定时任务开启【3】使用示例【4】控制台输出总结背景

全面解析MySQL索引长度限制问题与解决方案

《全面解析MySQL索引长度限制问题与解决方案》MySQL对索引长度设限是为了保持高效的数据检索性能,这个限制不是MySQL的缺陷,而是数据库设计中的权衡结果,下面我们就来看看如何解决这一问题吧... 目录引言:为什么会有索引键长度问题?一、问题根源深度解析mysql索引长度限制原理实际场景示例二、五大解决

Springboot如何正确使用AOP问题

《Springboot如何正确使用AOP问题》:本文主要介绍Springboot如何正确使用AOP问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录​一、AOP概念二、切点表达式​execution表达式案例三、AOP通知四、springboot中使用AOP导出

Python中Tensorflow无法调用GPU问题的解决方法

《Python中Tensorflow无法调用GPU问题的解决方法》文章详解如何解决TensorFlow在Windows无法识别GPU的问题,需降级至2.10版本,安装匹配CUDA11.2和cuDNN... 当用以下代码查看GPU数量时,gpuspython返回的是一个空列表,说明tensorflow没有找到

解决未解析的依赖项:‘net.sf.json-lib:json-lib:jar:2.4‘问题

《解决未解析的依赖项:‘net.sf.json-lib:json-lib:jar:2.4‘问题》:本文主要介绍解决未解析的依赖项:‘net.sf.json-lib:json-lib:jar:2.4... 目录未解析的依赖项:‘net.sf.json-lib:json-lib:jar:2.4‘打开pom.XM

XML重复查询一条Sql语句的解决方法

《XML重复查询一条Sql语句的解决方法》文章分析了XML重复查询与日志失效问题,指出因DTO缺少@Data注解导致日志无法格式化、空指针风险及参数穿透,进而引发性能灾难,解决方案为在Controll... 目录一、核心问题:从SQL重复执行到日志失效二、根因剖析:DTO断裂引发的级联故障三、解决方案:修复