hadoop入门7:自定义GroupingComparator进行分组

2024-06-07 12:32

本文主要是介绍hadoop入门7:自定义GroupingComparator进行分组,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

摘要:

GroupingComparator是在reduce阶段分组来使用的,由于reduce阶段,如果key相同的一组,只取第一个key作为key,迭代所有的values。 如果reduce的key是自定义的bean,我们只需要bean里面的某个属性相同就认为这样的key是相同的,这是我们就需要之定义GroupCoparator来“欺骗”reduce了。 我们需要理清楚的还有map阶段你的几个自定义: parttioner中的getPartition()这个是map阶段自定义分区, bean中定义CopmareTo()是在溢出和merge时用来来排序的。 

demo数据:

订单id            金额     产品名称

order_234578,4789,笔记本
order_123456,7789,笔记本
order_123456,1789,手机
order_234578,4789,手机
order_123456,3789,笔记本
order_00001,4789,笔记本
order_00002,7789,笔记本
order_00001,5789,洗衣机
order_00002,17789,服务器

根据上面的订单信息需要求出每一个订单中成交金额最大的一笔交易。

设计思路:

1、利用“订单id和金额”作为key,可以将map阶段读取到的所有订单数据按照id分区,按照金额排序,发送到reduce

2、在reduce端利用groupingcomparator将订单id相同的kv聚合成组,然后取第一个即是最大值

groupingcomparator代码:

package com.zsy.mr.groupingcomparator;import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;public class ItemIdGroupingComparator extends WritableComparator {protected ItemIdGroupingComparator() {super(OrderBean.class,true);}@SuppressWarnings("rawtypes")@Overridepublic int compare(WritableComparable a, WritableComparable b) {OrderBean aBean = (OrderBean)a;OrderBean bOrderBean = (OrderBean)b;return aBean.getItemId().compareTo(bOrderBean.getItemId());}
}

Partitioner代码:

