(一)基于Spring Reactor框架响应式异步编程|道法术器

2023-10-23 07:59

本文主要是介绍(一)基于Spring Reactor框架响应式异步编程|道法术器,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

                       

 

特注意: Spring Boot 的版本大于等于 2.3.0,在此版本之后才开始支持 MYSQL 的响应式驱动  

在执行程序时: 通常为了提供性能,处理器和编译器常常会对指令进行重排序。 从排序分为编译器重排序和处理器重排序两种 :

    (1)编译器重排序: 编译器保证不改变单线程执行结构的前提下,可以调整多线程语句执行顺序;        (2)处理器重排序: 如果不存在数据依赖性,处理器可以改变语句对应机器指令的执行顺序;

     若要实现快速响应,就得把程序执行指令的方式换一换,将同步方式改成异步方法,方法执行改为消息发送,因此诞生了响应式编程模型; 

特注意的是: 在响应式编程技术栈中,有一点需要注意,即响应式编程并不是针对系统中的某一部分组件,而是需要使用于"调用链路"上的所有组件。无论是Web层(Controller), 服务层(Service) ,还是处于下游的数据访问层(Dao或者Repository), 只要有一个环节不是响应式的, 那么这个环节就会出现“同步阻塞”, 从而导致"背压机制"失效 , 这就是所谓的全栈式响应编程的设计理念。


Spring WebFlux 响应式异步编程|道法术器(一)

 Spring WeFlux响应式编程整合另一种方案|道法术器(二)


R2DBC简介

Spring data R2DBC是更大的Spring data 系列的一部分,它使得实现基于R2DBC的存储库变得容易。R2DBC代表反应式关系数据库连接,这是一种使用反应式驱动程序集成SQL数据库的规范。Spring Data R2DBC使用属性的Spring抽象和Repository支持应用于R2DBC。它使得在反应式应用程序堆栈中使用关系数据访问技术构建Spring驱动的应用程序变得更加容易。

Spring Data R2DBC的目标是在概念上变得简单。为了实现这一点,它不提供缓存、延迟加载、写后处理或ORM框架的许多其他特性。这使得Spring Data R2DBC成为一个简单、有限、固执己见的对象映射器。

Spring Data R2DBC允许一种 functional 方法与数据库交互,提供R2dbcEntityTemplate作为应用程序的入口点。

首先选择数据库驱动程序并创建R2dbcEntityTemplate实例


使用传统 web 框架,比如 SpringMVC,这些基于 Servlet 容器。
Webflux 是一种异步非阻塞的框架,异步非阻塞的框架在 Servlet3.1 以后才支持,核心是基于 Reactor 的相关 API 实现的。
什么是异步非阻塞
异步和同步
异步和同步针对调用者,调用者发送请求,如果等着对方回应之后才去做其他事情就是同步,如果发送请求之后不等着对方回应就去做其他事情就是异步。
非阻塞和阻塞
阻塞和非阻塞针对被调用者,被调用者收到请求之后,做完请求任务之后才给出反馈就是阻塞,收到请求之后马上给出反馈然后再去做事情就是非阻塞。
Webflux 特点:
第一 非阻塞式:在有限资源下,提高系统吞吐量和伸缩性,以 Reactor 为基础实现响应式编程。
第二 函数式编程:Spring5 框架基于 java8,Webflux 使用 Java8 函数式编程方式实现路由请求。

 


Spring Data R2DBC可以与Spring Data JPA结合使用,其实R2DBC与原来的JPA使用方式差别不大,使用非常简单。
只是Spring Data JPA中方法返回的是真实的值,而R2DBC中,返回的是数据流Mono,Flux。

简单介绍一个Spring Data JPA。Spring Data JPA是Spring基于ORM框架、JPA规范的基础上封装的一套 JPA (Java Persistence API) 应用框架,简单说,就是类似Mybatis,Hibernate的框架(Spring Data JPA底层通过Hibernate操作数据库)。

Repository是Spring Data R2DBC中的重要概念,封装了对一个实体的操作,相当于一个dao(Data Access Object,数据访问对象)
 


官网连接:Spring Data R2DBC - Reference Documentation


5. Requirements

The Spring Data R2DBC 1.x binaries require:

  • JDK level 8.0 and above

  • Spring Framework 5.3.8 and above

  • R2DBC Arabba-SR10 and above

  • 这是官网对搭建非阻塞响应式编程的环境要求:


 一,本节将从简单的搭建开,体验下响应式非阻塞编程的大致概况:

    1.1 搭建环境:

    

<!--设置spring-boot依赖的版本 -->
<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.6.5</version> <!--2.4.11--><relativePath/> <!-- lookup parent from repository -->
</parent>

  

