【MapReduce】MapReduce清洗共享单车数据

2024-03-06 18:59

本文主要是介绍【MapReduce】MapReduce清洗共享单车数据,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

MapReduce清洗共享单车数据

  • 数据
  • 代码实现
    • 自定义类
    • Mapper阶段
    • 自定义outputFormat
    • 自定义RecordWriter
    • Driver阶段
  • 结果

数据

点击下载数据
在这里插入图片描述
所对应的字段分别是:结束时间、车俩id、出发地、目的地、所在城市、开始经度,开始纬度、结束经度,结束维度

  • 需求
    去掉空数据或者NA的
    将时间格式转换成2017年7月1日 00:45
    计算所跨越的经纬度
    按照所在城市将数据进行分类存储,再同一类数据中,按照车俩的id进行升序排序

代码实现

自定义类

import org.apache.hadoop.io.WritableComparable;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;public class JavaBean implements WritableComparable<JavaBean> {private String startTime;private String endTime;private int id;private String start_loc;private String end_loc;private String city;private double longitude;private double latitiude;public int compareTo(JavaBean o) {return -(o.id - this.id);}public void write(DataOutput dataOutput) throws IOException {dataOutput.writeUTF(startTime);dataOutput.writeUTF(endTime);dataOutput.writeInt(id);dataOutput.writeUTF(start_loc);dataOutput.writeUTF(end_loc);dataOutput.writeUTF(city);dataOutput.writeDouble(longitude);dataOutput.writeDouble(latitiude);}public void readFields(DataInput dataInput) throws IOException {startTime = dataInput.readUTF();endTime = dataInput.readUTF();id = dataInput.readInt();start_loc = dataInput.readUTF();end_loc = dataInput.readUTF();city = dataInput.readUTF();longitude = dataInput.readDouble();latitiude = dataInput.readDouble();}public void set(String startTime, String endTime, int id, String start_loc, String end_loc, String city, double longitude, double latitiude) {this.startTime = startTime;this.endTime = endTime;this.id = id;this.start_loc = start_loc;this.end_loc = end_loc;this.city = city;this.longitude = longitude;this.latitiude = latitiude;}@Overridepublic String toString() {return startTime + '\t' +endTime + '\t' +id + "\t" +start_loc + '\t' +end_loc + '\t' +city + '\t' +longitude + "\t" +latitiude;}public String getStartTime() {return startTime;}public void setStartTime(String startTime) {this.startTime = startTime;}public String getEndTime() {return endTime;}public void setEndTime(String endTime) {this.endTime = endTime;}public int getId() {return id;}public void setId(int id) {this.id = id;}public String getStart_loc() {return start_loc;}public void setStart_loc(String start_loc) {this.start_loc = start_loc;}public String getEnd_loc() {return end_loc;}public void setEnd_loc(String end_loc) {this.end_loc = end_loc;}public String getCity() {return city;}public void setCity(String city) {this.city = city;}public double getLongitude() {return longitude;}public void setLongitude(double longitude) {this.longitude = longitude;}public double getLatitiude() {return latitiude;}public void setLatitiude(double latitiude) {this.latitiude = latitiude;}
}

Mapper阶段

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;public class MapTest extends Mapper<LongWritable, Text, JavaBean, NullWritable> {JavaBean k = new JavaBean();SimpleDateFormat simpleDateFormat1 = new SimpleDateFormat("MM/dd/yyyy HH:mm");SimpleDateFormat simpleDateFormat2 = new SimpleDateFormat("yyyy-MM-dd HH:mm");Date date1, date2;String time1 = null;String time2 = null;Double longitude, latitiude;@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String datas[] = value.toString().split("\t", -1);for (String str : datas) {if ("".equals(str) || str == null || "NA".equalsIgnoreCase(str)) return;}try {date1 = simpleDateFormat1.parse(datas[1]);time1 = simpleDateFormat2.format(date1);date2 = simpleDateFormat1.parse(datas[2]);time2 = simpleDateFormat2.format(date2);} catch (ParseException e) {e.printStackTrace();}longitude = Double.parseDouble(datas[8]) - Double.parseDouble(datas[7]);latitiude = Double.parseDouble(datas[10]) - Double.parseDouble(datas[9]);k.set(time1, time2, Integer.parseInt(datas[3]), datas[4], datas[5], datas[6], longitude, latitiude);context.write(k, NullWritable.get());}
}

自定义outputFormat

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class MyOutputFormat extends FileOutputFormat<JavaBean, NullWritable> {public RecordWriter<JavaBean, NullWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {return new MyRecordWriter(job);}
}

自定义RecordWriter

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;public class MyRecordWriter extends RecordWriter<JavaBean, NullWritable> {BufferedWriter bw;public MyRecordWriter(TaskAttemptContext taskAttemptContext) {}public void write(JavaBean key, NullWritable value) throws IOException, InterruptedException {String city = key.getCity();String path = "D:\\MP\\共享单车\\output1\\" + city + ".txt";bw = new BufferedWriter(new FileWriter(path, true));bw.write(key.toString());bw.write("\n");bw.flush();}public void close(TaskAttemptContext context) throws IOException, InterruptedException {bw.close();}
}

Driver阶段

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.File;public class DriTest {public static void main(String[] args) throws Exception {java.io.File file = new java.io.File("D:\\MP\\共享单车\\output2");if (file.exists()) {delFile(file);driver();} else {driver();}}public static void delFile(java.io.File file) {File[] files = file.listFiles();if (files != null && files.length != 0) {for (int i = 0; i < files.length; i++) {delFile(files[i]);}}file.delete();}public static void driver() throws Exception {Configuration conf = new Configuration();
//        conf.set("fs.default","hdfs://192.168.0.155:9000");Job job = Job.getInstance(conf);job.setJarByClass(DriTest.class);job.setMapperClass(MapTest.class);job.setMapOutputKeyClass(JavaBean.class);job.setMapOutputValueClass(NullWritable.class);job.setOutputFormatClass(MyOutputFormat.class);FileInputFormat.setInputPaths(job, "D:\\MP\\共享单车\\input\\dataResources.txt");FileOutputFormat.setOutputPath(job, new Path("D:\\MP\\共享单车\\output2"));boolean b = job.waitForCompletion(true);System.exit(b ? 0 : 1);}
}

结果

分类成功
在这里插入图片描述
id升序
在这里插入图片描述

这篇关于【MapReduce】MapReduce清洗共享单车数据的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!


原文地址:https://blog.csdn.net/heiren_a/article/details/114838370
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.chinasem.cn/article/781016

相关文章

使用Python获取JS加载的数据的多种实现方法

《使用Python获取JS加载的数据的多种实现方法》在当今的互联网时代,网页数据的动态加载已经成为一种常见的技术手段,许多现代网站通过JavaScript(JS)动态加载内容,这使得传统的静态网页爬取... 目录引言一、动态 网页与js加载数据的原理二、python爬取JS加载数据的方法(一)分析网络请求1

8种快速易用的Python Matplotlib数据可视化方法汇总(附源码)

《8种快速易用的PythonMatplotlib数据可视化方法汇总(附源码)》你是否曾经面对一堆复杂的数据,却不知道如何让它们变得直观易懂?别慌,Python的Matplotlib库是你数据可视化的... 目录引言1. 折线图(Line Plot)——趋势分析2. 柱状图(Bar Chart)——对比分析3

Spring Boot 整合 Redis 实现数据缓存案例详解

《SpringBoot整合Redis实现数据缓存案例详解》Springboot缓存,默认使用的是ConcurrentMap的方式来实现的,然而我们在项目中并不会这么使用,本文介绍SpringB... 目录1.添加 Maven 依赖2.配置Redis属性3.创建 redisCacheManager4.使用Sp

Python Pandas高效处理Excel数据完整指南

《PythonPandas高效处理Excel数据完整指南》在数据驱动的时代,Excel仍是大量企业存储核心数据的工具,Python的Pandas库凭借其向量化计算、内存优化和丰富的数据处理接口,成为... 目录一、环境搭建与数据读取1.1 基础环境配置1.2 数据高效载入技巧二、数据清洗核心战术2.1 缺失

Python处理超大规模数据的4大方法详解

《Python处理超大规模数据的4大方法详解》在数据的奇妙世界里,数据量就像滚雪球一样,越变越大,从最初的GB级别的小数据堆,逐渐演变成TB级别的数据大山,所以本文我们就来看看Python处理... 目录1. Mars:数据处理界的 “变形金刚”2. Dask:分布式计算的 “指挥家”3. CuPy:GPU

使用Vue-ECharts实现数据可视化图表功能

《使用Vue-ECharts实现数据可视化图表功能》在前端开发中,经常会遇到需要展示数据可视化的需求,比如柱状图、折线图、饼图等,这类需求不仅要求我们准确地将数据呈现出来,还需要兼顾美观与交互体验,所... 目录前言为什么选择 vue-ECharts?1. 基于 ECharts,功能强大2. 更符合 Vue

Java如何根据word模板导出数据

《Java如何根据word模板导出数据》这篇文章主要为大家详细介绍了Java如何实现根据word模板导出数据,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... pom.XML文件导入依赖 <dependency> <groupId>cn.afterturn</groupId>

Python实现获取带合并单元格的表格数据

《Python实现获取带合并单元格的表格数据》由于在日常运维中经常出现一些合并单元格的表格,如果要获取数据比较麻烦,所以本文我们就来聊聊如何使用Python实现获取带合并单元格的表格数据吧... 由于在日常运维中经常出现一些合并单元格的表格,如果要获取数据比较麻烦,现将将封装成类,并通过调用list_exc

Mysql数据库中数据的操作CRUD详解

《Mysql数据库中数据的操作CRUD详解》:本文主要介绍Mysql数据库中数据的操作(CRUD),详细描述对Mysql数据库中数据的操作(CRUD),包括插入、修改、删除数据,还有查询数据,包括... 目录一、插入数据(insert)1.插入数据的语法2.注意事项二、修改数据(update)1.语法2.有

SpringBoot实现接口数据加解密的三种实战方案

《SpringBoot实现接口数据加解密的三种实战方案》在金融支付、用户隐私信息传输等场景中,接口数据若以明文传输,极易被中间人攻击窃取,SpringBoot提供了多种优雅的加解密实现方案,本文将从原... 目录一、为什么需要接口数据加解密?二、核心加解密算法选择1. 对称加密(AES)2. 非对称加密(R