用RxJava实现Rxbus替换EventBus事件总线

2024-06-01 07:58

本文主要是介绍用RxJava实现Rxbus替换EventBus事件总线,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

     首先,Rxjava不必多说,可以说和Retrofit是年度最火框架,在GitHub上都已经超过两万star,Eventbus也不必多说,目前大多数开发者大多数项目一定会用到EventBus或者Otto作为事件总线通信库,对于RxJava使用者来说,RxJava也可以轻松实现事件总线,因为它们都依据于观察者模式。本文介绍Rxbus如何完美替换Eventbus,减少APP体积.

不多说,上代码


/*** 作者:JackyZ fenglizizhu* 说明:事件的订阅*/
public class RxBus {private static volatile RxBus mDefaultInstance;private final Subject<Object,Object> mBus;private final Map<Class<?>,Object> mStickEventMap;public RxBus() {//将Subject(PublishSubject)转换为SerializedSubjectmBus = new SerializedSubject<>(PublishSubject.create());mStickEventMap=new ConcurrentHashMap<>();}/*单例*/public static RxBus getDefault(){if(mDefaultInstance==null){synchronized (RxBus.class){if(mDefaultInstance==null){mDefaultInstance=new RxBus();}}}return mDefaultInstance;}/*发送事件*/public void post(Object event){if(mBus == null) {mDefaultInstance=new RxBus();}mBus.onNext(event);}/*订阅事件*/public <T> Observable<T> toObservable(Class<T> eventType){return  mBus.ofType(eventType);//ofType可以根据事件类型发送指定数据}/*发送一个新的Sticky事件*/public void postSticky(Object event){synchronized (mStickEventMap){//线程锁mStickEventMap.put(event.getClass(),event);//将事件类型保存到Map集合}post(event);}/*订阅Sticky事件*/public <T> Observable<T> tObservableStick(final Class<T> eventType){synchronized (mStickEventMap){final Observable<T> observable=mBus.ofType(eventType);//获取发送事件的Observablefinal Object event=mStickEventMap.get(eventType);//根据事件类型作为key查找value对应的valueif(null!=event){return  observable.mergeWith(Observable.create(new Observable.OnSubscribe<T>(){//通过mergeWith合并两个Observable@Overridepublic void call(Subscriber<? super T> subscriber) {//订阅 eventType.cast(event)直接将eventType转换为 T发送subscriber.onNext(eventType.cast(event));}}));}else {return observable;//如果没有sticky 就返回observable}}}/*根据eventType获取事件*/public <T> T getStickEvent(Class<T> eventType){synchronized (mStickEventMap){return  eventType.cast(mStickEventMap.get(eventType));}}/*移除指定类型的eventType的Sticky事件*/public <T> T removeStickyEvent(Class<T> eventType){synchronized (mStickEventMap){return eventType.cast(mStickEventMap.remove(eventType));}}/*移除所有的Sticky事件*/public void removeAllStickyEvents(){synchronized (mStickEventMap){mStickEventMap.clear();}}

开始分析

注:
1、Subject同时充当了Observer和Observable的角色,Subject是非线程安全的,要避免该问题,需要将 Subject转换为一个 SerializedSubject ,上述RxBus类中把线程非安全的PublishSubject包装成线程安全的Subject。

2、PublishSubject只会把在订阅发生的时间点之后来自原始Observable的数据发射给观察者。

3、ofType操作符只发射指定类型的数据,其内部就是filter+cast

public final <R> Observable<R> ofType(final Class<R> klass) {return filter(new Func1<T, Boolean>() {@Overridepublic final Boolean call(T t) {return klass.isInstance(t);}}).cast(klass);
}

filter操作符可以使你提供一个指定的测试数据项,只有通过测试的数据才会被“发射”。
cast操作符可以将一个Observable转换成指定类型的Observable。

分析:


RxBus工作流程图


1、首先创建一个可同时充当Observer和Observable的Subject;

2、在需要接收事件的地方,订阅该Subject(此时Subject是作为Observable),在这之后,一旦Subject接收到事件,立即发射给该订阅者;

3、在我们需要发送事件的地方,将事件post至Subject,此时Subject作为Observer接收到事件(onNext),然后会发射给所有订阅该Subject的订阅者。

对于RxBus的使用,就和普通的RxJava订阅事件很相似了。
先看发送事件的代码:

RxBus.getDefault().post(new UserEvent (1, "yoyo"));

userEvent是要发送的事件,如果你用过EventBus, 很容易理解,UserEvent的代码:

public class UserEvent {long id;String name;public UserEvent(long id,String name) {this.id= id;this.name= name;}public long getId() {return id;}public String getName() {return name;}
}

再看接收事件的代码:

// rxSubscription是一个Subscription的全局变量,这段代码可以在onCreate/onStart等生命周期内
rxSubscription = RxBus.getDefault().toObserverable(UserEvent.class).subscribe(new Action1<UserEvent>() {@Overridepublic void call(UserEvent userEvent) {long id = userEvent.getId();String name = userEvent.getName();...}},new Action1<Throwable>() {@Overridepublic void call(Throwable throwable) {// TODO: 处理异常}        });

最后,一定要记得在生命周期结束的地方取消订阅事件,防止RxJava可能会引起的内存泄漏问题。

@Override
protected void onDestroy() {super.onDestroy();if(!rxSubscription.isUnsubscribed()) {rxSubscription.unsubscribe();}
}


如果你的项目已经开始使用RxJava,建议可以把EventBus或Otto替换成RxBus,减小项目体积。


支持Sticky事件

在Android开发中,Sticky事件只指事件消费者在事件发布之后才注册的也能接收到该事件的特殊类型。Android中就有这样的实例,也就是Sticky Broadcast,即粘性广播。正常情况下如果发送者发送了某个广播,而接收者在这个广播发送后才注册自己的Receiver,这时接收者便无法接收到刚才的广播,为此Android引入了StickyBroadcast,在广播发送结束后会保存刚刚发送的广播(Intent),这样当接收者注册完Receiver后就可以接收到刚才已经发布的广播。这就使得我们可以预先处理一些事件,让有消费者时再把这些事件投递给消费者。

Subject

我们在实现简单的RxBus时使用了PublishSubject,其实RxJava提供给开发者4种Subject:
PublishSubject,BehaviorSubject ,BehaviorSubject,AsyncSubject。

  1. PublishSubject只会给在订阅者订阅的时间点之后的数据发送给观察者。

  2. BehaviorSubject在订阅者订阅时,会发送其最近发送的数据(如果此时还没有收到任何数据,它会发送一个默认值)。

  3. ReplaySubject在订阅者订阅时,会发送所有的数据给订阅者,无论它们是何时订阅的。

  4. AsyncSubject只在原Observable事件序列完成后,发送最后一个数据,后续如果还有订阅者继续订阅该Subject, 则可以直接接收到最后一个值。


Subject

从上图来看,似乎BehaviorSubjectReplaySubject具备Sticky的特性。

BehaviorSubject方案

BehaviorSubject似乎完全符合Sticky的定义,但是你发现了它只能保存最近的那个事件。

有这样的场景:如果订阅者A订阅了Event1,订阅者B订阅了Event2,而此时BehaviorSubject事件队列里是[..., Event2, Event1],当订阅者订阅时,因为保存的是最近的事件:即Event1,所以订阅者B是接收不到Event2的。

解决办法就是:
每个Event类型都各自创建一个对应的BehaviorSubject,这样的话资源开销比较大,并且该Sticky事件总线和普通的RxBus事件总线不能共享,即:普通事件和Sticky事件是独立的,因为普通事件是基于PublishSubject, 暂时放弃该方案!

ReplaySubject方案

ReplaySubject可以保存发送过的所有数据事件。

因为保存了所有的数据事件,所以不管什么类型的Event,我们只要过滤该类型,并让其发送最近的那个Event即可满足Sticky事件了。但是获取最近的对应事件是个难点,因为最符合需求的操作符takeLast()仅在订阅事件结束时(即:onCompleted())才会发送最后的那个数据事件,而我们的RxBus正常情况下应该是尽量避免其订阅事件结束的。(我没能找到合适的操作符,如果你知道,请告知我)

所以BehaviorSubject也是比较难实现Sticky特性的。

并且,不管是BehaviorSubject还是ReplaySubject,它们还有一个棘手的问题:它们实现的EventBus和普通的RxBus(基于PublishSubject)之间的数据是相互独立的!

总结:BehaviorSubjectBehaviorSubject都不能天然适合Sticky事件......

使用Map实现Sticky

该方法思路是在原来PublishSubject实现的RxBus基础上,使用ConcurrentHashMap<事件类型,事件>保存每个事件的最近事件,不仅能实现Sticky特性,最重要的是可以和普通RxBus的事件数据共享,不独立

因为我们的RxBus是基于PublishSubject的,而RxJava又有4种Subject,而且其中的BehaviorSubjectReplaySubject看起来又符合Sticky特性,所以我们可能会钻这个牛角尖,理所当然的认为实现Sticky需要通过其他类型的Subject.... (好吧,我钻进去了...)

这个方案的思路我是根据EventBus的实现想到的,下面是大致流程:

  1. Map的初始化:

    private final Map<Class<?>, Object> mStickyEventMap;public RxBus() {mBus = new SerializedSubject<>(PublishSubject.create());mStickyEventMap = new ConcurrentHashMap<>();}

    ConcurrentHashMap是一个线程安全的HashMap, 采用stripping lock(分离锁),效率比HashTable高很多。

  2. 在我们postSticky(Event)时,存入Map中:

    public void postSticky(Object event) {synchronized (mStickyEventMap) {mStickyEventMap.put(event.getClass(), event);} post(event); 
    }
  3. 订阅时toObservableSticky(Class<T> eventType),先从Map中寻找是否包含该类型的事件,如果没有,则说明没有Sticky事件要发送,直接订阅Subject(此时作为被观察者Observable);如果有,则说明有Sticky事件需要发送,订阅merge(Subject 和 Sticky事件)

    public <T> Observable<T> toObservableSticky(final Class<T> eventType) {synchronized (mStickyEventMap) {Observable<T> observable = mBus.ofType(eventType);final Object event = mStickyEventMap.get(eventType);if (event != null) {return observable.mergeWith(Observable.create(new Observable.OnSubscribe<T>() {@Overridepublic void call(Subscriber<? super T> subscriber) {subscriber.onNext(eventType.cast(event));}}));} else {return observable;}}}

    merge操作符:可以将多个Observables合并,就好像它们是单个的Observable一样。

这样,Sticky的核心功能就完成了,使用上和普通RxBus一样,通过postSticky()发送事件,toObservableSticky()订阅事件。

除此之外,我还提供了getStickyEvent(Class<T> eventType),removeStickyEvent(Class<T> eventType),removeAllStickyEvents()方法,供查找、移除对应事件类型的事件、移除全部Sticky事件。

在使用Sticky特性时,在不需要某Sticky事件时, 通过removeStickyEvent(Class<T> eventType)移除它,最保险的做法是:在主Activity的onDestroyremoveAllStickyEvents()
因为我们的RxBus是个单例静态对象,再正常退出app时,该对象依然会存在于JVM,除非进程被杀死,这样的话导致StickyMap里的数据依然存在,为了避免该问题,需要在app退出时,清理StickyMap。

// 主Activity(一般是栈底Activity)
@Override
protected void onDestroy() {super.onDestroy();// 移除所有Sticky事件RxBus.getDefault().removeAllStickyEvents();
}



 


异常处理

在使用RxBus过程中,你会发现你订阅了某个事件后,在后续接收到该事件时,如果处理的过程中发生了异常,你会发现后续的事件再也接收不到了,除非你重新订阅!

原因在于RxJava的事件序列机制,一个订阅事件是以onCompleted()或者onError()作为结束的,即:一旦订阅者的onCompleted()onError()被调用,订阅者和被订阅者的订阅关系就解除了。

这里说下RxJava的异常传递机制onError()在Observable序列传递过程中出现任何异常时被调用,然后终止Observable事件序列的传递,以此通知所有的订阅者发生了一个不可恢复的错误,即:异常总会传递到订阅者。

这本是RxJava的一个优点,反而在事件总线的场景下,成了让人头疼的问题!

所以我们的RxBus的订阅者在处理订阅事件时,一旦发生了异常,而又没Catch,那么最终都会调用到onError(),而一旦走到onError(),就意味着这个订阅者和该Subject解除了订阅关系,因此再也收不到后续发出的事件了

我目前想到2种方案

解决方案1:自动重新订阅

即在onError(e)或onCompleted()发生时,立即重新订阅,保证订阅事件在解决时可以立即恢复。

    private void subscribeEvent(){RxBus.getDefault().toObservable(Event.class)// 使用操作符过程中,无需try,catch,直接使用.subscribe(new Subscriber<Event>() {@Overridepublic void onCompleted() {subscribeEvent();}@Overridepublic void onError(Throwable e) {e.printStackTrace();subscribeEvent();}@Overridepublic void onNext(Event event) {// 直接处理接收的事件}});}

注意:这个方案仅可以用于普通RxBus,绝不能用于Sticky事件! 
原因在于Sticky的粘性特性,引起error的事件如果重新订阅的话,该事件很可能继续导致error,从而引起死循环!

解决方案2:捕捉

不管是普通RxBus还是使用Sticky的RxBus,通用的解决方案就是捕捉异常:在任何操作符内的ActionX,FuncX方法以及onNext(T t)内使用try,catch处理。

 RxBus.getDefault().toObservableSticky(EventSticky.class)        // 建议在Sticky时,在操作符内主动try,catch        .map(new Func1<EventSticky, EventSticky>() {@Overridepublic EventSticky call(EventSticky eventSticky) {try {// 变换操作} catch (Exception e) {e.printStackTrace();}return eventSticky;}}).subscribe(new Action1<EventSticky>() {@Overridepublic void call(EventSticky eventSticky) {try {// 处理接收的事件} catch (Exception e) {e.printStackTrace();}}
});

这篇关于用RxJava实现Rxbus替换EventBus事件总线的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1020428

相关文章

Nginx 配置跨域的实现及常见问题解决

《Nginx配置跨域的实现及常见问题解决》本文主要介绍了Nginx配置跨域的实现及常见问题解决,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来... 目录1. 跨域1.1 同源策略1.2 跨域资源共享(CORS)2. Nginx 配置跨域的场景2.1

Python中提取文件名扩展名的多种方法实现

《Python中提取文件名扩展名的多种方法实现》在Python编程中,经常会遇到需要从文件名中提取扩展名的场景,Python提供了多种方法来实现这一功能,不同方法适用于不同的场景和需求,包括os.pa... 目录技术背景实现步骤方法一:使用os.path.splitext方法二:使用pathlib模块方法三

javax.net.ssl.SSLHandshakeException:异常原因及解决方案

《javax.net.ssl.SSLHandshakeException:异常原因及解决方案》javax.net.ssl.SSLHandshakeException是一个SSL握手异常,通常在建立SS... 目录报错原因在程序中绕过服务器的安全验证注意点最后多说一句报错原因一般出现这种问题是因为目标服务器

CSS实现元素撑满剩余空间的五种方法

《CSS实现元素撑满剩余空间的五种方法》在日常开发中,我们经常需要让某个元素占据容器的剩余空间,本文将介绍5种不同的方法来实现这个需求,并分析各种方法的优缺点,感兴趣的朋友一起看看吧... css实现元素撑满剩余空间的5种方法 在日常开发中,我们经常需要让某个元素占据容器的剩余空间。这是一个常见的布局需求

HTML5 getUserMedia API网页录音实现指南示例小结

《HTML5getUserMediaAPI网页录音实现指南示例小结》本教程将指导你如何利用这一API,结合WebAudioAPI,实现网页录音功能,从获取音频流到处理和保存录音,整个过程将逐步... 目录1. html5 getUserMedia API简介1.1 API概念与历史1.2 功能与优势1.3

Java实现删除文件中的指定内容

《Java实现删除文件中的指定内容》在日常开发中,经常需要对文本文件进行批量处理,其中,删除文件中指定内容是最常见的需求之一,下面我们就来看看如何使用java实现删除文件中的指定内容吧... 目录1. 项目背景详细介绍2. 项目需求详细介绍2.1 功能需求2.2 非功能需求3. 相关技术详细介绍3.1 Ja

springboot项目中整合高德地图的实践

《springboot项目中整合高德地图的实践》:本文主要介绍springboot项目中整合高德地图的实践,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一:高德开放平台的使用二:创建数据库(我是用的是mysql)三:Springboot所需的依赖(根据你的需求再

spring中的ImportSelector接口示例详解

《spring中的ImportSelector接口示例详解》Spring的ImportSelector接口用于动态选择配置类,实现条件化和模块化配置,关键方法selectImports根据注解信息返回... 目录一、核心作用二、关键方法三、扩展功能四、使用示例五、工作原理六、应用场景七、自定义实现Impor

SpringBoot3应用中集成和使用Spring Retry的实践记录

《SpringBoot3应用中集成和使用SpringRetry的实践记录》SpringRetry为SpringBoot3提供重试机制,支持注解和编程式两种方式,可配置重试策略与监听器,适用于临时性故... 目录1. 简介2. 环境准备3. 使用方式3.1 注解方式 基础使用自定义重试策略失败恢复机制注意事项

使用Python和OpenCV库实现实时颜色识别系统

《使用Python和OpenCV库实现实时颜色识别系统》:本文主要介绍使用Python和OpenCV库实现的实时颜色识别系统,这个系统能够通过摄像头捕捉视频流,并在视频中指定区域内识别主要颜色(红... 目录一、引言二、系统概述三、代码解析1. 导入库2. 颜色识别函数3. 主程序循环四、HSV色彩空间详解