本文主要是介绍RxJava----Utility 辅助操作,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
这个页面列出了很多用于Observable的辅助操作符
materialize( )— 将Observable转换成一个通知列表convert an Observable into a list of Notificationsdematerialize( )— 将上面的结果逆转回一个Observabletimestamp( )— 给Observable发射的每个数据项添加一个时间戳serialize( )— 强制Observable按次序发射数据并且要求功能是完好的cache( )— 记住Observable发射的数据序列并发射相同的数据序列给后续的订阅者observeOn( )— 指定观察者观察Observable的调度器subscribeOn( )— 指定Observable执行任务的调度器doOnEach( )— 注册一个动作,对Observable发射的每个数据项使用doOnCompleted( )— 注册一个动作,对正常完成的Observable使用doOnError( )— 注册一个动作,对发生错误的Observable使用doOnTerminate( )— 注册一个动作,对完成的Observable使用,无论是否发生错误doOnSubscribe( )— 注册一个动作,在观察者订阅时使用doOnUnsubscribe( )— 注册一个动作,在观察者取消订阅时使用finallyDo( )— 注册一个动作,在Observable完成时使用delay( )— 延时发射Observable的结果delaySubscription( )— 延时处理订阅请求timeInterval( )— 定期发射数据using( )— 创建一个只在Observable生命周期存在的资源single( )— 强制返回单个数据,否则抛出异常singleOrDefault( )— 如果Observable完成时返回了单个数据,就返回它,否则返回默认数据toFuture( ),toIterable( ),toList( )— 将Observable转换为其它对象或数据结构
=========================================================
Materialize/Dematerialize
Materialize将数据项和事件通知都当做数据项发射,Dematerialize刚好相反。

一个合法的有限的Obversable将调用它的观察者的onNext方法零次或多次,然后调用观察者的onCompleted或onError正好一次。Materialize操作符将这一系列调用,包括原来的onNext通知和终止通知onCompleted或onError都转换为一个Observable发射的数据序列。
RxJava的materialize将来自原始Observable的通知转换为Notification对象,然后它返回的Observable会发射这些数据。
materialize默认不在任何特定的调度器 (Scheduler) 上执行。
- Javadoc: materialize()

Dematerialize操作符是Materialize的逆向过程,它将Materialize转换的结果还原成它原本的形式。
dematerialize反转这个过程,将原始Observable发射的Notification对象还原成Observable的通知。
dematerialize默认不在任何特定的调度器 (Scheduler) 上执行。
- Javadoc: dematerialize()
Timestamp
给Observable发射的数据项附加一个时间戳

RxJava中的实现为timestamp,它将一个发射T类型数据的Observable转换为一个发射类型为Timestamped<T>的数据的Observable,每一项都包含数据的原始发射时间。
timestamp默认在immediate调度器上执行,但是可以通过参数指定其它的调度器。
- Javadoc: timestamp()
- Javadoc: timestamp(Scheduler)
Serialize
强制一个Observable连续调用并保证行为正确

一个Observable可以异步调用它的观察者的方法,可能是从不同的线程调用。这可能会让Observable行为不正确,它可能会在某一个onNext调用之前尝试调用onCompleted或onError方法,或者从两个不同的线程同时调用onNext方法。使用Serialize操作符,你可以纠正这个Observable的行为,保证它的行为是正确的且是同步的。
RxJava中的实现是serialize,它默认不在任何特定的调度器上执行。
- Javadoc: serialize()
Replay
保证所有的观察者收到相同的数据序列,即使它们在Observable开始发射数据之后才订阅

可连接的Observable (connectable Observable)与普通的Observable差不多,不过它并不会在被订阅时开始发射数据,而是直到使用了Connect操作符时才会开始。用这种方法,你可以在任何时候让一个Observable开始发射数据。
如果在将一个Observable转换为可连接的Observable之前对它使用Replay操作符,产生的这个可连接Observable将总是发射完整的数据序列给任何未来的观察者,即使那些观察者在这个Observable开始给其它观察者发射数据之后才订阅。

RxJava的实现为replay,它有多个接受不同参数的变体,有的可以指定replay的最大缓存数量,有的还可以指定调度器。
- Javadoc: replay()
- Javadoc: replay(int)
- Javadoc: replay(long,TimeUnit)
- Javadoc: replay(int,long,TimeUnit)

有一种 replay返回一个普通的Observable。它可以接受一个变换函数为参数,这个函数接受原始Observable发射的数据项为参数,返回结果Observable要发射的一项数据。因此,这个操作符其实是replay变换之后的数据项。
- Javadoc: replay(Func1)
- Javadoc: replay(Func1,int)
- Javadoc: replay(Func1,long,TimeUnit)
- Javadoc: replay(Func1,int,long,TimeUnit)
ObserveOn
指定一个观察者在哪个调度器上观察这个Observable

