您的位置:首页 > 编程语言 > Java开发

RxJava实战演示1------基本代码使用

2016-07-26 11:24 429 查看
声明:本文适合有一点RxJava语法基础的同学阅读,如果基础为零,建议先看下仍物线大神的此篇文章:http://gank.io/post/560e15be2dca930e00da1083

1.应用程序配置,如果使用java8的lambda表达式的话,需要额外添加如下配置

在全局build.gradle的 dependencies中添加:classpath 'me.tatarka:gradle-retrolambda:2.5.0'

在app的build.gradle中添加如下内容:apply plugin: 'me.tatarka.retrolambda',android空间下添加

compileOptions {
sourceCompatibility JavaVersion.VERSION_1_8
targetCompatibility JavaVersion.VERSION_1_8
}
正常的Rxjava配置,在dependencies中添加
compile 'io.reactivex:rxjava:1.1.0'
compile 'io.reactivex:rxandroid:1.1.0'
本文将添加,以下配置

compile fileTree(dir: 'libs', include: ['*.jar'])
testCompile 'junit:junit:4.12'
compile 'com.android.support:appcompat-v7:21.0.3'
compile 'com.android.support:cardview-v7:21.0.3'
compile 'com.android.support:recyclerview-v7:21.0.3'
compile 'io.reactivex:rxjava:1.1.0'
compile 'io.reactivex:rxandroid:1.1.0'
compile 'com.google.code.gson:gson:2.4'
compile 'com.jakewharton:butterknife:7.0.1'
compile 'com.squareup.picasso:picasso:2.5.2'
compile 'com.squareup.okhttp3:okhttp:3.+'


2.实例演示,基本用法

/**
* 使用create方式
*/
public static void createObserable() {
//定义被观察者,
Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
if (!subscriber.isUnsubscribed()) {
subscriber.onNext("hello");
subscriber.onNext("hi");
subscriber.onNext(downLoadJson());
subscriber.onNext("world");
subscriber.onCompleted();
}
}
});

Subscriber<String> showsub = new Subscriber<String>() {
@Override
public void onCompleted() {
Log.i(TAG, "onCompleted");
}

@Override
public void onError(Throwable e) {
Log.i(TAG, e.getMessage());
}

@Override
public void onNext(String s) {
Log.i(TAG, "result-->>" + s);
}
};
observable.subscribe(showsub);//关联被观察者

}

/**
* 调用下载方法
*
* @return
*/
public static String downLoadJson() {
return "json data";
}

/**
* create 第二种方式
*/
public static void createPrint() {
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
if (!subscriber.isUnsubscribed()) {
for (int i = 1; i <= 10; i++) {
subscriber.onNext(i);
}
subscriber.onCompleted();
}
}
}).subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
Log.i(TAG, "onCompleted");
}

@Override
public void onError(Throwable e) {
Log.i(TAG, e.getMessage());
}

@Override
public void onNext(Integer integer) {
Log.i(TAG, "result-->>" + integer);
}
});
}

/***
* 使用在被观察者,返回的对象一般都是数值类型
*/
public static void from() {
Integer[] items = {1, 2, 3, 4, 5, 6, 7, 8, 9};
Observable observable = Observable.from(items);
observable.subscribe(new Action1() {
@Override
public void call(Object o) {
Log.i(TAG, o.toString());
}
});
}

/**
* 指定某一时刻进行数据发送,轮询发射,注意与timer的区别
*/
public static void interval() {
Integer[] items = {1, 2, 3, 4, 5};
Observable observable = Observable.interval(1, 1, TimeUnit.SECONDS);//每过一秒发送1个数据
observable.subscribe(new Action1() {
@Override
public void call(Object o) {
Log.i(TAG, o.toString());
}
});
}

/**
* 处理数组集合,将会按顺序输出1,2,3,4,5,6,3,5......
*/
public static void just() {
Integer[] items1 = {1, 2, 3, 4, 5, 6};
Integer[] items2 = {3, 5, 6, 8, 3, 8};
Observable observable = Observable.just(items1, items2);
observable.subscribe(new Subscriber<Integer[]>() {
@Override
public void onCompleted() {
Log.i(TAG, "onCompleted");
}

@Override
public void onError(Throwable e) {
Log.i(TAG, e.getMessage());
}

@Override
public void onNext(Integer[] o) {
for(int i=0;i<o.length;i++){
Log.i(TAG,"next:"+o[i]);
}
}
});
}
//Observable.just(items1, items2).repeat(3).subscribe() 重复进行3次发射


<pre name="code" class="java">/** * 使用范围数据,指定输出数据的范围 */ 
public static void range(){ 
Observable observable = Observable.range(1, 40);//第一个是起始点,第二个是想发射数字的个数 
observable.subscribe(new Subscriber<Integer>()
{@Override 
public void onCompleted() 
{ Log.i(TAG, "onCompleted"); 
@Override
public void onError(Throwable e) 
{ Log.i(TAG, e.getMessage());}
@Override
public void onNext(Integer o) 
{ Log.i(TAG,"next:"+o);}
});
}
/** * 使用过滤功能,将只会输出1,2,3,4 */ 
public static void filter(){ 
Observable observable = Observable.just(1,2,3,4,5,7,8); 
observable.filter(new Func1<Integer,Boolean>() 
{@Override 
public Boolean call(Integer o) 
{ return o<5;}}).observeOn(Schedulers.io()).subscribe(new Subscriber<Integer>() { 
@Override public void onCompleted() {}
@Override public void onError(Throwable e) {} 
@Overridepublic void onNext(Integer o) 
{ Log.i(TAG,o.toString());} 
}); 
}


Observable.timer(3,TimeUnit.SECONDS)//3秒后再进行发射.subscribe(new Observable<Long>() {@Overridepublic void onCompleted() {}@Overridepublic void onError(Throwable e) {}@Overridepublic void onNext(Long number) {Log.d("RXJAVA", "I say " + number);}});

                                            
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: