《从0到1学习Flink》—— 如何自定义 Data Sink ?

2024-05-02 08:32
文章标签 学习 自定义 data flink sink

本文主要是介绍《从0到1学习Flink》—— 如何自定义 Data Sink ?,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

前言

前篇文章 《从0到1学习Flink》—— Data Sink 介绍 介绍了 Flink Data Sink,也介绍了 Flink 自带的 Sink,那么如何自定义自己的 Sink 呢?这篇文章将写一个 demo 教大家将从 Kafka Source 的数据 Sink 到 MySQL 中去。

准备工作

我们先来看下 Flink 从 Kafka topic 中获取数据的 demo,首先你需要安装好了 FLink 和 Kafka 。

运行启动 Flink、Zookepeer、Kafka,

好了,都启动了!

数据库建表

DROP TABLE IF EXISTS `student`;
CREATE TABLE `student` (`id` int(11) unsigned NOT NULL AUTO_INCREMENT,`name` varchar(25) COLLATE utf8_bin DEFAULT NULL,`password` varchar(25) COLLATE utf8_bin DEFAULT NULL,`age` int(10) DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

实体类

Student.java

package com.zhisheng.flink.model;/*** Desc:* weixin: zhisheng_tian* blog: http://www.54tianzhisheng.cn/*/
public class Student {public int id;public String name;public String password;public int age;public Student() {}public Student(int id, String name, String password, int age) {this.id = id;this.name = name;this.password = password;this.age = age;}@Overridepublic String toString() {return "Student{" +"id=" + id +", name='" + name + '\'' +", password='" + password + '\'' +", age=" + age +'}';}public int getId() {return id;}public void setId(int id) {this.id = id;}public String getName() {return name;}public void setName(String name) {this.name = name;}public String getPassword() {return password;}public void setPassword(String password) {this.password = password;}public int getAge() {return age;}public void setAge(int age) {this.age = age;}
}

工具类

工具类往 kafka topic student 发送数据

import com.alibaba.fastjson.JSON;
import com.zhisheng.flink.model.Metric;
import com.zhisheng.flink.model.Student;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.HashMap;
import java.util.Map;
import java.util.Properties;/*** 往kafka中写数据* 可以使用这个main函数进行测试一下* weixin: zhisheng_tian* blog: http://www.54tianzhisheng.cn/*/
public class KafkaUtils2 {public static final String broker_list = "localhost:9092";public static final String topic = "student";  //kafka topic 需要和 flink 程序用同一个 topicpublic static void writeToKafka() throws InterruptedException {Properties props = new Properties();props.put("bootstrap.servers", broker_list);props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");KafkaProducer producer = new KafkaProducer<String, String>(props);for (int i = 1; i <= 100; i++) {Student student = new Student(i, "zhisheng" + i, "password" + i, 18 + i);ProducerRecord record = new ProducerRecord<String, String>(topic, null, null, JSON.toJSONString(student));producer.send(record);System.out.println("发送数据: " + JSON.toJSONString(student));}producer.flush();}public static void main(String[] args) throws InterruptedException {writeToKafka();}
}

SinkToMySQL

该类就是 Sink Function,继承了 RichSinkFunction ,然后重写了里面的方法。在 invoke 方法中将数据插入到 MySQL 中。

package com.zhisheng.flink.sink;import com.zhisheng.flink.model.Student;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;/*** Desc:* weixin: zhisheng_tian* blog: http://www.54tianzhisheng.cn/*/
public class SinkToMySQL extends RichSinkFunction<Student> {PreparedStatement ps;private Connection connection;/*** open() 方法中建立连接,这样不用每次 invoke 的时候都要建立连接和释放连接** @param parameters* @throws Exception*/@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);connection = getConnection();String sql = "insert into Student(id, name, password, age) values(?, ?, ?, ?);";ps = this.connection.prepareStatement(sql);}@Overridepublic void close() throws Exception {super.close();//关闭连接和释放资源if (connection != null) {connection.close();}if (ps != null) {ps.close();}}/*** 每条数据的插入都要调用一次 invoke() 方法** @param value* @param context* @throws Exception*/@Overridepublic void invoke(Student value, Context context) throws Exception {//组装数据,执行插入操作ps.setInt(1, value.getId());ps.setString(2, value.getName());ps.setString(3, value.getPassword());ps.setInt(4, value.getAge());ps.executeUpdate();}private static Connection getConnection() {Connection con = null;try {Class.forName("com.mysql.jdbc.Driver");con = DriverManager.getConnection("jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8", "root", "root123456");} catch (Exception e) {System.out.println("-----------mysql get connection has exception , msg = "+ e.getMessage());}return con;}
}

