Flink 流转表,表转流,watermark设置

2024-08-23 08:44

本文主要是介绍Flink 流转表,表转流,watermark设置,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

流转表

首先创建一个流


@Data
@AllArgsConstructor
@NoArgsConstructor
public static class Nan {private String xing;private String name;private Long ts;
}StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);DataStreamSource<String> sourceNan = env.socketTextStream("hdp01", 1111);
DataStreamSource<String> sourceNv = env.socketTextStream("hdp01", 2222);System.setProperty("java.net.preferIPv4Stack", "true");SingleOutputStreamOperator<Nan> beanNan = sourceNan.map(new MapFunction<String, Nan>() {@Overridepublic Nan map(String s) throws Exception {try {String[] split = s.split(",");return new Nan(split[0].substring(0, 1), split[1], Long.parseLong(split[2]));} catch (Exception e) {return null;}}
}).filter(Objects::nonNull).assignTimestampsAndWatermarks(WatermarkStrategy.<Nan>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<Nan>() {@Overridepublic long extractTimestamp(Nan nan, long l) {return nan.getTs();}
})).returns(TypeInformation.of(Nan.class));

创建watermark

流转表的时候有一个点要注意,watermark必须要重新指定,否则会丢失,常用的方式如下
创建watermark,有两步,
第一步:必须要依据一个字段来创建watermark,这个字段必须是timestamp_ltz(3)的类型。
第二步:根据时间戳字段生成watermark
时间戳字段有两种获取方式
1、根据一个bigint字段进行转换
2、在流转表,且流上设置了watermark的情况下,根据内置属性rowtime创建,这个rowtime是流转表时暴露出来的事件时间
watermark也有两种获取方式
1、根据时间戳字段重新创建watermark
2、在流转表,且流上设置了watermark的情况下,沿用流上的watermark

下面是两种场景,只要记住第一种就行了,其实第二种没什么用。

1、 根据一个bigint字段进行创建时间戳字段,然后重新创建watermark

