Hadoop3教程(十八):MapReduce之MapJoin案例分析

2023-10-19 08:45

本文主要是介绍Hadoop3教程(十八):MapReduce之MapJoin案例分析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

  • (118)MapJoin案例需求分析
    • ReduceJoin的问题
    • 如何解决ReduceJoin的问题
    • 如何将一个文件主动缓存到集群的内存里
  • (119)MapJoin案例代码实现
  • 参考文献

(118)MapJoin案例需求分析

ReduceJoin的问题

在ReduceJoin中,合并的操作是在Reduce阶段进行的,所以相比Map阶段,Reduce阶段的处理压力过大。另外,相同的产品ID的数据会进入同一个Reducer中,如果这个产品ID下数据过多,其他产品ID的数据很少,那么会导致前面那个Reducer压力过大,这就是数据倾斜问题。

如何解决ReduceJoin的问题

那如何解决这种问题呢?

比较好的方法是不使用ReduceJoin,使用MapJoin,即在Map阶段实现拼接。

思路简单来说,就是将产品码表放进内存,orders.txt正常切片进入mapper,然后mapper处理的时候,就逐行对orders.txt里的数据进行产品码值的替换。

基于这种方式,MapJoin的适用场景也就很明显了,MapJoin适用于一张或多张表特别小(不能把内存撑爆了),一张表特别大的场景

如何将一个文件主动缓存到集群的内存里

那问题来了,在Hadoop里怎么把一张表主动缓存到内存当中,且还能在map()里调用呢?

首先我们需要在驱动类里,指定将文件加载到缓存:

//缓存普通文件到Task运行节点。
job.addCacheFile(new URI("file:///e:/cache/pd.txt"));
//如果是集群运行,需要设置HDFS路径
job.addCacheFile(new URI("hdfs://hadoop102:8020/cache/pd.txt"));// MapJoin的话就不需要Reduce阶段了
job.setNumReduceTasks(0);

然后在自定义Mapper类的setup()里,按以下流程编写代码,以读取缓存的文件数据:

//1. 获取缓存的文件;
// 2.循环读取缓存文件中每一行;
// 3. 切割;
// 4. 缓存数据到集合;

setup()执行完成后,才会执行map()

所以我们最后在map()里,获取一行后,截取到pid,从内存中码表拿到产品中文名,拼接给出就可以。

(119)MapJoin案例代码实现

过了一遍教程,其实就是对上一小节的代码实现。

总的来说,就是只有一个Map阶段,在Map阶段中,在map()处理之前,先把码表读进内存中,然后map()在一行一行读取后,直接使用内存中的码表对指定字段进行替换即可。

对我来讲用处不大,所以这里直接跳过,但还是补充一下代码:

在MapJoinDriver驱动类中添加缓存文件:

package com.atguigu.mapreduce.mapjoin;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;public class MapJoinDriver {public static void main(String[] args) throws IOException, URISyntaxException, ClassNotFoundException, InterruptedException {// 1 获取job信息Configuration conf = new Configuration();Job job = Job.getInstance(conf);// 2 设置加载jar包路径job.setJarByClass(MapJoinDriver.class);// 3 关联mapperjob.setMapperClass(MapJoinMapper.class);// 4 设置Map输出KV类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(NullWritable.class);// 5 设置最终输出KV类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);// 加载缓存数据job.addCacheFile(new URI("file:///D:/input/tablecache/pd.txt"));// Map端Join的逻辑不需要Reduce阶段,设置reduceTask数量为0job.setNumReduceTasks(0);// 6 设置输入输出路径FileInputFormat.setInputPaths(job, new Path("D:\\input"));FileOutputFormat.setOutputPath(job, new Path("D:\\output"));// 7 提交boolean b = job.waitForCompletion(true);System.exit(b ? 0 : 1);}
}

在MapJoinMapper类中的setup方法中读取缓存文件,并在map()里进行替换:

package com.atguigu.mapreduce.mapjoin;import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;public class MapJoinMapper extends Mapper<LongWritable, Text, Text, NullWritable> {private Map<String, String> pdMap = new HashMap<>();private Text text = new Text();//任务开始前将pd数据缓存进pdMap@Overrideprotected void setup(Context context) throws IOException, InterruptedException {//通过缓存文件得到小表数据pd.txtURI[] cacheFiles = context.getCacheFiles();Path path = new Path(cacheFiles[0]);//获取文件系统对象,并开流FileSystem fs = FileSystem.get(context.getConfiguration());FSDataInputStream fis = fs.open(path);//通过包装流转换为reader,方便按行读取BufferedReader reader = new BufferedReader(new InputStreamReader(fis, "UTF-8"));//逐行读取,按行处理String line;while (StringUtils.isNotEmpty(line = reader.readLine())) {//切割一行    
//01	小米String[] split = line.split("\t");pdMap.put(split[0], split[1]);}//关流IOUtils.closeStream(reader);}@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//读取大表数据    
//1001	01	1String[] fields = value.toString().split("\t");//通过大表每行数据的pid,去pdMap里面取出pnameString pname = pdMap.get(fields[1]);//将大表每行数据的pid替换为pnametext.set(fields[0] + "\t" + pname + "\t" + fields[2]);//写出context.write(text,NullWritable.get());}
}

参考文献

  1. 【尚硅谷大数据Hadoop教程,hadoop3.x搭建到集群调优,百万播放】

这篇关于Hadoop3教程(十八):MapReduce之MapJoin案例分析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/238691

相关文章

关于MyISAM和InnoDB对比分析

《关于MyISAM和InnoDB对比分析》:本文主要介绍关于MyISAM和InnoDB对比分析,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录开篇:从交通规则看存储引擎选择理解存储引擎的基本概念技术原理对比1. 事务支持:ACID的守护者2. 锁机制:并发控制的艺

Nexus安装和启动的实现教程

《Nexus安装和启动的实现教程》:本文主要介绍Nexus安装和启动的实现教程,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、Nexus下载二、Nexus安装和启动三、关闭Nexus总结一、Nexus下载官方下载链接:DownloadWindows系统根

MyBatis Plus 中 update_time 字段自动填充失效的原因分析及解决方案(最新整理)

《MyBatisPlus中update_time字段自动填充失效的原因分析及解决方案(最新整理)》在使用MyBatisPlus时,通常我们会在数据库表中设置create_time和update... 目录前言一、问题现象二、原因分析三、总结:常见原因与解决方法对照表四、推荐写法前言在使用 MyBATis

六个案例搞懂mysql间隙锁

《六个案例搞懂mysql间隙锁》MySQL中的间隙是指索引中两个索引键之间的空间,间隙锁用于防止范围查询期间的幻读,本文主要介绍了六个案例搞懂mysql间隙锁,具有一定的参考价值,感兴趣的可以了解一下... 目录概念解释间隙锁详解间隙锁触发条件间隙锁加锁规则案例演示案例一:唯一索引等值锁定存在的数据案例二:

Python主动抛出异常的各种用法和场景分析

《Python主动抛出异常的各种用法和场景分析》在Python中,我们不仅可以捕获和处理异常,还可以主动抛出异常,也就是以类的方式自定义错误的类型和提示信息,这在编程中非常有用,下面我将详细解释主动抛... 目录一、为什么要主动抛出异常?二、基本语法:raise关键字基本示例三、raise的多种用法1. 抛

github打不开的问题分析及解决

《github打不开的问题分析及解决》:本文主要介绍github打不开的问题分析及解决,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、找到github.com域名解析的ip地址二、找到github.global.ssl.fastly.net网址解析的ip地址三

CnPlugin是PL/SQL Developer工具插件使用教程

《CnPlugin是PL/SQLDeveloper工具插件使用教程》:本文主要介绍CnPlugin是PL/SQLDeveloper工具插件使用教程,具有很好的参考价值,希望对大家有所帮助,如有错... 目录PL/SQL Developer工具插件使用安装拷贝文件配置总结PL/SQL Developer工具插

Mysql的主从同步/复制的原理分析

《Mysql的主从同步/复制的原理分析》:本文主要介绍Mysql的主从同步/复制的原理分析,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录为什么要主从同步?mysql主从同步架构有哪些?Mysql主从复制的原理/整体流程级联复制架构为什么好?Mysql主从复制注意

Java中的登录技术保姆级详细教程

《Java中的登录技术保姆级详细教程》:本文主要介绍Java中登录技术保姆级详细教程的相关资料,在Java中我们可以使用各种技术和框架来实现这些功能,文中通过代码介绍的非常详细,需要的朋友可以参考... 目录1.登录思路2.登录标记1.会话技术2.会话跟踪1.Cookie技术2.Session技术3.令牌技

java -jar命令运行 jar包时运行外部依赖jar包的场景分析

《java-jar命令运行jar包时运行外部依赖jar包的场景分析》:本文主要介绍java-jar命令运行jar包时运行外部依赖jar包的场景分析,本文给大家介绍的非常详细,对大家的学习或工作... 目录Java -jar命令运行 jar包时如何运行外部依赖jar包场景:解决:方法一、启动参数添加: -Xb