hadoop入门3:MR实现Join逻辑

2024-06-07 12:32
文章标签 实现 入门 逻辑 mr join hadoop

本文主要是介绍hadoop入门3:MR实现Join逻辑,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

如果看详细的代码模板,请看我的hadoop入门1里有详细的模板,也有详细的解释

今天用两组数据进行join;其实数据很简单,

订单表:                                    

id     日期        产品id   数量         

1001 20180923 a001 2
1002 20180923 a002 1
1003 20180923 a001 3
1004 20180923 a003 1
1005 20180923 a003 2

产品表:

产品id  产品名称  分类id  价格

a001 华为手机 1000 2799
a002 惠普笔记本 1000 8799
a003 苹果平板 1000 5799

需求是:需要这两张表进行关联一张表;在SQL中很简单:select * from order o left join product t on o.pid = t.id;

用hadoop实现:具体请看代码:

1、创建关联表的映射类:

/*** Project Name:hadoopMapReduce* File Name:InfoBean.java* Package Name:com.zsy.mr.rjoin* Date:2018年9月23日下午5:17:59* Copyright (c) 2018, zhaoshouyun All Rights Reserved.*
*/package com.zsy.mr.rjoin;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;import org.apache.hadoop.io.WritableComparable;public class InfoBean implements WritableComparable<InfoBean> {private int orderId;private String dateString;private String pId;private int amount;private String pName;private int categoryId ;private float price;private String flag ;//0订单   1商品@Overridepublic void readFields(DataInput input) throws IOException {this.orderId =  input.readInt();this.dateString = input.readUTF();this.pId = input.readUTF();this.amount = input.readInt();this.pName = input.readUTF();this.categoryId = input.readInt();this.price = input.readFloat();this.flag = input.readUTF();		}/*** private int orderId;private String dateString;private int pId;private int amount;private String pName;private int categoryId ;private float price;*/@Overridepublic void write(DataOutput output) throws IOException {output.writeInt(orderId);output.writeUTF(dateString);output.writeUTF(pId);output.writeInt(amount);output.writeUTF(pName);output.writeInt(categoryId);output.writeFloat(price);output.writeUTF(flag);}@Overridepublic int compareTo(InfoBean o) {return this.price > o.price ? -1 : 1;}public int getOrderId() {return orderId;}public void setOrderId(int orderId) {this.orderId = orderId;}public String getDateString() {return dateString;}public void setDateString(String dateString) {this.dateString = dateString;}public int getAmount() {return amount;}public void setAmount(int amount) {this.amount = amount;}public String getpId() {return pId;}public void setpId(String pId) {this.pId = pId;}public String getpName() {return pName;}public void setpName(String pName) {this.pName = pName;}public int getCategoryId() {return categoryId;}public void setCategoryId(int categoryId) {this.categoryId = categoryId;}public String getFlag() {return flag;}public void setFlag(String flag) {this.flag = flag;}public float getPrice() {return price;}public void setPrice(float price) {this.price = price;}/*** Creates a new instance of InfoBean.** @param orderId* @param dateString* @param pId* @param amount* @param pName* @param categoryId* @param price*/public void set(int orderId, String dateString, String pId, int amount, String pName, int categoryId, float price, String flag) {this.orderId = orderId;this.dateString = dateString;this.pId = pId;this.amount = amount;this.pName = pName;this.categoryId = categoryId;this.price = price;this.flag = flag;}/*** Creates a new instance of InfoBean.**/public InfoBean() {}@Overridepublic String toString() {return orderId + "\t" + dateString + "\t" + amount + "\t" + pId+ "\t" + pName + "\t" + categoryId + "\t" + price+"\t"+flag;}}

2、编写具体的业务

/*** Project Name:hadoopMapReduce* File Name:Rjoin.java* Package Name:com.zsy.mr.rjoin* Date:2018年9月23日下午5:16:11* Copyright (c) 2018, zhaoshouyun All Rights Reserved.*
*/
/*** Project Name:hadoopMapReduce* File Name:Rjoin.java* Package Name:com.zsy.mr.rjoin* Date:2018年9月23日下午5:16:11* Copyright (c) 2018, zhaoshouyun All Rights Reserved.**/package com.zsy.mr.rjoin;import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.List;import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;/*** ClassName: Rjoin * Function: TODO ADD FUNCTION. * date: 2018年9月23日 下午5:16:11 * @author zhaoshouyun* @version * @since 1.0*/
public class RJoin {static class RJoinMapper extends Mapper<LongWritable, Text, Text, InfoBean>{InfoBean bean = new InfoBean();Text text = new Text();@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, InfoBean>.Context context)throws IOException, InterruptedException {//由于读取文件后,获取的内容不好区分,是订单文件还是产品文件,我们可以通过分区来获取文件名来去人,我的订单文件名是包含order的FileSplit  split = (FileSplit) context.getInputSplit();//获取文件名称String fileName = split.getPath().getName();//通过空格分割String[] strs = value.toString().split(" ");String flag = "";//标记String pId = "";//产品idif(fileName.contains("order")){//处理订单信息//订单idint orderId = Integer.parseInt(strs[0]);String dateString = strs[1];//产品id pId = strs[2];int amount = Integer.parseInt(strs[3]);flag = "0";bean.set(orderId, dateString, pId, amount, "", 0, 0, flag);}else{//处理产品信息pId  = strs[0];String pName = strs[1];int categoryId = Integer.parseInt(strs[2]);float price = Float.parseFloat(strs[3]);flag = "1";bean.set(0, "", pId, 0, pName, categoryId, price, flag);}text.set(pId);context.write(text, bean);}}static class RJoinReducer extends Reducer<Text, InfoBean, InfoBean, NullWritable>{@Overrideprotected void reduce(Text key, Iterable<InfoBean> infoBeans,Reducer<Text, InfoBean, InfoBean, NullWritable>.Context context) throws IOException, InterruptedException {InfoBean pBean = new InfoBean();List<InfoBean> list = new ArrayList<>();for (InfoBean infoBean : infoBeans) {if("1".equals(infoBean.getFlag())){//flag 0是订单信息  1是产品信息try {BeanUtils.copyProperties(pBean, infoBean);//数据必须进行拷贝,不可直接赋值} catch (IllegalAccessException | InvocationTargetException e) {e.printStackTrace();}}else{//处理订单信息InfoBean orderBean = new InfoBean();try {BeanUtils.copyProperties(orderBean, infoBean);} catch (IllegalAccessException | InvocationTargetException e) {e.printStackTrace();}//由于订单和产品的关系是多对一的关系,所有订单要用list临时存放起来list.add(orderBean);}}			for (InfoBean orderBean : list) {orderBean.setCategoryId(pBean.getCategoryId());orderBean.setpName(pBean.getpName());orderBean.setPrice(pBean.getPrice());//写出context.write(orderBean, NullWritable.get());}}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();/*conf.set("mapreduce.framework.name", "yarn");conf.set("yarn.resoucemanger.hostname", "hadoop01");*/Job job = Job.getInstance(conf);job.setJarByClass(RJoin.class);//指定本业务job要使用的业务类job.setMapperClass(RJoinMapper.class);job.setReducerClass(RJoinReducer.class);//指定mapper输出的k v类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(InfoBean.class);//指定最终输出kv类型(reduce输出类型)job.setOutputKeyClass(InfoBean.class);job.setOutputValueClass(NullWritable.class);//指定job的输入文件所在目录FileInputFormat.setInputPaths(job, new Path(args[0]));//指定job的输出结果目录FileOutputFormat.setOutputPath(job, new Path(args[1]));//将job中配置的相关参数,以及job所有的java类所在 的jar包,提交给yarn去运行//job.submit();无结果返回,建议不使用它boolean res = job.waitForCompletion(true);System.exit(res?0:1);}}

输入文件:

输出结果:

 

 

本次运行时在本地的eclipse运行的,本地运行正常,放到集群里也就没什么问题了

 

 

 

这篇关于hadoop入门3:MR实现Join逻辑的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1039179

相关文章

Java实现字节字符转bcd编码

《Java实现字节字符转bcd编码》BCD是一种将十进制数字编码为二进制的表示方式,常用于数字显示和存储,本文将介绍如何在Java中实现字节字符转BCD码的过程,需要的小伙伴可以了解下... 目录前言BCD码是什么Java实现字节转bcd编码方法补充总结前言BCD码(Binary-Coded Decima

SpringBoot全局域名替换的实现

《SpringBoot全局域名替换的实现》本文主要介绍了SpringBoot全局域名替换的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一... 目录 项目结构⚙️ 配置文件application.yml️ 配置类AppProperties.Ja

Python实现批量CSV转Excel的高性能处理方案

《Python实现批量CSV转Excel的高性能处理方案》在日常办公中,我们经常需要将CSV格式的数据转换为Excel文件,本文将介绍一个基于Python的高性能解决方案,感兴趣的小伙伴可以跟随小编一... 目录一、场景需求二、技术方案三、核心代码四、批量处理方案五、性能优化六、使用示例完整代码七、小结一、

Java实现将HTML文件与字符串转换为图片

《Java实现将HTML文件与字符串转换为图片》在Java开发中,我们经常会遇到将HTML内容转换为图片的需求,本文小编就来和大家详细讲讲如何使用FreeSpire.DocforJava库来实现这一功... 目录前言核心实现:html 转图片完整代码场景 1:转换本地 HTML 文件为图片场景 2:转换 H

C#使用Spire.Doc for .NET实现HTML转Word的高效方案

《C#使用Spire.Docfor.NET实现HTML转Word的高效方案》在Web开发中,HTML内容的生成与处理是高频需求,然而,当用户需要将HTML页面或动态生成的HTML字符串转换为Wor... 目录引言一、html转Word的典型场景与挑战二、用 Spire.Doc 实现 HTML 转 Word1

C#实现一键批量合并PDF文档

《C#实现一键批量合并PDF文档》这篇文章主要为大家详细介绍了如何使用C#实现一键批量合并PDF文档功能,文中的示例代码简洁易懂,感兴趣的小伙伴可以跟随小编一起学习一下... 目录前言效果展示功能实现1、添加文件2、文件分组(书签)3、定义页码范围4、自定义显示5、定义页面尺寸6、PDF批量合并7、其他方法

SpringBoot实现不同接口指定上传文件大小的具体步骤

《SpringBoot实现不同接口指定上传文件大小的具体步骤》:本文主要介绍在SpringBoot中通过自定义注解、AOP拦截和配置文件实现不同接口上传文件大小限制的方法,强调需设置全局阈值远大于... 目录一  springboot实现不同接口指定文件大小1.1 思路说明1.2 工程启动说明二 具体实施2

Python实现精确小数计算的完全指南

《Python实现精确小数计算的完全指南》在金融计算、科学实验和工程领域,浮点数精度问题一直是开发者面临的重大挑战,本文将深入解析Python精确小数计算技术体系,感兴趣的小伙伴可以了解一下... 目录引言:小数精度问题的核心挑战一、浮点数精度问题分析1.1 浮点数精度陷阱1.2 浮点数误差来源二、基础解决

Java实现在Word文档中添加文本水印和图片水印的操作指南

《Java实现在Word文档中添加文本水印和图片水印的操作指南》在当今数字时代,文档的自动化处理与安全防护变得尤为重要,无论是为了保护版权、推广品牌,还是为了在文档中加入特定的标识,为Word文档添加... 目录引言Spire.Doc for Java:高效Word文档处理的利器代码实战:使用Java为Wo

Java实现远程执行Shell指令

《Java实现远程执行Shell指令》文章介绍使用JSch在SpringBoot项目中实现远程Shell操作,涵盖环境配置、依赖引入及工具类编写,详解分号和双与号执行多指令的区别... 目录软硬件环境说明编写执行Shell指令的工具类总结jsch(Java Secure Channel)是SSH2的一个纯J