<!-- 响应式编程集成-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>${mysql-connector-java.version}</version>
</dependency>
<!--R2DBC是基于Reactive Streams标准来设计的。通过使用R2DBC,你可以使用reactive API来操作数据。同时R2DBC只是一个开放的标准,而各个具体的数据库连接实现,需要实现这个标准。-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-r2dbc</artifactId>
</dependency>
<dependency><groupId>com.github.jasync-sql</groupId><artifactId>jasync-r2dbc-mysql</artifactId><version>1.2.3</version>
</dependency>

额外可有可无
<!--reactor-test测试相关类-->
<dependency><groupId>io.projectreactor</groupId><artifactId>reactor-test</artifactId>
</dependency>

 第二: 基础配置application.yml文件

server:port: 9999servlet:context-path: /
spring:#连接数据库的url,前缀不再是jdbc而是换成r2dbc#这里可以配置连接池相关的其它属性,这里为了简洁不配置r2dbc:url: mysql://localhost:3306/tope-pay-user?&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=GMT%2B8&useSSL=falseusername: rootpassword: 123456logging:level:org.springframework.r2dbc: INFO  #输出执行的sqlorg.springframework.cloud.web.reactive: inforeactor.ipc.netty: info

第三: javaConfig文件编写,读取初始化化R2dbc连接的相关参数

package org.jd.websocket.auth.data.reactor.config;import com.github.jasync.r2dbc.mysql.JasyncConnectionFactory;
import com.github.jasync.sql.db.mysql.pool.MySQLConnectionFactory;
import io.r2dbc.spi.ConnectionFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.r2dbc.R2dbcProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.r2dbc.core.R2dbcEntityOperations;
import org.springframework.data.r2dbc.core.R2dbcEntityTemplate;
import org.springframework.r2dbc.connection.R2dbcTransactionManager;
import org.springframework.transaction.ReactiveTransactionManager;
import org.springframework.transaction.annotation.EnableTransactionManagement;import java.net.URI;
import java.net.URISyntaxException;/*** R2dbcProperties 看源代码中,数据库连接池的相关配置*/
@Configuration
@EnableTransactionManagement // 开启事务的支持
public class DatabaseConfiguration {@Bean@Qualifier("mysqlConnectionFactory")public ConnectionFactory connectionFactory(R2dbcProperties properties) throws URISyntaxException {// 从R2dbcProperties中,解析出 host、port、databaseURI uri = new URI(properties.getUrl());String host = uri.getHost();int port = uri.getPort();String database = uri.getPath().substring(1); // 去掉首位的 / 斜杠// 创建 Configuration 配置配置对象com.github.jasync.sql.db.Configuration configuration = new com.github.jasync.sql.db.Configuration(properties.getUsername(), host, port, properties.getPassword(), database);// 创建 ConnectionFactory 对象JasyncConnectionFactory jasyncConnectionFactory = new JasyncConnectionFactory(new MySQLConnectionFactory(configuration));return jasyncConnectionFactory;}@Beanpublic R2dbcEntityOperations mysqlR2dbcEntityOperations(@Qualifier("mysqlConnectionFactory") ConnectionFactory connectionFactory) {return new R2dbcEntityTemplate(connectionFactory);}@Beanpublic ReactiveTransactionManager transactionManager(R2dbcProperties properties) throws URISyntaxException {return new R2dbcTransactionManager(this.connectionFactory(properties));}
}

四:数据持久层: 响应式非阻塞编程 

package org.jd.websocket.auth.data.reactor.repository;import org.jd.websocket.auth.data.reactor.entity.RSysSystem;
import org.springframework.data.repository.reactive.ReactiveCrudRepository;
import org.springframework.data.repository.reactive.ReactiveSortingRepository;
import org.springframework.stereotype.Repository;/*** 持久层:非阻塞异步访问*/
@Repository
public interface RSysSystemReactiveRepository extends ReactiveCrudRepository<RSysSystem, Long>, ReactiveSortingRepository<RSysSystem, Long> {
}

五:业务层:

package org.jd.websocket.auth.data.reactor.service;import org.jd.websocket.auth.data.reactor.entity.RSysSystem;
import reactor.core.publisher.Mono;public interface RSysSystemService {/*** 通过ID查找单条记录** @param systemId 系统服务ID* @return {@link Mono<RSysSystem>}*/Mono<RSysSystem> findById(Long systemId);/*** 插入记录信息** @param system* @return {@link Mono<RSysSystem>)*/Mono<RSysSystem> insert(RSysSystem system);/*** 通过ID查询是否存在记录** @param systemId 系统ID* @return {@link Mono<Boolean>}*/Mono<Boolean> exists(Long systemId);/*** 查询记录数** @return {@link Mono<Long>}*/Mono<Long> count();
}

package org.jd.websocket.auth.data.reactor.service.impl;fimport lombok.extern.slf4j.Slf4j;
import org.jd.websocket.auth.data.reactor.entity.RSysSystem;
import org.jd.websocket.auth.data.reactor.repository.RSysSystemReactiveRepository;
import org.jd.websocket.auth.data.reactor.service.RSysSystemService;
import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import reactor.core.publisher.Mono;
import javax.annotation.Resource;/*** 构建全调用链路异步响应式编程* 系统响应式查询服务*/
@Slf4j
@Service
public class RSysSystemServiceImpl implements RSysSystemService {@Resourceprivate RSysSystemReactiveRepository sysSystemReactiveRepository;@Overridepublic Mono<RSysSystem> findById(Long systemId) {return sysSystemReactiveRepository.findById(systemId);}@Transactional(rollbackFor = Exception.class)@Overridepublic Mono<RSysSystem> insert(RSysSystem system) {return sysSystemReactiveRepository.save(system);}@Overridepublic Mono<Boolean> exists(Long systemId) {return sysSystemReactiveRepository.existsById(systemId);}@Overridepublic Mono<Long> count() {return sysSystemReactiveRepository.count();}
}