tenv.createTemporaryView("nan", beanNan, Schema.newBuilder().column("xing", DataTypes.STRING()).column("name", DataTypes.STRING()).column("ts", DataTypes.BIGINT()).columnByExpression("rt", "to_timestamp_ltz(ts,3)") // 根据一个bigint字段进行转换.watermark("rt", "rt - interval '1' second ") // 重新创建watermark.build());

2、根据内置属性rowtime创建时间戳字段,然后沿用流上的watermark

tenv.createTemporaryView("nan1", beanNan, Schema.newBuilder().column("xing", DataTypes.STRING()).column("name", DataTypes.STRING()).column("ts", DataTypes.BIGINT()).columnByMetadata("rt", DataTypes.TIMESTAMP_LTZ(3),"rowtime") // 根据内置属性rowtime创建.watermark("rt", "source_watermark()") // 沿用流的watermark “source_watermark 等于 rt - interval '1' second”.build());
TableResult tableResult = tenv.executeSql("select *,current_watermark(rt) from nan");
tableResult.print();

表转流

首先创建一个表

 String source = "CREATE TABLE person (  " +"  xing STRING,  " +"  name STRING,  " +"  ts BIGINT,  " +"  rt as to_timestamp_ltz(ts,3),  " +"  watermark for rt as rt - interval '1' second  " +") WITH (  " +" 'connector' = 'kafka',  " +" 'topic' = 'flink_topic',  " +" 'properties.bootstrap.servers' = '172.16.10.139:9092',  " +" 'properties.group.id' = 'testGroup',  " +" 'scan.startup.mode' = 'latest-offset', " +" 'format' = 'json'  " +")";tenv.executeSql(source);

创建watermark

表转流,可以沿用流上的watermark,不需要额外声明

DataStream<Row> dataStream = tenv.toDataStream(table);dataStream.process(new ProcessFunction<Row, Object>() {@Overridepublic void processElement(Row value, ProcessFunction<Row, Object>.Context ctx, Collector<Object> out) throws Exception {System.out.println(value+" watermark=>"+ctx.timerService().currentWatermark());}
});
env.execute();

这篇关于Flink 流转表,表转流,watermark设置的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1098903

相关文章

python设置环境变量路径实现过程

《python设置环境变量路径实现过程》本文介绍设置Python路径的多种方法:临时设置(Windows用`set`,Linux/macOS用`export`)、永久设置(系统属性或shell配置文件... 目录设置python路径的方法临时设置环境变量(适用于当前会话)永久设置环境变量(Windows系统

Go语言编译环境设置教程

《Go语言编译环境设置教程》Go语言支持高并发(goroutine)、自动垃圾回收,编译为跨平台二进制文件,云原生兼容且社区活跃,开发便捷,内置测试与vet工具辅助检测错误,依赖模块化管理,提升开发效... 目录Go语言优势下载 Go  配置编译环境配置 GOPROXYIDE 设置(VS Code)一些基本

小白也能轻松上手! 路由器设置优化指南

《小白也能轻松上手!路由器设置优化指南》在日常生活中,我们常常会遇到WiFi网速慢的问题,这主要受到三个方面的影响,首要原因是WiFi产品的配置优化不合理,其次是硬件性能的不足,以及宽带线路本身的质... 在数字化时代,网络已成为生活必需品,追剧、游戏、办公、学习都离不开稳定高速的网络。但很多人面对新路由器

linux hostname设置全过程

《linuxhostname设置全过程》:本文主要介绍linuxhostname设置全过程,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录查询hostname设置步骤其它相关点hostid/etc/hostsEDChina编程A工具license破解注意事项总结以RHE

Python设置Cookie永不超时的详细指南

《Python设置Cookie永不超时的详细指南》Cookie是一种存储在用户浏览器中的小型数据片段,用于记录用户的登录状态、偏好设置等信息,下面小编就来和大家详细讲讲Python如何设置Cookie... 目录一、Cookie的作用与重要性二、Cookie过期的原因三、实现Cookie永不超时的方法(一)

Qt 设置软件版本信息的实现

《Qt设置软件版本信息的实现》本文介绍了Qt项目中设置版本信息的三种常用方法,包括.pro文件和version.rc配置、CMakeLists.txt与version.h.in结合,具有一定的参考... 目录在运行程序期间设置版本信息可以参考VS在 QT 中设置软件版本信息的几种方法方法一:通过 .pro

PostgreSQL 默认隔离级别的设置

《PostgreSQL默认隔离级别的设置》PostgreSQL的默认事务隔离级别是读已提交,这是其事务处理系统的基础行为模式,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价... 目录一 默认隔离级别概述1.1 默认设置1.2 各版本一致性二 读已提交的特性2.1 行为特征2.2

一文详解MySQL如何设置自动备份任务

《一文详解MySQL如何设置自动备份任务》设置自动备份任务可以确保你的数据库定期备份,防止数据丢失,下面我们就来详细介绍一下如何使用Bash脚本和Cron任务在Linux系统上设置MySQL数据库的自... 目录1. 编写备份脚本1.1 创建并编辑备份脚本1.2 给予脚本执行权限2. 设置 Cron 任务2

mtu设置多少网速最快? 路由器MTU设置最佳网速的技巧

《mtu设置多少网速最快?路由器MTU设置最佳网速的技巧》mtu设置多少网速最快?想要通过设置路由器mtu获得最佳网速,该怎么设置呢?下面我们就来看看路由器MTU设置最佳网速的技巧... 答:1500 MTU值指的是在网络传输中数据包的最大值,合理的设置MTU 值可以让网络更快!mtu设置可以优化不同的网

MySQL 设置AUTO_INCREMENT 无效的问题解决

《MySQL设置AUTO_INCREMENT无效的问题解决》本文主要介绍了MySQL设置AUTO_INCREMENT无效的问题解决,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参... 目录快速设置mysql的auto_increment参数一、修改 AUTO_INCREMENT 的值。