hadoop入门3:MR实现Join逻辑

2024-06-07 12:32
文章标签 实现 入门 逻辑 mr join hadoop

本文主要是介绍hadoop入门3:MR实现Join逻辑,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

如果看详细的代码模板,请看我的hadoop入门1里有详细的模板,也有详细的解释

今天用两组数据进行join;其实数据很简单,

订单表:                                    

id     日期        产品id   数量         

1001 20180923 a001 2
1002 20180923 a002 1
1003 20180923 a001 3
1004 20180923 a003 1
1005 20180923 a003 2

产品表:

产品id  产品名称  分类id  价格

a001 华为手机 1000 2799
a002 惠普笔记本 1000 8799
a003 苹果平板 1000 5799

需求是:需要这两张表进行关联一张表;在SQL中很简单:select * from order o left join product t on o.pid = t.id;

用hadoop实现:具体请看代码:

1、创建关联表的映射类:

/*** Project Name:hadoopMapReduce* File Name:InfoBean.java* Package Name:com.zsy.mr.rjoin* Date:2018年9月23日下午5:17:59* Copyright (c) 2018, zhaoshouyun All Rights Reserved.*
*/package com.zsy.mr.rjoin;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;import org.apache.hadoop.io.WritableComparable;public class InfoBean implements WritableComparable<InfoBean> {private int orderId;private String dateString;private String pId;private int amount;private String pName;private int categoryId ;private float price;private String flag ;//0订单   1商品@Overridepublic void readFields(DataInput input) throws IOException {this.orderId =  input.readInt();this.dateString = input.readUTF();this.pId = input.readUTF();this.amount = input.readInt();this.pName = input.readUTF();this.categoryId = input.readInt();this.price = input.readFloat();this.flag = input.readUTF();		}/*** private int orderId;private String dateString;private int pId;private int amount;private String pName;private int categoryId ;private float price;*/@Overridepublic void write(DataOutput output) throws IOException {output.writeInt(orderId);output.writeUTF(dateString);output.writeUTF(pId);output.writeInt(amount);output.writeUTF(pName);output.writeInt(categoryId);output.writeFloat(price);output.writeUTF(flag);}@Overridepublic int compareTo(InfoBean o) {return this.price > o.price ? -1 : 1;}public int getOrderId() {return orderId;}public void setOrderId(int orderId) {this.orderId = orderId;}public String getDateString() {return dateString;}public void setDateString(String dateString) {this.dateString = dateString;}public int getAmount() {return amount;}public void setAmount(int amount) {this.amount = amount;}public String getpId() {return pId;}public void setpId(String pId) {this.pId = pId;}public String getpName() {return pName;}public void setpName(String pName) {this.pName = pName;}public int getCategoryId() {return categoryId;}public void setCategoryId(int categoryId) {this.categoryId = categoryId;}public String getFlag() {return flag;}public void setFlag(String flag) {this.flag = flag;}public float getPrice() {return price;}public void setPrice(float price) {this.price = price;}/*** Creates a new instance of InfoBean.** @param orderId* @param dateString* @param pId* @param amount* @param pName* @param categoryId* @param price*/public void set(int orderId, String dateString, String pId, int amount, String pName, int categoryId, float price, String flag) {this.orderId = orderId;this.dateString = dateString;this.pId = pId;this.amount = amount;this.pName = pName;this.categoryId = categoryId;this.price = price;this.flag = flag;}/*** Creates a new instance of InfoBean.**/public InfoBean() {}@Overridepublic String toString() {return orderId + "\t" + dateString + "\t" + amount + "\t" + pId+ "\t" + pName + "\t" + categoryId + "\t" + price+"\t"+flag;}}

2、编写具体的业务

/*** Project Name:hadoopMapReduce* File Name:Rjoin.java* Package Name:com.zsy.mr.rjoin* Date:2018年9月23日下午5:16:11* Copyright (c) 2018, zhaoshouyun All Rights Reserved.*
*/
/*** Project Name:hadoopMapReduce* File Name:Rjoin.java* Package Name:com.zsy.mr.rjoin* Date:2018年9月23日下午5:16:11* Copyright (c) 2018, zhaoshouyun All Rights Reserved.**/package com.zsy.mr.rjoin;import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.List;import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;/*** ClassName: Rjoin * Function: TODO ADD FUNCTION. * date: 2018年9月23日 下午5:16:11 * @author zhaoshouyun* @version * @since 1.0*/
public class RJoin {static class RJoinMapper extends Mapper<LongWritable, Text, Text, InfoBean>{InfoBean bean = new InfoBean();Text text = new Text();@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, InfoBean>.Context context)throws IOException, InterruptedException {//由于读取文件后,获取的内容不好区分,是订单文件还是产品文件,我们可以通过分区来获取文件名来去人,我的订单文件名是包含order的FileSplit  split = (FileSplit) context.getInputSplit();//获取文件名称String fileName = split.getPath().getName();//通过空格分割String[] strs = value.toString().split(" ");String flag = "";//标记String pId = "";//产品idif(fileName.contains("order")){//处理订单信息//订单idint orderId = Integer.parseInt(strs[0]);String dateString = strs[1];//产品id pId = strs[2];int amount = Integer.parseInt(strs[3]);flag = "0";bean.set(orderId, dateString, pId, amount, "", 0, 0, flag);}else{//处理产品信息pId  = strs[0];String pName = strs[1];int categoryId = Integer.parseInt(strs[2]);float price = Float.parseFloat(strs[3]);flag = "1";bean.set(0, "", pId, 0, pName, categoryId, price, flag);}text.set(pId);context.write(text, bean);}}static class RJoinReducer extends Reducer<Text, InfoBean, InfoBean, NullWritable>{@Overrideprotected void reduce(Text key, Iterable<InfoBean> infoBeans,Reducer<Text, InfoBean, InfoBean, NullWritable>.Context context) throws IOException, InterruptedException {InfoBean pBean = new InfoBean();List<InfoBean> list = new ArrayList<>();for (InfoBean infoBean : infoBeans) {if("1".equals(infoBean.getFlag())){//flag 0是订单信息  1是产品信息try {BeanUtils.copyProperties(pBean, infoBean);//数据必须进行拷贝,不可直接赋值} catch (IllegalAccessException | InvocationTargetException e) {e.printStackTrace();}}else{//处理订单信息InfoBean orderBean = new InfoBean();try {BeanUtils.copyProperties(orderBean, infoBean);} catch (IllegalAccessException | InvocationTargetException e) {e.printStackTrace();}//由于订单和产品的关系是多对一的关系,所有订单要用list临时存放起来list.add(orderBean);}}			for (InfoBean orderBean : list) {orderBean.setCategoryId(pBean.getCategoryId());orderBean.setpName(pBean.getpName());orderBean.setPrice(pBean.getPrice());//写出context.write(orderBean, NullWritable.get());}}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();/*conf.set("mapreduce.framework.name", "yarn");conf.set("yarn.resoucemanger.hostname", "hadoop01");*/Job job = Job.getInstance(conf);job.setJarByClass(RJoin.class);//指定本业务job要使用的业务类job.setMapperClass(RJoinMapper.class);job.setReducerClass(RJoinReducer.class);//指定mapper输出的k v类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(InfoBean.class);//指定最终输出kv类型(reduce输出类型)job.setOutputKeyClass(InfoBean.class);job.setOutputValueClass(NullWritable.class);//指定job的输入文件所在目录FileInputFormat.setInputPaths(job, new Path(args[0]));//指定job的输出结果目录FileOutputFormat.setOutputPath(job, new Path(args[1]));//将job中配置的相关参数,以及job所有的java类所在 的jar包,提交给yarn去运行//job.submit();无结果返回,建议不使用它boolean res = job.waitForCompletion(true);System.exit(res?0:1);}}

输入文件:

输出结果:

 

 

本次运行时在本地的eclipse运行的,本地运行正常,放到集群里也就没什么问题了

 

 

 

这篇关于hadoop入门3:MR实现Join逻辑的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1039179

相关文章

C++中unordered_set哈希集合的实现

《C++中unordered_set哈希集合的实现》std::unordered_set是C++标准库中的无序关联容器,基于哈希表实现,具有元素唯一性和无序性特点,本文就来详细的介绍一下unorder... 目录一、概述二、头文件与命名空间三、常用方法与示例1. 构造与析构2. 迭代器与遍历3. 容量相关4

Linux join命令的使用及说明

《Linuxjoin命令的使用及说明》`join`命令用于在Linux中按字段将两个文件进行连接,类似于SQL的JOIN,它需要两个文件按用于匹配的字段排序,并且第一个文件的换行符必须是LF,`jo... 目录一. 基本语法二. 数据准备三. 指定文件的连接key四.-a输出指定文件的所有行五.-o指定输出

C++中悬垂引用(Dangling Reference) 的实现

《C++中悬垂引用(DanglingReference)的实现》C++中的悬垂引用指引用绑定的对象被销毁后引用仍存在的情况,会导致访问无效内存,下面就来详细的介绍一下产生的原因以及如何避免,感兴趣... 目录悬垂引用的产生原因1. 引用绑定到局部变量,变量超出作用域后销毁2. 引用绑定到动态分配的对象,对象

SpringBoot基于注解实现数据库字段回填的完整方案

《SpringBoot基于注解实现数据库字段回填的完整方案》这篇文章主要为大家详细介绍了SpringBoot如何基于注解实现数据库字段回填的相关方法,文中的示例代码讲解详细,感兴趣的小伙伴可以了解... 目录数据库表pom.XMLRelationFieldRelationFieldMapping基础的一些代

Java HashMap的底层实现原理深度解析

《JavaHashMap的底层实现原理深度解析》HashMap基于数组+链表+红黑树结构,通过哈希算法和扩容机制优化性能,负载因子与树化阈值平衡效率,是Java开发必备的高效数据结构,本文给大家介绍... 目录一、概述:HashMap的宏观结构二、核心数据结构解析1. 数组(桶数组)2. 链表节点(Node

Java AOP面向切面编程的概念和实现方式

《JavaAOP面向切面编程的概念和实现方式》AOP是面向切面编程,通过动态代理将横切关注点(如日志、事务)与核心业务逻辑分离,提升代码复用性和可维护性,本文给大家介绍JavaAOP面向切面编程的概... 目录一、AOP 是什么?二、AOP 的核心概念与实现方式核心概念实现方式三、Spring AOP 的关

Python实现字典转字符串的五种方法

《Python实现字典转字符串的五种方法》本文介绍了在Python中如何将字典数据结构转换为字符串格式的多种方法,首先可以通过内置的str()函数进行简单转换;其次利用ison.dumps()函数能够... 目录1、使用json模块的dumps方法:2、使用str方法:3、使用循环和字符串拼接:4、使用字符

Linux下利用select实现串口数据读取过程

《Linux下利用select实现串口数据读取过程》文章介绍Linux中使用select、poll或epoll实现串口数据读取,通过I/O多路复用机制在数据到达时触发读取,避免持续轮询,示例代码展示设... 目录示例代码(使用select实现)代码解释总结在 linux 系统里,我们可以借助 select、

Linux挂载linux/Windows共享目录实现方式

《Linux挂载linux/Windows共享目录实现方式》:本文主要介绍Linux挂载linux/Windows共享目录实现方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地... 目录文件共享协议linux环境作为服务端(NFS)在服务器端安装 NFS创建要共享的目录修改 NFS 配

通过React实现页面的无限滚动效果

《通过React实现页面的无限滚动效果》今天我们来聊聊无限滚动这个现代Web开发中不可或缺的技术,无论你是刷微博、逛知乎还是看脚本,无限滚动都已经渗透到我们日常的浏览体验中,那么,如何优雅地实现它呢?... 目录1. 早期的解决方案2. 交叉观察者:IntersectionObserver2.1 Inter