Flink构造宽表实时入库案例介绍

2024-01-10 14:28

本文主要是介绍Flink构造宽表实时入库案例介绍,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1. 安装包准备

Flink 1.15.4 安装包

Flink cdc的mysql连接器

Flink sql的sdb连接器

MySQL驱动

SDB驱动

Flink jdbc的mysql连接器

 

2. 入库流程图

3. Flink安装部署

  1. 上传Flink压缩包到服务器,并解压

tar -zxvf  flink-1.14.5-bin-scala_2.11.tgz  -C /opt/

  1. 复制依赖至Flink中

cp sdb-flink-connector-3.4.8-jar-with-dependencies.jar /opt/flink-1.14.5/lib
cp sequoiadb-driver-3.4.8.jre8.jar /opt/flink-1.14.5/lib
cp flink-sql-connector-mysql-cdc-2.2.1.jar /opt/flink-1.14.5/lib
cp flink-connector-jdbc_2.11-1.14.6.jar /opt/flink-1.14.5/lib

  1. 修改flink-conf.yaml文件

vi conf/flink-conf.yaml

### 配置Master的机器名(IP地址)
jobmanager.rpc.address: sdb1
### 配置每个taskmanager 生成的临时文件夹
io.tmp.dirs: /opt/flink-1.14.5/tmp

  1. 修改master文件

vi conf/masters

#作为master的ip和端口号
upgrade1:8081

  1. 修改worker文件

vi conf/workers

#集群主机名
upgrade1
upgrade2
upgrade3

  1. 拷贝到集群其他机器

scp -r /opt/flink-1.14.5 sdbadmin@upgrade2:/opt/
scp -r /opt/flink-1.14.5 sdbadmin@upgrade3:/opt/

  1. 启动flink集群

[sdbadmin@upgrade1 flink-1.14.5]$ ./bin/start-cluster.sh

  1. 启动flink-SQL

[sdbadmin@upgrade1 flink-1.14.5]$ ./bin/sql-client.sh

4. 实时入库

编写造数程序进行造数

4.1 环境准备

4.1.1 开启mysql的binlog

  1. 创建binlog文件夹

[sdbadmin@upgrade1 mysql]$ mkdir /opt/sequoiasql/mysql/database/3306/binlog

  1. 开启binlog

vim /opt/sequoiasql/mysql/database/3306/auto.cnf

>>配置以下内容:
log_bin=/opt/sequoiasql/mysql/database/3306/binlog
binlog_format=ROW
expire_logs_days=1
server_id=1

配置完成之后,重启mysql

[sdbadmin@upgrade1 mysql]$ ./bin/sdb_mysql_ctl stop myinst
[sdbadmin@upgrade1 mysql]$ ./bin/sdb_mysql_ctl start myinst

4.1.2 创建mysql表

创建库

create database sbtest;
use sbtest;

创建表

CREATE TABLE sbtest1 (
    id INT UNSIGNED AUTO_INCREMENT,
    uuid INT(10),
    name1 CHAR(120),
    age INT(4),
    time1 DATETIME,
    PRIMARY KEY(id)
);

CREATE TABLE sbtest2 (
    id INT UNSIGNED AUTO_INCREMENT,
    uuid INT(10),
    name2 CHAR(120),
    age INT(4),
    time1 DATETIME,
    PRIMARY KEY(id)
);

CREATE TABLE sbtest3 (
    id INT UNSIGNED AUTO_INCREMENT,
    uuid INT(10),
    name3 CHAR(120),
    age INT(4),
    time1 DATETIME,
    PRIMARY KEY(id)
);

创建flink入库表

CREATE TABLE sbtest4 (
    id INT UNSIGNED AUTO_INCREMENT,
    uuid INT(10),
    name1 CHAR(120),
    name2 CHAR(120),
    name3 CHAR(120),
    age INT(4),
    time1 DATETIME,
    PRIMARY KEY(id)
);

4.1.3 创建flink映射表

需要用到flink-sql-connector-mysql-cdc-2.2.1.jar

