Spring Boot 整合 Apache Flink 的详细过程

2025-06-07 03:50

本文主要是介绍Spring Boot 整合 Apache Flink 的详细过程,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

《SpringBoot整合ApacheFlink的详细过程》ApacheFlink是一个高性能的分布式流处理框架,而SpringBoot提供了快速构建企业级应用的能力,下面给大家介绍Spri...

Spring Boot 整合 Apache Flink 教程

一、背景与目标

Apache Flink 是一个高性能的分布式流处理框架,而Spring Boot提供了快速构建企业级应用的能力。整合二者可实现:

  • 利用Spring Boot的依赖注入、配置管理等功能简化Flink作业开发
  • 构建完整的微服务架构,将流处理嵌入Spring生态
  • 实现动态作业提交与管理

二、环境准备

  • JDK 17+
  • Maven 3.8+
  • Spring Boot 3.1.5
  • Flink 1.17.2

三、创建项目 & 添加依赖

1. 创建Spring Boot项目

使用Spring Initializr生成基础项目,选择:

  • Maven
  • Spring Web(可选,用于创建REST接口)

2. 添加Flink依赖

<!-- pom.XML -->
<dependencies>
    <!-- Spring Boot Starter -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>
    <!-- Flink核心依赖 -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-Java</artifactId>
        <version>1.17.2</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java</artifactId>
        <version>1.17.2</version>
        <scope>provided</scope>
    </dependency>
    <!-- 本地执行时需添加 -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-runtime</artifactId>
        <version>1.17.2</version>
        <scope>test</scope>
    </dependency>
</dependencies>

四、基础整合示例

1. 编写Flink流处理作业

// src/main/java/com/example/demo/flink/WordCountJob.java
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class WordCountJob {
    public static void ex编程ecute() throws Exception {
        final StreamExecutionEnvironment env = 
            StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<String> text = env.fromElements(
   android         "Spring Boot整合Flink",
            "Flink实时流处理",
            "Spring生态集成"
        );
        DataStrChina编程eam<WordCount> counts = text
            .flatMap(new FlatMapFunction<String, WordCount>() {
                @Override
                public void flatMap(String value, Collector<WordCount>China编程 out) {
                    for (String word : value.split("\\s")) {
                        out.collect(new WordCount(word, 1L));
                    }
                }
            })
            .keyBy(value -> value.word)
            .sum("count");
        counts.print();
        env.execute("Spring Boot Flink Job");
    }
    public static class WordCount {
        public String word;
        public long count;
        public WordCount() {}
        public WordCount(String word, long count) {
            this.word = word;
            this.count = count;
        }
        @Override
        public String toString() {
            return word + " : " + count;
        }
    }
}

2. 在Spring Boot中启动作业

// src/main/java/com/example/demo/DemoApplication.java
@SpringBootApplication
public class DemoApplication implements CommandLineRunner {
    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }
    @Override
    public void run(String... args) throws Exception {
        WordCountJob.execute(); // 启动Flink作业
    }
}

五、进阶整合 - 通过REST API动态提交作业

1. 创建Job提交服务

// src/main/java/com/example/demo/service/FlinkJobService.java
@Service
public class FlinkJobService {
    public String submitWordCountJob(List<String> inputLines) {
        try {
            final StreamExecutionEnvironment env = 
                StreamExecutionEnvironment.getExecutionEnvironment();
            DataStream<String> texjavascriptt = env.fromCollection(inputLines);
            // ...(同上WordCount逻辑)
            JobExecutionResult result = env.execute();
            return "JobID: " + result.getJobID();
        } catch (Exception e) {
            return "Job Failed: " + e.getMessage();
        }
    }
}

2. 创建REST控制器

// src/main/java/com/example/demo/controller/JobController.java
@RestController
@RequestMapping("/jobs")
public class JobController {
    @Autowired
    private FlinkJobService flinkJobService;
    @PostMapping("/wordcount")
    public String submitWordCount(@RequestBody List<String> inputs) {
        return flinkJobService.submitWordCountJob(inputs);
    }
}

六、关键配置说明

1. application.properties

# 设置Flink本地执行环境
spring.flink.local.enabled=true
spring.flink.job.name=SpringBootFlinkJob
# 调整并行度(根据CPU核心数)
spring.flink.parallelism=4

2. 解决依赖冲突

在pom.xml中排除冲突依赖:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-core</artifactId>
    <version>1.17.2</version>
    <exclusions>
        <exclusion>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
        </exclusion>
    </exclusions>
</dependency>

七、运行与验证

启动Spring Boot应用:

mvn spring-boot:run

调用API提交作业:

curl -X POST -H "Content-Type: application/json" \
-d '["Hello Flink", "Spring Boot Integration"]' \
http://localhost:8080/jobs/wordcount

查看控制台输出:

Flink> Spring : 1
Flink> Boot : 1
Flink> Integration : 1
...

八、生产环境注意事项

集群部署:将打包后的jar提交到Flink集群

flink run -c com.example.demo.DemoApplication your-application.jar

