mapreduce实现全局排序

2024-06-16 20:18
文章标签 实现 全局 排序 mapreduce

本文主要是介绍mapreduce实现全局排序,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

直接附代码,说明都在源码里了。

 

package com.hadoop.totalsort;
import java.io.IOException;
import java.util.ArrayList;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.LineRecordReader;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.IndexedSortable;
import org.apache.hadoop.util.QuickSort;
public class SamplerInputFormat extends FileInputFormat<Text, Text> {  
static final String PARTITION_FILENAME = "_partition.lst";  
static final String SAMPLE_SIZE = "terasort.partitions.sample";  
private static JobConf lastConf = null;  
private static InputSplit[] lastResult = null;  
static class TextSampler implements IndexedSortable {  
public ArrayList<Text> records = new ArrayList<Text>();  
public int compare(int arg0, int arg1) {  
Text right = records.get(arg0);  
Text left = records.get(arg1);  
return right.compareTo(left);  
}  
public void swap(int arg0, int arg1) {  
Text right = records.get(arg0);  
Text left = records.get(arg1);  
records.set(arg0, left);  
records.set(arg1, right);  
}  
public void addKey(Text key) {  
records.add(new Text(key));  
}  
//将采集出来的key数据排序
public Text[] createPartitions(int numPartitions) {  
int numRecords = records.size();  
if (numPartitions > numRecords) {  
throw new IllegalArgumentException("Requested more partitions than input keys (" + numPartitions +  
" > " + numRecords + ")");  
}  
new QuickSort().sort(this, 0, records.size());  
float stepSize = numRecords / (float) numPartitions;  //采集的时候应该是采了100条记录,从10个分片查找的,此处再取numPartitions-1条
Text[] result = new Text[numPartitions - 1];  
for (int i = 1; i < numPartitions; ++i) {  
result[i - 1] = records.get(Math.round(stepSize * i));  
}  
return result;  
}  
}  
public static void writePartitionFile(JobConf conf, Path partFile) throws IOException {  
//前段代码从分片中采集数据,通过sampler.addKey存入TextSampler中的records数组
SamplerInputFormat inputFormat = new SamplerInputFormat();  
TextSampler sampler = new TextSampler();  
Text key = new Text();  
Text value = new Text();  
int partitions = conf.getNumReduceTasks(); // Reducer任务的个数   
long sampleSize = conf.getLong(SAMPLE_SIZE, 100); // 采集数据-键值对的个数   
InputSplit[] splits = inputFormat.getSplits(conf, conf.getNumMapTasks());// 获得数据分片   
int samples = Math.min(10, splits.length);// 采集分片的个数   ,采集10个分片
long recordsPerSample = sampleSize / samples;// 每个分片采集的键值对个数   
int sampleStep = splits.length / samples; // 采集分片的步长   ,总的分片个数/要采集的分片个数
long records = 0;  
for (int i = 0; i < samples; i++) {  //1...10分片数
RecordReader<Text, Text> reader = inputFormat.getRecordReader(splits[sampleStep * i], conf, null);  
while (reader.next(key, value)) {  
sampler.addKey(key);   //将key值增加到sampler的records数组
records += 1;  
if ((i + 1) * recordsPerSample <= records) {  //目的是均匀采集各分片的条数,比如采集到第5个分片,那么记录条数应该小于5个分片应该的条数
break;  
}  
}  
}  
FileSystem outFs = partFile.getFileSystem(conf);  
if (outFs.exists(partFile)) {  
outFs.delete(partFile, false);  
}  
SequenceFile.Writer writer = SequenceFile.createWriter(outFs, conf, partFile, Text.class, NullWritable.class);  
NullWritable nullValue = NullWritable.get();  
for (Text split : sampler.createPartitions(partitions)) {  //调用createPartitions方法,排序采集出来的数据,并取partitions条
writer.append(split, nullValue);  
}  
writer.close();  
}  
static class TeraRecordReader implements RecordReader<Text, Text> {  
private LineRecordReader in;  
private LongWritable junk = new LongWritable();  
private Text line = new Text();  
private static int KEY_LENGTH = 10;  
public TeraRecordReader(Configuration job, FileSplit split) throws IOException {  
in = new LineRecordReader(job, split);  
}  
public void close() throws IOException {  
in.close();  
}  
public Text createKey() {  
// TODO Auto-generated method stub   
return new Text();  
}  
public Text createValue() {  
return new Text();  
}  
public long getPos() throws IOException {  
// TODO Auto-generated method stub   
return in.getPos();  
}  
public float getProgress() throws IOException {  
// TODO Auto-generated method stub   
return in.getProgress();  
}  
public boolean next(Text arg0, Text arg1) throws IOException {  
if (in.next(junk, line)) {   //调用父类方法,将value值赋给key
// if (line.getLength() < KEY_LENGTH) {   
arg0.set(line);  
arg1.clear();  
//                } else {   
//                    byte[] bytes = line.getBytes(); // 默认知道读取要比较值的前10个字节 作为key   
//                                                    // 后面的字节作为value;   
//                    arg0.set(bytes, 0, KEY_LENGTH);   
//                    arg1.set(bytes, KEY_LENGTH, line.getLength() - KEY_LENGTH);   
//                }   
return true;  
} else {  
return false;  
}  
}  
}  
@Override  
public InputSplit[] getSplits(JobConf conf, int splits) throws IOException {  
if (conf == lastConf) {  
return lastResult;  
}  
lastConf = conf;  
lastResult = super.getSplits(lastConf, splits);  
return lastResult;  
}  
public org.apache.hadoop.mapred.RecordReader<Text, Text> getRecordReader(InputSplit arg0, JobConf arg1,  
Reporter arg2) throws IOException {  
return new TeraRecordReader(arg1, (FileSplit) arg0);  
}  
}  


 

package com.hadoop.totalsort;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Partitioner;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class SamplerSort extends Configured implements Tool {  
// 自定义的Partitioner   
public static class TotalOrderPartitioner implements Partitioner<Text, Text> {  
private Text[] splitPoints;  
public TotalOrderPartitioner() {  
}  
public int getPartition(Text arg0, Text arg1, int arg2) {  
// TODO Auto-generated method stub   
return findPartition(arg0);  
}  
public void configure(JobConf arg0) {  
try {  
FileSystem fs = FileSystem.getLocal(arg0);  
Path partFile = new Path(SamplerInputFormat.PARTITION_FILENAME);  
splitPoints = readPartitions(fs, partFile, arg0); // 读取采集文件   
} catch (IOException ie) {  
throw new IllegalArgumentException("can't read paritions file", ie);  
}  
}  
public int findPartition(Text key) // 分配可以到多个reduce   
{  
int len = splitPoints.length;  
for (int i = 0; i < len; i++) {  
int res = key.compareTo(splitPoints[i]);  
if (res > 0 && i < len - 1) {  
continue;  
} else if (res == 0) {  
return i;  
} else if (res < 0) {  
return i;  
} else if (res > 0 && i == len - 1) {  
return i + 1;  
}  
}  
return 0;  
}  
private static Text[] readPartitions(FileSystem fs, Path p, JobConf job) throws IOException {  
SequenceFile.Reader reader = new SequenceFile.Reader(fs, p, job);  
List<Text> parts = new ArrayList<Text>();  
Text key = new Text();  
NullWritable value = NullWritable.get();  
while (reader.next(key, value)) {  
parts.add(key);  
}  
reader.close();  
return parts.toArray(new Text[parts.size()]);  
}  
}  
public int run(String[] args) throws Exception {  
JobConf job = (JobConf) getConf();  
// job.set(name, value);   
Path inputDir = new Path(args[0]);  
inputDir = inputDir.makeQualified(inputDir.getFileSystem(job));  
Path partitionFile = new Path(inputDir, SamplerInputFormat.PARTITION_FILENAME);  
URI partitionUri = new URI(partitionFile.toString() +  
"#" + SamplerInputFormat.PARTITION_FILENAME);  
SamplerInputFormat.setInputPaths(job, new Path(args[0]));  
FileOutputFormat.setOutputPath(job, new Path(args[1]));  
job.setJobName("SamplerTotalSort");  
job.setJarByClass(SamplerSort.class);  
job.setOutputKeyClass(Text.class);  
job.setOutputValueClass(Text.class);  
job.setInputFormat(SamplerInputFormat.class);  
job.setOutputFormat(TextOutputFormat.class);  
job.setPartitionerClass(TotalOrderPartitioner.class);  
job.setNumReduceTasks(4);  
SamplerInputFormat.writePartitionFile(job, partitionFile); // 数据采集并写入文件   
DistributedCache.addCacheFile(partitionUri, job); // 将这个文件作为共享文件   
DistributedCache.createSymlink(job);  
// SamplerInputFormat.setFinalSync(job, true);   
JobClient.runJob(job);  
return 0;  
}  
public static void main(String[] args) throws Exception {  
int res = ToolRunner.run(new JobConf(), new SamplerSort(), args);  
System.exit(res);  
}  
}  


 

 

