从零开始最短路径学习Hadoop之02----处理气象数据的第一个MapReduce程序

本文主要是介绍从零开始最短路径学习Hadoop之02----处理气象数据的第一个MapReduce程序,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

编写一个气象数据挖掘的MapReduce程序

1. 气象数据在哪里?
NCDC  美国国家气候数据中心
获取数据的方式在www.hadoopbook.com里给出了,是这里 http://hadoopbook.com/code.html
两个示例的数据在这里下载 https://github.com/tomwhite/hadoop-book/tree/master/input/ncdc/all  ,文件名分别是1901,1902
原始记录是许多 小文件,现在已经按照年份被拼接成上面的1901, 1902这两个单独的文件。

2. 最重要的是第5节,不想看长篇大论的,可以直接看第5节就行。

3. 用Hadoop分析数据
3.1 分析一条记录:
0029029070999991901010106004+64333+023450FM-12+000599999V0202701N015919999999N0000001N9-00781+99999102001ADDGF108991999999999999999999
记录的含义:
0029029070
99999
19010101  观察日期
0600  观察时间
4+64333  经度
+023450 纬度
F M-12
    +0005
    99999
    V020
    270
    1
    N
    0159
    1
    99999
    9
    9
    N
    000000
    1
    N
    9-0078
   1
    + 99999
    102001ADDGF108991999999999999999999

    3.2 源代码文件MaxTemperatureMapper.java
package com.ifis;import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;public class MaxTemperatureMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable>{private static final int MISSING = 9999;public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException{String line = value.toString();String year = line.substring(15,19);int airTemperature;if(line.charAt(87) == '+'){airTemperature = Integer.parseInt(line.substring(88, 92));}else{airTemperature = Integer.parseInt(line.substring(87, 92));}String quality = line.substring(92, 93);if(airTemperature != MISSING && quality.matches("[01459]")){output.collect(new Text(year), new IntWritable(airTemperature));}}
}

    3.3 源代码MaxTemperatureReducer.java
package com.ifis;import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;public class MaxTemperatureReducer extends MapReduceBaseimplements Reducer<Text, IntWritable, Text, IntWritable>{public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException{int maxValue = Integer.MIN_VALUE;while(values.hasNext()){maxValue = Math.max(maxValue, values.next().get());}output.collect(key, new IntWritable(maxValue));}
}

    3.4 源代码MaxTemperature.java