CREATE TABLE sbtest1_mysql (
    id INT,
    uuid INT,
    name1 CHAR(120),
    age INT,
    time1 TIMESTAMP,
    PRIMARY KEY (id) NOT ENFORCED
  ) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = '192.168.223.135',
    'port' = '3306',
    'username' = 'root',
    'password' = 'root',
    'database-name' = 'sbtest',
    'table-name' = 'sbtest1'
);

CREATE TABLE sbtest2_mysql (
    id INT,
    uuid INT,
    name2 CHAR(120),
    age INT,
    time1 TIMESTAMP,
    PRIMARY KEY (id) NOT ENFORCED
  ) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = '192.168.223.135',
    'port' = '3306',
    'username' = 'root',
    'password' = 'root',
    'database-name' = 'sbtest',
    'table-name' = 'sbtest2'
);

CREATE TABLE sbtest3_mysql (
    id INT,
    uuid INT,
    name3 CHAR(120),
    age INT,
    time1 TIMESTAMP,
    PRIMARY KEY (id) NOT ENFORCED
  ) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = '192.168.223.135',
    'port' = '3306',
    'username' = 'root',
    'password' = 'root',
    'database-name' = 'sbtest',
    'table-name' = 'sbtest3'
);

创建flink -->  mysql入库映射表

需要用到flink-connector-jdbc_2.11-1.14.6.jar

CREATE TABLE sbtest4_mysql (
    id BIGINT,
    uuid INT,
    name1 CHAR(120),
    name2 CHAR(120),
    name3 CHAR(120),
    age INT,
    time1 TIMESTAMP,
    PRIMARY KEY (id) NOT ENFORCED
  ) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://192.168.223.135:3306/sbtest',
    'username' = 'root',
    'password' = 'root',
    'table-name' = 'sbtest4'
);

创建flink -->  mysql入库映射表

需要用到sdb-flink-connector-3.4.8-jar-with-dependencies.jar

CREATE TABLE sbtest_sdb (
    id BIGINT,
    uuid INT,
    name1 CHAR(120),
    name2 CHAR(120),
    name3 CHAR(120),
    age INT,
    time1 TIMESTAMP,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'sequoiadb',
    'bulksize' = '1',
    'hosts' = '192.168.223.135:11810',
    'collectionspace' = 'sbtest',
    'collection' = 'sbtest4'
);

4.2 MySQL实时入库

4.2.1 Flink left join

select sdb1.id, sdb1.uuid, sdb1.name1, sdb2.name2, sdb3.name3, sdb1.age, sdb1.time1
from sbtest1_mysql sdb1
left join sbtest2_mysql sdb2
on sdb1.id = sdb2.id
left join sbtest3_mysql sdb3
on sdb1.id = sdb3.id;

4.2.2 mysql实时入库

insert into sbtest4_mysql select sdb1.id, sdb1.uuid, sdb1.name1, sdb2.name2, sdb3.name3, sdb1.age, sdb1.time1
from sbtest1_mysql sdb1
left join sbtest2_mysql sdb2
on sdb1.id = sdb2.id
left join sbtest3_mysql sdb3
on sdb1.id = sdb3.id;

查看Flink任务

查看可以成功入库

4.3 SDB实时入库

4.3.1 Flink left join

select sdb1.id, sdb1.uuid, sdb1.name1, sdb2.name2, sdb3.name3, sdb1.age, sdb1.time1
from sbtest1_mysql sdb1
left join sbtest2_mysql sdb2
on sdb1.id = sdb2.id
left join sbtest3_mysql sdb3
on sdb1.id = sdb3.id;

4.3.2 sdb实时入库

insert into sbtest_sdb select sdb1.id, sdb1.uuid, sdb1.name1, sdb2.name2, sdb3.name3, sdb1.age, sdb1.time1
from sbtest1_mysql sdb1
left join sbtest2_mysql sdb2
on sdb1.id = sdb2.id
left join sbtest3_mysql sdb3
on sdb1.id = sdb3.id;

查看Flink任务

