java的反应式流

2023-11-07 20:52
文章标签 java 反应式

本文主要是介绍java的反应式流,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Java的反应式流是一种新的编程模型,它在异步和事件驱动的环境下工作。反应式流的目的是为了解决传统的单线程或者多线程编程模型在高并发和大流量情况下的性能瓶颈。

反应式流的核心是Observable和Observer,Observable表示一个数据流,而Observer则表示这个数据流的消费者。Observable在数据流上产生事件,而Observer则对这些事件进行响应。反应式流的数据流是一种推式的流,Observable发布事件时不需要等待Observer接收,Observable会把事件推送给Observer,而不是Observer去轮询Observable。

Java的反应式流通常基于Reactor或RxJava等库,这些库提供了丰富的函数式编程API和运算符,可以非常方便地处理异步事件。这些库都提供了类似于Observable和Observer的抽象概念,可以用来描述和处理异步数据流。同时还提供了常用的运算符,包括map、filter、reduce等,这些运算符可以方便地对数据流进行变换和过滤。

反应式流还有一个重要的概念是背压(backpressure),它是指在高并发和大流量情况下,消费者无法处理生产者产生的数据流,导致数据积压的情况。为了解决这个问题,反应式流引入了背压机制,生产者会在发送数据前先询问消费者的处理能力,如果消费者没有处理能力,生产者会等待一段时间或者缓存数据,等待消费者处理完数据后再继续发送。

反应式流已经被广泛应用于大规模的互联网应用中,包括机器学习、数据分析、网络爬虫等领域。它的优点在于处理高并发和大流量的数据流时,能够更加高效地利用系统资源,提高系统的性能和可扩展性。

总之,反应式流是Java编程中的一个重要概念,它可以帮助我们更好地处理异步和事件驱动的数据流,提高系统的性能和可扩展性。

不涉及任何库,就单纯用java的反应式流,完成发布订阅者模式:

package com.example.jdk9.react;import java.util.concurrent.Flow.*;public class PublisherSubscriberDemo {public static void main(String[] args) {SimplePublisher<String> publisher = new SimplePublisher<>();SimpleSubscriber<String> subscriber1 = new SimpleSubscriber<>();SimpleSubscriber<String> subscriber2 = new SimpleSubscriber<>();publisher.subscribe(subscriber1);publisher.subscribe(subscriber2);publisher.submit("hello");publisher.submit("world");publisher.close();}
}class SimplePublisher<T> implements Publisher<T> {private Subscription subscription;@Overridepublic void subscribe(Subscriber<? super T> subscriber) {subscriber.onSubscribe(new Subscription() {@Overridepublic void request(long n) {}@Overridepublic void cancel() {// nothing to do}});this.subscription = new Subscription() {private boolean cancelled = false;@Overridepublic void request(long n) {// nothing to do}@Overridepublic void cancel() {this.cancelled = true;}public boolean isCancelled() {return this.cancelled;}};subscriber.onSubscribe(this.subscription);}public void submit(T item) {subscriptionLimitedQueue.offer(item);subscription.request(1);}public void close() {while (!subscriptionLimitedQueue.isEmpty()) {subscriptionLimitedQueue.poll();}subscription.cancel();}private SubscriptionLimitedQueue<T> subscriptionLimitedQueue = new SubscriptionLimitedQueue<>(2);static class SubscriptionLimitedQueue<T> {private final int limit;private int size = 0;private Node<T> head;private Node<T> tail;public SubscriptionLimitedQueue(int limit) {this.limit = limit;}private static class Node<T> {final T item;Node<T> next;Node(T item, Node<T> next) {this.item = item;this.next = next;}}public void offer(T item) {Node<T> node = new Node<>(item, null);if (head == null) {head = node;tail = head;} else {tail.next = node;tail = tail.next;}size++;if (size > limit) {Node<T> newHead = head.next;head.next = null;head = newHead;size--;}}public boolean isEmpty() {return size == 0;}public T poll() {if (isEmpty()) {return null;}T item = head.item;Node<T> newHead = head.next;head.next = null;head = newHead;size--;return item;}}
}class SimpleSubscriber<T> implements Subscriber<T> {private Subscription subscription;@Overridepublic void onSubscribe(Subscription subscription) {this.subscription = subscription;System.out.println("订阅成功");subscription.request(1);}@Overridepublic void onNext(T item) {System.out.println("Received item: " + item);subscription.request(1);}@Overridepublic void onError(Throwable throwable) {throwable.printStackTrace();}@Overridepublic void onComplete() {System.out.println("Done");}
}

这段代码演示了使用Flow API来发布和订阅消息的过程,它包含以下类和接口:

  1. Publisher<T>:发布者接口,表示能够发布指定类型的消息给订阅者。
  2. Subscriber<T>:订阅者接口,表示能够接收指定类型的消息。
  3. Subscription:订阅接口,表示订阅关系,能够请求一定数量的消息和取消订阅。
  4. SubmissionPublisher<T>:继承自Publisher<T>接口,实现了异步发布消息的能力。
  5. Flow API:一组用于处理数据流和异步操作的接口和类。

具体解释:

  1. SimplePublisher类是一个实现了Publisher接口的简单发布者类,它能够发布指定类型的消息给订阅者。它内部维护了一个SubscriptionLimitedQueue类的对象,用于限制消息队列的长度。
  2. SubscriptionLimitedQueue类是一个维护队列长度的类,用于实现限制消息队列长度的功能。
  3. SimpleSubscriber类是一个实现了Subscriber接口的简单订阅者类,它能够接收指定类型的消息,并将其输出到控制台中。
  4. main方法创建了一个SimplePublisher类的实例和一个SimpleSubscriber类的实例,然后将它们关联起来,最后向SimplePublisher类的实例中发布了两个消息,随后关闭了发布者。

运行结果:

例子:

第一步,引入依赖:

<dependency><groupId>io.projectreactor</groupId><artifactId>reactor-core</artifactId><version>3.5.11</version></dependency>

第二步,编写代码:

package com.example.jdk9.react;import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;public class ReactiveStreamExample {public static void main(String[] args) {Flux<Integer> stream = Flux.range(1, 10);stream.map(i -> i * 2).filter(i -> i % 3 == 0).flatMap(i -> Mono.just(i).zipWith(Mono.just(i * 3))).subscribe(System.out::println);}
}

上面的代码首先创建了一个从1到10的数字列表,然后通过map操作符将每个元素乘以2,再使用filter操作符过滤掉不能被3整除的元素。接下来,使用flatMap操作符来创建一个新的流,该流将原始元素和该元素乘以3的结果合并在一起。最后,使用subscribe方法来订阅这个流并打印出每个元素的值。

这个例子展示了Reactor库中的一些常见操作符,包括mapfilterflatMap。通过这些操作符的链式调用,我们可以轻松地对数据流进行复杂的操作。在实际的应用中,我们可以根据具体的需求选择不同的操作符来实现所需的数据处理逻辑。

使用Reactor 库实现发布订阅者模式:

package com.example.jdk9.react;import reactor.core.publisher.Flux;public class PublisherSubscriberExample {public static void main(String[] args) {// 创建发布者Flux<Integer> publisher = Flux.just(1, 2, 3, 4, 5);// 订阅者1:打印每个元素publisher.subscribe(System.out::println);// 订阅者2:计算元素的总和并打印publisher.reduce(0, Integer::sum).subscribe(total -> System.out.println("Sum = " + total));}
}

这篇关于java的反应式流的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/366235

相关文章

Java实现在Word文档中添加文本水印和图片水印的操作指南

《Java实现在Word文档中添加文本水印和图片水印的操作指南》在当今数字时代,文档的自动化处理与安全防护变得尤为重要,无论是为了保护版权、推广品牌,还是为了在文档中加入特定的标识,为Word文档添加... 目录引言Spire.Doc for Java:高效Word文档处理的利器代码实战:使用Java为Wo

SpringBoot日志级别与日志分组详解

《SpringBoot日志级别与日志分组详解》文章介绍了日志级别(ALL至OFF)及其作用,说明SpringBoot默认日志级别为INFO,可通过application.properties调整全局或... 目录日志级别1、级别内容2、调整日志级别调整默认日志级别调整指定类的日志级别项目开发过程中,利用日志

Java中的抽象类与abstract 关键字使用详解

《Java中的抽象类与abstract关键字使用详解》:本文主要介绍Java中的抽象类与abstract关键字使用详解,本文通过实例代码给大家介绍的非常详细,感兴趣的朋友跟随小编一起看看吧... 目录一、抽象类的概念二、使用 abstract2.1 修饰类 => 抽象类2.2 修饰方法 => 抽象方法,没有

SpringBoot 多环境开发实战(从配置、管理与控制)

《SpringBoot多环境开发实战(从配置、管理与控制)》本文详解SpringBoot多环境配置,涵盖单文件YAML、多文件模式、MavenProfile分组及激活策略,通过优先级控制灵活切换环境... 目录一、多环境开发基础(单文件 YAML 版)(一)配置原理与优势(二)实操示例二、多环境开发多文件版

Spring 中的切面与事务结合使用完整示例

《Spring中的切面与事务结合使用完整示例》本文给大家介绍Spring中的切面与事务结合使用完整示例,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考... 目录 一、前置知识:Spring AOP 与 事务的关系 事务本质上就是一个“切面”二、核心组件三、完

Java实现远程执行Shell指令

《Java实现远程执行Shell指令》文章介绍使用JSch在SpringBoot项目中实现远程Shell操作,涵盖环境配置、依赖引入及工具类编写,详解分号和双与号执行多指令的区别... 目录软硬件环境说明编写执行Shell指令的工具类总结jsch(Java Secure Channel)是SSH2的一个纯J

JavaScript中比较两个数组是否有相同元素(交集)的三种常用方法

《JavaScript中比较两个数组是否有相同元素(交集)的三种常用方法》:本文主要介绍JavaScript中比较两个数组是否有相同元素(交集)的三种常用方法,每种方法结合实例代码给大家介绍的非常... 目录引言:为什么"相等"判断如此重要?方法1:使用some()+includes()(适合小数组)方法2

SpringBoot 获取请求参数的常用注解及用法

《SpringBoot获取请求参数的常用注解及用法》SpringBoot通过@RequestParam、@PathVariable等注解支持从HTTP请求中获取参数,涵盖查询、路径、请求体、头、C... 目录SpringBoot 提供了多种注解来方便地从 HTTP 请求中获取参数以下是主要的注解及其用法:1

HTTP 与 SpringBoot 参数提交与接收协议方式

《HTTP与SpringBoot参数提交与接收协议方式》HTTP参数提交方式包括URL查询、表单、JSON/XML、路径变量、头部、Cookie、GraphQL、WebSocket和SSE,依据... 目录HTTP 协议支持多种参数提交方式,主要取决于请求方法(Method)和内容类型(Content-Ty

深度解析Java @Serial 注解及常见错误案例

《深度解析Java@Serial注解及常见错误案例》Java14引入@Serial注解,用于编译时校验序列化成员,替代传统方式解决运行时错误,适用于Serializable类的方法/字段,需注意签... 目录Java @Serial 注解深度解析1. 注解本质2. 核心作用(1) 主要用途(2) 适用位置3