很多ReactiveX实现都使用调度器 “Scheduler”来管理多线程环境中Observable的转场。你可以使用ObserveOn操作符指定Observable在一个特定的调度器上发送通知给观察者 (调用观察者的onNext, onCompleted, onError方法)。

注意:当遇到一个异常时ObserveOn会立即向前传递这个onError终止通知,它不会等待慢速消费的Observable接受任何之前它已经收到但还没有发射的数据项。这可能意味着onError通知会跳到(并吞掉)原始Observable发射的数据项前面,正如图例上展示的。
SubscribeOn操作符的作用类似,但它是用于指定Observable本身在特定的调度器上执行,它同样会在那个调度器上给观察者发通知。
RxJava中,要指定Observable应该在哪个调度器上调用观察者的onNext, onCompleted, onError方法,你需要使用observeOn操作符,传递给它一个合适的Scheduler。
- Javadoc: observeOn(Scheduler)
SubscribeOn
指定Observable自身在哪个调度器上执行

很多ReactiveX实现都使用调度器 “Scheduler”来管理多线程环境中Observable的转场。你可以使用SubscribeOn操作符指定Observable在一个特定的调度器上运转。
ObserveOn操作符的作用类似,但是功能很有限,它指示Observable在一个指定的调度器上给观察者发通知。
在某些实现中还有一个UnsubscribeOn操作符。
- Javadoc: subscribeOn(Scheduler)
- Javadoc: unsubscribeOn(Scheduler)
Do
注册一个动作作为原始Observable生命周期事件的一种占位符

你可以注册回调,当Observable的某个事件发生时,Rx会在与Observable链关联的正常通知集合中调用它。Rx实现了多种操作符用于达到这个目的。
RxJava实现了很多Do操作符的变体。
doOnEach

doOnEach操作符让你可以注册一个回调,它产生的Observable每发射一项数据就会调用它一次。你可以以Action的形式传递参数给它,这个Action接受一个onNext的变体Notification作为它的唯一参数,你也可以传递一个Observable给doOnEach,这个Observable的onNext会被调用,就好像它订阅了原始的Observable一样。
- Javadoc: doOnEach(Action1)
- Javadoc: doOnEach(Observer)
doOnNext

doOnNext操作符类似于doOnEach(Action1),但是它的Action不是接受一个Notification参数,而是接受发射的数据项。
示例代码
Observable.just(1, 2, 3).doOnNext(new Action1<Integer>() {@Overridepublic void call(Integer item) {if( item > 1 ) {throw new RuntimeException( "Item exceeds maximum value" );}}}).subscribe(new Subscriber<Integer>() {@Overridepublic void onNext(Integer item) {System.out.println("Next: " + item);}@Overridepublic void onError(Throwable error) {System.err.println("Error: " + error.getMessage());}@Overridepublic void onCompleted() {System.out.println("Sequence complete.");}});
输出
Next: 1
Error: Item exceeds maximum value
doOnSubscribe

doOnSubscribe操作符注册一个动作,当观察者订阅它生成的Observable它就会被调用。
- Javadoc: doOnSubscribe(Action0)
doOnUnsubscribe

doOnUnsubscribe操作符注册一个动作,当观察者取消订阅它生成的Observable它就会被调用。
- Javadoc: doOnUnsubscribe(Action0)
doOnCompleted

doOnCompleted 操作符注册一个动作,当它产生的Observable正常终止调用onCompleted时会被调用。
- Javadoc: doOnCompleted(Action0)
doOnError

doOnError 操作符注册一个动作,当它产生的Observable异常终止调用onError时会被调用。
- Javadoc: doOnError(Action0)
doOnTerminate

doOnTerminate 操作符注册一个动作,当它产生的Observable终止之前会被调用,无论是正常还是异常终止。
- Javadoc: doOnTerminate(Action0)
finallyDo

finallyDo 操作符注册一个动作,当它产生的Observable终止之后会被调用,无论是正常还是异常终止。
- Javadoc: finallyDo(Action0)
Delay
延迟一段指定的时间再发射来自Observable的发射物

Delay操作符让原始Observable在发射每项数据之前都暂停一段指定的时间段。效果是Observable发射的数据项在时间上向前整体平移了一个增量。
RxJava的实现是 delay和delaySubscription。

第一种delay接受一个定义时长的参数(包括数量和单位)。每当原始Observable发射一项数据,delay就启动一个定时器,当定时器过了给定的时间段时,delay返回的Observable发射相同的数据项。
注意:delay不会平移onError通知,它会立即将这个通知传递给订阅者,同时丢弃任何待发射的onNext通知。然而它会平移一个onCompleted通知。
delay默认在computation调度器上执行,你可以通过参数指定使用其它的调度器。
- Javadoc: delay(long,TimeUnit)
- Javadoc: delay()

