您的位置:首页 > 编程语言 > Java开发

使用RxJava实现EventBus

2016-03-16 11:51 489 查看

1.EventBus简介

EventBus主要功能是替代Intent、Handler、BroadCast在Fragment、Activity、Service、线程之间传递消息的通信库。优点是开销小,代码简洁,解耦代码。EventBus是一个观察者模式的实现,所以它具有以下三个要素:

- Event:事件,可以使任意类型对象。

- Subscriber:事件订阅者,接收特定的事件。

- Publisher:事件发布者,用于通知Subscriber有事件发生。

官方的图可以很好的说明EventBus的架构:



2.用RxJava实现EventBus

public class RxBus {
public static synchronized RxBus get() {
return RxBusGenerator.instance;
}

private static class RxBusGenerator {
private static RxBus instance = new RxBus();
}

private RxBus() {
}

private ConcurrentHashMap<Object, Vector<Subject>> subjectMapper = new ConcurrentHashMap<>();

public <T> Observable<T> register(@NonNull Class<T> clazz) {
return register(clazz.getName(), clazz);
}

public <T> Observable<T> register(@NonNull Object tag, @NonNull Class<T> clazz) {
Vector<Subject> subjects = subjectMapper.get(tag);
if (null == subjects) {
subjects = new Vector<>();
subjectMapper.put(tag, subjects);
}
Subject<T, T> subject;
subjects.add(subject = PublishSubject.create());
return subject;
}

public void unregister(@NonNull Object tag, @NonNull Observable observable) {
Vector<Subject> subjects = subjectMapper.get(tag);
if (null != subjects) {
subjects.remove((Subject) observable);
if (subjects.isEmpty()) {
subjectMapper.remove(tag);
}
}
}

public void post(@NonNull String content) {
post(content, content);
}

public void post(@NonNull Object content) {
post(content.getClass().getName(), content);
}

public void post(@NonNull Object tag, @NonNull Object content) {
Vector<Subject> subjectList = subjectMapper.get(tag);
if (!(null == subjectList || subjectList.isEmpty())) {
for (Subject subject : subjectList) {
subject.onNext(content);
}
}

}
}


如上述代码,RxBus只提供了register、unregister、post三个方法。

这里又加入了一个tag的概念,也可以理解为channel,注册Subject、反注册Subject和post事件的时候都需要这个tag,只有tag一致才能正常接收到事件。

比如有一个事件类HelloEvent,这个事件的作用是接收到后toast一个提示“hello”,如果两个Activity都注册了这个HelloEvent事件,但是没有tag去限制,一旦post了一个helloEvent事件后,两个Activity都会收到这个事件,导致两个Activity都会toast。如果使用tag,post这个HelloEvent的时候可以设置这个tag,只有register时也使用了这个tag才会接收到这个event。

对上述代码还有几点补充说明:

①RxBus通过单例模式的形式获取对象。

② ConcurrentHashMap使用分段锁的技术解决了HashMap的线程不安全问题和HashTable在全局锁竞争激烈的情况下效率低下的问题。用于存储tag和tag对应的订阅者列表。

③tag对应的订阅者列表使用Vector存储,保证了线程安全问题。

④ PublishSubject只会把在订阅发生的时间点之后来自原始Observable的数据发射给观察者。

⑤ 对于String类型,我们单独定义了
post(@NonNull String content)
方法,为了防止订阅
String
的订阅者都发送通知,即使它们订阅的字符串内容不一致。所以建议发送
string
的时候带上
tag


下面是我们的使用方法:

① RxJavaActivity.java

public class RxJavaActivity extends Activity {

private Observable<Course> courseObservable;
private Observable<String> stringObservable;

@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_rxjava);

courseObservable = RxBus.get().register(Course.class);
courseObservable.observeOn(AndroidSchedulers.mainThread())
.subscribe(course -> Toast.makeText(this, course.name, Toast.LENGTH_SHORT).show());

stringObservable = RxBus.get().register("tag", String.class);
stringObservable.observeOn(AndroidSchedulers.mainThread())
.subscribe(s -> Toast.makeText(this, s, Toast.LENGTH_SHORT).show());
}

public void doClick(View view) {
startActivity(new Intent(this, RxJavaSecondActivity.class));
}

@Override
protected void onDestroy() {
super.onDestroy();
RxBus.get().unregister("", courseObservable);
RxBus.get().unregister("tag", stringObservable);
}

}


② RxJavaSecondActivity.java

public class RxJavaSecondActivity extends Activity {

@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_second_rxjava);
}

public void doClick(View view) {
RxBus.get().post(new Course("chinese"));

RxBus.get().post("tag", "post a string");
}

}


效果:当你点击RxJavaSecondActivity中的按钮,会依次弹出“chinese”“post a string”的提示。同样我们可以在Service、Fragment等地方进行处理。

以上我们就实现了EventBus的功能。谢谢大家给的宝贵意见。

参考:http://www.cnblogs.com/tiantianbyconan/p/4578699.html
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  RxJava EventBus