序列化案例实操(统计每一个手机号耗费的总上行流量、总下行流量、总流量)

本文主要是介绍序列化案例实操(统计每一个手机号耗费的总上行流量、总下行流量、总流量),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

  • 序列化概述
  • 自定义bean对象实现序列化接口(Writable)
  • 案例需求
  • 编写MapReduce程序
  • 运行结果


序列化概述

序列化就是把内存中的对象,转换成字节序列(或其他数据传输协议)以便于存储到磁盘(持久化)和网络传输。
反序列化就是将收到字节序列(或其他数据传输协议)或者是磁盘的持久化数据,转换成内存中的对象。

自定义bean对象实现序列化接口(Writable)

在企业开发中往往常用的基本序列化类型不能满足所有需求,比如在Hadoop框架内部传递一个bean对象,那么该对象就需要实现序列化接口。
具体实现bean对象序列化步骤如下7步:
(1)必须实现Writable接口
(2)反序列化时,需要反射调用空参构造函数,所以必须有空参构造

public FlowBean() {super();
}

(3)重写序列化方法

@Override
public void write(DataOutput out) throws IOException {out.writeLong(upFlow);out.writeLong(downFlow);out.writeLong(sumFlow);
}

(4)重写反序列化方法

@Override
public void readFields(DataInput in) throws IOException {upFlow = in.readLong();downFlow = in.readLong();sumFlow = in.readLong();
}

(5)注意反序列化的顺序和序列化的顺序完全一致
(6)要想把结果显示在文件中,需要重写toString(),可用"\t"分开,方便后续用。
(7)如果需要将自定义的bean放在key中传输,则还需要实现Comparable接口,因为MapReduce框中的Shuffle过程要求对key必须能排序。

@Override
public int compareTo(FlowBean o) {// 倒序排列,从大到小return this.sumFlow > o.getSumFlow() ? -1 : 1;
}

案例需求

统计每一个手机号耗费的总上行流量、总下行流量、总流量
输入总数据:
在这里插入图片描述
输入数据格式:
7 13560436666 120.196.100.99 1116 954 200
id 手机号码 网络ip 上行流量 下行流量 网络状态码
期望输出数据格式:
13560436666 1116 954 2070
手机号码 上行流量 下行流量 总流量

编写MapReduce程序

在这里插入图片描述
FlowBean:

package com.atxiaoyu.xuliehua;import org.apache.hadoop.io.Writable;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;public class FlowBean implements Writable {private long upFlow; //上行流量private long downFlow; //下行流量private long sumFlow; //总流量//空参构造public FlowBean() {}public long getUpFlow() {return upFlow;}public void setUpFlow(long upFlow) {this.upFlow = upFlow;}public long getDownFlow() {return downFlow;}public void setDownFlow(long downFlow) {this.downFlow = downFlow;}public long getSumFlow() {return sumFlow;}public void setSumFlow(long sumFlow) {this.sumFlow = sumFlow;}public void setSumFlow() {this.sumFlow = this.upFlow+this.downFlow;}@Overridepublic void write(DataOutput out) throws IOException {out.writeLong(upFlow);out.writeLong(downFlow);out.writeLong(sumFlow);}@Overridepublic void readFields(DataInput in) throws IOException {this.upFlow=in.readLong();this.downFlow=in.readLong();this.sumFlow=in.readLong();}@Overridepublic String toString() {return upFlow+"\t"+downFlow+"\t"+sumFlow;}
}

FlowMapper:

