批量生产千万级数据 推送到kafka代码

2024-06-19 04:52

本文主要是介绍批量生产千万级数据 推送到kafka代码,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1、批量规则生成代码

1、随机IP生成代码
2、指定时间范围内随机日期生成代码
3、随机中文名生成代码。

package com.wfg.flink.connector.utils;import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;/*** @author wfg*/
public class RandomGeneratorUtils {/*** 生成随机的IP地址* @return ip*/public static String generateRandomIp() {Random random = new Random();// 生成IP地址的四个部分 0-255int part1 = random.nextInt(256);int part2 = random.nextInt(256);int part3 = random.nextInt(256);int part4 = random.nextInt(256);// 将每个部分转换为字符串,并用点连接return part1 + "." + part2 + "." + part3 + "." + part4;}/*** 在指定的开始和结束日期之间生成一个随机的日期时间。** @param startDate 开始日期* @param endDate 结束日期* @return 随机生成的日期时间*/public static LocalDateTime generateRandomDateTime(LocalDate startDate, LocalDate endDate) {long startEpochDay = startDate.toEpochDay();long endEpochDay = endDate.toEpochDay();Random random = new Random();LocalDate randomDate = startDate;if (startEpochDay != endEpochDay) {long randomDay = startEpochDay + random.nextLong(endEpochDay - startEpochDay);randomDate = LocalDate.ofEpochDay(randomDay);}LocalTime randomTime = LocalTime.of(// 小时random.nextInt(24),// 分钟random.nextInt(60),// 秒random.nextInt(60));return LocalDateTime.of(randomDate, randomTime);}private static final List<String> FIRST_NAMES = new ArrayList<>();private static final List<String> LAST_NAMES = new ArrayList<>();static {// 初始化一些常见的名字和姓氏FIRST_NAMES.addAll(Arrays.asList("王","李","赵","刘","张", "钱", "孙", "周", "吴", "郑", "王", "冯", "陈", "褚", "卫", "蒋", "沈", "韩", "杨", "朱", "秦", "尤", "许", "何", "吕", "施","张", "孔", "曹", "严", "华", "金", "魏", "陶", "姜", "戚", "谢", "邹", "喻", "柏", "水", "窦", "章", "云", "苏", "潘", "葛", "奚", "范", "彭", "郎","鲁","韦", "昌", "马", "苗", "凤", " 刚", "文", "张", "桂", "富", "生", "龙", "功", "夫", "周", "建", "余", "冲", "程", "沙", "阳", "江", "潘"));LAST_NAMES.addAll(Arrays.asList("伟","芳","丽","强","明","风","阳","海","天","藏","霞","刚", "夫", "勇", "毅", "俊", "峰", "强", "军", "平", "保", "东", "文", "辉", "力", "明", "永", "健", "世", "广", "志", "义","义", "兴", "良", "海", "山", "仁", "波", "宁", "贵", "福", "生", "龙", "元", "全", "国", "胜", "学", "祥", "才", "艺", "能", "武", "航", "城", "春","明", "文", "辉", "建", "永", "强", "军", "平", "保", "东", "文", "辉", "力", "明", "永", "健", "世", "广", "志", "义", "义", "兴", "良", "海", "山","仁", "波", "宁", "贵", "福", "生", "龙", "元", "全", "国", "胜", "学", "祥", "才", "艺", "能", "武", "航", "城", "春", "明", "文", "辉", "建","永", "强", "军", "平", "保", "东", "文", "辉", "力", "明", "永", "健", "世", "广", "志", "义", "义", "兴", "良", "海", "山", "仁", "波", "宁","贵", "福", "生", "龙", "元", "全", "国", "胜", "学", "祥", "才", "艺", "能", "武", "航", "城", "春", "明", "文", "辉", "建", "永", "强", "军","平", "保", "东", "文", "辉", "力", "明", "永", "健", "世", "广", "志", "义", "义", "兴", "良", "海", "山", "仁", "波", "宁", "贵", "福", "生","龙", "元", "全", "国", "胜", "学", "祥", "才", "艺", "能", "武", "航", "城", "春", "明", "文", "辉", "建", "永", "强", "军", "平", "保", "东"));}/*** 生成一个随机的全名,由随机的姓和名组成。** @return 随机全名*/public static String generateRandomFullName() {Random random = new Random();String firstName = FIRST_NAMES.get(random.nextInt(FIRST_NAMES.size()));String lastName = LAST_NAMES.get(random.nextInt(LAST_NAMES.size()));return firstName + lastName;}
}

2. 生成对象体

package com.wfg.flink.connector.dto;import lombok.Data;/*** @author wfg*/
@Data
public class KafkaPvDto {// uuidprivate String uuid;// 用户名private String userName;// 访问时间private String visitTime;// 访问地址private String visitIp;// 访问服务IPprivate String visitServiceIp;
}

3. 批量数据生成推送代码

package com.wfg.flink.test.kafka;import cn.hutool.core.date.DateTime;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.date.LocalDateTimeUtil;
import com.alibaba.fastjson2.JSONObject;
import com.wfg.flink.connector.dto.KafkaPvDto;
import com.wfg.flink.connector.utils.RandomGeneratorUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;import java.time.LocalDate;
import java.util.Date;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;import static com.wfg.flink.connector.constants.Constants.KAFKA_BROKERS;
import static com.wfg.flink.connector.constants.Constants.TEST_TOPIC_PV;public class KafkaTestProducer {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", KAFKA_BROKERS);props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");try (Producer<String, String> producer = new KafkaProducer<>(props)){int times = 100000;for (int i = 0; i < times; i++) {System.out.println("Send No. :" + i );CompletableFuture.allOf(CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer))).join();producer.flush();}}}private static void sendKafkaMsg (Producer < String, String > producer){String msg = createMsg();System.out.println(msg);producer.send(new ProducerRecord<>(TEST_TOPIC_PV, UUID.randomUUID().toString().replaceAll("-", ""), msg));}private static String createMsg () {KafkaPvDto dto = new KafkaPvDto();dto.setUuid(UUID.randomUUID().toString().replaceAll("-", ""));dto.setUserName(RandomGeneratorUtils.generateRandomFullName());dto.setVisitIp(RandomGeneratorUtils.generateRandomIp());DateTime begin = DateUtil.beginOfDay(new Date());String timeStr = DateUtil.format(RandomGeneratorUtils.generateRandomDateTime(LocalDateTimeUtil.of(begin).toLocalDate(), LocalDate.now()), "yyyy-MM-dd HH:mm:ss");dto.setVisitTime(timeStr);dto.setVisitServiceIp(RandomGeneratorUtils.generateRandomIp());return JSONObject.toJSONString(dto);}
}

