hadoop入门3:MR实现Join逻辑

2024-06-07 12:32
文章标签 实现 入门 逻辑 mr join hadoop

本文主要是介绍hadoop入门3:MR实现Join逻辑,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

如果看详细的代码模板,请看我的hadoop入门1里有详细的模板,也有详细的解释

今天用两组数据进行join;其实数据很简单,

订单表:                                    

id     日期        产品id   数量         

1001 20180923 a001 2
1002 20180923 a002 1
1003 20180923 a001 3
1004 20180923 a003 1
1005 20180923 a003 2

产品表:

产品id  产品名称  分类id  价格

a001 华为手机 1000 2799
a002 惠普笔记本 1000 8799
a003 苹果平板 1000 5799

需求是:需要这两张表进行关联一张表;在SQL中很简单:select * from order o left join product t on o.pid = t.id;

用hadoop实现:具体请看代码:

1、创建关联表的映射类:

/*** Project Name:hadoopMapReduce* File Name:InfoBean.java* Package Name:com.zsy.mr.rjoin* Date:2018年9月23日下午5:17:59* Copyright (c) 2018, zhaoshouyun All Rights Reserved.*
*/package com.zsy.mr.rjoin;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;import org.apache.hadoop.io.WritableComparable;public class InfoBean implements WritableComparable<InfoBean> {private int orderId;private String dateString;private String pId;private int amount;private String pName;private int categoryId ;private float price;private String flag ;//0订单   1商品@Overridepublic void readFields(DataInput input) throws IOException {this.orderId =  input.readInt();this.dateString = input.readUTF();this.pId = input.readUTF();this.amount = input.readInt();this.pName = input.readUTF();this.categoryId = input.readInt();this.price = input.readFloat();this.flag = input.readUTF();		}/*** private int orderId;private String dateString;private int pId;private int amount;private String pName;private int categoryId ;private float price;*/@Overridepublic void write(DataOutput output) throws IOException {output.writeInt(orderId);output.writeUTF(dateString);output.writeUTF(pId);output.writeInt(amount);output.writeUTF(pName);output.writeInt(categoryId);output.writeFloat(price);output.writeUTF(flag);}@Overridepublic int compareTo(InfoBean o) {return this.price > o.price ? -1 : 1;}public int getOrderId() {return orderId;}public void setOrderId(int orderId) {this.orderId = orderId;}public String getDateString() {return dateString;}public void setDateString(String dateString) {this.dateString = dateString;}public int getAmount() {return amount;}public void setAmount(int amount) {this.amount = amount;}public String getpId() {return pId;}public void setpId(String pId) {this.pId = pId;}public String getpName() {return pName;}public void setpName(String pName) {this.pName = pName;}public int getCategoryId() {return categoryId;}public void setCategoryId(int categoryId) {this.categoryId = categoryId;}public String getFlag() {return flag;}public void setFlag(String flag) {this.flag = flag;}public float getPrice() {return price;}public void setPrice(float price) {this.price = price;}/*** Creates a new instance of InfoBean.** @param orderId* @param dateString* @param pId* @param amount* @param pName* @param categoryId* @param price*/public void set(int orderId, String dateString, String pId, int amount, String pName, int categoryId, float price, String flag) {this.orderId = orderId;this.dateString = dateString;this.pId = pId;this.amount = amount;this.pName = pName;this.categoryId = categoryId;this.price = price;this.flag = flag;}/*** Creates a new instance of InfoBean.**/public InfoBean() {}@Overridepublic String toString() {return orderId + "\t" + dateString + "\t" + amount + "\t" + pId+ "\t" + pName + "\t" + categoryId + "\t" + price+"\t"+flag;}}

2、编写具体的业务

/*** Project Name:hadoopMapReduce* File Name:Rjoin.java* Package Name:com.zsy.mr.rjoin* Date:2018年9月23日下午5:16:11* Copyright (c) 2018, zhaoshouyun All Rights Reserved.*
*/
/*** Project Name:hadoopMapReduce* File Name:Rjoin.java* Package Name:com.zsy.mr.rjoin* Date:2018年9月23日下午5:16:11* Copyright (c) 2018, zhaoshouyun All Rights Reserved.**/package com.zsy.mr.rjoin;import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.List;import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;/*** ClassName: Rjoin * Function: TODO ADD FUNCTION. * date: 2018年9月23日 下午5:16:11 * @author zhaoshouyun* @version * @since 1.0*/
public class RJoin {static class RJoinMapper extends Mapper<LongWritable, Text, Text, InfoBean>{InfoBean bean = new InfoBean();Text text = new Text();@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, InfoBean>.Context context)throws IOException, InterruptedException {//由于读取文件后,获取的内容不好区分,是订单文件还是产品文件,我们可以通过分区来获取文件名来去人,我的订单文件名是包含order的FileSplit  split = (FileSplit) context.getInputSplit();//获取文件名称String fileName = split.getPath().getName();//通过空格分割String[] strs = value.toString().split(" ");String flag = "";//标记String pId = "";//产品idif(fileName.contains("order")){//处理订单信息//订单idint orderId = Integer.parseInt(strs[0]);String dateString = strs[1];//产品id pId = strs[2];int amount = Integer.parseInt(strs[3]);flag = "0";bean.set(orderId, dateString, pId, amount, "", 0, 0, flag);}else{//处理产品信息pId  = strs[0];String pName = strs[1];int categoryId = Integer.parseInt(strs[2]);float price = Float.parseFloat(strs[3]);flag = "1";bean.set(0, "", pId, 0, pName, categoryId, price, flag);}text.set(pId);context.write(text, bean);}}static class RJoinReducer extends Reducer<Text, InfoBean, InfoBean, NullWritable>{@Overrideprotected void reduce(Text key, Iterable<InfoBean> infoBeans,Reducer<Text, InfoBean, InfoBean, NullWritable>.Context context) throws IOException, InterruptedException {InfoBean pBean = new InfoBean();List<InfoBean> list = new ArrayList<>();for (InfoBean infoBean : infoBeans) {if("1".equals(infoBean.getFlag())){//flag 0是订单信息  1是产品信息try {BeanUtils.copyProperties(pBean, infoBean);//数据必须进行拷贝,不可直接赋值} catch (IllegalAccessException | InvocationTargetException e) {e.printStackTrace();}}else{//处理订单信息InfoBean orderBean = new InfoBean();try {BeanUtils.copyProperties(orderBean, infoBean);} catch (IllegalAccessException | InvocationTargetException e) {e.printStackTrace();}//由于订单和产品的关系是多对一的关系,所有订单要用list临时存放起来list.add(orderBean);}}			for (InfoBean orderBean : list) {orderBean.setCategoryId(pBean.getCategoryId());orderBean.setpName(pBean.getpName());orderBean.setPrice(pBean.getPrice());//写出context.write(orderBean, NullWritable.get());}}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();/*conf.set("mapreduce.framework.name", "yarn");conf.set("yarn.resoucemanger.hostname", "hadoop01");*/Job job = Job.getInstance(conf);job.setJarByClass(RJoin.class);//指定本业务job要使用的业务类job.setMapperClass(RJoinMapper.class);job.setReducerClass(RJoinReducer.class);//指定mapper输出的k v类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(InfoBean.class);//指定最终输出kv类型(reduce输出类型)job.setOutputKeyClass(InfoBean.class);job.setOutputValueClass(NullWritable.class);//指定job的输入文件所在目录FileInputFormat.setInputPaths(job, new Path(args[0]));//指定job的输出结果目录FileOutputFormat.setOutputPath(job, new Path(args[1]));//将job中配置的相关参数,以及job所有的java类所在 的jar包,提交给yarn去运行//job.submit();无结果返回,建议不使用它boolean res = job.waitForCompletion(true);System.exit(res?0:1);}}

输入文件:

输出结果:

 

 

本次运行时在本地的eclipse运行的,本地运行正常,放到集群里也就没什么问题了

 

 

 

这篇关于hadoop入门3:MR实现Join逻辑的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1039179

相关文章

Python中edge-tts实现便捷语音合成

《Python中edge-tts实现便捷语音合成》edge-tts是一个功能强大的Python库,支持多种语言和声音选项,本文主要介绍了Python中edge-tts实现便捷语音合成,具有一定的参考价... 目录安装与环境设置文本转语音查找音色更改语音参数生成音频与字幕总结edge-tts 是一个功能强大的

Java实现按字节长度截取字符串

《Java实现按字节长度截取字符串》在Java中,由于字符串可能包含多字节字符,直接按字节长度截取可能会导致乱码或截取不准确的问题,下面我们就来看看几种按字节长度截取字符串的方法吧... 目录方法一:使用String的getBytes方法方法二:指定字符编码处理方法三:更精确的字符编码处理使用示例注意事项方

使用Python和PaddleOCR实现图文识别的代码和步骤

《使用Python和PaddleOCR实现图文识别的代码和步骤》在当今数字化时代,图文识别技术的应用越来越广泛,如文档数字化、信息提取等,PaddleOCR是百度开源的一款强大的OCR工具包,它集成了... 目录一、引言二、环境准备2.1 安装 python2.2 安装 PaddlePaddle2.3 安装

嵌入式Linux之使用设备树驱动GPIO的实现方式

《嵌入式Linux之使用设备树驱动GPIO的实现方式》:本文主要介绍嵌入式Linux之使用设备树驱动GPIO的实现方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐... 目录一、设备树配置1.1 添加 pinctrl 节点1.2 添加 LED 设备节点二、编写驱动程序2.1

Android 实现一个隐私弹窗功能

《Android实现一个隐私弹窗功能》:本文主要介绍Android实现一个隐私弹窗功能,本文通过实例代码给大家介绍的非常详细,感兴趣的朋友一起看看吧... 效果图如下:1. 设置同意、退出、点击用户协议、点击隐私协议的函数参数2. 《用户协议》、《隐私政策》设置成可点击的,且颜色要区分出来res/l

spring IOC的理解之原理和实现过程

《springIOC的理解之原理和实现过程》:本文主要介绍springIOC的理解之原理和实现过程,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、IoC 核心概念二、核心原理1. 容器架构2. 核心组件3. 工作流程三、关键实现机制1. Bean生命周期2.

Redis实现分布式锁全解析之从原理到实践过程

《Redis实现分布式锁全解析之从原理到实践过程》:本文主要介绍Redis实现分布式锁全解析之从原理到实践过程,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、背景介绍二、解决方案(一)使用 SETNX 命令(二)设置锁的过期时间(三)解决锁的误删问题(四)Re

Java根据IP地址实现归属地获取

《Java根据IP地址实现归属地获取》Ip2region是一个离线IP地址定位库和IP定位数据管理框架,这篇文章主要为大家详细介绍了Java如何使用Ip2region实现根据IP地址获取归属地,感兴趣... 目录一、使用Ip2region离线获取1、Ip2region简介2、导包3、下编程载xdb文件4、J

PyQt5+Python-docx实现一键生成测试报告

《PyQt5+Python-docx实现一键生成测试报告》作为一名测试工程师,你是否经历过手动填写测试报告的痛苦,本文将用Python的PyQt5和python-docx库,打造一款测试报告一键生成工... 目录引言工具功能亮点工具设计思路1. 界面设计:PyQt5实现数据输入2. 文档生成:python-

Android实现一键录屏功能(附源码)

《Android实现一键录屏功能(附源码)》在Android5.0及以上版本,系统提供了MediaProjectionAPI,允许应用在用户授权下录制屏幕内容并输出到视频文件,所以本文将基于此实现一个... 目录一、项目介绍二、相关技术与原理三、系统权限与用户授权四、项目架构与流程五、环境配置与依赖六、完整