这篇关于mapreduce实现全局排序的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1067441

相关文章

Spring Boot @RestControllerAdvice全局异常处理最佳实践

《SpringBoot@RestControllerAdvice全局异常处理最佳实践》本文详解SpringBoot中通过@RestControllerAdvice实现全局异常处理,强调代码复用、统... 目录前言一、为什么要使用全局异常处理?二、核心注解解析1. @RestControllerAdvice2

MySQL中查找重复值的实现

《MySQL中查找重复值的实现》查找重复值是一项常见需求,比如在数据清理、数据分析、数据质量检查等场景下,我们常常需要找出表中某列或多列的重复值,具有一定的参考价值,感兴趣的可以了解一下... 目录技术背景实现步骤方法一:使用GROUP BY和HAVING子句方法二:仅返回重复值方法三:返回完整记录方法四:

IDEA中新建/切换Git分支的实现步骤

《IDEA中新建/切换Git分支的实现步骤》本文主要介绍了IDEA中新建/切换Git分支的实现步骤,通过菜单创建新分支并选择是否切换,创建后在Git详情或右键Checkout中切换分支,感兴趣的可以了... 前提:项目已被Git托管1、点击上方栏Git->NewBrancjsh...2、输入新的分支的

Python实现对阿里云OSS对象存储的操作详解

《Python实现对阿里云OSS对象存储的操作详解》这篇文章主要为大家详细介绍了Python实现对阿里云OSS对象存储的操作相关知识,包括连接,上传,下载,列举等功能,感兴趣的小伙伴可以了解下... 目录一、直接使用代码二、详细使用1. 环境准备2. 初始化配置3. bucket配置创建4. 文件上传到os

关于集合与数组转换实现方法

《关于集合与数组转换实现方法》:本文主要介绍关于集合与数组转换实现方法,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录1、Arrays.asList()1.1、方法作用1.2、内部实现1.3、修改元素的影响1.4、注意事项2、list.toArray()2.1、方

使用Python实现可恢复式多线程下载器

《使用Python实现可恢复式多线程下载器》在数字时代,大文件下载已成为日常操作,本文将手把手教你用Python打造专业级下载器,实现断点续传,多线程加速,速度限制等功能,感兴趣的小伙伴可以了解下... 目录一、智能续传:从崩溃边缘抢救进度二、多线程加速:榨干网络带宽三、速度控制:做网络的好邻居四、终端交互

MySQL中的锁机制详解之全局锁,表级锁,行级锁

《MySQL中的锁机制详解之全局锁,表级锁,行级锁》MySQL锁机制通过全局、表级、行级锁控制并发,保障数据一致性与隔离性,全局锁适用于全库备份,表级锁适合读多写少场景,行级锁(InnoDB)实现高并... 目录一、锁机制基础:从并发问题到锁分类1.1 并发访问的三大问题1.2 锁的核心作用1.3 锁粒度分

java实现docker镜像上传到harbor仓库的方式

《java实现docker镜像上传到harbor仓库的方式》:本文主要介绍java实现docker镜像上传到harbor仓库的方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地... 目录1. 前 言2. 编写工具类2.1 引入依赖包2.2 使用当前服务器的docker环境推送镜像2.2

C++20管道运算符的实现示例

《C++20管道运算符的实现示例》本文简要介绍C++20管道运算符的使用与实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧... 目录标准库的管道运算符使用自己实现类似的管道运算符我们不打算介绍太多,因为它实际属于c++20最为重要的

Java easyExcel实现导入多sheet的Excel

《JavaeasyExcel实现导入多sheet的Excel》这篇文章主要为大家详细介绍了如何使用JavaeasyExcel实现导入多sheet的Excel,文中的示例代码讲解详细,感兴趣的小伙伴可... 目录1.官网2.Excel样式3.代码1.官网easyExcel官网2.Excel样式3.代码