大数据项目之通话记录统计

2023-12-10 08:38

本文主要是介绍大数据项目之通话记录统计,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

架构图:
在这里插入图片描述

第一步:模拟生产数据

    public void produce() {try {// 读取通讯录数据List<Contact> contacts = in.read(Contact.class);while ( flg ) {// 从通讯录中随机查找2个电话号码(主叫,被叫)int call1Index = new Random().nextInt(contacts.size());int call2Index;while ( true ) {call2Index = new Random().nextInt(contacts.size());if ( call1Index != call2Index ) {break;}}Contact call1 = contacts.get(call1Index);Contact call2 = contacts.get(call2Index);// 生成随机的通话时间String startDate = "20180101000000";String endDate = "20190101000000";long startTime = DateUtil.parse(startDate, "yyyyMMddHHmmss").getTime();long endTime = DateUtil.parse(endDate, "yyyyMMddHHmmss").getTime();// 通话时间long calltime = startTime + (long)((endTime - startTime) * Math.random());// 通话时间字符串String callTimeString = DateUtil.format(new Date(calltime), "yyyyMMddHHmmss");// 生成随机的通话时长String duration = NumberUtil.format(new Random().nextInt(3000), 4);// 生成通话记录Calllog log = new Calllog(call1.getTel(), call2.getTel(), callTimeString, duration);System.out.println(log);// 将通话记录刷写到数据文件中out.write(log);}} catch ( Exception e ) {e.printStackTrace();}}

数据格式如下
在这里插入图片描述
第一列是主叫电话号码,第二列是被叫电话号码,第三列是通话开始时间,第四列是通话时长,单位秒。生成的数据放到一个文件中去。

第二步:flume收集日志并存放至kafka

启动flume来收集日志并发送到kafka(根据自己的安装目录自行修改)
flume-ng agent -c conf/ -n a1 -f conf/flume-2-kafka.conf

flume-2-kafka.conf文件内容如下

a1.sources = r1
a1.channels = c1
a1.sinks = k1a1.sources.r1.type = exec
a1.sources.r1.command = tail -F -c +0 /Users/liangjiepeng/Documents/tmpfile/bigdata/call.loga1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.channels.c1.byteCapacity = 800000a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = ct
a1.sinks.k1.kafka.bootstrap.servers = localhost:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动kafka(启动参数根据自己配置文件位置自行修改)
kafka-server-start /usr/local/etc/kafka/server.properties

第三步:导入数据到hbase中去

启动hbase
从kafka中导出数据到hbase中去
hbase的表结构如下
在这里插入图片描述

    public void consume() {try {// 创建配置对象Properties prop = new Properties();prop.load(Thread.currentThread().getContextClassLoader().getResourceAsStream("consumer.properties"));// 获取flume采集的数据KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(prop);// 关注主题consumer.subscribe(Arrays.asList(Names.TOPIC.getValue()));// Hbase数据访问对象HBaseDao dao = new HBaseDao();// 初始化dao.init();int i = 0;// 消费数据while ( true ) {ConsumerRecords<String, String> consumerRecords = consumer.poll(10);for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord.value());// 插入数据dao.insertData(consumerRecord.value());//Calllog log = new Calllog(consumerRecord.value());//dao.insertData(log);System.out.println(i++);}}} catch ( Exception e ) {e.printStackTrace();}}

为了更快地统计数据,创建了两个列族,分别代表call1是主叫还是被叫。上方produce方法中产生的数据都是放到caller族,而callee族的数据由hbase的协处理器根据放到caller族中的数据生成。

协处理器的postPut方法

public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {// 获取表Table table = e.getEnvironment().getTable(TableName.valueOf(Names.TABLE.getValue()));// 主叫用户的rowkeyString rowkey = Bytes.toString(put.getRow());// 1_133_2019_144_1010_1String[] values = rowkey.split("_");CoprocessorDao dao = new CoprocessorDao();String call1 = values[1];String call2 = values[3];String calltime = values[2];String duration = values[4];String flg = values[5];if ( "1".equals(flg) ) {// 只有主叫用户保存后才需要触发被叫用户的保存String calleeRowkey = dao.getRegionNum(call2, calltime) + "_" + call2 + "_" + calltime + "_" + call1 + "_" + duration + "_0";// 保存数据Put calleePut = new Put(Bytes.toBytes(calleeRowkey));byte[] calleeFamily = Bytes.toBytes(Names.CF_CALLEE.getValue());calleePut.addColumn(calleeFamily, Bytes.toBytes("call1"), Bytes.toBytes(call2));calleePut.addColumn(calleeFamily, Bytes.toBytes("call2"), Bytes.toBytes(call1));calleePut.addColumn(calleeFamily, Bytes.toBytes("calltime"), Bytes.toBytes(calltime));calleePut.addColumn(calleeFamily, Bytes.toBytes("duration"), Bytes.toBytes(duration));calleePut.addColumn(calleeFamily, Bytes.toBytes("flg"), Bytes.toBytes("0"));table.put( calleePut );// 关闭表table.close();}}

使用协处理器可能会出现java.lang.OutOfMemoryError: Unable to create new native thread,这时需要增加系统进程可创建线程的最大数或者降低数据put到hbase中的速度。

第四步:分析统计并写入到mysql

目标:统计每个电话号码每天/每月/每年的通话次数和通话总时长,即mysql中一条记录要有电话号码、通话日期、通话次数、通话总时长,因为电话号码和通话日期有很多重复,把电话号码和通话日期作成外键关联到其它表中去。

格式变成如下
在这里插入图片描述
在这里插入图片描述
mapper如下
在这里插入图片描述
reducer
在这里插入图片描述
MySQLBeanOutputFormat的write方法
在这里插入图片描述
代码链接

这篇关于大数据项目之通话记录统计的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/476589

相关文章

springboot项目中整合高德地图的实践

《springboot项目中整合高德地图的实践》:本文主要介绍springboot项目中整合高德地图的实践,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一:高德开放平台的使用二:创建数据库(我是用的是mysql)三:Springboot所需的依赖(根据你的需求再

一文详解如何在idea中快速搭建一个Spring Boot项目

《一文详解如何在idea中快速搭建一个SpringBoot项目》IntelliJIDEA作为Java开发者的‌首选IDE‌,深度集成SpringBoot支持,可一键生成项目骨架、智能配置依赖,这篇文... 目录前言1、创建项目名称2、勾选需要的依赖3、在setting中检查maven4、编写数据源5、开启热

SQL Server修改数据库名及物理数据文件名操作步骤

《SQLServer修改数据库名及物理数据文件名操作步骤》在SQLServer中重命名数据库是一个常见的操作,但需要确保用户具有足够的权限来执行此操作,:本文主要介绍SQLServer修改数据... 目录一、背景介绍二、操作步骤2.1 设置为单用户模式(断开连接)2.2 修改数据库名称2.3 查找逻辑文件名

SpringBoot项目配置logback-spring.xml屏蔽特定路径的日志

《SpringBoot项目配置logback-spring.xml屏蔽特定路径的日志》在SpringBoot项目中,使用logback-spring.xml配置屏蔽特定路径的日志有两种常用方式,文中的... 目录方案一:基础配置(直接关闭目标路径日志)方案二:结合 Spring Profile 按环境屏蔽关

canal实现mysql数据同步的详细过程

《canal实现mysql数据同步的详细过程》:本文主要介绍canal实现mysql数据同步的详细过程,本文通过实例图文相结合给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的... 目录1、canal下载2、mysql同步用户创建和授权3、canal admin安装和启动4、canal

使用SpringBoot整合Sharding Sphere实现数据脱敏的示例

《使用SpringBoot整合ShardingSphere实现数据脱敏的示例》ApacheShardingSphere数据脱敏模块,通过SQL拦截与改写实现敏感信息加密存储,解决手动处理繁琐及系统改... 目录痛点一:痛点二:脱敏配置Quick Start——Spring 显示配置:1.引入依赖2.创建脱敏

MySQL版本问题导致项目无法启动问题的解决方案

《MySQL版本问题导致项目无法启动问题的解决方案》本文记录了一次因MySQL版本不一致导致项目启动失败的经历,详细解析了连接错误的原因,并提供了两种解决方案:调整连接字符串禁用SSL或统一MySQL... 目录本地项目启动报错报错原因:解决方案第一个:第二种:容器启动mysql的坑两种修改时区的方法:本地

详解如何使用Python构建从数据到文档的自动化工作流

《详解如何使用Python构建从数据到文档的自动化工作流》这篇文章将通过真实工作场景拆解,为大家展示如何用Python构建自动化工作流,让工具代替人力完成这些数字苦力活,感兴趣的小伙伴可以跟随小编一起... 目录一、Excel处理:从数据搬运工到智能分析师二、PDF处理:文档工厂的智能生产线三、邮件自动化:

springboot项目中使用JOSN解析库的方法

《springboot项目中使用JOSN解析库的方法》JSON,全程是JavaScriptObjectNotation,是一种轻量级的数据交换格式,本文给大家介绍springboot项目中使用JOSN... 目录一、jsON解析简介二、Spring Boot项目中使用JSON解析1、pom.XML文件引入依

Python数据分析与可视化的全面指南(从数据清洗到图表呈现)

《Python数据分析与可视化的全面指南(从数据清洗到图表呈现)》Python是数据分析与可视化领域中最受欢迎的编程语言之一,凭借其丰富的库和工具,Python能够帮助我们快速处理、分析数据并生成高质... 目录一、数据采集与初步探索二、数据清洗的七种武器1. 缺失值处理策略2. 异常值检测与修正3. 数据