状态管理:集成Flink State Backend(如RocksDB)

监控集成:通过Micrometer接入Spring Boot Actuator

资源隔离:使用YarnKubernetes部署模式

九、完整项目结构

src/
├── main/
│   ├── java/
│   │   ├── com/example/demo/
│   │   │   ├── DemoApplication.java
│   │   │   ├── flink/
│   │   │   │   └── WordCountJob.java
│   │   │   ├── controller/
│   │   │   ├── service/
│   ├── resources/
│   │   └── application.properties
pom.xml

通过以上步骤,即可实现Spring Boot与Apache Flink的深度整合。这种架构特别适合需要将实时流处理能力嵌入微服务体系的场景,如实时风控系统、IoT数据处理平台等。后续可扩展集成Kafka、HBase等大数据组件。

到此这篇关于Spring Boot 整合 Apache Flink 教程的文章就介绍到这了,更多相关Spring Boot 整合 Apache Flink内容请搜索编程China编程(www.chinasem.cn)以前的文章或继续浏览下面的相关文章希望大家以后多多支持China编程(www.chinasem.cn)!

这篇关于Spring Boot 整合 Apache Flink 的详细过程的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1154948

相关文章

Java实现将HTML文件与字符串转换为图片

《Java实现将HTML文件与字符串转换为图片》在Java开发中,我们经常会遇到将HTML内容转换为图片的需求,本文小编就来和大家详细讲讲如何使用FreeSpire.DocforJava库来实现这一功... 目录前言核心实现:html 转图片完整代码场景 1:转换本地 HTML 文件为图片场景 2:转换 H

Java使用jar命令配置服务器端口的完整指南

《Java使用jar命令配置服务器端口的完整指南》本文将详细介绍如何使用java-jar命令启动应用,并重点讲解如何配置服务器端口,同时提供一个实用的Web工具来简化这一过程,希望对大家有所帮助... 目录1. Java Jar文件简介1.1 什么是Jar文件1.2 创建可执行Jar文件2. 使用java

SpringBoot实现不同接口指定上传文件大小的具体步骤

《SpringBoot实现不同接口指定上传文件大小的具体步骤》:本文主要介绍在SpringBoot中通过自定义注解、AOP拦截和配置文件实现不同接口上传文件大小限制的方法,强调需设置全局阈值远大于... 目录一  springboot实现不同接口指定文件大小1.1 思路说明1.2 工程启动说明二 具体实施2

Java实现在Word文档中添加文本水印和图片水印的操作指南

《Java实现在Word文档中添加文本水印和图片水印的操作指南》在当今数字时代,文档的自动化处理与安全防护变得尤为重要,无论是为了保护版权、推广品牌,还是为了在文档中加入特定的标识,为Word文档添加... 目录引言Spire.Doc for Java:高效Word文档处理的利器代码实战:使用Java为Wo

SpringBoot日志级别与日志分组详解

《SpringBoot日志级别与日志分组详解》文章介绍了日志级别(ALL至OFF)及其作用,说明SpringBoot默认日志级别为INFO,可通过application.properties调整全局或... 目录日志级别1、级别内容2、调整日志级别调整默认日志级别调整指定类的日志级别项目开发过程中,利用日志

Java中的抽象类与abstract 关键字使用详解

《Java中的抽象类与abstract关键字使用详解》:本文主要介绍Java中的抽象类与abstract关键字使用详解,本文通过实例代码给大家介绍的非常详细,感兴趣的朋友跟随小编一起看看吧... 目录一、抽象类的概念二、使用 abstract2.1 修饰类 => 抽象类2.2 修饰方法 => 抽象方法,没有

SpringBoot 多环境开发实战(从配置、管理与控制)

《SpringBoot多环境开发实战(从配置、管理与控制)》本文详解SpringBoot多环境配置,涵盖单文件YAML、多文件模式、MavenProfile分组及激活策略,通过优先级控制灵活切换环境... 目录一、多环境开发基础(单文件 YAML 版)(一)配置原理与优势(二)实操示例二、多环境开发多文件版

Spring 中的切面与事务结合使用完整示例

《Spring中的切面与事务结合使用完整示例》本文给大家介绍Spring中的切面与事务结合使用完整示例,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考... 目录 一、前置知识:Spring AOP 与 事务的关系 事务本质上就是一个“切面”二、核心组件三、完

Java实现远程执行Shell指令

《Java实现远程执行Shell指令》文章介绍使用JSch在SpringBoot项目中实现远程Shell操作,涵盖环境配置、依赖引入及工具类编写,详解分号和双与号执行多指令的区别... 目录软硬件环境说明编写执行Shell指令的工具类总结jsch(Java Secure Channel)是SSH2的一个纯J

ShardingProxy读写分离之原理、配置与实践过程

《ShardingProxy读写分离之原理、配置与实践过程》ShardingProxy是ApacheShardingSphere的数据库中间件,通过三层架构实现读写分离,解决高并发场景下数据库性能瓶... 目录一、ShardingProxy技术定位与读写分离核心价值1.1 技术定位1.2 读写分离核心价值二