支持RxJava2的RxBus简单封装

介绍基于RxJava2的RxBus封装示例

由于网络上搜索RxBus的文章都只支持RxJava1,而RxJava2已经在16年底正式发布了,我们现在来支持一下RxJava2版本的RxBus

引入

dependencies { 
    // rxjava and rxandroid 
    compile 'io.reactivex.rxjava2:rxjava:2.0.4' 
    compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
}

RxBus类

package com.eggsy.framework.bus;

import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.Observable;
import io.reactivex.Scheduler;
import io.reactivex.disposables.CompositeDisposable;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Consumer;
import io.reactivex.internal.operators.flowable.FlowableOnBackpressureError;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.processors.FlowableProcessor;
import io.reactivex.processors.PublishProcessor;
import io.reactivex.subjects.PublishSubject;
import io.reactivex.subjects.Subject;

/**
 * Created by eggsy on 17-1-6.
 */
public class RxBus {

    private static RxBus instance;

    private Subject<Object> subjectBus;

    private FlowableProcessor<Object> processorBus;

    public static RxBus getDefault() {
        if (instance == null) {
            synchronized (RxBus.class) {
                if (instance == null) {
                    RxBus tempInstance = new RxBus();
                    tempInstance.subjectBus = PublishSubject.create().toSerialized();
                    tempInstance.processorBus = PublishProcessor.create().toSerialized();
                    instance = tempInstance;
                }
            }
        }
        return instance;
    }

    public Disposable register(Class eventType, Consumer observer) {
        return toObserverable(eventType).subscribe(observer);
    }

    public Disposable register(Class eventType, Consumer observer, Scheduler scheduler) {
        return toObserverable(eventType).observeOn(scheduler).subscribe(observer);
    }

    public Disposable register(Class eventType, Consumer observer,Scheduler scheduler, BackpressureStrategy strategy){
        Flowable o = toFlowable(eventType);
        switch (strategy) {
            case DROP:
                o = o.onBackpressureDrop();
            case LATEST:
                o = o.onBackpressureLatest();
            case MISSING:
                o = o;
            case ERROR:
                o = RxJavaPlugins.onAssembly(new FlowableOnBackpressureError<>(o));
            default:
                o = o.onBackpressureBuffer();
        }
        if(scheduler!=null){
            o.observeOn(scheduler);
        }
        return o.subscribe(observer);
    }

    public Disposable register(Class eventType, Consumer observer,BackpressureStrategy strategy){
        return register(eventType,observer,null,strategy);
    }

    public void unRegister(Disposable disposable) {
        if (disposable != null && !disposable.isDisposed()) {
            disposable.dispose();
        }
    }

    public void unRegister(CompositeDisposable compositeDisposable) {
        if (compositeDisposable != null) {
            compositeDisposable.dispose();
        }
    }

    public void post(final Object event) {
        subjectBus.onNext(event);
        processorBus.onNext(event);
    }

    private Observable toObserverable(Class cls) {
        return subjectBus.ofType(cls);
    }

    private Flowable toFlowable(Class cls) {
        return processorBus.ofType(cls);
    }

    public boolean hasObservers() {
        return subjectBus.hasObservers();
    }

    public boolean hasSubscribers() {
        return processorBus.hasSubscribers();
    }

}

以上是对RxJava2的RxBus的简单封装,在此基础上可以使用apt来对类进行进一步的封装,请参考我的另外一篇文章基于APT的RxBus库使用,最后我将它放到github上并且补充相应的示例,欢迎fork和star写的不好或者不对的地方,欢迎留言评论交流~~~