package com.zsy.mr.groupingcomparator;import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Partitioner;public class ItemIdPartitioner extends Partitioner<OrderBean, NullWritable> {//相同的id会发往相同的partitioner,产生的分区数是根据用户设置的reducetask数保持一致,即numReduceTasks数是用户在设置的数字@Overridepublic int getPartition(OrderBean key, NullWritable value, int numReduceTasks) {return (key.getItemId().hashCode() & Integer.MAX_VALUE) % numReduceTasks;}}

OrderBean代码:

package com.zsy.mr.groupingcomparator;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;import org.apache.hadoop.io.WritableComparable;public class OrderBean implements WritableComparable<OrderBean> {private String itemId;private String productName;private Float price;@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(itemId);out.writeUTF(productName);out.writeFloat(price);}@Overridepublic void readFields(DataInput in) throws IOException {this.itemId = in.readUTF();this.productName = in.readUTF();this.price = in.readFloat();}@Overridepublic int compareTo(OrderBean o) {// 如果订单号相同,在进行价格比较int result = this.itemId.compareTo(o.getItemId());if (result == 0) {result = -this.price.compareTo(o.price);}return result;}public String getItemId() {return itemId;}public void setItemId(String itemId) {this.itemId = itemId;}public String getProductName() {return productName;}public void setProductName(String productName) {this.productName = productName;}public float getPrice() {return price;}public void setPrice(float price) {this.price = price;}@Overridepublic String toString() {return "itemId=" + itemId + ", productName=" + productName + ", price=" + price;}}

GroupingCommparatorSort代码:

package com.zsy.mr.groupingcomparator;import java.io.IOException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import com.zsy.mr.groupingcomparator.GroupingCommparatorSort.GroupingCommparatorSortMapper.GroupingCommparatorSortReducer;public class GroupingCommparatorSort {static class GroupingCommparatorSortMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable> {OrderBean orderBean = new OrderBean();@Overrideprotected void map(LongWritable key, Text value,Mapper<LongWritable, Text, OrderBean, NullWritable>.Context context)throws IOException, InterruptedException {String[] str = value.toString().split(",");orderBean.setItemId(str[0]);orderBean.setPrice(Float.parseFloat(str[1]));orderBean.setProductName(str[2]);context.write(orderBean, NullWritable.get());}static class GroupingCommparatorSortReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable> {@Overrideprotected void reduce(OrderBean arg0, Iterable<NullWritable> arg1,Reducer<OrderBean, NullWritable, OrderBean, NullWritable>.Context context)throws IOException, InterruptedException {context.write(arg0, NullWritable.get());}}}/*** main:(这里用一句话描述这个方法的作用).* * @author zhaoshouyun* @param args* @since 1.0*/public static void main(String[] args) throws Exception {Configuration conf = new Configuration();/** conf.set("mapreduce.framework.name", "yarn");* conf.set("yarn.resoucemanger.hostname", "hadoop01");*/Job job = Job.getInstance(conf);job.setJarByClass(GroupingCommparatorSort.class);// 指定本业务job要使用的业务类job.setMapperClass(GroupingCommparatorSortMapper.class);job.setReducerClass(GroupingCommparatorSortReducer.class);// 指定mapper输出的k v类型 如果map的输出和reduce的输出一样,只需要设置输出即可// job.setMapOutputKeyClass(Text.class);// job.setMapOutputValueClass(FlowBean.class);// 指定最终输出kv类型(reduce输出类型)job.setOutputKeyClass(OrderBean.class);job.setOutputValueClass(NullWritable.class);// 指定job的输入文件所在目录FileInputFormat.setInputPaths(job, new Path(args[0]));// 指定job的输出结果目录FileOutputFormat.setOutputPath(job, new Path(args[1]));// 设置setGroupingComparatorClassjob.setGroupingComparatorClass(ItemIdGroupingComparator.class);// 设置自定义的setPartitionerClassjob.setPartitionerClass(ItemIdPartitioner.class);// 设置reducetask任务数为2job.setNumReduceTasks(2);// 将job中配置的相关参数,以及job所有的java类所在 的jar包,提交给yarn去运行// job.submit();无结果返回,建议不使用它boolean res = job.waitForCompletion(true);System.exit(res ? 0 : 1);}
}

运行结果-part-00000:

运行结果-part-00001:

这篇关于hadoop入门7:自定义GroupingComparator进行分组的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1039183

相关文章

springboot自定义注解RateLimiter限流注解技术文档详解

《springboot自定义注解RateLimiter限流注解技术文档详解》文章介绍了限流技术的概念、作用及实现方式,通过SpringAOP拦截方法、缓存存储计数器,结合注解、枚举、异常类等核心组件,... 目录什么是限流系统架构核心组件详解1. 限流注解 (@RateLimiter)2. 限流类型枚举 (

SpringBoot 异常处理/自定义格式校验的问题实例详解

《SpringBoot异常处理/自定义格式校验的问题实例详解》文章探讨SpringBoot中自定义注解校验问题,区分参数级与类级约束触发的异常类型,建议通过@RestControllerAdvice... 目录1. 问题简要描述2. 异常触发1) 参数级别约束2) 类级别约束3. 异常处理1) 字段级别约束

Spring Boot 与微服务入门实战详细总结

《SpringBoot与微服务入门实战详细总结》本文讲解SpringBoot框架的核心特性如快速构建、自动配置、零XML与微服务架构的定义、演进及优缺点,涵盖开发环境准备和HelloWorld实战... 目录一、Spring Boot 核心概述二、微服务架构详解1. 微服务的定义与演进2. 微服务的优缺点三

从入门到精通详解LangChain加载HTML内容的全攻略

《从入门到精通详解LangChain加载HTML内容的全攻略》这篇文章主要为大家详细介绍了如何用LangChain优雅地处理HTML内容,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录引言:当大语言模型遇见html一、HTML加载器为什么需要专门的HTML加载器核心加载器对比表二

一文解密Python进行监控进程的黑科技

《一文解密Python进行监控进程的黑科技》在计算机系统管理和应用性能优化中,监控进程的CPU、内存和IO使用率是非常重要的任务,下面我们就来讲讲如何Python写一个简单使用的监控进程的工具吧... 目录准备工作监控CPU使用率监控内存使用率监控IO使用率小工具代码整合在计算机系统管理和应用性能优化中,监

如何使用Lombok进行spring 注入

《如何使用Lombok进行spring注入》本文介绍如何用Lombok简化Spring注入,推荐优先使用setter注入,通过注解自动生成getter/setter及构造器,减少冗余代码,提升开发效... Lombok为了开发环境简化代码,好处不用多说。spring 注入方式为2种,构造器注入和setter

从入门到进阶讲解Python自动化Playwright实战指南

《从入门到进阶讲解Python自动化Playwright实战指南》Playwright是针对Python语言的纯自动化工具,它可以通过单个API自动执行Chromium,Firefox和WebKit... 目录Playwright 简介核心优势安装步骤观点与案例结合Playwright 核心功能从零开始学习

MySQL进行数据库审计的详细步骤和示例代码

《MySQL进行数据库审计的详细步骤和示例代码》数据库审计通过触发器、内置功能及第三方工具记录和监控数据库活动,确保安全、完整与合规,Java代码实现自动化日志记录,整合分析系统提升监控效率,本文给大... 目录一、数据库审计的基本概念二、使用触发器进行数据库审计1. 创建审计表2. 创建触发器三、Java

MySQL深分页进行性能优化的常见方法

《MySQL深分页进行性能优化的常见方法》在Web应用中,分页查询是数据库操作中的常见需求,然而,在面对大型数据集时,深分页(deeppagination)却成为了性能优化的一个挑战,在本文中,我们将... 目录引言:深分页,真的只是“翻页慢”那么简单吗?一、背景介绍二、深分页的性能问题三、业务场景分析四、

SpringBoot结合Docker进行容器化处理指南

《SpringBoot结合Docker进行容器化处理指南》在当今快速发展的软件工程领域,SpringBoot和Docker已经成为现代Java开发者的必备工具,本文将深入讲解如何将一个SpringBo... 目录前言一、为什么选择 Spring Bootjavascript + docker1. 快速部署与