《十堂课学习 Flink SQL》第四章:Flink 应用 java 开发开始典型案例

本文主要是介绍《十堂课学习 Flink SQL》第四章:Flink 应用 java 开发开始典型案例,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

小伙伴们我们从本章开始将基于JAVA 进行Flink 应用开发,本章节主要介绍Maven开发环境搭建,日志配置,流计算案例以及批计算案例。一方面希望能借此规范化一下开发流程,另一方面也是简单案例入门,为接下来越来越复杂的案例分析打好基础。

4.1 基于 Maven 的 Flink 应用开发环境搭建

4.1.1 新建基于Maven的项目

在这里插入图片描述

4.1.2 添加 Maven 依赖

双击 pom.xml 文件,添加 dependencies 如下:

<!-- 替换或删除原来包含的 properties -->
<properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><scala.binary.version>2.11</scala.binary.version><lombok.version>1.18.30</lombok.version><flink.version>1.14.6</flink.version><slf4j.version>2.0.9</slf4j.version><logback.version>1.3.11</logback.version><junit.version>4.13.2</junit.version>
</properties><dependencies><!-- flink 相关 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId><version>${commons-lang3.version}</version><scope>provided</scope></dependency><!-- 编译工具 --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>${lombok.version}</version><scope>provided</scope></dependency><!-- log 相关 --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>${slf4j.version}</version><scope>provided</scope></dependency><dependency><groupId>ch.qos.logback</groupId><artifactId>logback-core</artifactId><version>${logback.version}</version><scope>provided</scope></dependency><dependency><groupId>ch.qos.logback</groupId><artifactId>logback-classic</artifactId><version>${logback.version}</version><scope>provided</scope></dependency><!-- test 相关 --><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>${junit.version}</version><scope>test</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-test-utils-junit</artifactId><version>${flink.version}</version><scope>test</scope></dependency></dependencies>

这里特别需要说明:

  1. 不需要添加 flink-java / flink-stream-java_*** / flink-core 等等。因为均包含在 flink-client 之中;
  2. 不需要添加 flink-runtime-web 。一般来说,flink-runtime-web 模块是与 Web UI 相关的,包括作业监控、图形化界面、日志查看等功能。它通常与 Flink 集群的 JobManager 一起运行,用于提供 Web 界面以监控和管理 Flink 作业。
  3. flink-client 仅仅用于本地调试,如果上生产环境不需要打包上去,因为生产环境提供相应的包(注意版本一致)。
  4. 日志相关工具依赖也仅仅用于本地调试,生产环境也有提供 Slf4j-api 以及对应log4j实现类。所以打包上 flink 客户端运行时不需要考虑日志的依赖问题。
  5. 测试类依赖的scope统一为 test,因为打包到flink客户端或生产环境flink集群均不需要这些。

4.1.3 添加 maven 插件

分别添加 maven-compiler-pluginmaven-shade-plugin 插件,注意其中的版本在前面已经提到。

<build><plugins><!-- Java Compiler --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><configuration><source>${maven.compiler.source}</source><target>${maven.compiler.target}</target></configuration></plugin><!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. --><!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.1.1</version><executions><!-- Run shade goal on package phase --><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>org.apache.flink:flink-shaded-force-shading</exclude><exclude>com.google.code.findbugs:jsr305</exclude><exclude>org.slf4j:*</exclude><exclude>org.apache.logging.log4j:*</exclude><exclude>ch.qos.logback:*</exclude></excludes></artifactSet><filters><filter><!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. --><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters></configuration></execution></executions></plugin></plugins>
</build>

确保maven下载安装好了相关依赖,即 查看 pom.xml 文件是否还有报错。

4.2 运行官方案例 WordCount

接下来新建一个 StreamWordCount 类,代码如下,注意新建是添加的包名:


