java9反应式编程 SubmissionPublisher实现发布订阅

2023-10-28 14:20

本文主要是介绍java9反应式编程 SubmissionPublisher实现发布订阅,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一 准备工作

JAVA9 中新增了Ract Stream API 支持反应编程,今天就来自己玩一下JAVA的原生反应API。
首先需要下载jdk13 , 编译版本需要是12,因为spring5 不支持12以上的版本编译。如图所示,另外IEDA中modules 的编译也要改成12,还有setting中的也要同步改成12 。

 IEDA的编译配置

二 反应模型简介 && 代码示例

任务订阅的过程
 订阅的过程
1- new 一个SubmissionPublisher
2- 新建订阅者 Subscriberss ,编写订阅者代码
3- 提交任务

Subscription 的作用
只有 发布者 和 订阅者 是不行的,就好比订阅报纸,是谁把报纸给你送到家的呢,这里就是Subscription
,他发起请求,请求从发布者那里获取几条消息。几个item。如果发布者发布了10条,但是请求只要3条,最终也只会消费3条。 demo代码如下

  public static void main(String[] args) throws InterruptedException {// 错误处理器,可以不写BiConsumer<Subscriberss, Exception> handler = (a,b) -> {System.out.println(Throwables.getStackTraceAsString(b));a.onError(b);};//线程池也不用,默认自带FJ 的线程池ExecutorService executorService = Executors.newFixedThreadPool(20);//可以直接new 不用构造方法。2 是buffer数组的容量大小,最大32,此处抛出一个问题,如果32个item都占满了,会怎么样?SubmissionPublisher<String> sp  = new SubmissionPublisher(executorService,2,handler);//添加一个订阅者(消费者),这个方法里会初始化BufferedSubscription 也是实际上消费者消费的类sp.subscribe(new Subscriberss());//发送4笔数据sp.submit("1");sp.submit("2");sp.submit("3");sp.submit("4");//关闭stream sp.close();//此处代码无关while (true){Thread.sleep(100000);}}//实现一个消费者static class Subscriberss<String> implements Flow.Subscriber{@Overridepublic void onSubscribe(Flow.Subscription subscription) {System.out.println("订阅消息");//拉取10个元素(消息)subscription.request(10);}@Overridepublic void onNext(Object item) {System.out.println( item.toString());}@Overridepublic void onError(Throwable throwable) {System.out.println("error");}@Overridepublic void onComplete() {System.out.println("牛逼");}}

执行结果如下
结果

代码跟踪

onSubscribe方法的描述 ,翻译成人话就是 onSubscribe在订阅者执行任何方法之前执行。所以首先打印
“订阅消息” 。

  Method invoked prior to invoking any other Subscriber methods for the given Subscription. If this method throws  an exception, resulting behavior is not guaranteed, but may  cause the Subscription not to be established or to be cancelled.

由于是异步的,不能跟着main线程,需要跟着订阅者的异步线程,BufferedSubscription默认使用的是FJ线程池,当我们提交完成后submit 方法结束后,任务就给到了 FJ线程池 或者 自己的线程池 里执行。
FORKJOIN 线程池
从BufferedSubscription 的 consume方法我们可以看到依次从 subscribeOnOpen(onSubscribe), takeItems (onNext),closeOnComplete(onComplete) 逐步调用。在等待执行的数组里,看到了我们的消息, array就是存放item的地方。

消息
注:这里不是demo里的 1,2,3,4, 这只之前的代码的跟踪

重点看一下:takeItems方法

//s 是订阅者,d是需求的数量,h是 头数组的下标final int takeItems(Subscriber<? super T> s, long d, int h) {Object[] a;int k = 0, cap;if ((a = array) != null && (cap = a.length) > 0) {//  这里提交任务的数量/8 +1 不清楚int m = cap - 1, b = (m >>> 3) + 1; // min(1, cap/8)  // 如果需求数量 <  这里提交任务的数量/8 +1  就用需求数量否则就用 /8的结果int n = (d < (long)b) ? (int)d : b;//从计算的下标开始逐个执行onNext 方法 for (; k < n; ++h, ++k) { Object x = QA.getAndSet(a, h & m, null);if (waiting != 0)signalWaiter();  //线程uppark if (x == null)break;else if (!consumeNext(s, x)) // 执行onNext方法break;}}return k;}

如果request请求了3次,但是submit了4次 ,最终会消费几次? demand 也就是我们的需求次数,takeitem里是取小的那个数就是3次,为什么 除以8 就不知道 了。。

如何实现 “反应”

如果是数组把item都存储下来逐个调用,请求次数就不能写死,所以参考HTTP里的写法改造无论发多少次都可以消费完。

			static class Subscriberss<String> implements Flow.Subscriber{//保存订阅者的引用private Flow.Subscription ss;@Overridepublic void onSubscribe(Flow.Subscription subscription) {System.out.println("订阅消息");ss=subscription;subscription.request(1);}@Overridepublic void onNext(Object item) {System.out.println( item.toString());//消费完成后再次请求ss.request(1);}}

接下来我们来研究java9 是如何用反应式API 改造HTTP模块的

这篇关于java9反应式编程 SubmissionPublisher实现发布订阅的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/293802

相关文章

Python实现精确小数计算的完全指南

《Python实现精确小数计算的完全指南》在金融计算、科学实验和工程领域,浮点数精度问题一直是开发者面临的重大挑战,本文将深入解析Python精确小数计算技术体系,感兴趣的小伙伴可以了解一下... 目录引言:小数精度问题的核心挑战一、浮点数精度问题分析1.1 浮点数精度陷阱1.2 浮点数误差来源二、基础解决

Java实现在Word文档中添加文本水印和图片水印的操作指南

《Java实现在Word文档中添加文本水印和图片水印的操作指南》在当今数字时代,文档的自动化处理与安全防护变得尤为重要,无论是为了保护版权、推广品牌,还是为了在文档中加入特定的标识,为Word文档添加... 目录引言Spire.Doc for Java:高效Word文档处理的利器代码实战:使用Java为Wo

SpringBoot日志级别与日志分组详解

《SpringBoot日志级别与日志分组详解》文章介绍了日志级别(ALL至OFF)及其作用,说明SpringBoot默认日志级别为INFO,可通过application.properties调整全局或... 目录日志级别1、级别内容2、调整日志级别调整默认日志级别调整指定类的日志级别项目开发过程中,利用日志

Java中的抽象类与abstract 关键字使用详解

《Java中的抽象类与abstract关键字使用详解》:本文主要介绍Java中的抽象类与abstract关键字使用详解,本文通过实例代码给大家介绍的非常详细,感兴趣的朋友跟随小编一起看看吧... 目录一、抽象类的概念二、使用 abstract2.1 修饰类 => 抽象类2.2 修饰方法 => 抽象方法,没有

SpringBoot 多环境开发实战(从配置、管理与控制)

《SpringBoot多环境开发实战(从配置、管理与控制)》本文详解SpringBoot多环境配置,涵盖单文件YAML、多文件模式、MavenProfile分组及激活策略,通过优先级控制灵活切换环境... 目录一、多环境开发基础(单文件 YAML 版)(一)配置原理与优势(二)实操示例二、多环境开发多文件版

Spring 中的切面与事务结合使用完整示例

《Spring中的切面与事务结合使用完整示例》本文给大家介绍Spring中的切面与事务结合使用完整示例,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考... 目录 一、前置知识:Spring AOP 与 事务的关系 事务本质上就是一个“切面”二、核心组件三、完

Java实现远程执行Shell指令

《Java实现远程执行Shell指令》文章介绍使用JSch在SpringBoot项目中实现远程Shell操作,涵盖环境配置、依赖引入及工具类编写,详解分号和双与号执行多指令的区别... 目录软硬件环境说明编写执行Shell指令的工具类总结jsch(Java Secure Channel)是SSH2的一个纯J

使用Python实现Word文档的自动化对比方案

《使用Python实现Word文档的自动化对比方案》我们经常需要比较两个Word文档的版本差异,无论是合同修订、论文修改还是代码文档更新,人工比对不仅效率低下,还容易遗漏关键改动,下面通过一个实际案例... 目录引言一、使用python-docx库解析文档结构二、使用difflib进行差异比对三、高级对比方

深度解析Python中递归下降解析器的原理与实现

《深度解析Python中递归下降解析器的原理与实现》在编译器设计、配置文件处理和数据转换领域,递归下降解析器是最常用且最直观的解析技术,本文将详细介绍递归下降解析器的原理与实现,感兴趣的小伙伴可以跟随... 目录引言:解析器的核心价值一、递归下降解析器基础1.1 核心概念解析1.2 基本架构二、简单算术表达

JavaScript中比较两个数组是否有相同元素(交集)的三种常用方法

《JavaScript中比较两个数组是否有相同元素(交集)的三种常用方法》:本文主要介绍JavaScript中比较两个数组是否有相同元素(交集)的三种常用方法,每种方法结合实例代码给大家介绍的非常... 目录引言:为什么"相等"判断如此重要?方法1:使用some()+includes()(适合小数组)方法2