您的位置:首页 > 编程语言 > Java开发

RxJava实例(二)

2016-04-02 16:02 435 查看

一、RxJava访问数据库实例

(From:http://www.devtf.cn/?p=734

// 用户表UserTable.java
Callable<List<User>> getUsers(SQLiteDatabase db, String userId) {
return new Callable<List<User>>() {
@Override
public List<User> call() {

// select * from users where _id is userId
}
}
}

//SqliteOpenHelper类: MySqliteOpenHelper.java
Observable<List<User>> getUsers(String userId) {
return makeObservable(mUserTable.getUsers(getReadableDatabase(), userId))
.subscribeOn(Schedulers.computation()) // note: do not use Schedulers.io()
}

private static <T> Observable<T> makeObservable(final Callable<T> func) {
return Observable.create(
new Observable.OnSubscribe<T>() {
@Override
public void call(Subscriber<? super T> subscriber) {
try {
subscriber.onNext(func.call());
} catch(Exception ex) {
Log.e(TAG, "Error reading from the database", ex);
}
}
});
}

// 界面调用DisplayUsersFragment.java

@Inject
MySqliteOpenHelper mDbHelper;

// ...

mDbHelper.getUsers(userId)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<List<User>>()) {
@Override
public void onNext(List<User> users) {
// Update our UI with the users
}
}
}


二.RxBus作为事件总线

(From http://nerds.weddingpartyapp.com/tech/2014/12/24/implementing-an-event-bus-with-rxjava-rxbus/

事件总线

// this is the middleman object
public class RxBus {

private final Subject<Object, Object> _bus = new SerializedSubject<>(PublishSubject.create());

public void send(Object o) {
_bus.onNext(o);
}

public Observable<Object> toObserverable() {
return _bus;
}
}


发送消息

@OnClick(R.id.btn_demo_rxbus_tap)
public void onTapButtonClicked() {
_rxBus.send(new TapEvent());
}


注册监听:

// note that it is important to subscribe to the exact same _rxBus instance that was used to post the events
_rxBus.toObserverable()
.subscribe(new Action1<Object>() {
@Override
public void call(Object event) {

if(event instanceof TapEvent) {
_showTapText();

}else if(event instanceof SomeOtherEvent) {
_doSomethingElse();
}
}
});


解释:

Subject同时充当了Observer和Observable的角色,Subject是非线程安全的,要避免该问题,需要将 Subject转换为一个 SerializedSubject ,上述RxBus类中把线程非安全的PublishSubject包装成线程安全的Subject。

Subjects拥有hasObservers()方法,检测是否有订阅者,从而可以在无订阅者的时候取消消息或者做其他操作。

另外一个例子,这个使用了范型:

(From: http://www.jianshu.com/p/ca090f6e2fe2)

public class RxBus {
private static volatile RxBus defaultInstance;
private final Subject bus;
// PublishSubject只会把在订阅发生的时间点之后来自原始Observable的数据发射给观察者
public RxBus() {
bus = new SerializedSubject<>(PublishSubject.create());
}
// 单例RxBus
public static RxBus getDefault() {
RxBus rxBus = defaultInstance;
if (defaultInstance == null) {
synchronized (RxBus.class) {
rxBus = defaultInstance;
if (defaultInstance == null) {
rxBus = new RxBus();
defaultInstance = rxBus;
}
}
}
return rxBus;
}
// 提供了一个新的事件(这里可以用send的)
public void post (Object o) {
bus.onNext(o);
}
// 根据传递的 eventType 类型返回特定类型(eventType)的 被观察者

public <T> Observable<T> toObserverable (Class<T> eventType) {
return bus.ofType(eventType);
}
}


解释

ofType的实现:

public final <R> Observable<R> ofType(final Class<R> klass) {
return filter(new Func1<T, Boolean>() {
@Override
public final Boolean call(T t) {
return klass.isInstance(t);
}
}).cast(klass);
}


filter
操作符可以使你提供一个指定的测试数据项,只有通过测试的数据才会被“发射”。
cast
操作符可以将一个Observable转换成指定类型的Observable。

// 发送消息
RxBus.getDefault().post(new YourEvent ());

// 订阅消息
rxSubscription = RxBus.getDefault().toObserverable(UserEvent.class)
.subscribe(new Action1<YourEvent>() {
@Override
public void call(UserEvent userEvent) {
long id = userEvent.getId();
String name = userEvent.getName();
...
}
},

// 其实没有必要处理错误
new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
// TODO: 处理异常
}
});

// 取消订阅
if(!rxSubscription.isUnsubscribed()) {
rxSubscription.unsubscribe();
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: