HadoopSpark解决二次排序问题(Hadoop篇)

2024-05-27 12:58

本文主要是介绍HadoopSpark解决二次排序问题(Hadoop篇),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

问题描述

  二次排序就是对每一个key对应的value进行排序,也就是对MapReduce的输出(KEY, Value(v1,v2,v3,......,vn))中的Value(v1,v2,v3,......,vn)值进行排序(升序或者降序),使得Value(s1,s2,s3,......,sn),si ∈ (v1,v2,v3,......,vn)且s1 < s2 < s3 < ...... < sn。假设我们有以下输入文件(逗号分割的分别是年,月,总数):

[root@iteblog.com /tmp ] # vim data.txt
2015,1,24
2015,3,56
2015,1,3
2015,2,-43
2015,4,5
2015,3,46
2014,2,64
2015,1,4
2015,1,21
2015,2,35
2015,2,0

我们期望的输出结果是

2014-2  64
2015-1  3,4,21,24
2015-2  -43,0,35
2015-3  46,56
2015-4  5

  但是Hadoop默认的输出结果只能对Key进行排序,其中Value中的值次序是不定的;也就是说,Hadoop默认的输出可能如下:

2014-2  64
2015-1  21,4,3,24
2015-2  0,35,-43
2015-3  46,56
2015-4  5

解决方案

  针对这个问题我们有两种方法来解决:(1)、将每个Key对应的Value全部存储到内存(这个只会存储到单台机器),然后对这些Value进行相应的排序。但是如果Value的数据量非常大,导致单台内存无法存储这些数据,这将会导致程序出现java.lang.OutOfMemoryError,所以这个方法不是很通用。(2)、这种方法将Value中的值和旧的Key组成一个新的Key,这样我们就可以利用Reduce来排序这个Key,其生成的结果就是我们需要的。过程如下:
  1、原始的键值对是(k,v)。这里的k就是就的key,也可以 称为natural key;
  2、我们可以将k和v组合成新的key(可以称为composite key),也就是((k,v), v)
  3、自定义分区函数,将k相同的键值对发送到同一个Reduce中;
  4、自定义分组函数,将k相同的键值对当作一个分组。
  文字比较枯燥,我们来看看下面实例:
  1、原始数据是

[root@iteblog.com /tmp ] # vim data.txt
2015,1,24
2015,3,56
2015,1,3
2015,2,-43
2015,4,5
2015,3,46
2014,2,64
2015,1,4
2015,1,21
2015,2,35
2015,2,0

我们将年、月组成key(natural key),总数作为value,结果变成:

(2015-1,24)
(2015-3,56)
(2015-1,3)
(2015-2,-43)
(2015-4,5)
(2015-3,46)
(2014-2,64)
(2015-1,4)
(2015-1,21)
(2015-2,35)
(2015-2,0)

  2、将value和key(natural key)组成新的key(composite key),如下:

((2015-1,24),24)
((2015-3,56),56)
((2015-1,3),3)
((2015-2,-43),-43)
((2015-4,5),5)
((2015-3,46),46)
((2014-2,64),64)
((2015-1,4),4)
((2015-1,21),21)
((2015-2,35),35)
((2015-2,0),0)

  3、自定义分区函数,将k相同的键值对发送到同一个Reduce中,结果如下:

[((2014-2,64),64)]
[((2015-1,24),24),((2015-1,3),3),((2015-1,4),4),((2015-1,21),21)]
[((2015-2,-43),-43),((2015-2,0),0),((2015-2,35),35)]
[((2015-3,56),56),((2015-3,46),46)]
[((2015-4,5),5)]

  4、自定义组排序函数,结果如下:

[((2014-2,64),64)]
[((2015-1,3),3),((2015-1,4),4),((2015-1,21),21),((2015-1,24),24)]
[((2015-2,-43),-43),((2015-2,0),0),((2015-2,35),35)]
[((2015-3,46),46),((2015-3,56),56)]
[((2015-4,5),5)]

  5、自定义分组函数,结果如下:

((2014-2,64),(64))
((2015-1,24),(3,4,21,24))
((2015-2,35),(-43,0,35))
((2015-3,56),(46,56))
((2015-4,5),(5))

  6、最后输出的结果就是我们要的:

2014-2  64
2015-1  3,4,21,24
2015-2  -43,0,35
2015-3  46,56
2015-4  5

代码实例

  下面将贴出使用MapReduce解决这个问题的代码:

package com.iteblog;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
  * User: 过往记忆
  * Date: 2015-08-05
  * Time: 下午23:49
  * bolg: http://www.iteblog.com
  * 本文地址:http://www.iteblog.com/archives/1415
  * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
  * 过往记忆博客微信公共帐号:iteblog_hadoop
  */
