手把手教你使用-RxJava-2-0(二)

本篇文章主要介绍线程调度器,通过对线程调度器的了解,方便我们更好的处理异步操作,在合适的场景选择合适的线程。同时,结合上篇文章,我们就初步掌握了 RxJava 2.x的基本操作并可以应用在我们的项目中。在本篇文章的后半部分,会具体展示RxJava 2.x的使用。

Scheduler简介

在不指定线程的情况下, RxJava 遵循的是线程不变的原则,即:在哪个线程调用 subscribe(),就在哪个线程生产事件;在哪个线程生产事件,就在哪个线程消费事件。如果需要切换线程,就需要用到 Scheduler (调度器)。
在RxJava 中,Scheduler,相当于线程控制器,RxJava 通过它来指定每一段代码应该运行在什么样的线程。RxJava 已经内置了几个 Scheduler ,它们已经适合大多数的使用场景。

Scheduler 的 API

● Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。

●Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。

●Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。

●Schedulers.computation(): 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。

● Android 还有一个专用的 AndroidSchedulers.mainThread(),它指定的操作将在 Android 主线程运行。

有了这几个 Scheduler ,就可以使用 subscribeOn() 和 observeOn() 两个方法来对线程进行控制了。subscribeOn(): 指定Observable(被观察者)所在的线程,或者叫做事件产生的线程。 observeOn(): 指定 Observer(观察者)所运行在的线程,或者叫做事件消费的线程。
下面用代码展示下线程调度的使用:

Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
Log.d("所在的线程:",Thread.currentThread().getName());
Log.d("发送的数据:", 1+"");
e.onNext(1);
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread()) /
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d("所在的线程:",Thread.currentThread().getName());
Log.d("接收到的数据:", "integer:" + integer);
}
});

01-19 10:06:38.275 27734-27783/? D/所在的线程:: RxCachedThreadScheduler-1
01-19 10:06:38.275 27734-27783/? D/发送的数据:: 1
01-19 10:06:38.285 27734-27734/? D/所在的线程:: main
01-19 10:06:38.285 27734-27734/? D/接收到的数据:: integer:1

可以看到,Observable(被观察者)发送事件的线程的确改变了, 是在一个叫 RxCachedThreadScheduler-1的线程中发送的事件, 而Observer(观察者)仍然在主线程中接收事件。由此我们实现了线程调度的操作,可以在此基础上尽情的进行异步操作。

下面来介绍一个具体的使用场景。

RxJava 2.x 网络请求使用

Android中有多种网络请求库, Retrofit便是其中的佼佼者,它的优势之一便是它支持RxJava的方式来调用。我们便以Retrofit进行网络请求,RxJava进行异步处理,两者结合来讲解RxJava在网络请求中的具体使用。

本例中 我们使用聚合数据中的全国天气数据,获得城市信息。
接口url:http://v.juhe.cn/weather/citys?key=.... 其中key是你申请时聚合数据给你的密钥。
具体请求的返回数据形式如下:

下面以上述数据简单讲解一下Retrofit的基本用法。

要使用Retrofit,先在Gradle中添加配置:

//Retrofit
compile 'com.squareup.retrofit2:retrofit:2.1.0'
//Gson converter
compile 'com.squareup.retrofit2:converter-gson:2.1.0'
//Okhttp
compile 'com.squareup.okhttp3:okhttp:3.5.0'
//RxJava adapter
compile 'com.jakewharton.retrofit:retrofit2-rxjava2-adapter:1.0.0'

定义Api接口:

public interface Api {
@GET("citys")
Observable<AllCity> getAllCity(@Query("key") String key);
}

创建一个Retrofit客户端:

private static Retrofit create() {
OkHttpClient.Builder builder = new OkHttpClient().newBuilder();
builder.readTimeout(10, TimeUnit.SECONDS);
builder.connectTimeout(9, TimeUnit.SECONDS);

return new Retrofit.Builder().baseUrl(baseUrl)
.client(builder.build())
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.build();
}

接下来就可以进行网络请求:

Retrofit retrofit = create();
Api api = retrofit.create(Api.class);
Observable<AllCity> observable = api.getAllCity(appkey);
observable.subscribeOn(Schedulers.io())
.flatMap(new Function<AllCity, ObservableSource<City>>() {
@Override
public ObservableSource<City> apply(AllCity city) throws Exception {
ArrayList<City> result = city.getResult();
return Observable.fromIterable(result);
}
})
.filter(new Predicate<City>() {
@Override
public boolean test(City city) throws Exception {
String id = city.getId();
if(Integer.parseInt(id)<5){
return true;
}
return false;
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<City>() {
@Override
public void accept(City city) throws Exception {
System.out.println(city);
}
});

01-19 13:28:56.952 13218-13218/com.lvr.rxjavalearning I/System.out: City{id=’1’, province=’北京’, city=’北京’, district=’北京’}
01-19 13:28:56.952 13218-13218/com.lvr.rxjavalearning I/System.out: City{id=’2’, province=’北京’, city=’北京’, district=’海淀’}
01-19 13:28:56.952 13218-13218/com.lvr.rxjavalearning I/System.out: City{id=’3’, province=’北京’, city=’北京’, district=’朝阳’}
01-19 13:28:56.952 13218-13218/com.lvr.rxjavalearning I/System.out: City{id=’4’, province=’北京’, city=’北京’, district=’顺义’}

调用Api接口方法,返回一个Observable(被观察者)对象,然后当subscribe()订阅后,就可以在IO线程中执行网络 请求操作,然后进行转换过滤,最终Observer(观察者)对象在UI线程中获得城市id在1-4之间的城市信息。
其中请求返回的数据是json形式,AllCity类包含所有的返回数据,具体代码如下:

public class AllCity {
private String error_code;
private String reason;
private String resultcode;
private ArrayList<City> result;
//省略getter,setter方法
}

ArrayList集合中封装了所有城市的信息,City类包含城市详细信息,具体代码如下:


public class City {

/**
* id : 1
* province : 北京
* city : 北京
* district : 北京
*/

private String id;
private String province;
private String city;
private String district;
//省略getter,setter,toString方法
}

本例中,我们假设Observer(观察者)需要id号在1-4之间的城市信息,我们就可以先使用flatMap()操作符先将封装所有信息的AllCity中提取出城市信息集合,然后转换成一个新的Observable(被观察者)进行传递,然后使用filter()进行过滤,过滤出符合要求的城市信息,最终传递给Observer(观察者),让其在UI线程接收数据,然后更新UI。整个过程完成了网络请求,同时进行异步操作,防止阻塞UI线程。
以上仅仅以实例介绍RxJava的基础使用,RxJava的功能远不止于此。不过掌握了以上的技能,我们已经可以在我们的项目中应用RxJava进行异步操作了。关于一些RxJava中的细节及其他相关技术还需要慢慢积累。

下面我们另一个重要的概念Disposable。当Observer(观察者)与Observable(被观察者)通过subscribe()建立连接后,事件可以进行传递。当发生一些其他情况,不得不断开两者之间的连接时,该怎么操作?这个时候就该Disposable上场了。

Disposable简介及使用

Disposable简介

Disposable, 这个单词的字面意思是一次性用品,用完即可丢弃的。在RxJava中,用它来切断Observer(观察者)与Observable(被观察者)之间的连接,当调用它的dispose()方法时, 它就会将Observer(观察者)与Observable(被观察者)之间的连接切断, 从而导致Observer(观察者)收不到事件。
下面我们就该考虑如何来获得Disposable对象?
Disposable的作用是切断连接,确切地讲是将Observer(观察者)切断,不再接收来自被观察者的事件,而被观察者的事件却仍在继续执行。
因此Disposable的对象通过观察者获得,具体分为两种方式。

Disposable对象的获得

1.Observer接口

Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {

}

@Override
public void onNext(String s) {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onComplete() {

}
};

通过创建Observer接口,当订阅后,建立与Observable的联系,onSubscribe(Disposable d)中便可以获得Disposable对象。
2.Consumer等其他函数式接口

Disposable disposable = Observable.just("你好").subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {

}
});

当subscribe()后直接返回一个Disposable 对象
获得了Disposable对象后,我们便可以调用dispose()方法,在恰当的时机,断开连接,停止接收Observable(被观察者)发送的事件。

注意:当切断被观察者与观察者之间的联系,Observable(被观察者)的事件却仍在继续执行。

另外,补充一下onNext()、onComplete()和onError()事件的发送规则。
具体规则:

Observable(被观察者)可以发送无限个onNext, Observer(观察者)也可以接收无限个onNext.

当Observable(被观察者)发送了一个onComplete后, Observable(被观察者)中onComplete之后的事件将会继续发送, 而Observer(观察者)收到onComplete事件之后将不再继续接收事件.

当Observable(被观察者)发送了一个onError后, Observable(被观察者)中onError之后的事件将继续发送, 而Observer(观察者)收到onError事件之后将不再继续接收事件.

Observable(被观察者)可以不发送onComplete或onError.

最为关键的是onComplete和onError必须唯一并且互斥, 即不能发多个onComplete, 也不能发多个onError, 也不能先发一个onComplete, 然后再发一个onError, 反之亦然

注: 关于onComplete和onError唯一并且互斥这一点, 是需要自行在代码中进行控制, 如果你的代码逻辑中违背了这个规则, 并不一定会导致程序崩溃. 比如发送多个onComplete是可以正常运行的, 依然是收到第一个onComplete就不再接收了, 但若是发送多个onError, 则收到第二个onError事件会导致程序会崩溃。

以上就是本篇文章的全部内容,结合上一篇文章,已经可以灵活使用RxJava了。在下篇文章中,将会介绍RxJava中新增加的内容:Flowable及backpressure。