Blink SQL之创建消息队列Kafka源表

2023-11-29 15:32

本文主要是介绍Blink SQL之创建消息队列Kafka源表,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

创建消息队列Kafka源表

注意事项

  • 仅适用于Blink 2.0及以上版本。
  • 仅适用于独享模式。
  • Kafka源表支持读取自建Kafka集群,但需要注意版本对应关系,以及自建集群和Blink版集群的网络环境配置。
  • 二进制数据不支持本地调试,语法检查没有问题请进行线上调试。

从Kafka输出的数据为序列化后的VARBINARY(二进制)格式。对于输出的每条数据,需要您编写自定义表值函数(UDTF)将其解析为序列化前的数据结构。Kafka源表数据解析流程通常为:Kafka Source Table -> UDTF -> Realtime Compute for Apache Flink -> Sink。此外,Flink SQL中也支持通过CAST函数将VARBINARY解析为VARCHAR类型。

DDL定义

Kafka源表定义DDL部分必须与以下SQL完全一致,可以更改WITH参数中的设置。

create table kafka_stream(   --必须和Kafka源表中的5个字段的顺序和类型保持一致。messageKey VARBINARY,`message`    VARBINARY,topic      VARCHAR,`partition`  INT,`offset`     BIGINT        
) with (type ='kafka010',topic = '<yourTopicName>',`group.id` = '<yourGroupId>',...
);

WITH参数

  • 通用配置
参数注释说明是否必选备注
typeKafka对应版本Kafka版本需要是Kafka08、Kafka09、Kafka010或Kafka011。
topic读取的单个topic
topicPattern读取一批topic的表达式Topic用竖线(|)分隔。例如:topic1|topic2|topic3
startupMode启动位点启动位点取值如下:
GROUP_OFFSETS(默认值):根据Group读取。
EARLIEST:从Kafka最早分区开始读取。
LATEST:从Kafka最新位点开始读取。
TIMESTAMP:从指定的时间点读取。
partitionDiscoveryIntervalMS定时检查是否有新分区产生Kafka 08版本:系统默认开启该功能。
Kafka 09版本及以上版本:不支持partitionDiscoveryIntervalMS参数。
extraConfig额外的KafkaConsumer配置项目不在可选配置项中,但是期望额外增加的配置。
  • Kafka08配置

    • Kafka08必选配置

      参数注释说明是否必选
      group.id消费组ID
      zookeeper.connectzk链接地址
    • 可选配置Key

      • consumer.id
      • socket.timeout.ms
      • fetch.message.max.bytes
      • num.consumer.fetchers
      • auto.commit.enable
      • auto.commit.interval.ms
      • queued.max.message.chunks
      • rebalance.max.retries
      • fetch.min.bytes
      • fetch.wait.max.ms
      • rebalance.backoff.ms
      • refresh.leader.backoff.ms
      • auto.offset.reset
      • consumer.timeout.ms
      • exclude.internal.topics
      • partition.assignment.strategy
      • client.id
      • zookeeper.session.timeout.ms
      • zookeeper.connection.timeout.ms
      • zookeeper.sync.time.ms
      • offsets.storage
      • offsets.channel.backoff.ms
      • offsets.channel.socket.timeout.ms
      • offsets.commit.max.retries
      • dual.commit.enabled
      • partition.assignment.strategy
      • socket.receive.buffer.bytes
      • fetch.min.bytes
  • Kafka09/Kafka010/Kafka011配置

    • Kafka09/Kafka010/Kafka011必选配置

      参数注释说明
      group.id消费组ID
      bootstrap.serversKafka集群地址
    • Kafka09/Kafka010/Kafka011可选配置,请参Kafka官方文档进行配置。

      • Kafka09
      • Kafka010
      • Kafka011

    当需要配置某选项时,在DDL中的WITH部分增加对应的参数即可。例如,配置SASL登录,需增加security.protocolsasl.mechanismsasl.jaas.config3个参数,示例如下。

    create table kafka_stream(messageKey varbinary,`message` varbinary,topic varchar,`partition` int,`offset` bigint
    ) with (type ='kafka010',topic = '<yourTopicName>',`group.id` = '<yourGroupId>',...,`security.protocol`='SASL_PLAINTEXT',`sasl.mechanism`='PLAIN',`sasl.jaas.config`='org.apache.kafka.common.security.plain.PlainLoginModule required username="<yourUserName>" password="<yourPassword>";'
    );
    

Kafka版本对应关系

typeKafka版本
Kafka080.8.22
Kafka090.9.0.1
Kafka0100.10.2.1
Kafka0110.11.0.2及以上