import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.MultipleParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;/*** 单词统计简单案例* @author Smileyan*/
@Slf4j
public class StreamWordCount {/*** 默认的用于统计单词个数的字符串*/public static final String DEFAULT_WORDS = "Flink’s Table & SQL API makes it possible to work with queries written " +"in the SQL language, but these queries need to be embedded within a table program that is written in either Java or Scala. " +"Moreover, these programs need to be packaged with a build tool before being submitted to a cluster. " +"This more or less limits the usage of Flink to Java/Scala programmers" +"The SQL Client aims to provide an easy way of writing, debugging, and submitting table programs " +"to a Flink cluster without a single line of Java or Scala code. " +"The SQL Client CLI allows for retrieving and visualizing real-time results from the running distributed " +"application on the command line.";public static void main(String[] args) throws Exception {final MultipleParameterTool params = MultipleParameterTool.fromArgs(args);final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 设置并行度env.getConfig().setParallelism(3);// 将全局参数传递给执行环境env.getConfig().setGlobalJobParameters(params);DataStream<String> text = null;// 根据输入参数判断是否指定了输入文件路径if (params.has("input")) {// 遍历所有输入文件路径,将它们的数据合并为一个数据流for (String input : params.getMultiParameterRequired("input")) {if (text == null) {text = env.readTextFile(input);} else {text = text.union(env.readTextFile(input));}}// 检查数据集是否为空Preconditions.checkNotNull(text, "Input DataStream should not be null.");} else {// 否则,使用默认的文本数据text = env.fromElements(DEFAULT_WORDS);}// 对文本数据进行分词并计数assert text != null;DataStream<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer()).keyBy(value -> value.f0).sum(1);// 打印结果到标准输出log.info("Printing result to stdout. Use --output to specify output path.");counts.print();// 执行作业env.execute("Streaming WordCount");}/*** 分词函数,实现了 FlatMapFunction 接口。* 将输入的文本行分割为单词,并为每个单词生成一个键值对(单词,1)。*/public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {private static final long serialVersionUID = 8061659867139246041L;@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) {// 将文本行转换为小写并按非单词字符分割String[] tokens = value.toLowerCase().split("\\W+");// 遍历分割后的单词数组,将每个单词生成键值对并输出到结果收集器for (String token : tokens) {if (!token.isEmpty()) {out.collect(Tuple2.of(token, 1));}}}}
}

接下来运行时请注意,我们需要运行时添加 provided 的依赖类型。即

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
选择OK后,即可点击右上角的执行按钮。如下录制视频所示:

4.3 添加日志输出配置文件

经过前面的这些步骤,已经完成了本地运行 flink job ,先不急着分析其中的结果,先优化一下输出日志的问题,即 DEBUG 级别日志太多;全部都是白色的字体看起来不够清晰。如图所示:
在这里插入图片描述
在项目的 resources 目录下新建一个文件,取名叫 logback.xml, 文件内容为:

<configuration><property name="CONSOLE_LOG_PATTERN"value="%cyan(%d{yyyy-MM-dd HH:mm:ss.SSS}) %blue([%thread]) %magenta(%-5level) %green(%logger{60}) %yellow(%file:%line) %X{sourceThread} - (%msg%n)"/><appender name="console" class="ch.qos.logback.core.ConsoleAppender"><encoder><pattern>${CONSOLE_LOG_PATTERN}</pattern><charset>UTF-8</charset></encoder></appender><appender name="file" class="ch.qos.logback.core.FileAppender"><file>logs/${file.log}.log</file><append>false</append><encoder><pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern></encoder></appender><!-- This affects logging for both user code and Flink --><root level="INFO"><appender-ref ref="console"/><appender-ref ref="file"/></root><!-- Uncomment this if you want to only change Flink's logging --><logger name="org.apache.flink" level="INFO"><appender-ref ref="file"/></logger><!-- The following lines keep the log level of common libraries/connectors onlog level INFO. The root logger does not override this. You have to manuallychange the log levels here. --><logger name="akka" level="INFO"><appender-ref ref="file"/></logger><logger name="org.apache.kafka" level="INFO"><appender-ref ref="file"/></logger><logger name="org.apache.hadoop" level="INFO"><appender-ref ref="file"/></logger><logger name="org.apache.zookeeper" level="INFO"><appender-ref ref="file"/></logger><!-- Suppress the irrelevant (wrong) warnings from the Netty channel handler --><logger name="org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline" level="ERROR"><appender-ref ref="file"/></logger><!-- 配置状态监听器,关闭版本信息输出 --><statusListener class="ch.qos.logback.core.status.NopStatusListener" />
</configuration>

在这里插入图片描述
执行后的效果可以参考如下视频:

4.4 批处理案例

类似地,我们添加批处理案例代码,新建 BatchWordCount类。

package cn.smileyan.demos;import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.MultipleParameterTool;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;/*** 说明:* 1. 代码中的 DEFAULT_WORDS 数组包含了一些默认的文本数据,用于 WordCount 示例。* 2. main 方法是程序的入口点,解析命令行参数,设置 Flink 执行环境,并执行 WordCount 示例。* 3. Tokenizer 类是一个 FlatMapFunction,用于将输入的文本进行切分和计数。* @author Smileyan*/
@Slf4j
public class BatchWordCount {/*** 默认的用于统计单词个数的字符串*/protected static final String[] DEFAULT_WORDS = {"Flink’s Table & SQL API makes it possible to work with queries written ","in the SQL language, but these queries need to be embedded within a table program that is written in either Java or Scala. ","Moreover, these programs need to be packaged with a build tool before being submitted to a cluster. ","This more or less limits the usage of Flink to Java/Scala programmers","The SQL Client aims to provide an easy way of writing, debugging, and submitting table programs ","to a Flink cluster without a single line of Java or Scala code. ","The SQL Client CLI allows for retrieving and visualizing real-time results from the running distributed ","application on the command line."};public static void main(String[] args) throws Exception {// 解析命令行参数final MultipleParameterTool params = MultipleParameterTool.fromArgs(args);// 获取 Flink 执行环境final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();// 设置全局作业参数env.getConfig().setGlobalJobParameters(params);// 定义文本数据集DataSet<String> text = null;if (params.has("input")) {// 如果命令行参数包含输入路径,则从文件中读取文本数据for (String input : params.getMultiParameterRequired("input")) {if (text == null) {text = env.readTextFile(input);} else {text = text.union(env.readTextFile(input));}}Preconditions.checkNotNull(text, "Input DataSet should not be null.");} else {// 否则,使用默认的文本数据text = env.fromElements(DEFAULT_WORDS);}// 执行 WordCount 示例assert text != null;DataSet<Tuple2<String, Integer>> counts =text.flatMap(new Tokenizer()).groupBy(0).sum(1);// 打印结果counts.print();}/*** Tokenizer 类实现了 FlatMapFunction 接口,用于将输入文本切分并计数。*/public static final class Tokenizerimplements FlatMapFunction<String, Tuple2<String, Integer>> {/*** 切分并计数逻辑** @param value 输入文本* @param out   输出 Tuple2<String, Integer> 的 Collector*/@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) {// 将文本转换为小写,并根据非单词字符切分String[] tokens = value.toLowerCase().split("\\W+");// 遍历切分后的单词数组,排除空单词,并将单词和计数为 1 的 Tuple 发送到 Collectorfor (String token : tokens) {if (!token.isEmpty()) {out.collect(new Tuple2<>(token, 1));}}}}
}

4.5 相关资料

欢迎访问本文对应的源码地址:https://gitee.com/smile-yan/quick-start-flink-java

Flink 官网 1.14.6 的在线文档:https://nightlies.apache.org/flink/flink-docs-release-1.14/

4.6 本章小结

俗话说,“万事开头难” 。但是很多开源项目作者团队都有一个很好的习惯 —— 提供quick-start 的简单项目。本文的目的也是如此,Flink 很强大,如果我们细究原理的话应该至少得读一些论文,做一些实验,读一读源码。

但事实上,很幸运对大多数小伙伴们而言,我们不需要这样做。Flink 就像一把斧头,我们需要学会如何把斧头打磨锋利、如何更好地使用斧头,而不用考虑怎么去制作它。

愿我们都能掌握 Flink 基础知识,并在今后的学习与工作中更好地打磨它,在接下来的开发道路上帮助我们不断披荆斩棘 ~

如果认为本章节写得还行,一定记得点击下方免费的赞 ~ 感谢 !
在这里插入图片描述

这篇关于《十堂课学习 Flink SQL》第四章:Flink 应用 java 开发开始典型案例的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/443743

相关文章

MySQL主从同步延迟问题的全面解决方案

《MySQL主从同步延迟问题的全面解决方案》MySQL主从同步延迟是分布式数据库系统中的常见问题,会导致从库读取到过期数据,影响业务一致性,下面我将深入分析延迟原因并提供多层次的解决方案,需要的朋友可... 目录一、同步延迟原因深度分析1.1 主从复制原理回顾1.2 延迟产生的关键环节二、实时监控与诊断方案

SpringBoot中四种AOP实战应用场景及代码实现

《SpringBoot中四种AOP实战应用场景及代码实现》面向切面编程(AOP)是Spring框架的核心功能之一,它通过预编译和运行期动态代理实现程序功能的统一维护,在SpringBoot应用中,AO... 目录引言场景一:日志记录与性能监控业务需求实现方案使用示例扩展:MDC实现请求跟踪场景二:权限控制与

Android开发环境配置避坑指南

《Android开发环境配置避坑指南》本文主要介绍了Android开发环境配置过程中遇到的问题及解决方案,包括VPN注意事项、工具版本统一、Gerrit邮箱配置、Git拉取和提交代码、MergevsR... 目录网络环境:VPN 注意事项工具版本统一:android Studio & JDKGerrit的邮

慢sql提前分析预警和动态sql替换-Mybatis-SQL

《慢sql提前分析预警和动态sql替换-Mybatis-SQL》为防止慢SQL问题而开发的MyBatis组件,该组件能够在开发、测试阶段自动分析SQL语句,并在出现慢SQL问题时通过Ducc配置实现动... 目录背景解决思路开源方案调研设计方案详细设计使用方法1、引入依赖jar包2、配置组件XML3、核心配

Python开发文字版随机事件游戏的项目实例

《Python开发文字版随机事件游戏的项目实例》随机事件游戏是一种通过生成不可预测的事件来增强游戏体验的类型,在这篇博文中,我们将使用Python开发一款文字版随机事件游戏,通过这个项目,读者不仅能够... 目录项目概述2.1 游戏概念2.2 游戏特色2.3 目标玩家群体技术选择与环境准备3.1 开发环境3

Java NoClassDefFoundError运行时错误分析解决

《JavaNoClassDefFoundError运行时错误分析解决》在Java开发中,NoClassDefFoundError是一种常见的运行时错误,它通常表明Java虚拟机在尝试加载一个类时未能... 目录前言一、问题分析二、报错原因三、解决思路检查类路径配置检查依赖库检查类文件调试类加载器问题四、常见

Java注解之超越Javadoc的元数据利器详解

《Java注解之超越Javadoc的元数据利器详解》本文将深入探讨Java注解的定义、类型、内置注解、自定义注解、保留策略、实际应用场景及最佳实践,无论是初学者还是资深开发者,都能通过本文了解如何利用... 目录什么是注解?注解的类型内置注编程解自定义注解注解的保留策略实际用例最佳实践总结在 Java 编程

MySQL数据库约束深入详解

《MySQL数据库约束深入详解》:本文主要介绍MySQL数据库约束,在MySQL数据库中,约束是用来限制进入表中的数据类型的一种技术,通过使用约束,可以确保数据的准确性、完整性和可靠性,需要的朋友... 目录一、数据库约束的概念二、约束类型三、NOT NULL 非空约束四、DEFAULT 默认值约束五、UN

MySQL 多表连接操作方法(INNER JOIN、LEFT JOIN、RIGHT JOIN、FULL OUTER JOIN)

《MySQL多表连接操作方法(INNERJOIN、LEFTJOIN、RIGHTJOIN、FULLOUTERJOIN)》多表连接是一种将两个或多个表中的数据组合在一起的SQL操作,通过连接,... 目录一、 什么是多表连接?二、 mysql 支持的连接类型三、 多表连接的语法四、实战示例 数据准备五、连接的性

MySQL中的分组和多表连接详解

《MySQL中的分组和多表连接详解》:本文主要介绍MySQL中的分组和多表连接的相关操作,本文通过实例代码给大家介绍的非常详细,感兴趣的朋友一起看看吧... 目录mysql中的分组和多表连接一、MySQL的分组(group javascriptby )二、多表连接(表连接会产生大量的数据垃圾)MySQL中的