RxJava 2.x 之图解创建、订阅、发射流程

2024-02-19 06:10

本文主要是介绍RxJava 2.x 之图解创建、订阅、发射流程,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

  • 从一个例子开始
  • 创建过程
  • 订阅过程
  • 发射过程
  • 小结
从一个例子开始
Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(ObservableEmitter<Integer> emitter) throws Exception {for (int i = 0; i < 3; i++) {emitter.onNext(i);}emitter.onComplete();Log.d(TAG, "subscribe " + Thread.currentThread().getName());}}).subscribeOn(Schedulers.newThread()).map(new Function<Integer, String>() {@Overridepublic String apply(Integer value) throws Exception {Log.d(TAG, "apply " + Thread.currentThread().getName());return "apply " + value;}}).observeOn(AndroidSchedulers.mainThread()).subscribeWith(new ResourceObserver<String>() {@Overridepublic void onNext(String value) {Log.d(TAG, "onNext " + value);}@Overridepublic void onError(Throwable e) {Log.d(TAG, "onError");}@Overridepublic void onComplete() {Log.d(TAG, "onComplete " + Thread.currentThread().getName());}});

来看看输出:

10-26 16:55:17.418 32696-561/com.onzhou.study D/MainActivity: apply RxNewThreadScheduler-1
10-26 16:55:17.418 32696-561/com.onzhou.study D/MainActivity: apply RxNewThreadScheduler-1
10-26 16:55:17.418 32696-561/com.onzhou.study D/MainActivity: create RxNewThreadScheduler-1
10-26 16:55:17.427 32696-32696/com.onzhou.study D/MainActivity: onNext apply 0
10-26 16:55:17.427 32696-32696/com.onzhou.study D/MainActivity: onNext apply 1
10-26 16:55:17.427 32696-32696/com.onzhou.study D/MainActivity: onNext apply 2
10-26 16:55:17.427 32696-32696/com.onzhou.study D/MainActivity: onComplete main

可以看到创建发送转换过程都在子线程中,而最后的回调是在主线程中

整个过程笔者整理成一张图,一步一步来跟进分析

创建过程
  • 第一步:通过create操作符创建了一个ObservableCreate类型的Observable,由于是基于匿名内部类创建的,因此持有的是实现了ObservableOnSubscribe接口的HomeActivity实例

  • 第二步:通过subscribeOn操作符创建了一个ObservableSubscribeOn类型的Observable,且其内部的source持有上个步骤的ObservableCreate实例

  • 第三步:通过map操作符创建了一个ObservableMap类型的Observable,且其内部持有上个步骤传入的ObservableSubscribeOn实例

  • 第四步:通过observeOn操作符创建了一个ObservableObserveOn类型的Observable,且其内部持有上个步骤的ObservableMap实例

  • 第五步:通过subscribeWith方法完成订阅,由于是基于匿名内部类创建的,因此传入的实际上是实现了ResourceObserverHomeActivity实例

订阅过程

上述的几个步骤其实已经完成的基本的创建过程了,最后我们拿到的实际是ObservableObserveOn的实例,下面开始订阅流程。

  • 第一步:subscribeWith方法,传入的observer是实现了ResourceObserver接口HomeActivity实例,通过subscribeActual发起订阅,内部实际调用的是source.subscribe方法,由于source持有的是上面传入的ObservableMap实例,因此这一步骤实际调用的是,ObservableMap实例中的subscribe方法,传入的参数就是ObserveOnObserver实例(构造参数主要是实现了ResourceObserver的实例即:HomeActivity)

  • 第二步:进入ObservableMap实例subscribe方法中,通过subscribeActual发起订阅,实际调用的是source.subscribe方法,传入的是MapObserver实例(构造参数为之前传递的ObserveOnObserver实例),由于source持有的是ObservableSubscribeOn的实例,因此最终调用的其实是ObservableSubscribeOn实例中的subscribe方法

  • 第三步:进入ObservableSubscribeOn实例subscribe方法中,通过subscribeActual发起订阅,完成MapObserver实例对SubscribeOnObserver的订阅,接着由NewThreadScheduler线程调度器完成对应的任务(该任务的执行是在线程中执行的),SubscribeTask实现了Runnable接口,最终会回调run方法,执行source.subscribe方法,这里的source持有的就是最开始的ObservableCreate实例

@Overridepublic void subscribeActual(final Observer<? super T> s) {final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);//这里的s就是上个步骤的MapObserver实例s.onSubscribe(parent);//这里的scheduler就是我们最开始指定的Schedulers.newThread 即NewThreadScheduler线程调度器parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}

  • 第四步:进入ObservableCreate实例subscribe方法中,通过subscribeActual发起订阅,这里的source持有的是HomeActivity实例,直接调用subscribe方法,传入参数是构建的最顶层的发射器CreateEmitter实例

  • 第五步:上述的几个过程实际已经完成了订阅的过程,最后经过层层传递,持有的最顶层的是CreateEmitter实例,即我们最终的被观察者
发射过程

上述的过程已经完成了订阅过程,在最后订阅完成之后,最终会通过source.subscribe方法,其实就是调用HomeActivity实例的subscribe方法,完成元素发射

@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {for (int i = 0; i < 3; i++) {emitter.onNext(i);}emitter.onComplete();Log.d(TAG, "subscribe " + Thread.currentThread().getName());
}

我们在最顶层的被观察者里通过ObservableEmitter实例onNext方法完成元素的发射,最终又会通过一层一层的Observer转发到最原始的实现了ResourceObserver接口观察者中来

注意:

  • 这里的被观察者里的所有发射过程实际上都是在NewThreadScheduler线程调度器分配的线程里完成的
  • 发射的元素被传递到下层的ObservableObserveOn类中的ObserveOnObserver实例onNext方法,实际执行的是HandlerScheduler.HandlerWorkerschedule方法,最终就是通过我们持有的主线程的handler切换到主线程中

小结

整个创建过程订阅过程发射过程看起来山路十八弯,但是如果你一步一步跟进查看,会发现整个流程实际上是很清晰的,整个过程起点终点很明确,
而中间产生的一系列ObservableObserver你都可以看作是代理类,用来转发订阅以及最终的元素发射

这篇关于RxJava 2.x 之图解创建、订阅、发射流程的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/723731

相关文章

Java NoClassDefFoundError运行时错误分析解决

《JavaNoClassDefFoundError运行时错误分析解决》在Java开发中,NoClassDefFoundError是一种常见的运行时错误,它通常表明Java虚拟机在尝试加载一个类时未能... 目录前言一、问题分析二、报错原因三、解决思路检查类路径配置检查依赖库检查类文件调试类加载器问题四、常见

Java注解之超越Javadoc的元数据利器详解

《Java注解之超越Javadoc的元数据利器详解》本文将深入探讨Java注解的定义、类型、内置注解、自定义注解、保留策略、实际应用场景及最佳实践,无论是初学者还是资深开发者,都能通过本文了解如何利用... 目录什么是注解?注解的类型内置注编程解自定义注解注解的保留策略实际用例最佳实践总结在 Java 编程

CentOS和Ubuntu系统使用shell脚本创建用户和设置密码

《CentOS和Ubuntu系统使用shell脚本创建用户和设置密码》在Linux系统中,你可以使用useradd命令来创建新用户,使用echo和chpasswd命令来设置密码,本文写了一个shell... 在linux系统中,你可以使用useradd命令来创建新用户,使用echo和chpasswd命令来设

Java 实用工具类Spring 的 AnnotationUtils详解

《Java实用工具类Spring的AnnotationUtils详解》Spring框架提供了一个强大的注解工具类org.springframework.core.annotation.Annot... 目录前言一、AnnotationUtils 的常用方法二、常见应用场景三、与 JDK 原生注解 API 的

Java controller接口出入参时间序列化转换操作方法(两种)

《Javacontroller接口出入参时间序列化转换操作方法(两种)》:本文主要介绍Javacontroller接口出入参时间序列化转换操作方法,本文给大家列举两种简单方法,感兴趣的朋友一起看... 目录方式一、使用注解方式二、统一配置场景:在controller编写的接口,在前后端交互过程中一般都会涉及

Java中的StringBuilder之如何高效构建字符串

《Java中的StringBuilder之如何高效构建字符串》本文将深入浅出地介绍StringBuilder的使用方法、性能优势以及相关字符串处理技术,结合代码示例帮助读者更好地理解和应用,希望对大家... 目录关键点什么是 StringBuilder?为什么需要 StringBuilder?如何使用 St

使用Java将各种数据写入Excel表格的操作示例

《使用Java将各种数据写入Excel表格的操作示例》在数据处理与管理领域,Excel凭借其强大的功能和广泛的应用,成为了数据存储与展示的重要工具,在Java开发过程中,常常需要将不同类型的数据,本文... 目录前言安装免费Java库1. 写入文本、或数值到 Excel单元格2. 写入数组到 Excel表格

Java并发编程之如何优雅关闭钩子Shutdown Hook

《Java并发编程之如何优雅关闭钩子ShutdownHook》这篇文章主要为大家详细介绍了Java如何实现优雅关闭钩子ShutdownHook,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起... 目录关闭钩子简介关闭钩子应用场景数据库连接实战演示使用关闭钩子的注意事项开源框架中的关闭钩子机制1.

Maven中引入 springboot 相关依赖的方式(最新推荐)

《Maven中引入springboot相关依赖的方式(最新推荐)》:本文主要介绍Maven中引入springboot相关依赖的方式(最新推荐),本文给大家介绍的非常详细,对大家的学习或工作具有... 目录Maven中引入 springboot 相关依赖的方式1. 不使用版本管理(不推荐)2、使用版本管理(推

Java 中的 @SneakyThrows 注解使用方法(简化异常处理的利与弊)

《Java中的@SneakyThrows注解使用方法(简化异常处理的利与弊)》为了简化异常处理,Lombok提供了一个强大的注解@SneakyThrows,本文将详细介绍@SneakyThro... 目录1. @SneakyThrows 简介 1.1 什么是 Lombok?2. @SneakyThrows