您的位置:首页 > 移动开发 > Android开发

RxJava与RxAndroid 接收消息通知

2015-12-25 13:52 429 查看
RxJava与RxAndroid 接收消息通知 主要通过两个类rx.Observerrx.Subscriber这两个类的关系:继承关系rx.Observer被rx.Subscriber继承作用
Provides a mechanism for receiving push-based notifications from Observables, and permits manual
* unsubscribing from these Observables.
翻译:提供了一种机制,用于接收推送的观察者为基础的通知,并允许手动取消订阅这些观测值Subscriber在Observer上面进行了扩展:如:取消订阅.unsubscribe(),判断是否可订阅等增加onStart()回掉方法,好像一定都执行,源码中的解释:// do nothing by default
//接收
private Observer<String> stringObserver = new Observer<String>() {

@Override
public void onCompleted() {
LogUtils.d("onCompleted():" + Thread.currentThread().getName());
}

@Override
public void onError(Throwable e) {
LogUtils.d("onError():" + Thread.currentThread().getName());
}

@Override
public void onNext(String o) {
LogUtils.d("onNext():data:" + o + "  线程" + Thread.currentThread().getName());
}
};

//接收   stringSubscriber.unsubscribe();
private Subscriber<String> stringSubscriber = new Subscriber<String>() {
@Override
public void onStart() {
super.onStart();
LogUtils.d("onStart():" + Thread.currentThread().getName());
}

@Override
public void onCompleted() {
LogUtils.d("onCompleted():" + Thread.currentThread().getName());
}

@Override
public void onError(Throwable e) {
LogUtils.d("onError():" + Thread.currentThread().getName());
}

@Override
public void onNext(String s) {
LogUtils.d("onNext():data:" + s + Thread.currentThread().getName());
}
};
rx.Subscriber在实现rx.Observer的同时,也实现了rx.Subscription
这个类是个接口,主要体现在控制取消订阅和是否不可订阅的判断:源码:
/*** Copyright 2014 Netflix, Inc.** Licensed under the Apache License, Version 2.0 (the "License");* you may not use this file except in compliance with the License.* You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0 ** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package rx;import rx.subscriptions.Subscriptions;/*** Subscription returns from {@link Observable#subscribe(Subscriber)} to allow unsubscribing.* <p>* See the utilities in {@link Subscriptions} and the implementations in the {@code rx.subscriptions} package.* <p>* This interface is the RxJava equivalent of {@code IDisposable} in Microsoft's Rx implementation.*/public interface Subscription {/*** Stops the receipt of notifications on the {@link Subscriber} that was registered when this Subscription* was received.* <p>* This allows unregistering an {@link Subscriber} before it has finished receiving all events (i.e. before* onCompleted is called).*/void unsubscribe();/*** Indicates whether this {@code Subscription} is currently unsubscribed.** @return {@code true} if this {@code Subscription} is currently unsubscribed, {@code false} otherwise*/boolean isUnsubscribed();}
关于事件接收或者说事件订阅的方法介绍:onStart();最开始执行,一定执行相对于传统观察者模式的不同,RxJava的时间回调方法除了普通时间onNext()( 相等于android的onClick/onEvent)之外,还定义了两个特殊的事件onCompleted(0和onError().onCompleted():事件队列完结,RxJava不仅把每个事件单独处理,还会把他们看作一个队列,RxJAVA规定,当不会再有新的onNext() 发出时,是触发onCompletd()的标志onError() 事件队列异常,在事件处理过程中出现异常时,onError()会被触发,同时队列自动终止,不允许再有事件发出.onCompleted和onError()是互斥的欢迎入群:476899320探讨RxJava等新技术

                                            
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: