批量生产千万级数据 推送到kafka代码

2024-06-19 04:52

本文主要是介绍批量生产千万级数据 推送到kafka代码,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1、批量规则生成代码

1、随机IP生成代码
2、指定时间范围内随机日期生成代码
3、随机中文名生成代码。

package com.wfg.flink.connector.utils;import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;/*** @author wfg*/
public class RandomGeneratorUtils {/*** 生成随机的IP地址* @return ip*/public static String generateRandomIp() {Random random = new Random();// 生成IP地址的四个部分 0-255int part1 = random.nextInt(256);int part2 = random.nextInt(256);int part3 = random.nextInt(256);int part4 = random.nextInt(256);// 将每个部分转换为字符串,并用点连接return part1 + "." + part2 + "." + part3 + "." + part4;}/*** 在指定的开始和结束日期之间生成一个随机的日期时间。** @param startDate 开始日期* @param endDate 结束日期* @return 随机生成的日期时间*/public static LocalDateTime generateRandomDateTime(LocalDate startDate, LocalDate endDate) {long startEpochDay = startDate.toEpochDay();long endEpochDay = endDate.toEpochDay();Random random = new Random();LocalDate randomDate = startDate;if (startEpochDay != endEpochDay) {long randomDay = startEpochDay + random.nextLong(endEpochDay - startEpochDay);randomDate = LocalDate.ofEpochDay(randomDay);}LocalTime randomTime = LocalTime.of(// 小时random.nextInt(24),// 分钟random.nextInt(60),// 秒random.nextInt(60));return LocalDateTime.of(randomDate, randomTime);}private static final List<String> FIRST_NAMES = new ArrayList<>();private static final List<String> LAST_NAMES = new ArrayList<>();static {// 初始化一些常见的名字和姓氏FIRST_NAMES.addAll(Arrays.asList("王","李","赵","刘","张", "钱", "孙", "周", "吴", "郑", "王", "冯", "陈", "褚", "卫", "蒋", "沈", "韩", "杨", "朱", "秦", "尤", "许", "何", "吕", "施","张", "孔", "曹", "严", "华", "金", "魏", "陶", "姜", "戚", "谢", "邹", "喻", "柏", "水", "窦", "章", "云", "苏", "潘", "葛", "奚", "范", "彭", "郎","鲁","韦", "昌", "马", "苗", "凤", " 刚", "文", "张", "桂", "富", "生", "龙", "功", "夫", "周", "建", "余", "冲", "程", "沙", "阳", "江", "潘"));LAST_NAMES.addAll(Arrays.asList("伟","芳","丽","强","明","风","阳","海","天","藏","霞","刚", "夫", "勇", "毅", "俊", "峰", "强", "军", "平", "保", "东", "文", "辉", "力", "明", "永", "健", "世", "广", "志", "义","义", "兴", "良", "海", "山", "仁", "波", "宁", "贵", "福", "生", "龙", "元", "全", "国", "胜", "学", "祥", "才", "艺", "能", "武", "航", "城", "春","明", "文", "辉", "建", "永", "强", "军", "平", "保", "东", "文", "辉", "力", "明", "永", "健", "世", "广", "志", "义", "义", "兴", "良", "海", "山","仁", "波", "宁", "贵", "福", "生", "龙", "元", "全", "国", "胜", "学", "祥", "才", "艺", "能", "武", "航", "城", "春", "明", "文", "辉", "建","永", "强", "军", "平", "保", "东", "文", "辉", "力", "明", "永", "健", "世", "广", "志", "义", "义", "兴", "良", "海", "山", "仁", "波", "宁","贵", "福", "生", "龙", "元", "全", "国", "胜", "学", "祥", "才", "艺", "能", "武", "航", "城", "春", "明", "文", "辉", "建", "永", "强", "军","平", "保", "东", "文", "辉", "力", "明", "永", "健", "世", "广", "志", "义", "义", "兴", "良", "海", "山", "仁", "波", "宁", "贵", "福", "生","龙", "元", "全", "国", "胜", "学", "祥", "才", "艺", "能", "武", "航", "城", "春", "明", "文", "辉", "建", "永", "强", "军", "平", "保", "东"));}/*** 生成一个随机的全名,由随机的姓和名组成。** @return 随机全名*/public static String generateRandomFullName() {Random random = new Random();String firstName = FIRST_NAMES.get(random.nextInt(FIRST_NAMES.size()));String lastName = LAST_NAMES.get(random.nextInt(LAST_NAMES.size()));return firstName + lastName;}
}

2. 生成对象体

package com.wfg.flink.connector.dto;import lombok.Data;/*** @author wfg*/
@Data
public class KafkaPvDto {// uuidprivate String uuid;// 用户名private String userName;// 访问时间private String visitTime;// 访问地址private String visitIp;// 访问服务IPprivate String visitServiceIp;
}

3. 批量数据生成推送代码

package com.wfg.flink.test.kafka;import cn.hutool.core.date.DateTime;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.date.LocalDateTimeUtil;
import com.alibaba.fastjson2.JSONObject;
import com.wfg.flink.connector.dto.KafkaPvDto;
import com.wfg.flink.connector.utils.RandomGeneratorUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;import java.time.LocalDate;
import java.util.Date;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;import static com.wfg.flink.connector.constants.Constants.KAFKA_BROKERS;
import static com.wfg.flink.connector.constants.Constants.TEST_TOPIC_PV;public class KafkaTestProducer {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", KAFKA_BROKERS);props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");try (Producer<String, String> producer = new KafkaProducer<>(props)){int times = 100000;for (int i = 0; i < times; i++) {System.out.println("Send No. :" + i );CompletableFuture.allOf(CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer))).join();producer.flush();}}}private static void sendKafkaMsg (Producer < String, String > producer){String msg = createMsg();System.out.println(msg);producer.send(new ProducerRecord<>(TEST_TOPIC_PV, UUID.randomUUID().toString().replaceAll("-", ""), msg));}private static String createMsg () {KafkaPvDto dto = new KafkaPvDto();dto.setUuid(UUID.randomUUID().toString().replaceAll("-", ""));dto.setUserName(RandomGeneratorUtils.generateRandomFullName());dto.setVisitIp(RandomGeneratorUtils.generateRandomIp());DateTime begin = DateUtil.beginOfDay(new Date());String timeStr = DateUtil.format(RandomGeneratorUtils.generateRandomDateTime(LocalDateTimeUtil.of(begin).toLocalDate(), LocalDate.now()), "yyyy-MM-dd HH:mm:ss");dto.setVisitTime(timeStr);dto.setVisitServiceIp(RandomGeneratorUtils.generateRandomIp());return JSONObject.toJSONString(dto);}
}