public class Entry implements WritableComparable<Entry> {
     private String yearMonth;
     private int count;
     public Entry() {
     }
     @Override
     public int compareTo(Entry entry) {
         int result = this .yearMonth.compareTo(entry.getYearMonth());
         if (result == 0 ) {
             result = compare(count, entry.getCount());
         }
         return result;
     }
     @Override
     public void write(DataOutput dataOutput) throws IOException {
         dataOutput.writeUTF(yearMonth);
         dataOutput.writeInt(count);
     }
     @Override
     public void readFields(DataInput dataInput) throws IOException {
         this .yearMonth = dataInput.readUTF();
         this .count = dataInput.readInt();
     }
     public String getYearMonth() {
         return yearMonth;
     }
     public void setYearMonth(String yearMonth) {
         this .yearMonth = yearMonth;
     }
     public int getCount() {
         return count;
     }
     public void setCount( int count) {
         this .count = count;
     }
     public static int compare( int a, int b) {
         return a < b ? - 1 : (a > b ? 1 : 0 );
     }
     @Override
     public String toString() {
         return yearMonth;
     }
}

  上面就是将旧的Key(natural key)和Value组合成新的Key(composite key)的代码,接下来看下自定义的分区类:

package com.iteblog;
import org.apache.hadoop.mapreduce.Partitioner;
public class EntryPartitioner extends Partitioner<Entry, Integer> {
     @Override
     public int getPartition(Entry entry, Integer integer, int numberPartitions) {
         return Math.abs((entry.getYearMonth().hashCode() % numberPartitions));
     }
}

  这个类使得natural key相同的数据分派到同一个Reduce中。然后看下自定义分组类:

package com.iteblog;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
/**
  * User: 过往记忆
  * Date: 2015-08-05
  * Time: 下午23:49
  * bolg: http://www.iteblog.com
  * 本文地址:http://www.iteblog.com/archives/1415
  * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
  * 过往记忆博客微信公共帐号:iteblog_hadoop
  */
public class EntryGroupingComparator extends WritableComparator {
     public EntryGroupingComparator() {
         super (Entry. class , true );
     }
     @Override
     public int compare(WritableComparable a, WritableComparable b) {
         Entry a1 = (Entry) a;
         Entry b1 = (Entry) b;
         return a1.getYearMonth().compareTo(b1.getYearMonth());
     }
}

  只要是natural key相同,我们就认为是同一个分组,这样Reduce内部才可以对Value中的值进行排序。接下来看下Map类

public class SecondarySortMapper extends Mapper<LongWritable, Text, Entry, Text> {
     private Entry entry = new Entry();
     private Text value = new Text();
     @Override
     protected void map(LongWritable key, Text lines, Context context)
             throws IOException, InterruptedException {
         String line = lines.toString();
         String[] tokens = line.split( "," );
         // YYYY = tokens[0]
         // MM = tokens[1]
         // count = tokens[2]
         String yearMonth = tokens[ 0 ] + "-" + tokens[ 1 ];
         int count = Integer.parseInt(tokens[ 2 ]);
         entry.setYearMonth(yearMonth);
         entry.setCount(count);
         value.set(tokens[ 2 ]);
         context.write(entry, value);
     }
}

  其实就是解析每一行的数据,然后将旧的Key(natural key)和Value组合成新的Key(composite key)。接下来看下Reduce类实现

public class SecondarySortReducer extends Reducer<Entry, Text, Entry, Text> {
     @Override
     protected void reduce(Entry key, Iterable<Text> values, Context context)
             throws IOException, InterruptedException {
         StringBuilder builder = new StringBuilder();
         for (Text value : values) {
             builder.append(value.toString());
             builder.append( "," );
         }
         context.write(key, new Text(builder.toString()));
     }
}

builder存储的就是排序好的Value序列,最后来看看启动程序的使用:

Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(Iteblog. class );
job.setJobName( "SecondarySort" );
FileInputFormat.setInputPaths(job, new Path(args[ 0 ]));
FileOutputFormat.setOutputPath(job, new Path(args[ 1 ]));
job.setOutputKeyClass(Entry. class );
job.setOutputValueClass(Text. class );
job.setMapperClass(SecondarySortMapper. class );
job.setReducerClass(SecondarySortReducer. class );
job.setPartitionerClass(EntryPartitioner. class );
job.setGroupingComparatorClass(EntryGroupingComparator. class );

关键看上面第12-15行的代码。下面是运行这个程序的方法和结果:

