Hadoop确实是处理海量离线数据的利器

2024-03-18 20:32

本文主要是介绍Hadoop确实是处理海量离线数据的利器,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

不得不说,Hadoop确实是处理海量离线数据的利器,当然,凡是一个东西有优点必定也有缺点,hadoop的缺点也很多,比如对流式计算,实时计算,DAG具有依赖关系的计算,支持都不友好,所以,由此诞生了很多新的分布式计算框架,Storm,Spark,Tez,impala,drill,等等,他们都是针对特定问题提出一种解决方案,新框架的的兴起,并不意味者他们就可以替代hadoop,一手独大,HDFS和MapReduce依旧是很优秀的,特别是对离线海量数据的处理。

hadoop在如下的几种应用场景里,用的还是非常广泛的,1,搜索引擎建索引,2,topK热关键词统计,3,海量日志的数据分析等等。
散仙,今天的这个例子的场景要对几亿的单词或短语做统计和并按词频排序,当然这些需求都是类似WordCount问题,如果你把Hadoop自带的WordCount的例子,给搞懂了,基本上做一些IP,热词的统计与分析就很很容易了,WordCount问题,确实是一个非常具有代表性的例子。


下面进入正题,先来分析下散仙这个例子的需求,总共需要二步来完成,第一步就是对短语的统计,第二步就是对结果集的排序。所以如果使用MapReduce来完成的话,就得需要2个作业来完成这件事情,第一个作业来统计词频,第二个来负责进行排序,当然这两者之间是有依赖关系的,第二个作业的执行,需要依赖第一个作业的结果,这就是典型的M,R,R的问题并且作业之间具有依赖关系,这种问题使用MapReduce来完成,效率可能有点低,如果使用支持DAG作业的Tez来做这件事情,那么就很简单了。不过本篇散仙,要演示的例子还是基于MapReduce来完成的,有兴趣的朋友,可以研究一下使用Tez。


对于第一个作业,我们只需要改写wordcount的例子,即可,因为散仙的需求里面涉及短语的统计,所以定义的格式为,短语和短语之间使用分号隔开,(默认的格式是按单词统计的,以空格为分割符)在map时只需要,按分号打散成数组,进行处理即可,测试的数据内容如下:



map里面的核心代码如下:

Java代码 复制代码  收藏代码
  1. /** 
  2.    * 统计词频的map端 
  3.    * 代码 
  4.    *  
  5.    * **/  
  6.   public void map(Object key, Text value, Context context  
  7.                   ) throws IOException, InterruptedException {  
  8.    
  9.     String [] data=value.toString().split(";");//按每行的分号拆分短语  
  10.     for(String s:data){  
  11.         if(s.trim().length()>0){//忽略空字符  
  12.         word.set(s);  
  13.         context.write(word, one);  
  14.         }  
  15.     }  
  16.   
  17.   }  
  /*** 统计词频的map端* 代码* * **/public void map(Object key, Text value, Context context) throws IOException, InterruptedException {String [] data=value.toString().split(";");//按每行的分号拆分短语for(String s:data){if(s.trim().length()>0){//忽略空字符word.set(s);context.write(word, one);}}}



reduce端的核心代码如下:

Java代码 复制代码  收藏代码
  1. /** 
  2.      * reduce端的 
  3.      * 代码 
  4.      * **/  
  5.     public void reduce(Text key, Iterable<IntWritable> values,   
  6.                        Context context  
  7.                        ) throws IOException, InterruptedException {  
  8.       int sum = 0;  
  9.       for (IntWritable val : values) {  
  10.         sum += val.get();//累加词频  
  11.       }  
  12.       result.set(sum);  
  13.       context.write(new Text(key.toString()+"::"), result);//为方便短语排序,以双冒号分隔符间隔  
  14.     }  
  15.   }  
/*** reduce端的* 代码* **/public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {int sum = 0;for (IntWritable val : values) {sum += val.get();//累加词频}result.set(sum);context.write(new Text(key.toString()+"::"), result);//为方便短语排序,以双冒号分隔符间隔}}


main函数里面的代码如下:

Java代码 复制代码  收藏代码
  1. public static void main(String[] args) throws Exception {  
  2.   Configuration conf = new Configuration();  
  3.   String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();  
  4.   if (otherArgs.length != 2) {  
  5.     System.err.println("Usage: wordcount <in> <out>");  
  6.     System.exit(2);  
  7.   }  
  8.   Job job = new Job(conf, "word count");  
  9.   job.setJarByClass(WordCount.class);  
  10.   job.setMapperClass(TokenizerMapper.class);  
  11.   job.setCombinerClass(IntSumReducer.class);  
  12.   job.setReducerClass(IntSumReducer.class);  
  13.   job.setOutputKeyClass(Text.class);  
  14.   job.setOutputValueClass(IntWritable.class);  
  15.   FileInputFormat.addInputPath(job, new Path(otherArgs[0]));  
  16.   FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));  
  17.   System.exit(job.waitForCompletion(true) ? 0 : 1);  
  18. }  
  public static void main(String[] args) throws Exception {Configuration conf = new Configuration();String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();if (otherArgs.length != 2) {System.err.println("Usage: wordcount <in> <out>");System.exit(2);}Job job = new Job(conf, "word count");job.setJarByClass(WordCount.class);job.setMapperClass(TokenizerMapper.class);job.setCombinerClass(IntSumReducer.class);job.setReducerClass(IntSumReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);FileInputFormat.addInputPath(job, new Path(otherArgs[0]));FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));System.exit(job.waitForCompletion(true) ? 0 : 1);}


运行结果,如下所示:

Java代码 复制代码  收藏代码
  1. a good student::    1  
  2. good student::  3  
  3. patient::   2  
  4. patient a:: 1  
a good student::	1
good student::	3
patient::	2
patient a::	1



下面,散仙来分析下排序作业的代码,如上图所示hadoop默认的排序,是基于key排序的,如果是字符类型的则基于字典表排序,如果是数值类型的则基于数字大小排序,两种方式都是按默认的升序排列的,如果想要降序输出,就需要我们自己写个排序组件了,散仙会在下面的代码给出例子,因为我们是要基于词频排序的,所以需要反转K,V来实现对词频的排序,map端代码如下:

Java代码 复制代码  收藏代码
  1. /** 
  2.          * 排序作业 
  3.          * map的实现 
  4.          *  
  5.          * **/  
  6.         @Override  
  7.         protected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException {  
  8.             String s[]=value.toString().split("::");//按两个冒号拆分每行数据  
  9.             word.set(s[0]);//  
  10.             one.set(Integer.parseInt(s[1].trim()));//  
  11.             context.write(one, word);//注意,此部分,需要反转K,V顺序  
  12.         }  
/*** 排序作业* map的实现* * **/@Overrideprotected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException {String s[]=value.toString().split("::");//按两个冒号拆分每行数据word.set(s[0]);//one.set(Integer.parseInt(s[1].trim()));//context.write(one, word);//注意,此部分,需要反转K,V顺序}


reduce端代码如下:

Java代码 复制代码  收藏代码
  1. /*** 
  2.   *  
  3.   * 排序作业的 
  4.   * reduce代码 
  5.   * **/       
  6.         @Override  
  7.         protected void reduce(IntWritable arg0, Iterable<Text> arg1, Context arg2)  
  8.                 throws IOException, InterruptedException {  
  9.             for(Text t:arg1){  
  10.                 result.set(t.toString());  
  11.                  arg2.write(result, arg0);  
  12.             }  
  13.         }  
/**** * 排序作业的* reduce代码* **/		@Overrideprotected void reduce(IntWritable arg0, Iterable<Text> arg1, Context arg2)throws IOException, InterruptedException {for(Text t:arg1){result.set(t.toString());arg2.write(result, arg0);}}




下面,我们再来看下排序组件的代码:

Java代码 复制代码  收藏代码
  1. /*** 
  2.  * 按词频降序排序 
  3.  * 的类 
  4.  *  
  5.  * **/  
  6.     public static class DescSort extends  WritableComparator{  
  7.   
  8.          public DescSort() {  
  9.              super(IntWritable.class,true);//注册排序组件  
  10.         }  
  11.          @Override  
  12.         public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3,  
  13.                 int arg4, int arg5) {  
  14.             return -super.compare(arg0, arg1, arg2, arg3, arg4, arg5);//注意使用负号来完成降序  
  15.         }  
  16.            
  17.          @Override  
  18.         public int compare(Object a, Object b) {  
  19.        
  20.             return   -super.compare(a, b);//注意使用负号来完成降序  
  21.         }  
  22.           
  23.     }  