注意:
kafka本机运行请参考: kafka本地部署链接

这篇关于批量生产千万级数据 推送到kafka代码的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1074059

相关文章

Linux下利用select实现串口数据读取过程

《Linux下利用select实现串口数据读取过程》文章介绍Linux中使用select、poll或epoll实现串口数据读取,通过I/O多路复用机制在数据到达时触发读取,避免持续轮询,示例代码展示设... 目录示例代码(使用select实现)代码解释总结在 linux 系统里,我们可以借助 select、

Java集合之Iterator迭代器实现代码解析

《Java集合之Iterator迭代器实现代码解析》迭代器Iterator是Java集合框架中的一个核心接口,位于java.util包下,它定义了一种标准的元素访问机制,为各种集合类型提供了一种统一的... 目录一、什么是Iterator二、Iterator的核心方法三、基本使用示例四、Iterator的工

Java 线程池+分布式实现代码

《Java线程池+分布式实现代码》在Java开发中,池通过预先创建并管理一定数量的资源,避免频繁创建和销毁资源带来的性能开销,从而提高系统效率,:本文主要介绍Java线程池+分布式实现代码,需要... 目录1. 线程池1.1 自定义线程池实现1.1.1 线程池核心1.1.2 代码示例1.2 总结流程2. J

JS纯前端实现浏览器语音播报、朗读功能的完整代码

《JS纯前端实现浏览器语音播报、朗读功能的完整代码》在现代互联网的发展中,语音技术正逐渐成为改变用户体验的重要一环,下面:本文主要介绍JS纯前端实现浏览器语音播报、朗读功能的相关资料,文中通过代码... 目录一、朗读单条文本:① 语音自选参数,按钮控制语音:② 效果图:二、朗读多条文本:① 语音有默认值:②

Vue实现路由守卫的示例代码

《Vue实现路由守卫的示例代码》Vue路由守卫是控制页面导航的钩子函数,主要用于鉴权、数据预加载等场景,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着... 目录一、概念二、类型三、实战一、概念路由守卫(Navigation Guards)本质上就是 在路

uni-app小程序项目中实现前端图片压缩实现方式(附详细代码)

《uni-app小程序项目中实现前端图片压缩实现方式(附详细代码)》在uni-app开发中,文件上传和图片处理是很常见的需求,但也经常会遇到各种问题,下面:本文主要介绍uni-app小程序项目中实... 目录方式一:使用<canvas>实现图片压缩(推荐,兼容性好)示例代码(小程序平台):方式二:使用uni

JAVA实现Token自动续期机制的示例代码

《JAVA实现Token自动续期机制的示例代码》本文主要介绍了JAVA实现Token自动续期机制的示例代码,通过动态调整会话生命周期平衡安全性与用户体验,解决固定有效期Token带来的风险与不便,感兴... 目录1. 固定有效期Token的内在局限性2. 自动续期机制:兼顾安全与体验的解决方案3. 总结PS

C#中通过Response.Headers设置自定义参数的代码示例

《C#中通过Response.Headers设置自定义参数的代码示例》:本文主要介绍C#中通过Response.Headers设置自定义响应头的方法,涵盖基础添加、安全校验、生产实践及调试技巧,强... 目录一、基础设置方法1. 直接添加自定义头2. 批量设置模式二、高级配置技巧1. 安全校验机制2. 类型

Python屏幕抓取和录制的详细代码示例

《Python屏幕抓取和录制的详细代码示例》随着现代计算机性能的提高和网络速度的加快,越来越多的用户需要对他们的屏幕进行录制,:本文主要介绍Python屏幕抓取和录制的相关资料,需要的朋友可以参考... 目录一、常用 python 屏幕抓取库二、pyautogui 截屏示例三、mss 高性能截图四、Pill

使用MapStruct实现Java对象映射的示例代码

《使用MapStruct实现Java对象映射的示例代码》本文主要介绍了使用MapStruct实现Java对象映射的示例代码,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,... 目录一、什么是 MapStruct?二、实战演练:三步集成 MapStruct第一步:添加 Mave