Hadoop案例(八)辅助排序和二次排序案例(GroupingComparator)

2023-11-01 17:40

本文主要是介绍Hadoop案例(八)辅助排序和二次排序案例(GroupingComparator),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

辅助排序和二次排序案例(GroupingComparator)

1.需求

有如下订单数据

订单id

商品id

成交金额

0000001

Pdt_01

222.8

0000001

Pdt_05

25.8

0000002

Pdt_03

522.8

0000002

Pdt_04

122.4

0000002

Pdt_05

722.4

0000003

Pdt_01

222.8

0000003

Pdt_02

33.8

现在需要求出每一个订单中最贵的商品。

2.数据准备

GroupingComparator.txt

   Pdt_01    222.8Pdt_05    722.4Pdt_05    25.8Pdt_01    222.8Pdt_01    33.8Pdt_03    522.8Pdt_04    122.4

输出数据预期:

3    222.8
part-r-00000.txt
2    722.4
part-r-00001.txt
1    222.8
part-r-00002.txt

3.分析

(1)利用“订单id和成交金额”作为key,可以将map阶段读取到的所有订单数据按照id分区,按照金额排序,发送到reduce。

(2)在reduce端利用groupingcomparator将订单id相同的kv聚合成组,然后取第一个即是最大值。

 

4.实现

定义订单信息OrderBean