注意:
kafka本机运行请参考: kafka本地部署链接

这篇关于批量生产千万级数据 推送到kafka代码的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1074059

相关文章

C#实现高性能Excel百万数据导出优化实战指南

《C#实现高性能Excel百万数据导出优化实战指南》在日常工作中,Excel数据导出是一个常见的需求,然而,当数据量较大时,性能和内存问题往往会成为限制导出效率的瓶颈,下面我们看看C#如何结合EPPl... 目录一、技术方案核心对比二、各方案选型建议三、性能对比数据四、核心代码实现1. MiniExcel

SQL常用操作精华之复制表、跨库查询、删除重复数据

《SQL常用操作精华之复制表、跨库查询、删除重复数据》:本文主要介绍SQL常用操作精华之复制表、跨库查询、删除重复数据,这些SQL操作涵盖了数据库开发中最常用的技术点,包括表操作、数据查询、数据管... 目录SQL常用操作精华总结表结构与数据操作高级查询技巧SQL常用操作精华总结表结构与数据操作复制表结

Redis中的数据一致性问题以及解决方案

《Redis中的数据一致性问题以及解决方案》:本文主要介绍Redis中的数据一致性问题以及解决方案,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、Redis 数据一致性问题的产生1. 单节点环境的一致性问题2. 网络分区和宕机3. 并发写入导致的脏数据4. 持

使用Python和PaddleOCR实现图文识别的代码和步骤

《使用Python和PaddleOCR实现图文识别的代码和步骤》在当今数字化时代,图文识别技术的应用越来越广泛,如文档数字化、信息提取等,PaddleOCR是百度开源的一款强大的OCR工具包,它集成了... 目录一、引言二、环境准备2.1 安装 python2.2 安装 PaddlePaddle2.3 安装

SpringBoot中四种AOP实战应用场景及代码实现

《SpringBoot中四种AOP实战应用场景及代码实现》面向切面编程(AOP)是Spring框架的核心功能之一,它通过预编译和运行期动态代理实现程序功能的统一维护,在SpringBoot应用中,AO... 目录引言场景一:日志记录与性能监控业务需求实现方案使用示例扩展:MDC实现请求跟踪场景二:权限控制与

Java注解之超越Javadoc的元数据利器详解

《Java注解之超越Javadoc的元数据利器详解》本文将深入探讨Java注解的定义、类型、内置注解、自定义注解、保留策略、实际应用场景及最佳实践,无论是初学者还是资深开发者,都能通过本文了解如何利用... 目录什么是注解?注解的类型内置注编程解自定义注解注解的保留策略实际用例最佳实践总结在 Java 编程

一文教你Python如何快速精准抓取网页数据

《一文教你Python如何快速精准抓取网页数据》这篇文章主要为大家详细介绍了如何利用Python实现快速精准抓取网页数据,文中的示例代码简洁易懂,具有一定的借鉴价值,有需要的小伙伴可以了解下... 目录1. 准备工作2. 基础爬虫实现3. 高级功能扩展3.1 抓取文章详情3.2 保存数据到文件4. 完整示例

使用Java将各种数据写入Excel表格的操作示例

《使用Java将各种数据写入Excel表格的操作示例》在数据处理与管理领域,Excel凭借其强大的功能和广泛的应用,成为了数据存储与展示的重要工具,在Java开发过程中,常常需要将不同类型的数据,本文... 目录前言安装免费Java库1. 写入文本、或数值到 Excel单元格2. 写入数组到 Excel表格

python处理带有时区的日期和时间数据

《python处理带有时区的日期和时间数据》这篇文章主要为大家详细介绍了如何在Python中使用pytz库处理时区信息,包括获取当前UTC时间,转换为特定时区等,有需要的小伙伴可以参考一下... 目录时区基本信息python datetime使用timezonepandas处理时区数据知识延展时区基本信息

Qt实现网络数据解析的方法总结

《Qt实现网络数据解析的方法总结》在Qt中解析网络数据通常涉及接收原始字节流,并将其转换为有意义的应用层数据,这篇文章为大家介绍了详细步骤和示例,感兴趣的小伙伴可以了解下... 目录1. 网络数据接收2. 缓冲区管理(处理粘包/拆包)3. 常见数据格式解析3.1 jsON解析3.2 XML解析3.3 自定义