本文主要是介绍RxJava(11-线程调度Scheduler),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
版权声明:本文为openXu原创文章【openXu的博客】,未经博主允许不得以任何形式转载
目录:
文章目录
- 1. 使用示例
- 2. subscribeOn()原理
- 多次subscribeOn()的情况
- 3. observeOn原理
- 4. 调度器的种类
RxJava中 使用
observeOn
和
subscribeOn
操作符,你可以让
Observable
在一个特定的调度器上执行,
observeOn
指示一个
Observable
在一个特定的调度器上调用观察者的
onNext
,
onError
和
onCompleted
方法,
subscribeOn
则指示
Observable
将全部的处理过程(包括发射数据和通知)放在特定的调度器上执行。
1. 使用示例
先看看下面的例子,体验一下在RxJava中 如何使用线程的切换:
private void logThread(Object obj, Thread thread){Log.v(TAG, "onNext:"+obj+" -"+Thread.currentThread().getName());
}
Observable.OnSubscribe onSub = new Observable.OnSubscribe<Integer>() {@Overridepublic void call(Subscriber<? super Integer> subscriber) {Log.v(TAG, "OnSubscribe -"+Thread.currentThread());subscriber.onNext(1);subscriber.onCompleted();}
};
Log.v(TAG, "--------------①-------------");
Observable.create(onSub).subscribe(integer->logThread(integer, Thread.currentThread()));
Log.v(TAG, "--------------②-------------");
Observable.create(onSub).subscribeOn(Schedulers.io()).subscribe(integer->logThread(integer, Thread.currentThread()));
Log.v(TAG, "--------------③-------------");
Observable.create(onSub).subscribeOn(Schedulers.newThread()).subscribe(integer->logThread(integer, Thread.currentThread()));
Log.v(TAG, "--------------④-------------");
Observable.create(onSub).subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread()).subscribe(integer->logThread(integer, Thread.currentThread()));
Log.v(TAG, "--------------⑤-------------");
Observable.create(onSub).subscribeOn(Schedulers.newThread()).observeOn(Schedulers.newThread()).subscribe(integer->logThread(integer, Thread.currentThread()));
Log.v(TAG, "--------------⑥-------------");
Observable.interval(100, TimeUnit.MILLISECONDS).take(1).subscribe(integer->logThread(integer, Thread.currentThread()));
/*输出:--------------①-------------OnSubscribe -Thread[main,5,main]onNext:1 -Thread[main,5,main]--------------②-------------OnSubscribe -Thread[RxIoScheduler-2,5,main]onNext:1 -Thread[RxIoScheduler-2,5,main]--------------③-------------OnSubscribe -Thread[RxNewThreadScheduler-1,5,main]onNext:1 -Thread[RxNewThreadScheduler-1,5,main]--------------④-------------OnSubscribe -Thread[RxNewThreadScheduler-2,5,main]onNext:1 -Thread[main,5,main]--------------⑤-------------OnSubscribe -Thread[RxNewThreadScheduler-4,5,main]onNext:1 -Thread[RxNewThreadScheduler-3,5,main]--------------⑥-------------onNext:0 -RxComputationScheduler-3*/
从上面的输出结果中,我们大概知道了下面几点:
①. RxJava中已经封装了多种调度器,不同的调度器可以指定在不同的线程中执行和观察
②. create创建的Observable默认在当前线程(主线程)中执行任务流,并在当前线程观察
③. interval创建的Observable会在一个叫Computation的线程中执行任务流和观察任务流
④. 除了observeOn和subscribeOn ,使用其他创建或者变换操作符也有可能造成线程的切换
2. subscribeOn()原理
subscribeOn()
用来指定Observable
在哪个线程中执行事件流,也就是指定Observable
中OnSubscribe
(计划表)的call
方法在那个线程发射数据。下面通过源码分析subscribeOn
是怎样实现线程的切换的。
下面看看subscribeOn
方法:
public final Observable<T> subscribeOn(Scheduler scheduler) {if (this instanceof ScalarSynchronousObservable) {return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);}return create(new OperatorSubscribeOn<T>(this, scheduler));
}
我们看到他创建了一个新的Observable
,并为新的Observable
创建了新的计划表OperatorSubscribeOn
对象,新的计划表保存了原始Observable
对象和调度器scheduler
。接着我们看看OperatorSubscribeOn
:
public final class OperatorSubscribeOn<T> implements Observable.OnSubscribe<T> {final Scheduler scheduler; //调度器final Observable<T> source; //原始Observable//①.原始观察者订阅了新的Observable后,将执行此call方法@Overridepublic void call(final Subscriber<? super T> subscriber) {final Scheduler.Worker inner = scheduler.createWorker();subscriber.add(inner);//②. call方法中使用传入的调度器创建的Worker对象的schedule方法切换线程inner.schedule(new Action0() {@Overridepublic void call() {final Thread t = Thread.currentThread();//③ .创建了一个新的观察者Subscriber<T> s = new Subscriber<T>(subscriber) {@Overridepublic void onNext(T t) {//⑤. 新的观察者收到数据后直接发送给原始观察者subscriber.onNext(t);}...};//④. 在切换的线程中,新的观察者订阅原始Observable,用来接收数据source.unsafeSubscribe(s);}});}
}
上面源码中注释已经写的很清楚了,OperatorSubscribeOn
其实就是一个普通的任务表,用于为新的Observable
发射数据,只是不是真正的发射,它创建了一个新的观察者订阅原始Observable
,这样就可以接受原始Observable
发射的数据,然后直接发送给原始观察者。
在call
方法中通过scheduler.createWorker().schedule()
完成线程的切换,这里就牵扯到两个对象了,Scheduler
和Worker
,不要着急,一个个的看,先看Scheduler
,由于RxJava中有多种调度器,我们就看一个简单的Schedulers.newThread()
,其他调度器的思路是一样的,下面一步一步看源码:
public final class Schedulers {//各种调度器对象private final Scheduler computationScheduler;private final Scheduler ioScheduler;private final Scheduler newThreadScheduler;//单例,Schedulers被加载的时候,上面的各种调度器对象已经初始化private static final Schedulers INSTANCE = new Schedulers();//构造方法private Schedulers() {RxJavaSchedulersHook hook = RxJavaPlugins.getInstance().getSchedulersHook();...Scheduler nt = hook.getNewThreadScheduler();if (nt != null) {newThreadScheduler = nt;} else {//①.创建newThreadScheduler对象newThreadScheduler = RxJavaSchedulersHook.createNewThreadScheduler();}}//②. 获取NewThreadScheduler对象public static Scheduler newThread() {return INSTANCE.newThreadScheduler;}...
}
Schedulers
中保存了多种调度器对象,在Schedulers
被加载的时候,他们就被初始化了,Schedulers
就像是一个调度器的管理器,接着跟踪RxJavaSchedulersHook.createNewScheduler()
,最终会找到一个叫NewThreadScheduler
的类:
public final class NewThreadScheduler extends Scheduler {private final ThreadFactory threadFactory;public NewThreadScheduler(ThreadFactory threadFactory) {this.threadFactory = threadFactory;}@Overridepublic Worker createWorker() {return new NewThreadWorker(threadFactory);}
}
NewThreadScheduler
就是我们调用subscribeOn(Schedulers.newThread() )
传入的调度器对象,每个调度器对象都有一个createWorker
方法用于创建一个Worker
对象,而NewThreadScheduler
对应创建的Worker
是一个叫NewThreadWorker
的对象,在新产生的OperatorSubscribeOn
计划表中就是通过NewThreadWorker.schedule(Action0)
实现线程的切换,下面我们跟踪schedule(Action0)
方法:
public class NewThreadWorker extends Scheduler.Worker implements Subscription {private final ScheduledExecutorService executor; //public NewThreadWorker(ThreadFactory threadFactory) {//创建一个线程池ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, threadFactory);executor = exec;}@Overridepublic Subscription schedule(final Action0 action) {return schedule(action, 0, null);}@Overridepublic Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {return scheduleActual(action, delayTime, unit);}//重要:worker.schedule()最终调用的是这个方法public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) {//return action;Action0 decoratedAction = schedulersHook.onSchedule(action);//ScheduledAction就是一个Runnable对象,在run()方法中调用了Action0.call()ScheduledAction run = new ScheduledAction(decoratedAction);Future<?> f;if (delayTime <= 0) {f = executor.submit(run); //将Runnable对象放入线程池中} else {f = executor.schedule(run, delayTime, unit); //延迟执行}run.add(f);return run;}...
}
我们发现OperatorSubscribeOn
计划表中通过NewThreadWorker.schedule(Action0)
,将Action0
放入到一个线程池中执行,这样就实现了线程的切换。
通过上面的分析,我们知道subscribeOn
是怎样将任务表放入线程池中执行的,感觉还是有点绕,看下图:
多次subscribeOn()的情况
我们发现,每次使用subscribeOn
都会产生一个新的Observable
,并产生一个新的计划表OnSubscribe
,目标Subscriber最后订阅的将是最后一次subscribeOn
产生的新的Observable
。在每个新的OnSubscribe
的call
方法中都会有一个产生一个新的线程,在这个新线程中订阅上一级Observable
,并创建一个新的Subscriber
接受数据,最终原始Observable
将在第一个新线程中发射数据,然后传送给给下一个新的观察者,直到传送到目标观察者,所以多次调用subscribeOn
只有第一个起作用(这只是表面现象,其实每个subscribeOn
都切换了线程,只是最终目标Observable
是在第一个subscribeOn
产生的线程中发射数据的)。看下图:
多次subscribeOn()
只有第一个会起作用,后面的只是在第一个的基础上在外面套了一层壳,就像下面的伪代码,最后执行是在第一个新线程中执行:
...
//第3个subscribeOn产生的新线程
new Thread(){@Overridepublic void run() {Subscriber s1 = new Subscriber();//第2个subscribeOn产生的新线程new Thread(){@Overridepublic void run() {Subscriber s2 = new Subscriber();//第1个subscribeOn产生的新线程new Thread(){@Overridepublic void run() {Subscriber<T> s3 = new Subscriber<T>(subscriber) {@Overridepublic void onNext(T t) {subscriber.onNext(t);}...};//①. 最后一个新观察者订阅原始Observable原始Observable.subscribe(s3);//②. 原始Observable将在此线程中发射数据//③. 最后一个新的观察者s3接受数据//④. s3收到数据后,直接发送给s2,s2收到数据后传给s1,...最后目标观察者收到数据} }.start();}}.start();}
}.start();
3. observeOn原理
observeOn
调用的是lift
操作符,lift
操作符在上一篇博客中讲过。lift
操作符创建了一个代理的Observable
,用于接收原始Observable
发射的数据,然后在Operator
中对数据做一些处理后传递给目标Subscriber
。
observeOn
一样创建了一个代理的Observable
,并创建一个代理观察者接受上一级Observable
的数据,代理观察者收到数据之后会开启一个线程,在新的线程中,调用下一级观察者的onNext
、onCompete
、onError
方法。
我们看看observeOn
操作符的源码:
public final class OperatorObserveOn<T> implements Observable.Operator<T, T> {private final Scheduler scheduler;//创建代理观察者,用于接收上一级Observable发射的数据@Overridepublic Subscriber<? super T> call(Subscriber<? super T> child) {if (scheduler instanceof ImmediateScheduler) {return child;} else if (scheduler instanceof TrampolineScheduler) {return child;} else {ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize);parent.init();return parent;}}//代理观察者private static final class ObserveOnSubscriber<T> extends Subscriber<T> implements Action0 {final Subscriber<? super T> child;final Scheduler.Worker recursiveScheduler;final NotificationLite<T> on;final Queue<Object> queue;//接受上一级Observable发射的数据@Overridepublic void onNext(final T t) {if (isUnsubscribed() || finished) {return;}if (!queue.offer(on.next(t))) {onError(new MissingBackpressureException());return;}schedule();}@Overridepublic void onCompleted() {...schedule();}@Overridepublic void onError(final Throwable e) {...schedule();}//开启新线程处理数据protected void schedule() {if (counter.getAndIncrement() == 0) {recursiveScheduler.schedule(this);}}// only execute this from schedule()//在新线程中将数据发送给目标观察者@Overridepublic void call() {long missed = 1L;long currentEmission = emitted;final Queue<Object> q = this.queue;final Subscriber<? super T> localChild = this.child;final NotificationLite<T> localOn = this.on;for (;;) {while (requestAmount != currentEmission) {...localChild.onNext(localOn.getValue(v));}}}}
}
可以发现,observeOn操作符对它后面的操作产生影响,比如下面一段代码:
Observable.just(100).subscribeOn(Schedulers.computation()) //Computation线程中发射数据.map(integer -> {return "map1-"+integer;}) //Computation线程中接受数据.observeOn(Schedulers.io()) //②. 切换.map(integer -> {return "map2-"+integer;}) //io线程中接受数据,由②决定.observeOn(Schedulers.newThread()) //③. 切换.map(integer -> {return "map3-"+integer;}) //newThread线程中接受数据,由③决定.observeOn(AndroidSchedulers.mainThread()) //④. 切换.delay(1000, TimeUnit.MILLISECONDS) //主线程中接受数据,由④决定.subscribe(str -> logThread(str, Thread.currentThread())); //Computation线程中接受数据,由④决定/*说明:最后目标观察者将在Computation线程中接受数据,这取决于delay操作符,delay操作符是在Computation线程中执行的,执行完后就会将数据发送给目标观察者。而他上面的observeOn将决定于delay产生的代理观察者在主线程中接受数据*//*输出:onNext:map3-map2-map1-100 -RxComputationScheduler-3*/
只要涉及到lift
操作符,其实就是生成了一套代理的Subscriber
(观察者)、Observable
(被观察者)和OnSubscribe
(计划表)。Observable
最典型的特征就是链式调用,我们暂且将每一步操作称为一级。代理的OnSubscribe
中的call
方法就是让代理Subscriber
订阅上一级Observable
,直到订阅到原始Observable
发射数据,代理Subscriber
收到数据后,可能对数据做一些操作也有可能切换线程,然后将数据传送给下一级Subscriber
,直到目标观察者接收到数据,目标观察者在那个线程接受数据取决于上一个Subscriber
在哪一个线程调用目标观察者的方法。示意图如下:
4. 调度器的种类
RxJava中可用的调度器有下面几种:
调度器类型 | 效果 |
---|---|
Schedulers.computation( ) | 用于计算任务,如事件循环或和回调处理,不要用于IO操作(IO操作请使用Schedulers.io());默认线程数等于处理器的数量 |
Schedulers.from(executor) | 使用指定的Executor作为调度器 |
Schedulers.immediate( ) | 在当前线程立即开始执行任务 |
Schedulers.io( ) | 用于IO密集型任务,如异步阻塞IO操作,这个调度器的线程池会根据需要增长;对于普通的计算任务,请使用Schedulers.computation();Schedulers.io( )默认是一个CachedThreadScheduler,很像一个有线程缓存的新线程调度器 |
Schedulers.newThread( ) | 为每个任务创建一个新线程 |
Schedulers.trampoline( ) | 当其它排队的任务完成后,在当前线程排队开始执行 |
在**RxAndroid**中新增了一个: | 调度器类型| 效果 | | ------------- |-------------| |AndroidSchedulers.mainThread( )|主线程,UI线程,可以用于更新界面|
#5. 各种操作符的默认调度器
在之前学习各种操作符的时候,都会介绍xx操作符默认在xxx调度器上执行,当时可能不太注意这是什么意思,下面总结了一些操作符默认的调度器:
操作符 | 调度器 |
---|---|
buffer(timespan) | computation |
buffer(timespan, count) | computation |
buffer(timespan, timeshift) | computation |
debounce(timeout, unit) | computation |
delay(delay, unit) | computation |
delaySubscription(delay, unit) | computation |
interval | computation |
repeat | trampoline |
replay(time, unit) | computation |
replay(buffersize, time, unit) | computation |
replay(selector, time, unit) | computation |
replay(selector, buffersize, time, unit) | computation |
retrytrampolinesample(period, unit) | computation |
skip(time, unit) | computation |
skipLast(time, unit) | computation |
take(time, unit) | computation |
takeLast(time, unit) | computation |
takeLast(count, time, unit) | computation |
takeLastBuffer(time, unit) | computation |
takeLastBuffer(count, time, unit) | computation |
throttleFirst | computation |
throttleLast | computation |
throttleWithTimeout | computation |
timeInterval | immediate |
timeout(timeoutSelector) | immediate |
timeout(firstTimeoutSelector, timeoutSelector) | immediate |
timeout(timeoutSelector, other) | immediate |
timeout(timeout, timeUnit) | computation |
timeout(firstTimeoutSelector, timeoutSelector, other) | immediate |
timeout(timeout, timeUnit, other) | computation |
timer | computation |
timestamp | immediate |
window(timespan) | computation |
window(timespan, count) | computation |
window(timespan, timeshift) | computation |
#源码下载:
https://github.com/openXu/RxJavaTest
这篇关于RxJava(11-线程调度Scheduler)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!