【MapReduce】MapReduce清洗共享单车数据

2024-03-06 18:59

本文主要是介绍【MapReduce】MapReduce清洗共享单车数据,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

MapReduce清洗共享单车数据

  • 数据
  • 代码实现
    • 自定义类
    • Mapper阶段
    • 自定义outputFormat
    • 自定义RecordWriter
    • Driver阶段
  • 结果

数据

点击下载数据
在这里插入图片描述
所对应的字段分别是:结束时间、车俩id、出发地、目的地、所在城市、开始经度,开始纬度、结束经度,结束维度

  • 需求
    去掉空数据或者NA的
    将时间格式转换成2017年7月1日 00:45
    计算所跨越的经纬度
    按照所在城市将数据进行分类存储,再同一类数据中,按照车俩的id进行升序排序

代码实现

自定义类

import org.apache.hadoop.io.WritableComparable;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;public class JavaBean implements WritableComparable<JavaBean> {private String startTime;private String endTime;private int id;private String start_loc;private String end_loc;private String city;private double longitude;private double latitiude;public int compareTo(JavaBean o) {return -(o.id - this.id);}public void write(DataOutput dataOutput) throws IOException {dataOutput.writeUTF(startTime);dataOutput.writeUTF(endTime);dataOutput.writeInt(id);dataOutput.writeUTF(start_loc);dataOutput.writeUTF(end_loc);dataOutput.writeUTF(city);dataOutput.writeDouble(longitude);dataOutput.writeDouble(latitiude);}public void readFields(DataInput dataInput) throws IOException {startTime = dataInput.readUTF();endTime = dataInput.readUTF();id = dataInput.readInt();start_loc = dataInput.readUTF();end_loc = dataInput.readUTF();city = dataInput.readUTF();longitude = dataInput.readDouble();latitiude = dataInput.readDouble();}public void set(String startTime, String endTime, int id, String start_loc, String end_loc, String city, double longitude, double latitiude) {this.startTime = startTime;this.endTime = endTime;this.id = id;this.start_loc = start_loc;this.end_loc = end_loc;this.city = city;this.longitude = longitude;this.latitiude = latitiude;}@Overridepublic String toString() {return startTime + '\t' +endTime + '\t' +id + "\t" +start_loc + '\t' +end_loc + '\t' +city + '\t' +longitude + "\t" +latitiude;}public String getStartTime() {return startTime;}public void setStartTime(String startTime) {this.startTime = startTime;}public String getEndTime() {return endTime;}public void setEndTime(String endTime) {this.endTime = endTime;}public int getId() {return id;}public void setId(int id) {this.id = id;}public String getStart_loc() {return start_loc;}public void setStart_loc(String start_loc) {this.start_loc = start_loc;}public String getEnd_loc() {return end_loc;}public void setEnd_loc(String end_loc) {this.end_loc = end_loc;}public String getCity() {return city;}public void setCity(String city) {this.city = city;}public double getLongitude() {return longitude;}public void setLongitude(double longitude) {this.longitude = longitude;}public double getLatitiude() {return latitiude;}public void setLatitiude(double latitiude) {this.latitiude = latitiude;}
}

Mapper阶段

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;public class MapTest extends Mapper<LongWritable, Text, JavaBean, NullWritable> {JavaBean k = new JavaBean();SimpleDateFormat simpleDateFormat1 = new SimpleDateFormat("MM/dd/yyyy HH:mm");SimpleDateFormat simpleDateFormat2 = new SimpleDateFormat("yyyy-MM-dd HH:mm");Date date1, date2;String time1 = null;String time2 = null;Double longitude, latitiude;@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String datas[] = value.toString().split("\t", -1);for (String str : datas) {if ("".equals(str) || str == null || "NA".equalsIgnoreCase(str)) return;}try {date1 = simpleDateFormat1.parse(datas[1]);time1 = simpleDateFormat2.format(date1);date2 = simpleDateFormat1.parse(datas[2]);time2 = simpleDateFormat2.format(date2);} catch (ParseException e) {e.printStackTrace();}longitude = Double.parseDouble(datas[8]) - Double.parseDouble(datas[7]);latitiude = Double.parseDouble(datas[10]) - Double.parseDouble(datas[9]);k.set(time1, time2, Integer.parseInt(datas[3]), datas[4], datas[5], datas[6], longitude, latitiude);context.write(k, NullWritable.get());}
}

自定义outputFormat

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class MyOutputFormat extends FileOutputFormat<JavaBean, NullWritable> {public RecordWriter<JavaBean, NullWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {return new MyRecordWriter(job);}
}

自定义RecordWriter

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;public class MyRecordWriter extends RecordWriter<JavaBean, NullWritable> {BufferedWriter bw;public MyRecordWriter(TaskAttemptContext taskAttemptContext) {}public void write(JavaBean key, NullWritable value) throws IOException, InterruptedException {String city = key.getCity();String path = "D:\\MP\\共享单车\\output1\\" + city + ".txt";bw = new BufferedWriter(new FileWriter(path, true));bw.write(key.toString());bw.write("\n");bw.flush();}public void close(TaskAttemptContext context) throws IOException, InterruptedException {bw.close();}
}

Driver阶段

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.File;public class DriTest {public static void main(String[] args) throws Exception {java.io.File file = new java.io.File("D:\\MP\\共享单车\\output2");if (file.exists()) {delFile(file);driver();} else {driver();}}public static void delFile(java.io.File file) {File[] files = file.listFiles();if (files != null && files.length != 0) {for (int i = 0; i < files.length; i++) {delFile(files[i]);}}file.delete();}public static void driver() throws Exception {Configuration conf = new Configuration();
//        conf.set("fs.default","hdfs://192.168.0.155:9000");Job job = Job.getInstance(conf);job.setJarByClass(DriTest.class);job.setMapperClass(MapTest.class);job.setMapOutputKeyClass(JavaBean.class);job.setMapOutputValueClass(NullWritable.class);job.setOutputFormatClass(MyOutputFormat.class);FileInputFormat.setInputPaths(job, "D:\\MP\\共享单车\\input\\dataResources.txt");FileOutputFormat.setOutputPath(job, new Path("D:\\MP\\共享单车\\output2"));boolean b = job.waitForCompletion(true);System.exit(b ? 0 : 1);}
}

结果

分类成功
在这里插入图片描述
id升序
在这里插入图片描述

这篇关于【MapReduce】MapReduce清洗共享单车数据的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/781016

相关文章

SpringBoot多环境配置数据读取方式

《SpringBoot多环境配置数据读取方式》SpringBoot通过环境隔离机制,支持properties/yaml/yml多格式配置,结合@Value、Environment和@Configura... 目录一、多环境配置的核心思路二、3种配置文件格式详解2.1 properties格式(传统格式)1.

解决pandas无法读取csv文件数据的问题

《解决pandas无法读取csv文件数据的问题》本文讲述作者用Pandas读取CSV文件时因参数设置不当导致数据错位,通过调整delimiter和on_bad_lines参数最终解决问题,并强调正确参... 目录一、前言二、问题复现1. 问题2. 通过 on_bad_lines=‘warn’ 跳过异常数据3

C#监听txt文档获取新数据方式

《C#监听txt文档获取新数据方式》文章介绍通过监听txt文件获取最新数据,并实现开机自启动、禁用窗口关闭按钮、阻止Ctrl+C中断及防止程序退出等功能,代码整合于主函数中,供参考学习... 目录前言一、监听txt文档增加数据二、其他功能1. 设置开机自启动2. 禁止控制台窗口关闭按钮3. 阻止Ctrl +

java如何实现高并发场景下三级缓存的数据一致性

《java如何实现高并发场景下三级缓存的数据一致性》这篇文章主要为大家详细介绍了java如何实现高并发场景下三级缓存的数据一致性,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 下面代码是一个使用Java和Redisson实现的三级缓存服务,主要功能包括:1.缓存结构:本地缓存:使

在MySQL中实现冷热数据分离的方法及使用场景底层原理解析

《在MySQL中实现冷热数据分离的方法及使用场景底层原理解析》MySQL冷热数据分离通过分表/分区策略、数据归档和索引优化,将频繁访问的热数据与冷数据分开存储,提升查询效率并降低存储成本,适用于高并发... 目录实现冷热数据分离1. 分表策略2. 使用分区表3. 数据归档与迁移在mysql中实现冷热数据分

C#解析JSON数据全攻略指南

《C#解析JSON数据全攻略指南》这篇文章主要为大家详细介绍了使用C#解析JSON数据全攻略指南,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录一、为什么jsON是C#开发必修课?二、四步搞定网络JSON数据1. 获取数据 - HttpClient最佳实践2. 动态解析 - 快速

MyBatis-Plus通用中等、大量数据分批查询和处理方法

《MyBatis-Plus通用中等、大量数据分批查询和处理方法》文章介绍MyBatis-Plus分页查询处理,通过函数式接口与Lambda表达式实现通用逻辑,方法抽象但功能强大,建议扩展分批处理及流式... 目录函数式接口获取分页数据接口数据处理接口通用逻辑工具类使用方法简单查询自定义查询方法总结函数式接口

SQL中如何添加数据(常见方法及示例)

《SQL中如何添加数据(常见方法及示例)》SQL全称为StructuredQueryLanguage,是一种用于管理关系数据库的标准编程语言,下面给大家介绍SQL中如何添加数据,感兴趣的朋友一起看看吧... 目录在mysql中,有多种方法可以添加数据。以下是一些常见的方法及其示例。1. 使用INSERT I

Python使用vllm处理多模态数据的预处理技巧

《Python使用vllm处理多模态数据的预处理技巧》本文深入探讨了在Python环境下使用vLLM处理多模态数据的预处理技巧,我们将从基础概念出发,详细讲解文本、图像、音频等多模态数据的预处理方法,... 目录1. 背景介绍1.1 目的和范围1.2 预期读者1.3 文档结构概述1.4 术语表1.4.1 核

MySQL 删除数据详解(最新整理)

《MySQL删除数据详解(最新整理)》:本文主要介绍MySQL删除数据的相关知识,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录一、前言二、mysql 中的三种删除方式1.DELETE语句✅ 基本语法: 示例:2.TRUNCATE语句✅ 基本语