0基础学习PyFlink——使用datagen生成流式数据

2023-11-03 18:30

本文主要是介绍0基础学习PyFlink——使用datagen生成流式数据,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

大纲

  • 可控参数
    • 字段级规则
      • 生成方式
      • 数值控制
      • 时间戳控制
    • 表级规则
      • 生成速度
      • 生成总量
  • 结构
    • 生成环境
    • 定义行结构
      • 定义表信息
  • 案例
    • 随机Int型
    • 顺序Int型
    • 随机型Int数组
    • 带时间戳的多列数据
  • 完整代码
  • 参考资料

在研究Flink的水位线(WaterMark)技术之前,我们可能需要Flink接收到流式数据,比如接入Kafka等。这就要求引入其他组件,增加了学习的难度。而Flink自身提供了datagen连接器,它可以用于生成流式数据,让问题内聚在Flink代码内部,从而降低学习探索的难度。
本节我们就介绍如何使用datagen生成数据。

可控参数

我们可以使用option方法控制生成的一些规则,主要分为“字段级规则”和“表级规则”。

字段级规则

顾名思义,字段级规则是指该规则作用于具体哪个字段,这就需要指明字段的名称——fields.col_name

生成方式

字段的生成方式由下面的字符串形式来控制(#表示字段的名称,下同)

fields.#.kind

可选值有:

  • random:随机方式,比如5,2,1,4,6……。
  • sequence:顺序方式,比如1,2,3,4,5,6……。

数值控制

如果kind是sequence,则数值控制使用:

  • fields.#.start:区间的起始值。
  • fields.#.end:区间的结束值。

如果配置了这个两个参数,则会生成有限个数的数据。

如果kind是random,则数值控制使用:

  • fields.#.min:随机算法会选取的最小值。
  • fields.#.max:随机算法会选取的最大值。

时间戳控制

fields.#.max-past仅仅可以用于TIMESTAMP和TIMESTAMP_LTZ类型的数据。它表示离现在时间戳最大的时间差,这个默认值是0。TIMESTAMP和TIMESTAMP_LTZ只支持random模式生成,这就需要控制随机值的区间。如果区间太小,我们生成的时间可能非常集中。后面我们会做相关测试。

表级规则

生成速度

rows-per-second表示每秒可以生成几条数据。

生成总量

number-of-rows表示一共可以生成多少条数据。如果这个参数不设置,则表示可以生成无界流。

结构

生成环境

我们需要流式环境,而datagen是Table API的连接器,于是使用流式执行环境创建一个流式表环境。

    stream_execute_env = StreamExecutionEnvironment.get_execution_environment()stream_execute_env.set_runtime_mode(RuntimeExecutionMode.STREAMING)stream_table_env = StreamTableEnvironment.create(stream_execution_environment=stream_execute_env)

定义行结构

    schame = Schema.new_builder().column('seed', DataTypes.INT()).build()

这个结构以及支持的生成模式是:

TypeSupported Generators
BOOLEANrandom
CHARrandom / sequence
VARCHARrandom / sequence
BINARYrandom / sequence
VARBINARYrandom / sequence
STRINGrandom / sequence
DECIMALrandom / sequence
TINYINTrandom / sequence
SMALLINTrandom / sequence
INTrandom / sequence
BIGINTrandom / sequence
FLOATrandom / sequence
DOUBLErandom / sequence
DATErandom
TIMErandom
TIMESTAMPrandom
TIMESTAMP_LTZrandom
INTERVAL YEAR TO MONTHrandom
INTERVAL DAY TO MONTHrandom
ROWrandom
ARRAYrandom
MAPrandom
MULTISETrandom

定义表信息

下面这个例子就是给seed字段按随机模式,生成seed_min和seed_max之间的数值,并且每秒生成rows_per_second行。

    table_descriptor = TableDescriptor.for_connector('datagen') \.schema(schame) \.option('fields.seed.kind', 'random') \.option('fields.seed.min', str(seed_min)) \.option('fields.seed.max', str(seed_max)) \.option('rows-per-second', str(rows_per_second)) \.build()

案例

随机Int型

每秒生成5行数据,每行数据中seed字段值随机在最小值0和最大值100之间。由于没有指定number-of-rows,生成的是无界流。

def gen_random_int():stream_execute_env = StreamExecutionEnvironment.get_execution_environment()stream_execute_env.set_runtime_mode(RuntimeExecutionMode.STREAMING)stream_table_env = StreamTableEnvironment.create(stream_execution_environment=stream_execute_env)seed_min = 0seed_max = 100rows_per_second = 5schame = Schema.new_builder().column('seed', DataTypes.INT()).build()table_descriptor = TableDescriptor.for_connector('datagen') \.schema(schame) \.option('fields.seed.kind', 'random') \.option('fields.seed.min', str(seed_min)) \.option('fields.seed.max', str(seed_max)) \.option('rows-per-second', str(rows_per_second)) \.build()stream_table_env.create_temporary_table('source', table_descriptor)table = stream_table_env.from_path('source')table.execute().print()
+----+-------------+
| op |        seed |
+----+-------------+
| +I |          25 |
| +I |          28 |
| +I |          73 |
| +I |          68 |
| +I |          40 |
| +I |          55 |
| +I |           6 |
| +I |          41 |
| +I |          16 |
| +I |          19 |
……

顺序Int型

每秒生成5行数据,每行数据中seed字段值从1开始递增,一直自增到10。由于设置了最大和最小值,生成的是有界流。

def gen_sequence_int():stream_execute_env = StreamExecutionEnvironment.get_execution_environment()stream_execute_env.set_runtime_mode(RuntimeExecutionMode.STREAMING)stream_table_env = StreamTableEnvironment.create(stream_execution_environment=stream_execute_env)seed_min = 1seed_max = 10rows_per_second = 5schame = Schema.new_builder().column('seed', DataTypes.INT()).build()table_descriptor = TableDescriptor.for_connector('datagen') \.schema(schame) \.option('fields.seed.kind', 'sequence') \.option('fields.seed.start', str(seed_min)) \.option('fields.seed.end', str(seed_max)) \.option('rows-per-second', str(rows_per_second)) \.build()stream_table_env.create_temporary_table('source', table_descriptor)table = stream_table_env.from_path('source')table.execute().print()
+----+-------------+
| op |        seed |
+----+-------------+
| +I |           1 |
| +I |           2 |
| +I |           3 |
| +I |           4 |
| +I |           5 |
| +I |           6 |
| +I |           7 |
| +I |           8 |
| +I |           9 |
| +I |          10 |
+----+-------------+
10 rows in set

随机型Int数组

每秒生成5行数据,每行数据中seed字段是一个Int型数组,数组里面的每个元素也是随机的。

def gen_random_int_array():stream_execute_env = StreamExecutionEnvironment.get_execution_environment()stream_execute_env.set_runtime_mode(RuntimeExecutionMode.STREAMING)stream_table_env = StreamTableEnvironment.create(stream_execution_environment=stream_execute_env)rows_per_second = 5schame = Schema.new_builder().column('seed', DataTypes.ARRAY(DataTypes.INT())) \.build()table_descriptor = TableDescriptor.for_connector('datagen') \.schema(schame) \.option('fields.seed.kind', 'random') \.option('rows-per-second', str(rows_per_second)) \.build()stream_table_env.create_temporary_table('source', table_descriptor)table = stream_table_env.from_path('source')table.execute().print()
+----+--------------------------------+
| op |                           seed |
+----+--------------------------------+
| +I | [625785630, -933999461, -48... |
| +I | [2087310154, 1602723641, 19... |
| +I | [1299442620, -613376781, -8... |
| +I | [2051511574, 246258035, -16... |
| +I | [2029482070, -1496468635, -... |
| +I | [1230213175, -1506525784, 7... |
| +I | [501476712, 1901967363, -56... |
……

带时间戳的多列数据

每秒生成5行数据,每行数据中seed字段值随机在最小值0和最大值100之间;timestamp字段随机在当前时间戳和“当前时间戳+max-past”之间。

def gen_random_int_and_timestamp():stream_execute_env = StreamExecutionEnvironment.get_execution_environment()stream_execute_env.set_runtime_mode(RuntimeExecutionMode.STREAMING)stream_table_env = StreamTableEnvironment.create(stream_execution_environment=stream_execute_env)seed_min = 0seed_max = 100rows_per_second = 5schame = Schema.new_builder().column('seed', DataTypes.INT()) \.column('timestamp', DataTypes.TIMESTAMP()) \.build()table_descriptor = TableDescriptor.for_connector('datagen') \.schema(schame) \.option('fields.seed.kind', 'random') \.option('fields.seed.min', str(seed_min)) \.option('fields.seed.max', str(seed_max)) \.option('fields.timestamp.kind', 'random') \.option('fields.timestamp.max-past', '0') \.option('rows-per-second', str(rows_per_second)) \.build()stream_table_env.create_temporary_table('source', table_descriptor)table = stream_table_env.from_path('source')table.execute().print()

由于max-past值为0,所以我们看到上例中每秒生成的timestamp 都极接近。

+----+-------------+----------------------------+
| op |        seed |                  timestamp |
+----+-------------+----------------------------+
| +I |          66 | 2023-11-02 13:53:29.082000 |
| +I |           9 | 2023-11-02 13:53:29.146000 |
| +I |          12 | 2023-11-02 13:53:29.146000 |
| +I |          52 | 2023-11-02 13:53:29.146000 |
| +I |          29 | 2023-11-02 13:53:29.146000 |
| +I |          63 | 2023-11-02 13:53:30.066000 |
| +I |          25 | 2023-11-02 13:53:30.066000 |
| +I |          21 | 2023-11-02 13:53:30.066000 |
| +I |          24 | 2023-11-02 13:53:30.066000 |
| +I |           6 | 2023-11-02 13:53:30.066000 |
| +I |          62 | 2023-11-02 13:53:31.067000 |
| +I |          57 | 2023-11-02 13:53:31.067000 |
| +I |          44 | 2023-11-02 13:53:31.067000 |
| +I |           6 | 2023-11-02 13:53:31.067000 |
| +I |          16 | 2023-11-02 13:53:31.067000 |
……

如果我们把max-past放大到比较大的数值,timestamp也将大幅度变化。

.option('fields.timestamp.max-past', '10000')
+----+-------------+----------------------------+
| op |        seed |                  timestamp |
+----+-------------+----------------------------+
| +I |          89 | 2023-11-02 13:57:17.342000 |
| +I |          35 | 2023-11-02 13:57:10.915000 |
| +I |          32 | 2023-11-02 13:57:11.045000 |
| +I |          74 | 2023-11-02 13:57:18.407000 |
| +I |          24 | 2023-11-02 13:57:13.603000 |
| +I |          82 | 2023-11-02 13:57:12.139000 |
| +I |          41 | 2023-11-02 13:57:16.129000 |
| +I |          95 | 2023-11-02 13:57:16.592000 |
| +I |          80 | 2023-11-02 13:57:14.364000 |
| +I |          60 | 2023-11-02 13:57:18.994000 |
| +I |          56 | 2023-11-02 13:57:19.330000 |
| +I |          10 | 2023-11-02 13:57:18.876000 |
| +I |          43 | 2023-11-02 13:57:12.449000 |
| +I |          73 | 2023-11-02 13:57:13.183000 |
| +I |          17 | 2023-11-02 13:57:18.736000 |
| +I |          46 | 2023-11-02 13:57:21.368000 |
……

完整代码


from pyflink.datastream import StreamExecutionEnvironment,RuntimeExecutionMode
from pyflink.table import StreamTableEnvironment, TableDescriptor, Schema, DataTypesdef gen_random_int():stream_execute_env = StreamExecutionEnvironment.get_execution_environment()stream_execute_env.set_runtime_mode(RuntimeExecutionMode.STREAMING)stream_table_env = StreamTableEnvironment.create(stream_execution_environment=stream_execute_env)seed_min = 0seed_max = 100rows_per_second = 5schame = Schema.new_builder().column('seed', DataTypes.INT()).build()table_descriptor = TableDescriptor.for_connector('datagen') \.schema(schame) \.option('fields.seed.kind', 'random') \.option('fields.seed.min', str(seed_min)) \.option('fields.seed.max', str(seed_max)) \.option('rows-per-second', str(rows_per_second)) \.build()stream_table_env.create_temporary_table('source', table_descriptor)table = stream_table_env.from_path('source')table.execute().print()def gen_sequence_int():stream_execute_env = StreamExecutionEnvironment.get_execution_environment()stream_execute_env.set_runtime_mode(RuntimeExecutionMode.STREAMING)stream_table_env = StreamTableEnvironment.create(stream_execution_environment=stream_execute_env)seed_min = 1seed_max = 10rows_per_second = 5schame = Schema.new_builder().column('seed', DataTypes.INT()).build()table_descriptor = TableDescriptor.for_connector('datagen') \.schema(schame) \.option('fields.seed.kind', 'sequence') \.option('fields.seed.start', str(seed_min)) \.option('fields.seed.end', str(seed_max)) \.option('rows-per-second', str(rows_per_second)) \.build()stream_table_env.create_temporary_table('source', table_descriptor)table = stream_table_env.from_path('source')table.execute().print()def gen_sequence_string():stream_execute_env = StreamExecutionEnvironment.get_execution_environment()stream_execute_env.set_runtime_mode(RuntimeExecutionMode.STREAMING)stream_table_env = StreamTableEnvironment.create(stream_execution_environment=stream_execute_env)seed_min = 0seed_max = 100rows_per_second = 5schame = Schema.new_builder().column('seed', DataTypes.STRING()).build()table_descriptor = TableDescriptor.for_connector('datagen') \.schema(schame) \.option('fields.seed.kind', 'sequence') \.option('fields.seed.start', str(seed_min)) \.option('fields.seed.end', str(seed_max)) \.option('rows-per-second', str(rows_per_second)) \.build()stream_table_env.create_temporary_table('source', table_descriptor)table = stream_table_env.from_path('source')table.execute().print()def gen_random_char():stream_execute_env = StreamExecutionEnvironment.get_execution_environment()stream_execute_env.set_runtime_mode(RuntimeExecutionMode.STREAMING)stream_table_env = StreamTableEnvironment.create(stream_execution_environment=stream_execute_env)rows_per_second = 5schame = Schema.new_builder().column('seed', DataTypes.CHAR(4)).build()table_descriptor = TableDescriptor.for_connector('datagen') \.schema(schame) \.option('fields.seed.kind', 'random') \.option('rows-per-second', str(rows_per_second)) \.build()stream_table_env.create_temporary_table('source', table_descriptor)table = stream_table_env.from_path('source')table.execute().print()def gen_random_int_and_timestamp():stream_execute_env = StreamExecutionEnvironment.get_execution_environment()stream_execute_env.set_runtime_mode(RuntimeExecutionMode.STREAMING)stream_table_env = StreamTableEnvironment.create(stream_execution_environment=stream_execute_env)seed_min = 0seed_max = 100rows_per_second = 5schame = Schema.new_builder().column('seed', DataTypes.INT()) \.column('timestamp', DataTypes.TIMESTAMP()) \.build()table_descriptor = TableDescriptor.for_connector('datagen') \.schema(schame) \.option('fields.seed.kind', 'random') \.option('fields.seed.min', str(seed_min)) \.option('fields.seed.max', str(seed_max)) \.option('fields.timestamp.kind', 'random') \.option('fields.timestamp.max-past', '10000') \.option('rows-per-second', str(rows_per_second)) \.build()stream_table_env.create_temporary_table('source', table_descriptor)table = stream_table_env.from_path('source')table.execute().print()def gen_random_int_array():stream_execute_env = StreamExecutionEnvironment.get_execution_environment()stream_execute_env.set_runtime_mode(RuntimeExecutionMode.STREAMING)stream_table_env = StreamTableEnvironment.create(stream_execution_environment=stream_execute_env)rows_per_second = 5schame = Schema.new_builder().column('seed', DataTypes.ARRAY(DataTypes.INT())) \.build()table_descriptor = TableDescriptor.for_connector('datagen') \.schema(schame) \.option('fields.seed.kind', 'random') \.option('rows-per-second', str(rows_per_second)) \.build()stream_table_env.create_temporary_table('source', table_descriptor)table = stream_table_env.from_path('source')table.execute().print()def gen_random_map():stream_execute_env = StreamExecutionEnvironment.get_execution_environment()stream_execute_env.set_runtime_mode(RuntimeExecutionMode.STREAMING)stream_table_env = StreamTableEnvironment.create(stream_execution_environment=stream_execute_env)rows_per_second = 5schame = Schema.new_builder().column('seed', DataTypes.MAP(DataTypes.STRING(), DataTypes.INT())) \.build()table_descriptor = TableDescriptor.for_connector('datagen') \.schema(schame) \.option('fields.seed.kind', 'random') \.option('rows-per-second', str(rows_per_second)) \.build()stream_table_env.create_temporary_table('source', table_descriptor)table = stream_table_env.from_path('source')table.execute().print()def gen_random_multiset():stream_execute_env = StreamExecutionEnvironment.get_execution_environment()stream_execute_env.set_runtime_mode(RuntimeExecutionMode.STREAMING)stream_table_env = StreamTableEnvironment.create(stream_execution_environment=stream_execute_env)rows_per_second = 5schame = Schema.new_builder().column('seed', DataTypes.MULTISET(DataTypes.STRING())) \.build()table_descriptor = TableDescriptor.for_connector('datagen') \.schema(schame) \.option('fields.seed.kind', 'random') \.option('rows-per-second', str(rows_per_second)) \.build()stream_table_env.create_temporary_table('source', table_descriptor)table = stream_table_env.from_path('source')table.execute().print()def gen_random_row():stream_execute_env = StreamExecutionEnvironment.get_execution_environment()stream_execute_env.set_runtime_mode(RuntimeExecutionMode.STREAMING)stream_table_env = StreamTableEnvironment.create(stream_execution_environment=stream_execute_env)rows_per_second = 5schame = Schema.new_builder().column('seed', DataTypes.ROW([DataTypes.FIELD("id", DataTypes.BIGINT()), DataTypes.FIELD("data", DataTypes.STRING())])) \.build()table_descriptor = TableDescriptor.for_connector('datagen') \.schema(schame) \.option('fields.seed.kind', 'random') \.option('rows-per-second', str(rows_per_second)) \.build()stream_table_env.create_temporary_table('source', table_descriptor)table = stream_table_env.from_path('source')table.execute().print()if __name__ == '__main__':gen_random_int_and_timestamp()

参考资料

  • https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/datagen/

这篇关于0基础学习PyFlink——使用datagen生成流式数据的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/340065

相关文章

SpringBoot分段处理List集合多线程批量插入数据方式

《SpringBoot分段处理List集合多线程批量插入数据方式》文章介绍如何处理大数据量List批量插入数据库的优化方案:通过拆分List并分配独立线程处理,结合Spring线程池与异步方法提升效率... 目录项目场景解决方案1.实体类2.Mapper3.spring容器注入线程池bejsan对象4.创建

PHP轻松处理千万行数据的方法详解

《PHP轻松处理千万行数据的方法详解》说到处理大数据集,PHP通常不是第一个想到的语言,但如果你曾经需要处理数百万行数据而不让服务器崩溃或内存耗尽,你就会知道PHP用对了工具有多强大,下面小编就... 目录问题的本质php 中的数据流处理:为什么必不可少生成器:内存高效的迭代方式流量控制:避免系统过载一次性

Python使用FastAPI实现大文件分片上传与断点续传功能

《Python使用FastAPI实现大文件分片上传与断点续传功能》大文件直传常遇到超时、网络抖动失败、失败后只能重传的问题,分片上传+断点续传可以把大文件拆成若干小块逐个上传,并在中断后从已完成分片继... 目录一、接口设计二、服务端实现(FastAPI)2.1 运行环境2.2 目录结构建议2.3 serv

C#实现千万数据秒级导入的代码

《C#实现千万数据秒级导入的代码》在实际开发中excel导入很常见,现代社会中很容易遇到大数据处理业务,所以本文我就给大家分享一下千万数据秒级导入怎么实现,文中有详细的代码示例供大家参考,需要的朋友可... 目录前言一、数据存储二、处理逻辑优化前代码处理逻辑优化后的代码总结前言在实际开发中excel导入很

Spring Security简介、使用与最佳实践

《SpringSecurity简介、使用与最佳实践》SpringSecurity是一个能够为基于Spring的企业应用系统提供声明式的安全访问控制解决方案的安全框架,本文给大家介绍SpringSec... 目录一、如何理解 Spring Security?—— 核心思想二、如何在 Java 项目中使用?——

springboot中使用okhttp3的小结

《springboot中使用okhttp3的小结》OkHttp3是一个JavaHTTP客户端,可以处理各种请求类型,比如GET、POST、PUT等,并且支持高效的HTTP连接池、请求和响应缓存、以及异... 在 Spring Boot 项目中使用 OkHttp3 进行 HTTP 请求是一个高效且流行的方式。

Java使用Javassist动态生成HelloWorld类

《Java使用Javassist动态生成HelloWorld类》Javassist是一个非常强大的字节码操作和定义库,它允许开发者在运行时创建新的类或者修改现有的类,本文将简单介绍如何使用Javass... 目录1. Javassist简介2. 环境准备3. 动态生成HelloWorld类3.1 创建CtC

使用Python批量将.ncm格式的音频文件转换为.mp3格式的实战详解

《使用Python批量将.ncm格式的音频文件转换为.mp3格式的实战详解》本文详细介绍了如何使用Python通过ncmdump工具批量将.ncm音频转换为.mp3的步骤,包括安装、配置ffmpeg环... 目录1. 前言2. 安装 ncmdump3. 实现 .ncm 转 .mp34. 执行过程5. 执行结

Java使用jar命令配置服务器端口的完整指南

《Java使用jar命令配置服务器端口的完整指南》本文将详细介绍如何使用java-jar命令启动应用,并重点讲解如何配置服务器端口,同时提供一个实用的Web工具来简化这一过程,希望对大家有所帮助... 目录1. Java Jar文件简介1.1 什么是Jar文件1.2 创建可执行Jar文件2. 使用java

C#使用Spire.Doc for .NET实现HTML转Word的高效方案

《C#使用Spire.Docfor.NET实现HTML转Word的高效方案》在Web开发中,HTML内容的生成与处理是高频需求,然而,当用户需要将HTML页面或动态生成的HTML字符串转换为Wor... 目录引言一、html转Word的典型场景与挑战二、用 Spire.Doc 实现 HTML 转 Word1