java9反应式编程 SubmissionPublisher实现发布订阅

2023-10-28 14:20

本文主要是介绍java9反应式编程 SubmissionPublisher实现发布订阅,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一 准备工作

JAVA9 中新增了Ract Stream API 支持反应编程,今天就来自己玩一下JAVA的原生反应API。
首先需要下载jdk13 , 编译版本需要是12,因为spring5 不支持12以上的版本编译。如图所示,另外IEDA中modules 的编译也要改成12,还有setting中的也要同步改成12 。

 IEDA的编译配置

二 反应模型简介 && 代码示例

任务订阅的过程
 订阅的过程
1- new 一个SubmissionPublisher
2- 新建订阅者 Subscriberss ,编写订阅者代码
3- 提交任务

Subscription 的作用
只有 发布者 和 订阅者 是不行的,就好比订阅报纸,是谁把报纸给你送到家的呢,这里就是Subscription
,他发起请求,请求从发布者那里获取几条消息。几个item。如果发布者发布了10条,但是请求只要3条,最终也只会消费3条。 demo代码如下

  public static void main(String[] args) throws InterruptedException {// 错误处理器,可以不写BiConsumer<Subscriberss, Exception> handler = (a,b) -> {System.out.println(Throwables.getStackTraceAsString(b));a.onError(b);};//线程池也不用,默认自带FJ 的线程池ExecutorService executorService = Executors.newFixedThreadPool(20);//可以直接new 不用构造方法。2 是buffer数组的容量大小,最大32,此处抛出一个问题,如果32个item都占满了,会怎么样?SubmissionPublisher<String> sp  = new SubmissionPublisher(executorService,2,handler);//添加一个订阅者(消费者),这个方法里会初始化BufferedSubscription 也是实际上消费者消费的类sp.subscribe(new Subscriberss());//发送4笔数据sp.submit("1");sp.submit("2");sp.submit("3");sp.submit("4");//关闭stream sp.close();//此处代码无关while (true){Thread.sleep(100000);}}//实现一个消费者static class Subscriberss<String> implements Flow.Subscriber{@Overridepublic void onSubscribe(Flow.Subscription subscription) {System.out.println("订阅消息");//拉取10个元素(消息)subscription.request(10);}@Overridepublic void onNext(Object item) {System.out.println( item.toString());}@Overridepublic void onError(Throwable throwable) {System.out.println("error");}@Overridepublic void onComplete() {System.out.println("牛逼");}}

执行结果如下
结果

代码跟踪

onSubscribe方法的描述 ,翻译成人话就是 onSubscribe在订阅者执行任何方法之前执行。所以首先打印
“订阅消息” 。

  Method invoked prior to invoking any other Subscriber methods for the given Subscription. If this method throws  an exception, resulting behavior is not guaranteed, but may  cause the Subscription not to be established or to be cancelled.

由于是异步的,不能跟着main线程,需要跟着订阅者的异步线程,BufferedSubscription默认使用的是FJ线程池,当我们提交完成后submit 方法结束后,任务就给到了 FJ线程池 或者 自己的线程池 里执行。
FORKJOIN 线程池
从BufferedSubscription 的 consume方法我们可以看到依次从 subscribeOnOpen(onSubscribe), takeItems (onNext),closeOnComplete(onComplete) 逐步调用。在等待执行的数组里,看到了我们的消息, array就是存放item的地方。

消息
注:这里不是demo里的 1,2,3,4, 这只之前的代码的跟踪

重点看一下:takeItems方法

//s 是订阅者,d是需求的数量,h是 头数组的下标final int takeItems(Subscriber<? super T> s, long d, int h) {Object[] a;int k = 0, cap;if ((a = array) != null && (cap = a.length) > 0) {//  这里提交任务的数量/8 +1 不清楚int m = cap - 1, b = (m >>> 3) + 1; // min(1, cap/8)  // 如果需求数量 <  这里提交任务的数量/8 +1  就用需求数量否则就用 /8的结果int n = (d < (long)b) ? (int)d : b;//从计算的下标开始逐个执行onNext 方法 for (; k < n; ++h, ++k) { Object x = QA.getAndSet(a, h & m, null);if (waiting != 0)signalWaiter();  //线程uppark if (x == null)break;else if (!consumeNext(s, x)) // 执行onNext方法break;}}return k;}

如果request请求了3次,但是submit了4次 ,最终会消费几次? demand 也就是我们的需求次数,takeitem里是取小的那个数就是3次,为什么 除以8 就不知道 了。。

如何实现 “反应”

如果是数组把item都存储下来逐个调用,请求次数就不能写死,所以参考HTTP里的写法改造无论发多少次都可以消费完。

			static class Subscriberss<String> implements Flow.Subscriber{//保存订阅者的引用private Flow.Subscription ss;@Overridepublic void onSubscribe(Flow.Subscription subscription) {System.out.println("订阅消息");ss=subscription;subscription.request(1);}@Overridepublic void onNext(Object item) {System.out.println( item.toString());//消费完成后再次请求ss.request(1);}}

接下来我们来研究java9 是如何用反应式API 改造HTTP模块的

这篇关于java9反应式编程 SubmissionPublisher实现发布订阅的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/293802

相关文章

SpringBoot整合liteflow的详细过程

《SpringBoot整合liteflow的详细过程》:本文主要介绍SpringBoot整合liteflow的详细过程,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋...  liteflow 是什么? 能做什么?总之一句话:能帮你规范写代码逻辑 ,编排并解耦业务逻辑,代码

JavaSE正则表达式用法总结大全

《JavaSE正则表达式用法总结大全》正则表达式就是由一些特定的字符组成,代表的是一个规则,:本文主要介绍JavaSE正则表达式用法的相关资料,文中通过代码介绍的非常详细,需要的朋友可以参考下... 目录常用的正则表达式匹配符正则表China编程达式常用的类Pattern类Matcher类PatternSynta

Spring Security中用户名和密码的验证完整流程

《SpringSecurity中用户名和密码的验证完整流程》本文给大家介绍SpringSecurity中用户名和密码的验证完整流程,本文结合实例代码给大家介绍的非常详细,对大家的学习或工作具有一定... 首先创建了一个UsernamePasswordAuthenticationTChina编程oken对象,这是S

java实现docker镜像上传到harbor仓库的方式

《java实现docker镜像上传到harbor仓库的方式》:本文主要介绍java实现docker镜像上传到harbor仓库的方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地... 目录1. 前 言2. 编写工具类2.1 引入依赖包2.2 使用当前服务器的docker环境推送镜像2.2

C++20管道运算符的实现示例

《C++20管道运算符的实现示例》本文简要介绍C++20管道运算符的使用与实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧... 目录标准库的管道运算符使用自己实现类似的管道运算符我们不打算介绍太多,因为它实际属于c++20最为重要的

Java easyExcel实现导入多sheet的Excel

《JavaeasyExcel实现导入多sheet的Excel》这篇文章主要为大家详细介绍了如何使用JavaeasyExcel实现导入多sheet的Excel,文中的示例代码讲解详细,感兴趣的小伙伴可... 目录1.官网2.Excel样式3.代码1.官网easyExcel官网2.Excel样式3.代码

Java MQTT实战应用

《JavaMQTT实战应用》本文详解MQTT协议,涵盖其发布/订阅机制、低功耗高效特性、三种服务质量等级(QoS0/1/2),以及客户端、代理、主题的核心概念,最后提供Linux部署教程、Sprin... 目录一、MQTT协议二、MQTT优点三、三种服务质量等级四、客户端、代理、主题1. 客户端(Clien

Java中调用数据库存储过程的示例代码

《Java中调用数据库存储过程的示例代码》本文介绍Java通过JDBC调用数据库存储过程的方法,涵盖参数类型、执行步骤及数据库差异,需注意异常处理与资源管理,以优化性能并实现复杂业务逻辑,感兴趣的朋友... 目录一、存储过程概述二、Java调用存储过程的基本javascript步骤三、Java调用存储过程示

Go语言数据库编程GORM 的基本使用详解

《Go语言数据库编程GORM的基本使用详解》GORM是Go语言流行的ORM框架,封装database/sql,支持自动迁移、关联、事务等,提供CRUD、条件查询、钩子函数、日志等功能,简化数据库操作... 目录一、安装与初始化1. 安装 GORM 及数据库驱动2. 建立数据库连接二、定义模型结构体三、自动迁

python实现对数据公钥加密与私钥解密

《python实现对数据公钥加密与私钥解密》这篇文章主要为大家详细介绍了如何使用python实现对数据公钥加密与私钥解密,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录公钥私钥的生成使用公钥加密使用私钥解密公钥私钥的生成这一部分,使用python生成公钥与私钥,然后保存在两个文