RxJava实例(二)
2016-04-02 16:02
435 查看
一、RxJava访问数据库实例
(From:http://www.devtf.cn/?p=734)// 用户表UserTable.java Callable<List<User>> getUsers(SQLiteDatabase db, String userId) { return new Callable<List<User>>() { @Override public List<User> call() { // select * from users where _id is userId } } } //SqliteOpenHelper类: MySqliteOpenHelper.java Observable<List<User>> getUsers(String userId) { return makeObservable(mUserTable.getUsers(getReadableDatabase(), userId)) .subscribeOn(Schedulers.computation()) // note: do not use Schedulers.io() } private static <T> Observable<T> makeObservable(final Callable<T> func) { return Observable.create( new Observable.OnSubscribe<T>() { @Override public void call(Subscriber<? super T> subscriber) { try { subscriber.onNext(func.call()); } catch(Exception ex) { Log.e(TAG, "Error reading from the database", ex); } } }); } // 界面调用DisplayUsersFragment.java @Inject MySqliteOpenHelper mDbHelper; // ... mDbHelper.getUsers(userId) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Action1<List<User>>()) { @Override public void onNext(List<User> users) { // Update our UI with the users } } }
二.RxBus作为事件总线
(From http://nerds.weddingpartyapp.com/tech/2014/12/24/implementing-an-event-bus-with-rxjava-rxbus/)事件总线
// this is the middleman object public class RxBus { private final Subject<Object, Object> _bus = new SerializedSubject<>(PublishSubject.create()); public void send(Object o) { _bus.onNext(o); } public Observable<Object> toObserverable() { return _bus; } }
发送消息
@OnClick(R.id.btn_demo_rxbus_tap) public void onTapButtonClicked() { _rxBus.send(new TapEvent()); }
注册监听:
// note that it is important to subscribe to the exact same _rxBus instance that was used to post the events _rxBus.toObserverable() .subscribe(new Action1<Object>() { @Override public void call(Object event) { if(event instanceof TapEvent) { _showTapText(); }else if(event instanceof SomeOtherEvent) { _doSomethingElse(); } } });
解释:
Subject同时充当了Observer和Observable的角色,Subject是非线程安全的,要避免该问题,需要将 Subject转换为一个 SerializedSubject ,上述RxBus类中把线程非安全的PublishSubject包装成线程安全的Subject。Subjects拥有hasObservers()方法,检测是否有订阅者,从而可以在无订阅者的时候取消消息或者做其他操作。
另外一个例子,这个使用了范型:
(From: http://www.jianshu.com/p/ca090f6e2fe2)
public class RxBus { private static volatile RxBus defaultInstance; private final Subject bus; // PublishSubject只会把在订阅发生的时间点之后来自原始Observable的数据发射给观察者 public RxBus() { bus = new SerializedSubject<>(PublishSubject.create()); } // 单例RxBus public static RxBus getDefault() { RxBus rxBus = defaultInstance; if (defaultInstance == null) { synchronized (RxBus.class) { rxBus = defaultInstance; if (defaultInstance == null) { rxBus = new RxBus(); defaultInstance = rxBus; } } } return rxBus; } // 提供了一个新的事件(这里可以用send的) public void post (Object o) { bus.onNext(o); } // 根据传递的 eventType 类型返回特定类型(eventType)的 被观察者 public <T> Observable<T> toObserverable (Class<T> eventType) { return bus.ofType(eventType); } }
解释
ofType的实现:public final <R> Observable<R> ofType(final Class<R> klass) { return filter(new Func1<T, Boolean>() { @Override public final Boolean call(T t) { return klass.isInstance(t); } }).cast(klass); }
filter操作符可以使你提供一个指定的测试数据项,只有通过测试的数据才会被“发射”。
cast操作符可以将一个Observable转换成指定类型的Observable。
// 发送消息 RxBus.getDefault().post(new YourEvent ()); // 订阅消息 rxSubscription = RxBus.getDefault().toObserverable(UserEvent.class) .subscribe(new Action1<YourEvent>() { @Override public void call(UserEvent userEvent) { long id = userEvent.getId(); String name = userEvent.getName(); ... } }, // 其实没有必要处理错误 new Action1<Throwable>() { @Override public void call(Throwable throwable) { // TODO: 处理异常 } }); // 取消订阅 if(!rxSubscription.isUnsubscribed()) { rxSubscription.unsubscribe(); }
相关文章推荐
- java封装(二):基于抽象的类库设计人员与具体的类库设计人员的分离
- 使用 Arrays 类操作 Java 中的数组
- java Semaphore
- spring mvc 的Controller类默认Scope是单例(singleton)的
- Struts2与Servlet的关系
- Java中类方法与实例方法的区别
- ssm框架搭建二----环境搭建
- 跟着柴毛毛学Spring(4)——面向切面编程
- 如何使用 Java 中的数组
- 一些Java的常用定义
- Java泛型反射机制(二)
- Java泛型反射机制(一)
- 大数乘法[JAVA实现]-经典笔试题
- Java基础——成员变量、局部变量和静态变量的区别
- RxJava实例(一)
- 深入Spring之web.xml
- Java技术_Java千百问(0019)_java中如何循环执行
- 彻底解决Spring mvc乱码
- JVM版本引起的一个错误java.lang.UnsupportedClassVersionError
- 关于eclipse下开发android应用的几个常用插件