package com.atxiaoyu.xuliehua;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class FlowMapper extends Mapper<LongWritable, Text,Text,FlowBean> {private  Text outK=new Text();private  FlowBean outV=new FlowBean();@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {// 获取一行String line=value.toString();//切割String[] split=line.split("\t");//抓取想要的数据String phone=split[1];String up=split[split.length-3]; //上行流量String down=split[split.length-2]; //下行流量//封装outK.set(phone);outV.setUpFlow(Long.parseLong(up));outV.setDownFlow(Long.parseLong(down));outV.setSumFlow();// 写出context.write(outK,outV);}
}

FlowReducer:

package com.atxiaoyu.xuliehua;import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class FlowReducer extends Reducer<Text,FlowBean,Text,FlowBean> {private  FlowBean outV=new FlowBean();@Overrideprotected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {//遍历集合累加值long totalUp=0;long totalDown=0;for (FlowBean value : values) {totalUp=totalUp+value.getUpFlow();totalDown=totalUp+value.getDownFlow();//封装outK,outVoutV.setUpFlow(totalUp);outV.setDownFlow(totalDown);outV.setSumFlow();//写出context.write(key,outV);}}
}

FlowDriver:

package com.atxiaoyu.xuliehua;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.kerby.config.Conf;import java.io.IOException;public class FlowDriver {public static void main(String[] args) throws InterruptedException, IOException, ClassNotFoundException {Configuration conf = new Configuration();//1 获取jobJob job = Job.getInstance(conf);//2 设置jar包路径job.setJarByClass(FlowDriver.class);// 3 管理mapper和reducerjob.setMapperClass(FlowMapper.class);job.setReducerClass(FlowReducer.class);// 4 设置map输出的kv类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(FlowBean.class);//5 设置最终输出的kv类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);//6 设置输入路径和输出路径FileInputFormat.setInputPaths(job, new Path("D:\\input"));FileOutputFormat.setOutputPath(job, new Path("D:\\output"));//7 提交jobboolean result = job.waitForCompletion(true);System.exit(result ? 0 : 1);}
}

运行结果

在这里插入图片描述

在这里插入图片描述
与我们设想的输出结果一致。

这篇关于序列化案例实操(统计每一个手机号耗费的总上行流量、总下行流量、总流量)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/878692

相关文章

SQL Server跟踪自动统计信息更新实战指南

《SQLServer跟踪自动统计信息更新实战指南》本文详解SQLServer自动统计信息更新的跟踪方法,推荐使用扩展事件实时捕获更新操作及详细信息,同时结合系统视图快速检查统计信息状态,重点强调修... 目录SQL Server 如何跟踪自动统计信息更新:深入解析与实战指南 核心跟踪方法1️⃣ 利用系统目录

RabbitMQ消费端单线程与多线程案例讲解

《RabbitMQ消费端单线程与多线程案例讲解》文章解析RabbitMQ消费端单线程与多线程处理机制,说明concurrency控制消费者数量,max-concurrency控制最大线程数,prefe... 目录 一、基础概念详细解释:举个例子:✅ 单消费者 + 单线程消费❌ 单消费者 + 多线程消费❌ 多

Olingo分析和实践之EDM 辅助序列化器详解(最佳实践)

《Olingo分析和实践之EDM辅助序列化器详解(最佳实践)》EDM辅助序列化器是ApacheOlingoOData框架中无需完整EDM模型的智能序列化工具,通过运行时类型推断实现灵活数据转换,适用... 目录概念与定义什么是 EDM 辅助序列化器?核心概念设计目标核心特点1. EDM 信息可选2. 智能类

MySql基本查询之表的增删查改+聚合函数案例详解

《MySql基本查询之表的增删查改+聚合函数案例详解》本文详解SQL的CURD操作INSERT用于数据插入(单行/多行及冲突处理),SELECT实现数据检索(列选择、条件过滤、排序分页),UPDATE... 目录一、Create1.1 单行数据 + 全列插入1.2 多行数据 + 指定列插入1.3 插入否则更

SpringSecurity整合redission序列化问题小结(最新整理)

《SpringSecurity整合redission序列化问题小结(最新整理)》文章详解SpringSecurity整合Redisson时的序列化问题,指出需排除官方Jackson依赖,通过自定义反序... 目录1. 前言2. Redission配置2.1 RedissonProperties2.2 Red

Python通用唯一标识符模块uuid使用案例详解

《Python通用唯一标识符模块uuid使用案例详解》Pythonuuid模块用于生成128位全局唯一标识符,支持UUID1-5版本,适用于分布式系统、数据库主键等场景,需注意隐私、碰撞概率及存储优... 目录简介核心功能1. UUID版本2. UUID属性3. 命名空间使用场景1. 生成唯一标识符2. 数

PostgreSQL的扩展dict_int应用案例解析

《PostgreSQL的扩展dict_int应用案例解析》dict_int扩展为PostgreSQL提供了专业的整数文本处理能力,特别适合需要精确处理数字内容的搜索场景,本文给大家介绍PostgreS... 目录PostgreSQL的扩展dict_int一、扩展概述二、核心功能三、安装与启用四、字典配置方法

Python中re模块结合正则表达式的实际应用案例

《Python中re模块结合正则表达式的实际应用案例》Python中的re模块是用于处理正则表达式的强大工具,正则表达式是一种用来匹配字符串的模式,它可以在文本中搜索和匹配特定的字符串模式,这篇文章主... 目录前言re模块常用函数一、查看文本中是否包含 A 或 B 字符串二、替换多个关键词为统一格式三、提

在Linux终端中统计非二进制文件行数的实现方法

《在Linux终端中统计非二进制文件行数的实现方法》在Linux系统中,有时需要统计非二进制文件(如CSV、TXT文件)的行数,而不希望手动打开文件进行查看,例如,在处理大型日志文件、数据文件时,了解... 目录在linux终端中统计非二进制文件的行数技术背景实现步骤1. 使用wc命令2. 使用grep命令

Python get()函数用法案例详解

《Pythonget()函数用法案例详解》在Python中,get()是字典(dict)类型的内置方法,用于安全地获取字典中指定键对应的值,它的核心作用是避免因访问不存在的键而引发KeyError错... 目录简介基本语法一、用法二、案例:安全访问未知键三、案例:配置参数默认值简介python是一种高级编