Flink 程序

这里的 source 是从 kafka 读取数据的,然后 Flink 从 Kafka 读取到数据(JSON)后用阿里 fastjson 来解析成 student 对象,然后在 addSink 中使用我们创建的 SinkToMySQL,这样就可以把数据存储到 MySQL 了。

package com.zhisheng.flink;import com.alibaba.fastjson.JSON;
import com.zhisheng.flink.model.Student;
import com.zhisheng.flink.sink.SinkToMySQL;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;import java.util.Properties;/*** Desc:* weixin: zhisheng_tian* blog: http://www.54tianzhisheng.cn/*/
public class Main3 {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("zookeeper.connect", "localhost:2181");props.put("group.id", "metric-group");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("auto.offset.reset", "latest");SingleOutputStreamOperator<Student> student = env.addSource(new FlinkKafkaConsumer011<>("student",   //这个 kafka topic 需要和上面的工具类的 topic 一致new SimpleStringSchema(),props)).setParallelism(1).map(string -> JSON.parseObject(string, Student.class)); //Fastjson 解析字符串成 student 对象student.addSink(new SinkToMySQL()); //数据 sink 到 mysqlenv.execute("Flink add sink");}
}

结果

运行 Flink 程序,然后再运行 KafkaUtils2.java 工具类,这样就可以了。

如果数据插入成功了,那么我们查看下我们的数据库:

数据库中已经插入了 100 条我们从 Kafka 发送的数据了。证明我们的 SinkToMySQL 起作用了。是不是很简单?

项目结构

怕大家不知道我的项目结构,这里发个截图看下:

最后

本文主要利用一个 demo,告诉大家如何自定义 Sink Function,将从 Kafka 的数据 Sink 到 MySQL 中,如果你项目中有其他的数据来源,你也可以换成对应的 Source,也有可能你的 Sink 是到其他的地方或者其他不同的方式,那么依旧是这个套路:继承 RichSinkFunction 抽象类,重写 invoke 方法。

关注我

转载请务必注明原创地址为:http://www.54tianzhisheng.cn/2018/10/31/flink-create-sink/

另外我自己整理了些 Flink 的学习资料,目前已经全部放到微信公众号了。你可以加我的微信:zhisheng_tian,然后回复关键字:Flink 即可无条件获取到。

相关文章

1、《从0到1学习Flink》—— Apache Flink 介绍

2、《从0到1学习Flink》—— Mac 上搭建 Flink 1.6.0 环境并构建运行简单程序入门

3、《从0到1学习Flink》—— Flink 配置文件详解

4、《从0到1学习Flink》—— Data Source 介绍

5、《从0到1学习Flink》—— 如何自定义 Data Source ?

6、《从0到1学习Flink》—— Data Sink 介绍

7、《从0到1学习Flink》—— 如何自定义 Data Sink ?

这篇关于《从0到1学习Flink》—— 如何自定义 Data Sink ?的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/953826

相关文章

C#中通过Response.Headers设置自定义参数的代码示例

《C#中通过Response.Headers设置自定义参数的代码示例》:本文主要介绍C#中通过Response.Headers设置自定义响应头的方法,涵盖基础添加、安全校验、生产实践及调试技巧,强... 目录一、基础设置方法1. 直接添加自定义头2. 批量设置模式二、高级配置技巧1. 安全校验机制2. 类型

SpringBoot AspectJ切面配合自定义注解实现权限校验的示例详解

《SpringBootAspectJ切面配合自定义注解实现权限校验的示例详解》本文章介绍了如何通过创建自定义的权限校验注解,配合AspectJ切面拦截注解实现权限校验,本文结合实例代码给大家介绍的非... 目录1. 创建权限校验注解2. 创建ASPectJ切面拦截注解校验权限3. 用法示例A. 参考文章本文

Vite 打包目录结构自定义配置小结

《Vite打包目录结构自定义配置小结》在Vite工程开发中,默认打包后的dist目录资源常集中在asset目录下,不利于资源管理,本文基于Rollup配置原理,本文就来介绍一下通过Vite配置自定义... 目录一、实现原理二、具体配置步骤1. 基础配置文件2. 配置说明(1)js 资源分离(2)非 JS 资

Unity新手入门学习殿堂级知识详细讲解(图文)

《Unity新手入门学习殿堂级知识详细讲解(图文)》Unity是一款跨平台游戏引擎,支持2D/3D及VR/AR开发,核心功能模块包括图形、音频、物理等,通过可视化编辑器与脚本扩展实现开发,项目结构含A... 目录入门概述什么是 UnityUnity引擎基础认知编辑器核心操作Unity 编辑器项目模式分类工程

聊聊springboot中如何自定义消息转换器

《聊聊springboot中如何自定义消息转换器》SpringBoot通过HttpMessageConverter处理HTTP数据转换,支持多种媒体类型,接下来通过本文给大家介绍springboot中... 目录核心接口springboot默认提供的转换器如何自定义消息转换器Spring Boot 中的消息

Python学习笔记之getattr和hasattr用法示例详解

《Python学习笔记之getattr和hasattr用法示例详解》在Python中,hasattr()、getattr()和setattr()是一组内置函数,用于对对象的属性进行操作和查询,这篇文章... 目录1.getattr用法详解1.1 基本作用1.2 示例1.3 原理2.hasattr用法详解2.

Python自定义异常的全面指南(入门到实践)

《Python自定义异常的全面指南(入门到实践)》想象你正在开发一个银行系统,用户转账时余额不足,如果直接抛出ValueError,调用方很难区分是金额格式错误还是余额不足,这正是Python自定义异... 目录引言:为什么需要自定义异常一、异常基础:先搞懂python的异常体系1.1 异常是什么?1.2

Linux中的自定义协议+序列反序列化用法

《Linux中的自定义协议+序列反序列化用法》文章探讨网络程序在应用层的实现,涉及TCP协议的数据传输机制、结构化数据的序列化与反序列化方法,以及通过JSON和自定义协议构建网络计算器的思路,强调分层... 目录一,再次理解协议二,序列化和反序列化三,实现网络计算器3.1 日志文件3.2Socket.hpp

C语言自定义类型之联合和枚举解读

《C语言自定义类型之联合和枚举解读》联合体共享内存,大小由最大成员决定,遵循对齐规则;枚举类型列举可能值,提升可读性和类型安全性,两者在C语言中用于优化内存和程序效率... 目录一、联合体1.1 联合体类型的声明1.2 联合体的特点1.2.1 特点11.2.2 特点21.2.3 特点31.3 联合体的大小1

springboot自定义注解RateLimiter限流注解技术文档详解

《springboot自定义注解RateLimiter限流注解技术文档详解》文章介绍了限流技术的概念、作用及实现方式,通过SpringAOP拦截方法、缓存存储计数器,结合注解、枚举、异常类等核心组件,... 目录什么是限流系统架构核心组件详解1. 限流注解 (@RateLimiter)2. 限流类型枚举 (