显示已经成功入库

这篇关于Flink构造宽表实时入库案例介绍的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/591107

相关文章

深度解析Java @Serial 注解及常见错误案例

《深度解析Java@Serial注解及常见错误案例》Java14引入@Serial注解,用于编译时校验序列化成员,替代传统方式解决运行时错误,适用于Serializable类的方法/字段,需注意签... 目录Java @Serial 注解深度解析1. 注解本质2. 核心作用(1) 主要用途(2) 适用位置3

Python与MySQL实现数据库实时同步的详细步骤

《Python与MySQL实现数据库实时同步的详细步骤》在日常开发中,数据同步是一项常见的需求,本篇文章将使用Python和MySQL来实现数据库实时同步,我们将围绕数据变更捕获、数据处理和数据写入这... 目录前言摘要概述:数据同步方案1. 基本思路2. mysql Binlog 简介实现步骤与代码示例1

Java中HashMap的用法详细介绍

《Java中HashMap的用法详细介绍》JavaHashMap是一种高效的数据结构,用于存储键值对,它是基于哈希表实现的,提供快速的插入、删除和查找操作,:本文主要介绍Java中HashMap... 目录一.HashMap1.基本概念2.底层数据结构:3.HashCode和equals方法为什么重写Has

Java 正则表达式的使用实战案例

《Java正则表达式的使用实战案例》本文详细介绍了Java正则表达式的使用方法,涵盖语法细节、核心类方法、高级特性及实战案例,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要... 目录一、正则表达式语法详解1. 基础字符匹配2. 字符类([]定义)3. 量词(控制匹配次数)4. 边

Python Counter 函数使用案例

《PythonCounter函数使用案例》Counter是collections模块中的一个类,专门用于对可迭代对象中的元素进行计数,接下来通过本文给大家介绍PythonCounter函数使用案例... 目录一、Counter函数概述二、基本使用案例(一)列表元素计数(二)字符串字符计数(三)元组计数三、C

Springboot项目构建时各种依赖详细介绍与依赖关系说明详解

《Springboot项目构建时各种依赖详细介绍与依赖关系说明详解》SpringBoot通过spring-boot-dependencies统一依赖版本管理,spring-boot-starter-w... 目录一、spring-boot-dependencies1.简介2. 内容概览3.核心内容结构4.

Spring Boot 整合 SSE(Server-Sent Events)实战案例(全网最全)

《SpringBoot整合SSE(Server-SentEvents)实战案例(全网最全)》本文通过实战案例讲解SpringBoot整合SSE技术,涵盖实现原理、代码配置、异常处理及前端交互,... 目录Spring Boot 整合 SSE(Server-Sent Events)1、简述SSE与其他技术的对

MySQL 临时表与复制表操作全流程案例

《MySQL临时表与复制表操作全流程案例》本文介绍MySQL临时表与复制表的区别与使用,涵盖生命周期、存储机制、操作限制、创建方法及常见问题,本文结合实例代码给大家介绍的非常详细,感兴趣的朋友跟随小... 目录一、mysql 临时表(一)核心特性拓展(二)操作全流程案例1. 复杂查询中的临时表应用2. 临时

MySQL 数据库表与查询操作实战案例

《MySQL数据库表与查询操作实战案例》本文将通过实际案例,详细介绍MySQL中数据库表的设计、数据插入以及常用的查询操作,帮助初学者快速上手,感兴趣的朋友跟随小编一起看看吧... 目录mysql 数据库表操作与查询实战案例项目一:产品相关数据库设计与创建一、数据库及表结构设计二、数据库与表的创建项目二:员

C#中的Drawing 类案例详解

《C#中的Drawing类案例详解》文章解析WPF与WinForms的Drawing类差异,涵盖命名空间、继承链、常用类及应用场景,通过案例展示如何创建带阴影圆角矩形按钮,强调WPF的轻量、可动画特... 目录一、Drawing 是什么?二、典型用法三、案例:画一个“带阴影的圆角矩形按钮”四、WinForm