【MapReduce】MapReduce清洗共享单车数据

2024-03-06 18:59

本文主要是介绍【MapReduce】MapReduce清洗共享单车数据,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

MapReduce清洗共享单车数据

  • 数据
  • 代码实现
    • 自定义类
    • Mapper阶段
    • 自定义outputFormat
    • 自定义RecordWriter
    • Driver阶段
  • 结果

数据

点击下载数据
在这里插入图片描述
所对应的字段分别是:结束时间、车俩id、出发地、目的地、所在城市、开始经度,开始纬度、结束经度,结束维度

  • 需求
    去掉空数据或者NA的
    将时间格式转换成2017年7月1日 00:45
    计算所跨越的经纬度
    按照所在城市将数据进行分类存储,再同一类数据中,按照车俩的id进行升序排序

代码实现

自定义类

import org.apache.hadoop.io.WritableComparable;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;public class JavaBean implements WritableComparable<JavaBean> {private String startTime;private String endTime;private int id;private String start_loc;private String end_loc;private String city;private double longitude;private double latitiude;public int compareTo(JavaBean o) {return -(o.id - this.id);}public void write(DataOutput dataOutput) throws IOException {dataOutput.writeUTF(startTime);dataOutput.writeUTF(endTime);dataOutput.writeInt(id);dataOutput.writeUTF(start_loc);dataOutput.writeUTF(end_loc);dataOutput.writeUTF(city);dataOutput.writeDouble(longitude);dataOutput.writeDouble(latitiude);}public void readFields(DataInput dataInput) throws IOException {startTime = dataInput.readUTF();endTime = dataInput.readUTF();id = dataInput.readInt();start_loc = dataInput.readUTF();end_loc = dataInput.readUTF();city = dataInput.readUTF();longitude = dataInput.readDouble();latitiude = dataInput.readDouble();}public void set(String startTime, String endTime, int id, String start_loc, String end_loc, String city, double longitude, double latitiude) {this.startTime = startTime;this.endTime = endTime;this.id = id;this.start_loc = start_loc;this.end_loc = end_loc;this.city = city;this.longitude = longitude;this.latitiude = latitiude;}@Overridepublic String toString() {return startTime + '\t' +endTime + '\t' +id + "\t" +start_loc + '\t' +end_loc + '\t' +city + '\t' +longitude + "\t" +latitiude;}public String getStartTime() {return startTime;}public void setStartTime(String startTime) {this.startTime = startTime;}public String getEndTime() {return endTime;}public void setEndTime(String endTime) {this.endTime = endTime;}public int getId() {return id;}public void setId(int id) {this.id = id;}public String getStart_loc() {return start_loc;}public void setStart_loc(String start_loc) {this.start_loc = start_loc;}public String getEnd_loc() {return end_loc;}public void setEnd_loc(String end_loc) {this.end_loc = end_loc;}public String getCity() {return city;}public void setCity(String city) {this.city = city;}public double getLongitude() {return longitude;}public void setLongitude(double longitude) {this.longitude = longitude;}public double getLatitiude() {return latitiude;}public void setLatitiude(double latitiude) {this.latitiude = latitiude;}
}

Mapper阶段

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;public class MapTest extends Mapper<LongWritable, Text, JavaBean, NullWritable> {JavaBean k = new JavaBean();SimpleDateFormat simpleDateFormat1 = new SimpleDateFormat("MM/dd/yyyy HH:mm");SimpleDateFormat simpleDateFormat2 = new SimpleDateFormat("yyyy-MM-dd HH:mm");Date date1, date2;String time1 = null;String time2 = null;Double longitude, latitiude;@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String datas[] = value.toString().split("\t", -1);for (String str : datas) {if ("".equals(str) || str == null || "NA".equalsIgnoreCase(str)) return;}try {date1 = simpleDateFormat1.parse(datas[1]);time1 = simpleDateFormat2.format(date1);date2 = simpleDateFormat1.parse(datas[2]);time2 = simpleDateFormat2.format(date2);} catch (ParseException e) {e.printStackTrace();}longitude = Double.parseDouble(datas[8]) - Double.parseDouble(datas[7]);latitiude = Double.parseDouble(datas[10]) - Double.parseDouble(datas[9]);k.set(time1, time2, Integer.parseInt(datas[3]), datas[4], datas[5], datas[6], longitude, latitiude);context.write(k, NullWritable.get());}
}

自定义outputFormat

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class MyOutputFormat extends FileOutputFormat<JavaBean, NullWritable> {public RecordWriter<JavaBean, NullWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {return new MyRecordWriter(job);}
}

自定义RecordWriter

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;public class MyRecordWriter extends RecordWriter<JavaBean, NullWritable> {BufferedWriter bw;public MyRecordWriter(TaskAttemptContext taskAttemptContext) {}public void write(JavaBean key, NullWritable value) throws IOException, InterruptedException {String city = key.getCity();String path = "D:\\MP\\共享单车\\output1\\" + city + ".txt";bw = new BufferedWriter(new FileWriter(path, true));bw.write(key.toString());bw.write("\n");bw.flush();}public void close(TaskAttemptContext context) throws IOException, InterruptedException {bw.close();}
}

Driver阶段

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.File;public class DriTest {public static void main(String[] args) throws Exception {java.io.File file = new java.io.File("D:\\MP\\共享单车\\output2");if (file.exists()) {delFile(file);driver();} else {driver();}}public static void delFile(java.io.File file) {File[] files = file.listFiles();if (files != null && files.length != 0) {for (int i = 0; i < files.length; i++) {delFile(files[i]);}}file.delete();}public static void driver() throws Exception {Configuration conf = new Configuration();
//        conf.set("fs.default","hdfs://192.168.0.155:9000");Job job = Job.getInstance(conf);job.setJarByClass(DriTest.class);job.setMapperClass(MapTest.class);job.setMapOutputKeyClass(JavaBean.class);job.setMapOutputValueClass(NullWritable.class);job.setOutputFormatClass(MyOutputFormat.class);FileInputFormat.setInputPaths(job, "D:\\MP\\共享单车\\input\\dataResources.txt");FileOutputFormat.setOutputPath(job, new Path("D:\\MP\\共享单车\\output2"));boolean b = job.waitForCompletion(true);System.exit(b ? 0 : 1);}
}

结果

分类成功
在这里插入图片描述
id升序
在这里插入图片描述

这篇关于【MapReduce】MapReduce清洗共享单车数据的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/781016

相关文章

Linux下利用select实现串口数据读取过程

《Linux下利用select实现串口数据读取过程》文章介绍Linux中使用select、poll或epoll实现串口数据读取,通过I/O多路复用机制在数据到达时触发读取,避免持续轮询,示例代码展示设... 目录示例代码(使用select实现)代码解释总结在 linux 系统里,我们可以借助 select、

Linux挂载linux/Windows共享目录实现方式

《Linux挂载linux/Windows共享目录实现方式》:本文主要介绍Linux挂载linux/Windows共享目录实现方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地... 目录文件共享协议linux环境作为服务端(NFS)在服务器端安装 NFS创建要共享的目录修改 NFS 配

C#使用iText获取PDF的trailer数据的代码示例

《C#使用iText获取PDF的trailer数据的代码示例》开发程序debug的时候,看到了PDF有个trailer数据,挺有意思,于是考虑用代码把它读出来,那么就用到我们常用的iText框架了,所... 目录引言iText 核心概念C# 代码示例步骤 1: 确保已安装 iText步骤 2: C# 代码程

Pandas处理缺失数据的方式汇总

《Pandas处理缺失数据的方式汇总》许多教程中的数据与现实世界中的数据有很大不同,现实世界中的数据很少是干净且同质的,本文我们将讨论处理缺失数据的一些常规注意事项,了解Pandas如何表示缺失数据,... 目录缺失数据约定的权衡Pandas 中的缺失数据None 作为哨兵值NaN:缺失的数值数据Panda

C++中处理文本数据char与string的终极对比指南

《C++中处理文本数据char与string的终极对比指南》在C++编程中char和string是两种用于处理字符数据的类型,但它们在使用方式和功能上有显著的不同,:本文主要介绍C++中处理文本数... 目录1. 基本定义与本质2. 内存管理3. 操作与功能4. 性能特点5. 使用场景6. 相互转换核心区别

k8s搭建nfs共享存储实践

《k8s搭建nfs共享存储实践》本文介绍NFS服务端搭建与客户端配置,涵盖安装工具、目录设置及服务启动,随后讲解K8S中NFS动态存储部署,包括创建命名空间、ServiceAccount、RBAC权限... 目录1. NFS搭建1.1 部署NFS服务端1.1.1 下载nfs-utils和rpcbind1.1

python库pydantic数据验证和设置管理库的用途

《python库pydantic数据验证和设置管理库的用途》pydantic是一个用于数据验证和设置管理的Python库,它主要利用Python类型注解来定义数据模型的结构和验证规则,本文给大家介绍p... 目录主要特点和用途:Field数值验证参数总结pydantic 是一个让你能够 confidentl

JAVA实现亿级千万级数据顺序导出的示例代码

《JAVA实现亿级千万级数据顺序导出的示例代码》本文主要介绍了JAVA实现亿级千万级数据顺序导出的示例代码,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面... 前提:主要考虑控制内存占用空间,避免出现同时导出,导致主程序OOM问题。实现思路:A.启用线程池

SpringBoot分段处理List集合多线程批量插入数据方式

《SpringBoot分段处理List集合多线程批量插入数据方式》文章介绍如何处理大数据量List批量插入数据库的优化方案:通过拆分List并分配独立线程处理,结合Spring线程池与异步方法提升效率... 目录项目场景解决方案1.实体类2.Mapper3.spring容器注入线程池bejsan对象4.创建

PHP轻松处理千万行数据的方法详解

《PHP轻松处理千万行数据的方法详解》说到处理大数据集,PHP通常不是第一个想到的语言,但如果你曾经需要处理数百万行数据而不让服务器崩溃或内存耗尽,你就会知道PHP用对了工具有多强大,下面小编就... 目录问题的本质php 中的数据流处理:为什么必不可少生成器:内存高效的迭代方式流量控制:避免系统过载一次性