/**** 按词频降序排序* 的类* * **/public static class DescSort extends  WritableComparator{public DescSort() {super(IntWritable.class,true);//注册排序组件}@Overridepublic int compare(byte[] arg0, int arg1, int arg2, byte[] arg3,int arg4, int arg5) {return -super.compare(arg0, arg1, arg2, arg3, arg4, arg5);//注意使用负号来完成降序}@Overridepublic int compare(Object a, Object b) {return   -super.compare(a, b);//注意使用负号来完成降序}}



main方法里面的实现代码如下所示:

Java代码 复制代码  收藏代码
  1. public static void main(String[] args) throws Exception{  
  2.           
  3.           
  4.           
  5.           Configuration conf = new Configuration();  
  6.             String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();  
  7.             if (otherArgs.length != 2) {  
  8.                 System.err.println("Usage: wordcount <in> <out>");  
  9.                 System.exit(2);  
  10.               }  
  11.            Job job=new Job(conf, "sort");  
  12.            job.setOutputKeyClass(IntWritable.class);  
  13.            job.setOutputValueClass(Text.class);  
  14.            job.setMapperClass(SortIntValueMapper.class);  
  15.            job.setReducerClass(SortIntValueReducer.class)   ;  
  16.            job.setSortComparatorClass(DescSort.class);//加入排序组件  
  17.            job.setInputFormatClass(org.apache.hadoop.mapreduce.lib.input.TextInputFormat.class);  
  18.            job.setOutputFormatClass(TextOutputFormat.class);  
  19.            FileInputFormat.setInputPaths(job, new Path(args[0]));  
  20.            FileOutputFormat.setOutputPath(job, new Path(args[1]));  
  21.            
  22.            System.exit(job.waitForCompletion(true) ? 0 : 1);  
  23.     }  
public static void main(String[] args) throws Exception{Configuration conf = new Configuration();String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();if (otherArgs.length != 2) {System.err.println("Usage: wordcount <in> <out>");System.exit(2);}Job job=new Job(conf, "sort");job.setOutputKeyClass(IntWritable.class);job.setOutputValueClass(Text.class);job.setMapperClass(SortIntValueMapper.class);job.setReducerClass(SortIntValueReducer.class)	;job.setSortComparatorClass(DescSort.class);//加入排序组件job.setInputFormatClass(org.apache.hadoop.mapreduce.lib.input.TextInputFormat.class);job.setOutputFormatClass(TextOutputFormat.class);FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));System.exit(job.waitForCompletion(true) ? 0 : 1);}



输出结果,如下所示:

Java代码 复制代码  收藏代码
  1. good student    3  
  2. patient 2  
  3. a good student  1  
  4. patient a   1  
good student	3
patient	2
a good student	1
patient a	1



至此,我们可以成功实现,统计并排序的业务,当然这种类型的需求非常多而且常见,如对某个海量日志IP的分析,散仙上面的例子使用的只是测试的数据,而真实数据是对几亿或几十亿的短语构建语料库使用,配置集群方面,可以根据自己的需求,配置集群的节点个数以及map,reduce的个数,而代码,只需要我们写好,提交给hadoop集群执行即可。

最后在简单总结一下,数据处理过程中,格式是需要提前定制好的,也就是说你得很清楚的你的格式代表什么意思,另外一点,关于hadoop的中文编码问题,这个是内部固定的UTF-8格式,如果你是GBK的文件编码,则需要自己单独在map或reduce过程中处理一下,否则输出的结果可能是乱码,最好的方法就是统一成UTF-8格式,否则,很容易出现一些编码问题的。

这篇关于Hadoop确实是处理海量离线数据的利器的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/823555

相关文章

SpringBoot多环境配置数据读取方式

《SpringBoot多环境配置数据读取方式》SpringBoot通过环境隔离机制,支持properties/yaml/yml多格式配置,结合@Value、Environment和@Configura... 目录一、多环境配置的核心思路二、3种配置文件格式详解2.1 properties格式(传统格式)1.

解决pandas无法读取csv文件数据的问题

《解决pandas无法读取csv文件数据的问题》本文讲述作者用Pandas读取CSV文件时因参数设置不当导致数据错位,通过调整delimiter和on_bad_lines参数最终解决问题,并强调正确参... 目录一、前言二、问题复现1. 问题2. 通过 on_bad_lines=‘warn’ 跳过异常数据3

Python进行JSON和Excel文件转换处理指南

《Python进行JSON和Excel文件转换处理指南》在数据交换与系统集成中,JSON与Excel是两种极为常见的数据格式,本文将介绍如何使用Python实现将JSON转换为格式化的Excel文件,... 目录将 jsON 导入为格式化 Excel将 Excel 导出为结构化 JSON处理嵌套 JSON:

Spring Boot 中的默认异常处理机制及执行流程

《SpringBoot中的默认异常处理机制及执行流程》SpringBoot内置BasicErrorController,自动处理异常并生成HTML/JSON响应,支持自定义错误路径、配置及扩展,如... 目录Spring Boot 异常处理机制详解默认错误页面功能自动异常转换机制错误属性配置选项默认错误处理

C#监听txt文档获取新数据方式

《C#监听txt文档获取新数据方式》文章介绍通过监听txt文件获取最新数据,并实现开机自启动、禁用窗口关闭按钮、阻止Ctrl+C中断及防止程序退出等功能,代码整合于主函数中,供参考学习... 目录前言一、监听txt文档增加数据二、其他功能1. 设置开机自启动2. 禁止控制台窗口关闭按钮3. 阻止Ctrl +

java如何实现高并发场景下三级缓存的数据一致性

《java如何实现高并发场景下三级缓存的数据一致性》这篇文章主要为大家详细介绍了java如何实现高并发场景下三级缓存的数据一致性,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 下面代码是一个使用Java和Redisson实现的三级缓存服务,主要功能包括:1.缓存结构:本地缓存:使

SpringBoot 异常处理/自定义格式校验的问题实例详解

《SpringBoot异常处理/自定义格式校验的问题实例详解》文章探讨SpringBoot中自定义注解校验问题,区分参数级与类级约束触发的异常类型,建议通过@RestControllerAdvice... 目录1. 问题简要描述2. 异常触发1) 参数级别约束2) 类级别约束3. 异常处理1) 字段级别约束

在MySQL中实现冷热数据分离的方法及使用场景底层原理解析

《在MySQL中实现冷热数据分离的方法及使用场景底层原理解析》MySQL冷热数据分离通过分表/分区策略、数据归档和索引优化,将频繁访问的热数据与冷数据分开存储,提升查询效率并降低存储成本,适用于高并发... 目录实现冷热数据分离1. 分表策略2. 使用分区表3. 数据归档与迁移在mysql中实现冷热数据分

C#解析JSON数据全攻略指南

《C#解析JSON数据全攻略指南》这篇文章主要为大家详细介绍了使用C#解析JSON数据全攻略指南,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录一、为什么jsON是C#开发必修课?二、四步搞定网络JSON数据1. 获取数据 - HttpClient最佳实践2. 动态解析 - 快速

Java堆转储文件之1.6G大文件处理完整指南

《Java堆转储文件之1.6G大文件处理完整指南》堆转储文件是优化、分析内存消耗的重要工具,:本文主要介绍Java堆转储文件之1.6G大文件处理的相关资料,文中通过代码介绍的非常详细,需要的朋友可... 目录前言文件为什么这么大?如何处理这个文件?分析文件内容(推荐)删除文件(如果不需要)查看错误来源如何避