另一种delay不实用常数延时参数,它使用一个函数针对原始Observable的每一项数据返回一个Observable,它监视返回的这个Observable,当任何那样的Observable终止时,delay返回的Observable就发射关联的那项数据。
这种delay默认不在任何特定的调度器上执行。
- Javadoc: delay(Func1)

这个版本的delay对每一项数据使用一个Observable作为原始Observable的延时定时器。
这种delay默认不在任何特定的调度器上执行。
- Javadoc: delay(Func0,Func1)

还有一个操作符delaySubscription让你你可以延迟订阅原始Observable。它结合搜一个定义延时的参数。
delaySubscription默认在computation调度器上执行,你可以通过参数指定使用其它的调度器。
- Javadoc: delaySubscription(long,TimeUnit)
- Javadoc: delaySubscription(long,TimeUnit,Scheduler)

还有一个版本的delaySubscription使用一个Obseable而不是一个固定的时长来设置订阅延时。
这种delaySubscription默认不在任何特定的调度器上执行。
- Javadoc: delaySubscription(Func0)
TimeInterval
将一个发射数据的Observable转换为发射那些数据发射时间间隔的Observable

TimeInterval操作符拦截原始Observable发射的数据项,替换为发射表示相邻发射物时间间隔的对象。
RxJava中的实现为timeInterval,这个操作符将原始Observable转换为另一个Obserervable,后者发射一个标志替换前者的数据项,这个标志表示前者的两个连续发射物之间流逝的时间长度。新的Observable的第一个发射物表示的是在观察者订阅原始Observable到原始Observable发射它的第一项数据之间流逝的时间长度。不存在与原始Observable发射最后一项数据和发射onCompleted通知之间时长对应的发射物。
timeInterval默认在immediate调度器上执行,你可以通过传参数修改。
- Javadoc: timeInterval()
- Javadoc: timeInterval(Scheduler)
Using
创建一个只在Observable生命周期内存在的一次性资源

Using操作符让你可以指示Observable创建一个只在它的生命周期内存在的资源,当Observable终止时这个资源会被自动释放。

using操作符接受三个参数:
- 一个用户创建一次性资源的工厂函数
- 一个用于创建Observable的工厂函数
- 一个用于释放资源的函数
当一个观察者订阅using返回的Observable时,using将会使用Observable工厂函数创建观察者要观察的Observable,同时使用资源工厂函数创建一个你想要创建的资源。当观察者取消订阅这个Observable时,或者当观察者终止时(无论是正常终止还是因错误而终止),using使用第三个函数释放它创建的资源。
using默认不在任何特定的调度器上执行。
- Javadoc: using(Func0,Func1,Action1)
First
只发射第一项(或者满足某个条件的第一项)数据

如果你只对Observable发射的第一项数据,或者满足某个条件的第一项数据感兴趣,你可以使用First操作符。
在某些实现中,First没有实现为一个返回Observable的过滤操作符,而是实现为一个在当时就发射原始Observable指定数据项的阻塞函数。在这些实现中,如果你想要的是一个过滤操作符,最好使用Take(1)或者ElementAt(0)。
在一些实现中还有一个Single操作符。它的行为与First类似,但为了确保只发射单个值,它会等待原始Observable终止(否则,不是发射那个值,而是以一个错误通知终止)。你可以使用它从原始Observable获取第一项数据,而且也确保只发射一项数据。
在RxJava中,这个操作符被实现为first,firstOrDefault和takeFirst。
可能容易混淆,BlockingObservable也有名叫first和firstOrDefault的操作符,它们会阻塞并返回值,不是立即返回一个Observable。
还有几个其它的操作符执行类似的功能。
过滤操作符

只发射第一个数据,使用没有参数的first操作符。
示例代码
Observable.just(1, 2, 3).first().subscribe(new Subscriber<Integer>() {@Overridepublic void onNext(Integer item) {System.out.println("Next: " + item);}@Overridepublic void onError(Throwable error) {System.err.println("Error: " + error.getMessage());}@Overridepublic void onCompleted() {System.out.println("Sequence complete.");}});
输出
Next: 1
Sequence complete.
- Javadoc: first()
first(Func1)

传递一个谓词函数给first,然后发射这个函数判定为true的第一项数据。
- Javadoc: first(Func1)
firstOrDefault

firstOrDefault与first类似,但是在Observagle没有发射任何数据时发射一个你在参数中指定的默认值。
- Javadoc: firstOrDefault(T)
firstOrDefault(Func1)

传递一个谓词函数给firstOrDefault,然后发射这个函数判定为true的第一项数据,如果没有数据通过了谓词测试就发射一个默认值。
- Javadoc firstOrDefault(T, Func1)
takeFirst