[root@iteblog.com /hadoop ] # bin/hadoop jar /tmp/iteblog-1.0-SNAPSHOT.jar 
     com.iteblog.Main  /iteblog/data .txt /iteblog/output
[root@iteblog.com /hadoop ] # bin/hadoop fs -cat /iteblog/output/pa*
2014-2  64,
2015-1  3,4,21,24,
2015-2  -43,0,35,
2015-3  46,56,
2015-4  5,
明天我将使用Spark来解决这个问题,敬请关注本博客。
本博客文章除特别声明,全部都是原创!
尊重原创,转载请注明: 转载自过往记忆(http://www.iteblog.com/)

这篇关于HadoopSpark解决二次排序问题(Hadoop篇)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1007543

相关文章

Vue3绑定props默认值问题

《Vue3绑定props默认值问题》使用Vue3的defineProps配合TypeScript的interface定义props类型,并通过withDefaults设置默认值,使组件能安全访问传入的... 目录前言步骤步骤1:使用 defineProps 定义 Props步骤2:设置默认值总结前言使用T

504 Gateway Timeout网关超时的根源及完美解决方法

《504GatewayTimeout网关超时的根源及完美解决方法》在日常开发和运维过程中,504GatewayTimeout错误是常见的网络问题之一,尤其是在使用反向代理(如Nginx)或... 目录引言为什么会出现 504 错误?1. 探索 504 Gateway Timeout 错误的根源 1.1 后端

Web服务器-Nginx-高并发问题

《Web服务器-Nginx-高并发问题》Nginx通过事件驱动、I/O多路复用和异步非阻塞技术高效处理高并发,结合动静分离和限流策略,提升性能与稳定性... 目录前言一、架构1. 原生多进程架构2. 事件驱动模型3. IO多路复用4. 异步非阻塞 I/O5. Nginx高并发配置实战二、动静分离1. 职责2

解决升级JDK报错:module java.base does not“opens java.lang.reflect“to unnamed module问题

《解决升级JDK报错:modulejava.basedoesnot“opensjava.lang.reflect“tounnamedmodule问题》SpringBoot启动错误源于Jav... 目录问题描述原因分析解决方案总结问题描述启动sprintboot时报以下错误原因分析编程异js常是由Ja

深度剖析SpringBoot日志性能提升的原因与解决

《深度剖析SpringBoot日志性能提升的原因与解决》日志记录本该是辅助工具,却为何成了性能瓶颈,SpringBoot如何用代码彻底破解日志导致的高延迟问题,感兴趣的小伙伴可以跟随小编一起学习一下... 目录前言第一章:日志性能陷阱的底层原理1.1 日志级别的“双刃剑”效应1.2 同步日志的“吞吐量杀手”

MySQL 表空却 ibd 文件过大的问题及解决方法

《MySQL表空却ibd文件过大的问题及解决方法》本文给大家介绍MySQL表空却ibd文件过大的问题及解决方法,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考... 目录一、问题背景:表空却 “吃满” 磁盘的怪事二、问题复现:一步步编程还原异常场景1. 准备测试源表与数据

解决Nginx启动报错Job for nginx.service failed because the control process exited with error code问题

《解决Nginx启动报错Jobfornginx.servicefailedbecausethecontrolprocessexitedwitherrorcode问题》Nginx启... 目录一、报错如下二、解决原因三、解决方式总结一、报错如下Job for nginx.service failed bec

SysMain服务可以关吗? 解决SysMain服务导致的高CPU使用率问题

《SysMain服务可以关吗?解决SysMain服务导致的高CPU使用率问题》SysMain服务是超级预读取,该服务会记录您打开应用程序的模式,并预先将它们加载到内存中以节省时间,但它可能占用大量... 在使用电脑的过程中,CPU使用率居高不下是许多用户都遇到过的问题,其中名为SysMain的服务往往是罪魁

MySQ中出现幻读问题的解决过程

《MySQ中出现幻读问题的解决过程》文章解析MySQLInnoDB通过MVCC与间隙锁机制在可重复读隔离级别下解决幻读,确保事务一致性,同时指出性能影响及乐观锁等替代方案,帮助开发者优化数据库应用... 目录一、幻读的准确定义与核心特征幻读 vs 不可重复读二、mysql隔离级别深度解析各隔离级别的实现差异

C++ vector越界问题的完整解决方案

《C++vector越界问题的完整解决方案》在C++开发中,std::vector作为最常用的动态数组容器,其便捷性与性能优势使其成为处理可变长度数据的首选,然而,数组越界访问始终是威胁程序稳定性的... 目录引言一、vector越界的底层原理与危害1.1 越界访问的本质原因1.2 越界访问的实际危害二、基