大数据项目之通话记录统计

2023-12-10 08:38

本文主要是介绍大数据项目之通话记录统计,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

架构图:
在这里插入图片描述

第一步:模拟生产数据

    public void produce() {try {// 读取通讯录数据List<Contact> contacts = in.read(Contact.class);while ( flg ) {// 从通讯录中随机查找2个电话号码(主叫,被叫)int call1Index = new Random().nextInt(contacts.size());int call2Index;while ( true ) {call2Index = new Random().nextInt(contacts.size());if ( call1Index != call2Index ) {break;}}Contact call1 = contacts.get(call1Index);Contact call2 = contacts.get(call2Index);// 生成随机的通话时间String startDate = "20180101000000";String endDate = "20190101000000";long startTime = DateUtil.parse(startDate, "yyyyMMddHHmmss").getTime();long endTime = DateUtil.parse(endDate, "yyyyMMddHHmmss").getTime();// 通话时间long calltime = startTime + (long)((endTime - startTime) * Math.random());// 通话时间字符串String callTimeString = DateUtil.format(new Date(calltime), "yyyyMMddHHmmss");// 生成随机的通话时长String duration = NumberUtil.format(new Random().nextInt(3000), 4);// 生成通话记录Calllog log = new Calllog(call1.getTel(), call2.getTel(), callTimeString, duration);System.out.println(log);// 将通话记录刷写到数据文件中out.write(log);}} catch ( Exception e ) {e.printStackTrace();}}

数据格式如下
在这里插入图片描述
第一列是主叫电话号码,第二列是被叫电话号码,第三列是通话开始时间,第四列是通话时长,单位秒。生成的数据放到一个文件中去。

第二步:flume收集日志并存放至kafka

启动flume来收集日志并发送到kafka(根据自己的安装目录自行修改)
flume-ng agent -c conf/ -n a1 -f conf/flume-2-kafka.conf

flume-2-kafka.conf文件内容如下

a1.sources = r1
a1.channels = c1
a1.sinks = k1a1.sources.r1.type = exec
a1.sources.r1.command = tail -F -c +0 /Users/liangjiepeng/Documents/tmpfile/bigdata/call.loga1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.channels.c1.byteCapacity = 800000a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = ct
a1.sinks.k1.kafka.bootstrap.servers = localhost:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动kafka(启动参数根据自己配置文件位置自行修改)
kafka-server-start /usr/local/etc/kafka/server.properties

第三步:导入数据到hbase中去

启动hbase
从kafka中导出数据到hbase中去
hbase的表结构如下
在这里插入图片描述

    public void consume() {try {// 创建配置对象Properties prop = new Properties();prop.load(Thread.currentThread().getContextClassLoader().getResourceAsStream("consumer.properties"));// 获取flume采集的数据KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(prop);// 关注主题consumer.subscribe(Arrays.asList(Names.TOPIC.getValue()));// Hbase数据访问对象HBaseDao dao = new HBaseDao();// 初始化dao.init();int i = 0;// 消费数据while ( true ) {ConsumerRecords<String, String> consumerRecords = consumer.poll(10);for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord.value());// 插入数据dao.insertData(consumerRecord.value());//Calllog log = new Calllog(consumerRecord.value());//dao.insertData(log);System.out.println(i++);}}} catch ( Exception e ) {e.printStackTrace();}}

为了更快地统计数据,创建了两个列族,分别代表call1是主叫还是被叫。上方produce方法中产生的数据都是放到caller族,而callee族的数据由hbase的协处理器根据放到caller族中的数据生成。

协处理器的postPut方法

public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {// 获取表Table table = e.getEnvironment().getTable(TableName.valueOf(Names.TABLE.getValue()));// 主叫用户的rowkeyString rowkey = Bytes.toString(put.getRow());// 1_133_2019_144_1010_1String[] values = rowkey.split("_");CoprocessorDao dao = new CoprocessorDao();String call1 = values[1];String call2 = values[3];String calltime = values[2];String duration = values[4];String flg = values[5];if ( "1".equals(flg) ) {// 只有主叫用户保存后才需要触发被叫用户的保存String calleeRowkey = dao.getRegionNum(call2, calltime) + "_" + call2 + "_" + calltime + "_" + call1 + "_" + duration + "_0";// 保存数据Put calleePut = new Put(Bytes.toBytes(calleeRowkey));byte[] calleeFamily = Bytes.toBytes(Names.CF_CALLEE.getValue());calleePut.addColumn(calleeFamily, Bytes.toBytes("call1"), Bytes.toBytes(call2));calleePut.addColumn(calleeFamily, Bytes.toBytes("call2"), Bytes.toBytes(call1));calleePut.addColumn(calleeFamily, Bytes.toBytes("calltime"), Bytes.toBytes(calltime));calleePut.addColumn(calleeFamily, Bytes.toBytes("duration"), Bytes.toBytes(duration));calleePut.addColumn(calleeFamily, Bytes.toBytes("flg"), Bytes.toBytes("0"));table.put( calleePut );// 关闭表table.close();}}

使用协处理器可能会出现java.lang.OutOfMemoryError: Unable to create new native thread,这时需要增加系统进程可创建线程的最大数或者降低数据put到hbase中的速度。

第四步:分析统计并写入到mysql

目标:统计每个电话号码每天/每月/每年的通话次数和通话总时长,即mysql中一条记录要有电话号码、通话日期、通话次数、通话总时长,因为电话号码和通话日期有很多重复,把电话号码和通话日期作成外键关联到其它表中去。

格式变成如下
在这里插入图片描述
在这里插入图片描述
mapper如下
在这里插入图片描述
reducer
在这里插入图片描述
MySQLBeanOutputFormat的write方法
在这里插入图片描述
代码链接

这篇关于大数据项目之通话记录统计的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/476589

相关文章

批量导入txt数据到的redis过程

《批量导入txt数据到的redis过程》用户通过将Redis命令逐行写入txt文件,利用管道模式运行客户端,成功执行批量删除以Product*匹配的Key操作,提高了数据清理效率... 目录批量导入txt数据到Redisjs把redis命令按一条 一行写到txt中管道命令运行redis客户端成功了批量删除k

精选20个好玩又实用的的Python实战项目(有图文代码)

《精选20个好玩又实用的的Python实战项目(有图文代码)》文章介绍了20个实用Python项目,涵盖游戏开发、工具应用、图像处理、机器学习等,使用Tkinter、PIL、OpenCV、Kivy等库... 目录① 猜字游戏② 闹钟③ 骰子模拟器④ 二维码⑤ 语言检测⑥ 加密和解密⑦ URL缩短⑧ 音乐播放

Springboot项目启动失败提示找不到dao类的解决

《Springboot项目启动失败提示找不到dao类的解决》SpringBoot启动失败,因ProductServiceImpl未正确注入ProductDao,原因:Dao未注册为Bean,解决:在启... 目录错误描述原因解决方法总结***************************APPLICA编

SpringBoot多环境配置数据读取方式

《SpringBoot多环境配置数据读取方式》SpringBoot通过环境隔离机制,支持properties/yaml/yml多格式配置,结合@Value、Environment和@Configura... 目录一、多环境配置的核心思路二、3种配置文件格式详解2.1 properties格式(传统格式)1.

SQL Server跟踪自动统计信息更新实战指南

《SQLServer跟踪自动统计信息更新实战指南》本文详解SQLServer自动统计信息更新的跟踪方法,推荐使用扩展事件实时捕获更新操作及详细信息,同时结合系统视图快速检查统计信息状态,重点强调修... 目录SQL Server 如何跟踪自动统计信息更新:深入解析与实战指南 核心跟踪方法1️⃣ 利用系统目录

解决pandas无法读取csv文件数据的问题

《解决pandas无法读取csv文件数据的问题》本文讲述作者用Pandas读取CSV文件时因参数设置不当导致数据错位,通过调整delimiter和on_bad_lines参数最终解决问题,并强调正确参... 目录一、前言二、问题复现1. 问题2. 通过 on_bad_lines=‘warn’ 跳过异常数据3

C#监听txt文档获取新数据方式

《C#监听txt文档获取新数据方式》文章介绍通过监听txt文件获取最新数据,并实现开机自启动、禁用窗口关闭按钮、阻止Ctrl+C中断及防止程序退出等功能,代码整合于主函数中,供参考学习... 目录前言一、监听txt文档增加数据二、其他功能1. 设置开机自启动2. 禁止控制台窗口关闭按钮3. 阻止Ctrl +

java如何实现高并发场景下三级缓存的数据一致性

《java如何实现高并发场景下三级缓存的数据一致性》这篇文章主要为大家详细介绍了java如何实现高并发场景下三级缓存的数据一致性,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 下面代码是一个使用Java和Redisson实现的三级缓存服务,主要功能包括:1.缓存结构:本地缓存:使

在IntelliJ IDEA中高效运行与调试Spring Boot项目的实战步骤

《在IntelliJIDEA中高效运行与调试SpringBoot项目的实战步骤》本章详解SpringBoot项目导入IntelliJIDEA的流程,教授运行与调试技巧,包括断点设置与变量查看,奠定... 目录引言:为良驹配上好鞍一、为何选择IntelliJ IDEA?二、实战:导入并运行你的第一个项目步骤1

在MySQL中实现冷热数据分离的方法及使用场景底层原理解析

《在MySQL中实现冷热数据分离的方法及使用场景底层原理解析》MySQL冷热数据分离通过分表/分区策略、数据归档和索引优化,将频繁访问的热数据与冷数据分开存储,提升查询效率并降低存储成本,适用于高并发... 目录实现冷热数据分离1. 分表策略2. 使用分区表3. 数据归档与迁移在mysql中实现冷热数据分