takeFirst与first类似,除了这一点:如果原始Observable没有发射任何满足条件的数据,first会抛出一个NoSuchElementException,takeFist会返回一个空的Observable(不调用onNext()但是会调用onCompleted)。
- Javadoc: takeFirst(Func1)
single

single操作符也与first类似,但是如果原始Observable在完成之前不是正好发射一次数据,它会抛出一个NoSuchElementException。
- Javadoc: single()
single(Func1)

single的变体接受一个谓词函数,发射满足条件的单个值,如果不是正好只有一个数据项满足条件,会以错误通知终止。
- Javadoc: single(Func1)
singleOrDefault

和firstOrDefault类似,但是如果原始Observable发射超过一个的数据,会以错误通知终止。
- Javadoc: singleOrDefault(T)
singleOrDefault(T,Func1)

和firstOrDefault(T, Func1)类似,如果没有数据满足条件,返回默认值;如果有多个数据满足条件,以错误通知终止。
- Javadoc: singleOrDefault(Func1,T)
first系列的这几个操作符默认不在任何特定的调度器上执行。
To
将Observable转换为另一个对象或数据结构

ReactiveX的很多语言特定实现都有一种操作符让你可以将Observable或者Observable发射的数据序列转换为另一个对象或数据结构。它们中的一些会阻塞直到Observable终止,然后生成一个等价的对象或数据结构;另一些返回一个发射那个对象或数据结构的Observable。
在某些ReactiveX实现中,还有一个操作符用于将Observable转换成阻塞式的。一个阻塞式的Ogbservable在普通的Observable的基础上增加了几个方法,用于操作Observable发射的数据项。
getIterator

getIterator操作符只能用于BlockingObservable的子类,要使用它,你首先必须把原始的Observable转换为一个BlockingObservable。可以使用这两个操作符:BlockingObservable.from或the Observable.toBlocking。
这个操作符将Observable转换为一个Iterator,你可以通过它迭代原始Observable发射的数据集。
- Javadoc: BlockingObservable.getIterator()
toFuture

toFuture操作符也是只能用于BlockingObservable。这个操作符将Observable转换为一个返回单个数据项的Future,如果原始Observable发射多个数据项,Future会收到一个IllegalArgumentException;如果原始Observable没有发射任何数据,Future会收到一个NoSuchElementException。
如果你想将发射多个数据项的Observable转换为Future,可以这样用:myObservable.toList().toBlocking().toFuture()。
- Javadoc: BlockingObservable.toFuture()
toIterable

toFuture操作符也是只能用于BlockingObservable。这个操作符将Observable转换为一个Iterable,你可以通过它迭代原始Observable发射的数据集。
- Javadoc: BlockingObservable.toIterable()
toList

通常,发射多项数据的Observable会为每一项数据调用onNext方法。你可以用toList操作符改变这个行为,让Observable将多项数据组合成一个List,然后调用一次onNext方法传递整个列表。
如果原始Observable没有发射任何数据就调用了onCompleted,toList返回的Observable会在调用onCompleted之前发射一个空列表。如果原始Observable调用了onError,toList返回的Observable会立即调用它的观察者的onError方法。
toList默认不在任何特定的调度器上执行。
- Javadoc: toList()
toMap

toMap收集原始Observable发射的所有数据项到一个Map(默认是HashMap)然后发射这个Map。你可以提供一个用于生成Map的Key的函数,还可以提供一个函数转换数据项到Map存储的值(默认数据项本身就是值)。
toMap默认不在任何特定的调度器上执行。
- Javadoc: toMap(Func1)
- Javadoc: toMap(Func1,Func1)
- Javadoc: toMap(Func1,Func1,Func0)
toMultiMap

toMultiMap类似于toMap,不同的是,它生成的这个Map同时还是一个ArrayList(默认是这样,你可以传递一个可选的工厂方法修改这个行为)。
toMultiMap默认不在任何特定的调度器上执行。
- Javadoc: toMultiMap(Func1)
- Javadoc: toMultiMap(Func1,Func1)
- Javadoc: toMultiMap(Func1,Func1,Func0)
- Javadoc: toMultiMap(Func1,Func1,Func0,Func1)
toSortedList

toSortedList类似于toList,不同的是,它会对产生的列表排序,默认是自然升序,如果发射的数据项没有实现Comparable接口,会抛出一个异常。然而,你也可以传递一个函数作为用于比较两个数据项,这是toSortedList不会使用Comparable接口。
toSortedList默认不在任何特定的调度器上执行。
- Javadoc: toSortedList()
- Javadoc: toSortedList(Func2)
nest

nest操作符有一个特殊的用途:将一个Observable转换为一个发射这个Observable的Observable。
这篇关于RxJava----Utility 辅助操作的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!