Kafka消息解析示例

  • 场景1:将Kafka中的数据进行计算,并将计算结果输出到RDS。

    Kafka中保存了JSON格式数据,需要使用实时计算Flink版进行计算,消息格式示例如下。

    {"name":"Alice","age":13,"grade":"A"
    }                
    
    • 方法1:Kafka SOURCE->Realtime Compute for Apache Flink->RDS SINK

      Blink 2.2.7及以上版本支持将VARBINARY类型通过CAST函数转换为VARCHAR类型,再通过JSON_VALUE函数对Kafka数据进行解析,示例如下。

      CREATE TABLE kafka_src (messageKey  VARBINARY,`message`   VARBINARY,topic       VARCHAR,`partition` INT,`offset`    BIGINT
      ) WITH (type = 'kafka010'   --请参见Kafka版本对应关系。
      );CREATE TABLE rds_sink (`name`       VARCHAR,age         VARCHAR,grade       VARCHAR
      ) WITH(type='rds'
      );CREATE VIEW input_view AS SELECT CAST(`message` as VARCHAR ) as `message`
      FROM kafka_src;INSERT INTO rds_sink
      SELECT JSON_VALUE(`message`,'$.name'),JSON_VALUE(`message`,'$.age'),JSON_VALUE(`message`,'$.grade')
      FROM input_view;
      
    • 方法2:Kafka Source->UDTF->Realtime Compute for Apache Flink->RDS Sink

      针对不规则数据、复杂JSON数据,需要您自行编写UDTF代码进行解析,示例如下。

      • SQL

        -- 定义解析Kafka message的UDTF。
        CREATE FUNCTION kafkaparser AS 'com.alibaba.kafkaUDTF';-- 定义源表。注意:Kafka源表DDL字段必须与以下示例完全一致。WITH中参数可以修改。
        CREATE TABLE kafka_src (messageKey  VARBINARY,`message`   VARBINARY,topic       VARCHAR,`partition` INT,`offset`    BIGINT
        ) WITH (type = 

这篇关于Blink SQL之创建消息队列Kafka源表的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/433364

相关文章

MySQL MCP 服务器安装配置最佳实践

《MySQLMCP服务器安装配置最佳实践》本文介绍MySQLMCP服务器的安装配置方法,本文结合实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下... 目录mysql MCP 服务器安装配置指南简介功能特点安装方法数据库配置使用MCP Inspector进行调试开发指

mysql中insert into的基本用法和一些示例

《mysql中insertinto的基本用法和一些示例》INSERTINTO用于向MySQL表插入新行,支持单行/多行及部分列插入,下面给大家介绍mysql中insertinto的基本用法和一些示例... 目录基本语法插入单行数据插入多行数据插入部分列的数据插入默认值注意事项在mysql中,INSERT I

一文详解MySQL如何设置自动备份任务

《一文详解MySQL如何设置自动备份任务》设置自动备份任务可以确保你的数据库定期备份,防止数据丢失,下面我们就来详细介绍一下如何使用Bash脚本和Cron任务在Linux系统上设置MySQL数据库的自... 目录1. 编写备份脚本1.1 创建并编辑备份脚本1.2 给予脚本执行权限2. 设置 Cron 任务2

SQL Server修改数据库名及物理数据文件名操作步骤

《SQLServer修改数据库名及物理数据文件名操作步骤》在SQLServer中重命名数据库是一个常见的操作,但需要确保用户具有足够的权限来执行此操作,:本文主要介绍SQLServer修改数据... 目录一、背景介绍二、操作步骤2.1 设置为单用户模式(断开连接)2.2 修改数据库名称2.3 查找逻辑文件名

SQL Server数据库死锁处理超详细攻略

《SQLServer数据库死锁处理超详细攻略》SQLServer作为主流数据库管理系统,在高并发场景下可能面临死锁问题,影响系统性能和稳定性,这篇文章主要给大家介绍了关于SQLServer数据库死... 目录一、引言二、查询 Sqlserver 中造成死锁的 SPID三、用内置函数查询执行信息1. sp_w

Python中使用uv创建环境及原理举例详解

《Python中使用uv创建环境及原理举例详解》uv是Astral团队开发的高性能Python工具,整合包管理、虚拟环境、Python版本控制等功能,:本文主要介绍Python中使用uv创建环境及... 目录一、uv工具简介核心特点:二、安装uv1. 通过pip安装2. 通过脚本安装验证安装:配置镜像源(可

canal实现mysql数据同步的详细过程

《canal实现mysql数据同步的详细过程》:本文主要介绍canal实现mysql数据同步的详细过程,本文通过实例图文相结合给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的... 目录1、canal下载2、mysql同步用户创建和授权3、canal admin安装和启动4、canal

SQL中JOIN操作的条件使用总结与实践

《SQL中JOIN操作的条件使用总结与实践》在SQL查询中,JOIN操作是多表关联的核心工具,本文将从原理,场景和最佳实践三个方面总结JOIN条件的使用规则,希望可以帮助开发者精准控制查询逻辑... 目录一、ON与WHERE的本质区别二、场景化条件使用规则三、最佳实践建议1.优先使用ON条件2.WHERE用

MySQL存储过程之循环遍历查询的结果集详解

《MySQL存储过程之循环遍历查询的结果集详解》:本文主要介绍MySQL存储过程之循环遍历查询的结果集,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录前言1. 表结构2. 存储过程3. 关于存储过程的SQL补充总结前言近来碰到这样一个问题:在生产上导入的数据发现

MySQL 衍生表(Derived Tables)的使用

《MySQL衍生表(DerivedTables)的使用》本文主要介绍了MySQL衍生表(DerivedTables)的使用,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学... 目录一、衍生表简介1.1 衍生表基本用法1.2 自定义列名1.3 衍生表的局限在SQL的查询语句select