package com.xyg.mapreduce.order;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;public class OrderBean implements WritableComparable<OrderBean> {private int order_id; // 订单id号private double price; // 价格public OrderBean() {super();}public OrderBean(int order_id, double price) {super();this.order_id = order_id;this.price = price;}@Overridepublic void write(DataOutput out) throws IOException {out.writeInt(order_id);out.writeDouble(price);}@Overridepublic void readFields(DataInput in) throws IOException {order_id = in.readInt();price = in.readDouble();}@Overridepublic String toString() {return order_id + "\t" + price;}public int getOrder_id() {return order_id;}public void setOrder_id(int order_id) {this.order_id = order_id;}public double getPrice() {return price;}public void setPrice(double price) {this.price = price;}// 二次排序
    @Overridepublic int compareTo(OrderBean o) {int result = order_id > o.getOrder_id() ? 1 : -1;if (order_id > o.getOrder_id()) {result = 1;} else if (order_id < o.getOrder_id()) {result = -1;} else {// 价格倒序排序result = price > o.getPrice() ? -1 : 1;}return result;}
}

编写OrderSortMapper处理流程

package com.xyg.mapreduce.order;
import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;public class OrderMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable> {OrderBean k = new OrderBean();@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1 获取一行String line = value.toString();// 2 截取String[] fields = line.split("\t"); // 3 封装对象 k.setOrder_id(Integer.parseInt(fields[0])); k.setPrice(Double.parseDouble(fields[2])); // 4 写出 context.write(k, NullWritable.get()); } }

编写OrderSortReducer处理流程

package com.xyg.mapreduce.order;
import java.io.IOException; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Reducer;public class OrderReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable> {@Overrideprotected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { context.write(key, NullWritable.get());} }

编写OrderSortDriver处理流程

package com.xyg.mapreduce.order;import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class OrderDriver {public static void main(String[] args) throws Exception, IOException {// 1 获取配置信息Configuration conf = new Configuration();Job job = Job.getInstance(conf);// 2 设置jar包加载路径job.setJarByClass(OrderDriver.class);// 3 加载map/reduce类job.setMapperClass(OrderMapper.class);job.setReducerClass(OrderReducer.class);// 4 设置map输出数据key和value类型job.setMapOutputKeyClass(OrderBean.class);job.setMapOutputValueClass(NullWritable.class);// 5 设置最终输出数据的key和value类型job.setOutputKeyClass(OrderBean.class);job.setOutputValueClass(NullWritable.class);// 6 设置输入数据和输出数据路径FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));// 10 设置reduce端的分组job.setGroupingComparatorClass(OrderGroupingComparator.class);// 7 设置分区job.setPartitionerClass(OrderPartitioner.class);// 8 设置reduce个数job.setNumReduceTasks(3);// 9 提交boolean result = job.waitForCompletion(true);System.exit(result ? 0 : 1);}
}OrderSortDriver

编写OrderSortPartitioner处理流程

package com.xyg.mapreduce.order;
import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Partitioner;public class OrderPartitioner extends Partitioner<OrderBean, NullWritable> {@Overridepublic int getPartition(OrderBean key, NullWritable value, int numReduceTasks) { return (key.getOrder_id() & Integer.MAX_VALUE) % numReduceTasks;} }

编写OrderSortGroupingComparator处理流程

package com.xyg.mapreduce.order;
import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator;public class OrderGroupingComparator extends WritableComparator {protected OrderGroupingComparator() {super(OrderBean.class, true);}@SuppressWarnings("rawtypes")@Overridepublic int compare(WritableComparable a, WritableComparable b) { OrderBean aBean = (OrderBean) a; OrderBean bBean = (OrderBean) b; int result; if (aBean.getOrder_id() > bBean.getOrder_id()) { result = 1; } else if (aBean.getOrder_id() < bBean.getOrder_id()) { result = -1; } else { result = 0; } return result; } }

转载于:https://www.cnblogs.com/frankdeng/p/9256249.html

这篇关于Hadoop案例(八)辅助排序和二次排序案例(GroupingComparator)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/324640

相关文章

RabbitMQ消费端单线程与多线程案例讲解

《RabbitMQ消费端单线程与多线程案例讲解》文章解析RabbitMQ消费端单线程与多线程处理机制,说明concurrency控制消费者数量,max-concurrency控制最大线程数,prefe... 目录 一、基础概念详细解释:举个例子:✅ 单消费者 + 单线程消费❌ 单消费者 + 多线程消费❌ 多

Olingo分析和实践之EDM 辅助序列化器详解(最佳实践)

《Olingo分析和实践之EDM辅助序列化器详解(最佳实践)》EDM辅助序列化器是ApacheOlingoOData框架中无需完整EDM模型的智能序列化工具,通过运行时类型推断实现灵活数据转换,适用... 目录概念与定义什么是 EDM 辅助序列化器?核心概念设计目标核心特点1. EDM 信息可选2. 智能类

MySql基本查询之表的增删查改+聚合函数案例详解

《MySql基本查询之表的增删查改+聚合函数案例详解》本文详解SQL的CURD操作INSERT用于数据插入(单行/多行及冲突处理),SELECT实现数据检索(列选择、条件过滤、排序分页),UPDATE... 目录一、Create1.1 单行数据 + 全列插入1.2 多行数据 + 指定列插入1.3 插入否则更

Python通用唯一标识符模块uuid使用案例详解

《Python通用唯一标识符模块uuid使用案例详解》Pythonuuid模块用于生成128位全局唯一标识符,支持UUID1-5版本,适用于分布式系统、数据库主键等场景,需注意隐私、碰撞概率及存储优... 目录简介核心功能1. UUID版本2. UUID属性3. 命名空间使用场景1. 生成唯一标识符2. 数

PostgreSQL的扩展dict_int应用案例解析

《PostgreSQL的扩展dict_int应用案例解析》dict_int扩展为PostgreSQL提供了专业的整数文本处理能力,特别适合需要精确处理数字内容的搜索场景,本文给大家介绍PostgreS... 目录PostgreSQL的扩展dict_int一、扩展概述二、核心功能三、安装与启用四、字典配置方法

Python中re模块结合正则表达式的实际应用案例

《Python中re模块结合正则表达式的实际应用案例》Python中的re模块是用于处理正则表达式的强大工具,正则表达式是一种用来匹配字符串的模式,它可以在文本中搜索和匹配特定的字符串模式,这篇文章主... 目录前言re模块常用函数一、查看文本中是否包含 A 或 B 字符串二、替换多个关键词为统一格式三、提

Golang如何对cron进行二次封装实现指定时间执行定时任务

《Golang如何对cron进行二次封装实现指定时间执行定时任务》:本文主要介绍Golang如何对cron进行二次封装实现指定时间执行定时任务问题,具有很好的参考价值,希望对大家有所帮助,如有错误... 目录背景cron库下载代码示例【1】结构体定义【2】定时任务开启【3】使用示例【4】控制台输出总结背景

Python get()函数用法案例详解

《Pythonget()函数用法案例详解》在Python中,get()是字典(dict)类型的内置方法,用于安全地获取字典中指定键对应的值,它的核心作用是避免因访问不存在的键而引发KeyError错... 目录简介基本语法一、用法二、案例:安全访问未知键三、案例:配置参数默认值简介python是一种高级编

MySQL中的索引结构和分类实战案例详解

《MySQL中的索引结构和分类实战案例详解》本文详解MySQL索引结构与分类,涵盖B树、B+树、哈希及全文索引,分析其原理与优劣势,并结合实战案例探讨创建、管理及优化技巧,助力提升查询性能,感兴趣的朋... 目录一、索引概述1.1 索引的定义与作用1.2 索引的基本原理二、索引结构详解2.1 B树索引2.2

一文详解Java Stream的sorted自定义排序

《一文详解JavaStream的sorted自定义排序》Javastream中的sorted方法是用于对流中的元素进行排序的方法,它可以接受一个comparator参数,用于指定排序规则,sorte... 目录一、sorted 操作的基础原理二、自定义排序的实现方式1. Comparator 接口的 Lam