 六:服务器访问层

基于注解方式编程

package org.jd.websocket.auth.data.reactor.controller;import org.jd.websocket.auth.data.reactor.entity.RSysSystem;
import org.jd.websocket.auth.data.reactor.service.RSysSystemService;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;import javax.annotation.Resource;@RestController
@RequestMapping("/system")
public class SysSystemController {@Resourceprivate RSysSystemService rSysSystemService;@GetMapping("/getSysSystemById/{systemId}")public Mono<RSysSystem> getSySystem(@PathVariable("systemId") Long systemId) {Mono<RSysSystem> result = rSysSystemService.findById(systemId);System.out.println("result:" + result.toString());return result;}}

七: 领域模型类

package org.jd.websocket.auth.data.reactor.entity;import lombok.Data;
import lombok.RequiredArgsConstructor;
import org.springframework.data.annotation.Id;
import org.springframework.data.annotation.Transient;
import org.springframework.data.annotation.Version;
import org.springframework.data.relational.core.mapping.Column;
import org.springframework.data.relational.core.mapping.Table;
import org.springframework.format.annotation.DateTimeFormat;/*** 属性上的注解使用Spring-data中的相关注解*/
import java.io.Serializable;
import java.time.LocalDateTime;
import java.util.List;@Data
@RequiredArgsConstructor
@Table(value = "sys_system")
public class RSysSystem implements Serializable {@Transientprivate static final long serialVersionUID = 7481799808203597699L;// 主键自增@Id@Column(value = "system_id")private Long systemId;/*** 系统名称* 字段映射和约束条件* //对应数据库表中哪个列字段及对该字段的自定义*/@Column(value = "system_name")private String systemName;/*** 详细功能描述: 描述该系统主要包含那些那些模块,每个模块的大致功能*/@Column(value = "system_detail_desc")private String systemDetailDesc;/*** 系统跳转到功能版块路径*/@Column(value = "path_function_url")private String pathFunctionUrl;/*** 系统包含那些模块* 该字段不参与数据库映射*/@Transientprivate List<RSysModule> sysModules;/**** 创建时间*/@DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")@Column(value = "create_time")private LocalDateTime createTime;/*** 更新时间*/@DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")@Column(value = "update_time")private LocalDateTime updateTime;/*** 版本号(用于乐观锁, 默认为 1)* 使用 @Version 注解标注对应的实体类。* 可以通过 @TableField 进行数据自动填充。*/@Versionprivate Integer version;
}

测试脚本:


CREATE TABLE `sys_system` (
  `system_id` bigint(11) NOT NULL AUTO_INCREMENT COMMENT '系统主键',
  `system_name` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL DEFAULT '' COMMENT '系统短名称',
  `system_detail_desc` varchar(200) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL COMMENT '系统简介',
  `path_function_url` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT '' COMMENT '跳转到版块页路径',
  `create_time` timestamp NULL DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '创建时间',
  `update_time` timestamp NULL DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  `version` int(11) NOT NULL DEFAULT '0' COMMENT '版本号',
  PRIMARY KEY (`system_id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

 

运行测试结果:

http://localhost:9999/system/getSysSystemById/1


 可能会遇到时间字段(LocalDateTime)转换的问题:使用下面的配置转换类即可

package org.jd.websocket.auth.data.reactor.config;import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonDeserializer;
import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer;
import org.springframework.boot.autoconfigure.jackson.Jackson2ObjectMapperBuilderCustomizer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.io.IOException;
import java.time.*;
import java.time.format.DateTimeFormatter;@Configuration
public class LocalDateTimeSerializerConfig {@Beanpublic Jackson2ObjectMapperBuilderCustomizer jackson2ObjectMapperBuilderCustomizer() {return builder -> {//序列化builder.serializerByType(LocalDateTime.class, new LocalDateTimeSerializer(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));//反序列化builder.deserializerByType(LocalDateTime.class, new LocalDateTimeDeserializer());};}// 反序列化public static class LocalDateTimeDeserializer extends JsonDeserializer<LocalDateTime> {@Overridepublic LocalDateTime deserialize(JsonParser p, DeserializationContext deserializationContext)throws IOException {long timestamp = p.getValueAsLong();if (timestamp > 0) {return LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.systemDefault());} else {return null;}}}
}

至此,基础搭建完成,后续会持续系列多篇讲解,撸下源代码及相关知识........待续..... 

参考序列:

* 官方文档
* https://github.com/spring-projects/spring-data-examples/tree/master/r2dbc/example
* https://www.reactiveprinciples.org/  中文官网

这篇关于(一)基于Spring Reactor框架响应式异步编程|道法术器的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/266590

相关文章

SpringBoot实现虚拟线程的方案

《SpringBoot实现虚拟线程的方案》Java19引入虚拟线程,本文就来介绍一下SpringBoot实现虚拟线程的方案,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,... 目录什么是虚拟线程虚拟线程和普通线程的区别SpringBoot使用虚拟线程配置@Async性能对比H

javaSE类和对象进阶用法举例详解

《javaSE类和对象进阶用法举例详解》JavaSE的面向对象编程是软件开发中的基石,它通过类和对象的概念,实现了代码的模块化、可复用性和灵活性,:本文主要介绍javaSE类和对象进阶用法的相关资... 目录前言一、封装1.访问限定符2.包2.1包的概念2.2导入包2.3自定义包2.4常见的包二、stati

SpringBoot结合Knife4j进行API分组授权管理配置详解

《SpringBoot结合Knife4j进行API分组授权管理配置详解》在现代的微服务架构中,API文档和授权管理是不可或缺的一部分,本文将介绍如何在SpringBoot应用中集成Knife4j,并进... 目录环境准备配置 Swagger配置 Swagger OpenAPI自定义 Swagger UI 底

解决hive启动时java.net.ConnectException:拒绝连接的问题

《解决hive启动时java.net.ConnectException:拒绝连接的问题》Hadoop集群连接被拒,需检查集群是否启动、关闭防火墙/SELinux、确认安全模式退出,若问题仍存,查看日志... 目录错误发生原因解决方式1.关闭防火墙2.关闭selinux3.启动集群4.检查集群是否正常启动5.

SpringBoot集成EasyExcel实现百万级别的数据导入导出实践指南

《SpringBoot集成EasyExcel实现百万级别的数据导入导出实践指南》本文将基于开源项目springboot-easyexcel-batch进行解析与扩展,手把手教大家如何在SpringBo... 目录项目结构概览核心依赖百万级导出实战场景核心代码效果百万级导入实战场景监听器和Service(核心

idea Maven Springboot多模块项目打包时90%的问题及解决方案

《ideaMavenSpringboot多模块项目打包时90%的问题及解决方案》:本文主要介绍ideaMavenSpringboot多模块项目打包时90%的问题及解决方案,具有很好的参考价值,... 目录1. 前言2. 问题3. 解决办法4. jar 包冲突总结1. 前言之所以写这篇文章是因为在使用Mav

C# async await 异步编程实现机制详解

《C#asyncawait异步编程实现机制详解》async/await是C#5.0引入的语法糖,它基于**状态机(StateMachine)**模式实现,将异步方法转换为编译器生成的状态机类,本... 目录一、async/await 异步编程实现机制1.1 核心概念1.2 编译器转换过程1.3 关键组件解析

Spring Security6.3.x的使用指南与注意事项

《SpringSecurity6.3.x的使用指南与注意事项》SpringSecurity6.3.1基于现代化架构,提供简洁配置、增强默认安全性和OAuth2.1/OIDC支持,采用Lambda... 目录介绍基础配置 (Servlet 应用 - 使用 Lambda DSL)关键配置详解(Lambda DS

Java Stream 的 Collectors.toMap高级应用与最佳实践

《JavaStream的Collectors.toMap高级应用与最佳实践》文章讲解JavaStreamAPI中Collectors.toMap的使用,涵盖基础语法、键冲突处理、自定义Map... 目录一、基础用法回顾二、处理键冲突三、自定义 Map 实现类型四、处理 null 值五、复杂值类型转换六、处理

SpringBoot实现RSA+AES自动接口解密的实战指南

《SpringBoot实现RSA+AES自动接口解密的实战指南》在当今数据泄露频发的网络环境中,接口安全已成为开发者不可忽视的核心议题,RSA+AES混合加密方案因其安全性高、性能优越而被广泛采用,本... 目录一、项目依赖与环境准备1.1 Maven依赖配置1.2 密钥生成与配置二、加密工具类实现2.1