package com.ifis;import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;public class MaxTemperature{public static void main(String[] args) throws IOException{if (args.length != 2){System.err.println("Usage: MaxTemperature <intput path> <output path>");System.exit(-1);}JobConf conf = new JobConf(MaxTemperature.class);conf.setJobName("Max Temperature");FileInputFormat.addInputPath(conf, new Path(args[0]));FileOutputFormat.setOutputPath(conf, new Path(args[1]));conf.setMapperClass(MaxTemperatureMapper.class);conf.setReducerClass(MaxTemperatureReducer.class);conf.setOutputKeyClass(Text.class);conf.setOutputValueClass(IntWritable.class);JobClient.runJob(conf);}
}

    3.5 编译命令:brian@brian-laptop:~/work/learn/hadoop/hdp-train/class-2/p1$ javac -classpath /home/brian/usr/hadoop/hadoop-1.1.2/hadoop-core-1.1.2.jar  -d ./classes/ ./src/*.java
    3.6 打jar包:brian@brian-laptop:~/work/learn/hadoop/hdp-train/class-2/p1$ jar -cvf maxtemp.jar -C ./classes/ .
    3.7 用put命令将1901文件放到hdfs。
    3.8 执行:brian@brian-laptop:~/work/learn/hadoop/hdp-train/class-2/p1$ ~/usr/hadoop/hadoop-1.1.2/bin/hadoop jar maxtemp.jar  com.ifis.MaxTemperature 1901 o4
    3.9 查看结果:brian@brian-laptop:~/work/learn/hadoop/hdp-train/class-2$ ~/usr/hadoop/hadoop-1.1.2/bin/hadoop fs -cat o4/part-00000

4. 一个新版api的MapReducer。
    4.1 说明
    这个例子书里没有给出import的类和接口的细节。如果有些类或者接口不知道import,到这里找http://hadoop.apache.org/docs/current/api/
    从这里找到类名,然后点击进入,可以看到包的信息。
    如果是java的类和接口,在这里找http://www.javaweb.cc/JavaAPI1.6/
    注意,这个新的api里,各种类要import org.apache.hadoop.mapreduce包,而不是org.apache.hadoop.mapred包,如Context,Job等等。

    4.2 源代码
package com.ifis;import java.io.IOException;
import java.lang.InterruptedException;
import java.lang.Iterable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;public class NewMaxTemperature{static class MaxTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable>{private static final int MISSING = 9999;public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{String line = value.toString();String year = line.substring(15, 19);int airTemperature;if (line.charAt(87) == '+'){airTemperature = Integer.parseInt(line.substring(88, 92));}else{airTemperature = Integer.parseInt(line.substring(87, 92));}String quality = line.substring(92, 93);if (airTemperature != MISSING && quality.matches("[01459]")){context.write(new Text(year), new IntWritable(airTemperature));}}}static class MaxTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable>{public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException{int maxValue = Integer.MIN_VALUE;for(IntWritable v : values){maxValue = Math.max(maxValue, v.get());}context.write(key, new IntWritable(maxValue));}}public static void main(String[] args) throws Exception{if (args.length != 2){System.err.println("Usage NewMaxTemerapture <input path> <output path>");System.exit(-1);}Job job = new Job();job.setJarByClass(NewMaxTemperature.class);FileInputFormat.addInputPath(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));job.setMapperClass(MaxTemperatureMapper.class);job.setReducerClass(MaxTemperatureReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);System.exit(job.waitForCompletion(true)?0:1);}
}

4.3 编译: brian@brian-i3:~/all/work/learn/hadoop/hdp-train/class-2/p2$ javac -classpath /home/brian/usr/hadoop/hadoop-1.1.2/hadoop-core-1.1.2.jar -d ./classes ./src/*.java
4.4 打包: brian@brian-i3:~/all/work/learn/hadoop/hdp-train/class-2/p2$ jar -cvf newmaxtemp.jar -C ./classes/ .
4.5 将1901和1902两个文件put到hdfs上,分别更名为1901.dat和1902.dat
4.6 执行,从1901.dat和1902.dat这两个文件里找当年的最高温度: brian@brian-i3:~/all/work/learn/hadoop/hdp-train/class-2/p2$ ~/usr/hadoop/hadoop-1.1.2/bin/hadoop jar newmaxtemp.jar com.ifis.NewMaxTemperature 190*.dat o2
4.7 查看结果: brian@brian-i3:~/all/work/learn/hadoop/hdp-train/class-2/p2$ ~/usr/hadoop/hadoop-1.1.2/bin/hadoop fs -cat o2/part-r-00000

5. 将4的例子改造成mapper,reducer分开写的形式。
假设项目的目录是p3,此目录下的目录结构和文件结构如下:
|-- classes
    `-- src
        |-- MaxTemperature.java
        |-- MaxTemperatureMapper.java
        `-- MaxTemperatureReducer.java

5.1 源代码MaxTemperatureMapper.java
package com.ifis;import java.io.IOException;
import java.lang.InterruptedException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;public class MaxTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable>{private static final int MISSING = 9999;public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{String line = value.toString();String year = line.substring(15, 19);int airTemperature;if (line.charAt(87) == '+'){airTemperature = Integer.parseInt(line.substring(88, 92));}else{airTemperature = Integer.parseInt(line.substring(87, 92));}String quality = line.substring(92, 93);if (airTemperature != MISSING && quality.matches("[01459]")){context.write(new Text(year), new IntWritable(airTemperature));}}
}


5.2 源代码MaxTemperatureReducer.java
package com.ifis;import java.io.IOException;
import java.lang.InterruptedException;
import java.lang.Iterable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;public class MaxTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable>{public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException{int maxValue = Integer.MIN_VALUE;for(IntWritable v : values){maxValue = Math.max(maxValue, v.get());}context.write(key, new IntWritable(maxValue));}
}

5.3 源代码MaxTemperature.java
package com.ifis;import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;public class MaxTemperature{public static void main(String[] args) throws Exception{if (args.length != 2){System.err.println("Usage NewMaxTemerapture <input path> <output path>");System.exit(-1);}Job job = new Job();job.setJarByClass(MaxTemperature.class);FileInputFormat.addInputPath(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));job.setMapperClass(MaxTemperatureMapper.class);job.setReducerClass(MaxTemperatureReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);System.exit(job.waitForCompletion(true)?0:1);}
}

5.4 编译: brian@brian-i3:~/all/work/learn/hadoop/hdp-train/class-2/p3$ javac -classpath /home/brian/usr/hadoop/hadoop-1.1.2/hadoop-core-1.1.2.jar -d ./classes/ ./src/*.java
5.5 打包: brian@brian-i3:~/all/work/learn/hadoop/hdp-train/class-2/p3$ jar -cvf maxtemp.jar -C ./classes/ . 
5.6 要处理的数据文件1901.dat和1902.dat在上一步已经put到hdfs,这次不需要再做这个动作了。
5.7 执行: brian@brian-i3:~/all/work/learn/hadoop/hdp-train/class-2/p3$ ~/usr/hadoop/hadoop-1.1.2/bin/hadoop jar maxtemp.jar  com.ifis.MaxTemperature 190*.dat o3
5.8 查看执行结果: brian@brian-i3:~/all/work/learn/hadoop/hdp-train/class-2/p3$ ~/usr/hadoop/hadoop-1.1.2/bin/hadoop fs -cat o3/part-r-00000 

6. Combiner可以提高map阶段的效率。注意,求最大值和最小值可以通过combiner提升效率,但求平均值就不行了。

7. Hadoop的streaming,通过unix标准流的方式,允许用其他语言写MapReduce程序。具体细节在这里不讲了,知道有这个功能就行。我们主要关注java语言。

8. 请熟悉第5节的源代码形式,熟悉到可以手工默写出来,这是MapReducer的主要形式,对本文来说,能掌握这个就足够了。

这篇关于从零开始最短路径学习Hadoop之02----处理气象数据的第一个MapReduce程序的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1050870

相关文章

批量导入txt数据到的redis过程

《批量导入txt数据到的redis过程》用户通过将Redis命令逐行写入txt文件,利用管道模式运行客户端,成功执行批量删除以Product*匹配的Key操作,提高了数据清理效率... 目录批量导入txt数据到Redisjs把redis命令按一条 一行写到txt中管道命令运行redis客户端成功了批量删除k

Java使用Thumbnailator库实现图片处理与压缩功能

《Java使用Thumbnailator库实现图片处理与压缩功能》Thumbnailator是高性能Java图像处理库,支持缩放、旋转、水印添加、裁剪及格式转换,提供易用API和性能优化,适合Web应... 目录1. 图片处理库Thumbnailator介绍2. 基本和指定大小图片缩放功能2.1 图片缩放的

SpringBoot多环境配置数据读取方式

《SpringBoot多环境配置数据读取方式》SpringBoot通过环境隔离机制,支持properties/yaml/yml多格式配置,结合@Value、Environment和@Configura... 目录一、多环境配置的核心思路二、3种配置文件格式详解2.1 properties格式(传统格式)1.

解决pandas无法读取csv文件数据的问题

《解决pandas无法读取csv文件数据的问题》本文讲述作者用Pandas读取CSV文件时因参数设置不当导致数据错位,通过调整delimiter和on_bad_lines参数最终解决问题,并强调正确参... 目录一、前言二、问题复现1. 问题2. 通过 on_bad_lines=‘warn’ 跳过异常数据3

Python进行JSON和Excel文件转换处理指南

《Python进行JSON和Excel文件转换处理指南》在数据交换与系统集成中,JSON与Excel是两种极为常见的数据格式,本文将介绍如何使用Python实现将JSON转换为格式化的Excel文件,... 目录将 jsON 导入为格式化 Excel将 Excel 导出为结构化 JSON处理嵌套 JSON:

python设置环境变量路径实现过程

《python设置环境变量路径实现过程》本文介绍设置Python路径的多种方法:临时设置(Windows用`set`,Linux/macOS用`export`)、永久设置(系统属性或shell配置文件... 目录设置python路径的方法临时设置环境变量(适用于当前会话)永久设置环境变量(Windows系统

Spring Boot 中的默认异常处理机制及执行流程

《SpringBoot中的默认异常处理机制及执行流程》SpringBoot内置BasicErrorController,自动处理异常并生成HTML/JSON响应,支持自定义错误路径、配置及扩展,如... 目录Spring Boot 异常处理机制详解默认错误页面功能自动异常转换机制错误属性配置选项默认错误处理

C#监听txt文档获取新数据方式

《C#监听txt文档获取新数据方式》文章介绍通过监听txt文件获取最新数据,并实现开机自启动、禁用窗口关闭按钮、阻止Ctrl+C中断及防止程序退出等功能,代码整合于主函数中,供参考学习... 目录前言一、监听txt文档增加数据二、其他功能1. 设置开机自启动2. 禁止控制台窗口关闭按钮3. 阻止Ctrl +

java如何实现高并发场景下三级缓存的数据一致性

《java如何实现高并发场景下三级缓存的数据一致性》这篇文章主要为大家详细介绍了java如何实现高并发场景下三级缓存的数据一致性,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 下面代码是一个使用Java和Redisson实现的三级缓存服务,主要功能包括:1.缓存结构:本地缓存:使

SpringBoot 异常处理/自定义格式校验的问题实例详解

《SpringBoot异常处理/自定义格式校验的问题实例详解》文章探讨SpringBoot中自定义注解校验问题,区分参数级与类级约束触发的异常类型,建议通过@RestControllerAdvice... 目录1. 问题简要描述2. 异常触发1) 参数级别约束2) 类级别约束3. 异常处理1) 字段级别约束