手把手教你使用-RxJava-2-0(一)

网上有很多关于RxJava的技术文章,大多数是关于1.x版本的。随着 RxJava 2.0 的推出,有些文章也介绍了2.x版本新增的内容以及与1.x版本的对比。有些同学如果刚刚接触RxJava,仅仅看RxJava 1.x 的一些技术文章,有时候会有些出入。因此本篇文章基于RxJava 2.0 进行由浅入深的学习,逐步掌握RxJava。

1.作用

RxJava的目的就是异步
RxJava的特点就是可以非常简便的实现异步调用,可以在逻辑复杂的代码逻辑中以比较轻易的方式实现异步调用。随着逻辑的复杂,需求的更改,代码可依然能保持极强的阅读性,在深入的使用过程中一定对这点深有体会。

2.工程引用

要应用RxJava,需要在项目中引入依赖:

io.reactivex.rxjava2:rxjava:2.0.4
io.reactivex.rxjava2:rxjava:2.0.4

3.概念

要想理解好RxJava,首先要理解清楚其中的几个关键概念。由于RxJava是利用观察者模式来实现一些列的操作,所以对于观察者模式中的观察者,被观察者,以及订阅、事件需要有一个了解。如果不理解观察者模式,不要紧,下面会详细介绍。

Observable:在观察者模式中称为“被观察者”;
Observer:观察者模式中的“观察者”,可接收Observable发送的数据;
subscribe:订阅,观察者与被观察者,通过subscribe()方法进行订阅;
Subscriber:也是一种观察者,在2.0中 它与Observer没什么实质的区别,不同的是 Subscriber要与Flowable(也是一种被观察者)联合使用,该部分内容是2.0新增的,后续文章再介绍。Obsesrver用于订阅Observable,而Subscriber用于订阅Flowable

4.RxJava中的观察者模式

观察者模式的概念很好理解,具体可以解释为:A 对象(观察者)对 B 对象(被观察者)的某种变化高度敏感,需要在 B 变化的一瞬间做出反应。
在程序的观察者模式,观察者不需要时刻盯着被观察者(例如 A 不需要每过 2ms 就检查一次 B 的状态),而是采用注册(Register)或者称为订阅(Subscribe)的方式,告诉被观察者:我需要你的某某状态,你要在它变化的时候通知我。

下面具体讲RxJava 的观察者模式

RxJava 有四个基本概念:Observable (被观察者)、 Observer (观察者)、 subscribe (订阅)、事件。Observable 和 Observer 通过 subscribe() 方法实现订阅关系,从而 Observable 可以在完成某些操作,获得一些结果后,回调触发事件,即发出事件来通知 Observer。

关于回调,如果理解则可以跳过这一段,如果不理解,在RxJava中可以简单的理解为:为了方便Observable和Observer交互,在Observable中,将Observer对象传入,在完成某些操作后调用Observer对象的方法,此时将触发Observer中具体实现的对应方法。
注意:Observer是个接口,Observable是个类。

与传统观察者模式不同, RxJava 的事件回调方法除了普通事件 onNext() 之外,还定义了三个特殊的事件:onComplete() 和 onError(),onSubscribe()。

onComplete(): 事件队列完结时调用该方法。RxJava 不仅把每个事件单独处理,还会把它们看做一个队列。
onError(): 事件队列异常。在事件处理过程中出异常时,onError() 会被触发,同时队列自动终止,不允许再有事件发出。
onSubscribe():RxJava 2.0 中新增的,传递参数为Disposable ,Disposable 相当于RxJava1.x中的Subscription,用于解除订阅。
注意:onComplete() 和 onError() 二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。

讲了这么多,大家会疑惑:这些都跟异步有什么关系?
其实这都是在为异步进行铺垫。当大家理解了观察者模式之后,就会很容易理解RxJava的异步实现方式。让Observable (被观察者)开启子线程执行耗操作,完成耗时操作后,触发回调,通知Observer (观察者)进行主线程UI更新。如此轻松便可以实现Android中的异步,且代码简洁明了,集中分布。RxJava中默认Observer (观察者)和Observer (观察者)都在同一线程执行任务。本文主要介绍RxJava中的一些基本使用,关于线程调度问题下篇文章再进行介绍。即本文中的所有操作都默认在同一线程进行。
好了,下面我们就开始了解RxJava的一些基本使用。

5.基本的用法

RxJava用法多种多样,其多样性体现在Obserable(被观察者)的创建上。
我们先以最基础的Obserable(被观察者)的创建为例介绍RxJava的使用:
Observable的创建:

Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
//执行一些其他操作
//.............
//执行完毕,触发回调,通知观察者
e.onNext("我来发射数据");
}
});

Observer的创建:

Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {

}

@Override
//观察者接收到通知,进行相关操作
public void onNext(String aLong) {
System.out.println("我接收到数据了");
}

@Override
public void onError(Throwable e) {

}

@Override
public void onComplete() {

}
};

订阅:

observable.subscribe(observer);

使用create( )创建Observable最基本的创建方式。可以看到,这里传入了一个 ObservableOnSubscribe对象作为参数,它的作用相当于一个计划表,当 Observable被订阅的时候,ObservableOnSubscribe的subscribe()方法会自动被调用,事件序列就会依照设定依次触发(对于上面的代码,就是观察者Observer 将会被调用一次 onNext())。这样,由被观察者调用了观察者的回调方法,就实现了由被观察者向观察者的事件传递,即观察者模式。
Observable的其他创建方式:
just()方式
Observable<String> observable = Observable.just("Hello");
使用just( ),将为你创建一个Observable并自动为你调用onNext( )发射数据。通过just( )方式 直接触发onNext(),just中传递的参数将直接在Observer的onNext()方法中接收到。
fromIterable()方式

List<String> list = new ArrayList<String>();
for(int i =0;i<10;i++){
list.add("Hello"+i);
}
Observable<String> observable = Observable.fromIterable((Iterable<String>) list);

使用fromIterable(),遍历集合,发送每个item。相当于多次回调onNext()方法,每次传入一个item。
注意:Collection接口是Iterable接口的子接口,所以所有Collection接口的实现类都可以作为Iterable对象直接传入fromIterable()方法。
defer()方式

Observable<String> observable = Observable.defer(new Callable<ObservableSource<? extends String>>() {
@Override
public ObservableSource<? extends String> call() throws Exception {
return Observable.just("hello");
}
});

当观察者订阅时,才创建Observable,并且针对每个观察者创建都是一个新的Observable。以何种方式创建这个Observable对象,当满足回调条件后,就会进行相应的回调。
interval( )方式

Observable<String> observable = Observable.interval(2, TimeUnit.SECONDS);

创建一个按固定时间间隔发射整数序列的Observable,可用作定时器。即按照固定2秒一次调用onNext()方法。
range( )方式

Observable<Integer> observable = Observable.range(1,20);

创建一个发射特定整数序列的Observable,第一个参数为起始值,第二个为发送的个数,如果为0则不发送,负数则抛异常。上述表示发射1到20的数。即调用20次nNext()方法,依次传入1-20数字。
timer( )方式

Observable<Integer> observable = Observable.timer(2, TimeUnit.SECONDS);

创建一个Observable,它在一个给定的延迟后发射一个特殊的值,即表示延迟2秒后,调用onNext()方法。
repeat( )方式

Observable<Integer> observable = Observable.just(123).repeat();

创建一个Observable,该Observable的事件可以重复调用。
除了Observable(被观察者)的创建之外,RxJava 2.x 还提供了多个函数式接口 ,用于实现简便式的观察者模式。具体的函数式接口包括以下:

以Consumer为例,我们可以实现简便式的观察者模式:

Observable.just("hello").subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
});

其中Consumer中的accept()方法接收一个来自Observable的单个值。Consumer就是一个观察者。其他函数式接口可以类似应用。
注意:Observable (被观察者)只有在被Observer (观察者)订阅后才能执行其内部的相关逻辑,下面代码证实了这一点:

Observable<Long> observable = Observable.interval(2, TimeUnit.SECONDS);
Observer<Long> observer = new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {

}

@Override
public void onNext(Long aLong) {
System.out.println(aLong);
}

@Override
public void onError(Throwable e) {

}

@Override
public void onComplete() {

}
};
SystemClock.sleep(10000);//睡眠10秒后,才进行订阅 仍然从0开始,表示Observable内部逻辑刚开始执行
observable.subscribe(observer);

01-18 16:09:20.874 12535-12927/com.lvr.rxjavalearning I/System.out: 0
01-18 16:09:22.864 12535-12927/com.lvr.rxjavalearning I/System.out: 1
01-18 16:09:24.864 12535-12927/com.lvr.rxjavalearning I/System.out: 2
01-18 16:09:26.864 12535-12927/com.lvr.rxjavalearning I/System.out: 3

除此之外,RxJava中还有许多操作符。操作符就是用于在Observable和最终的Observer之间,通过转换Observable为其他观察者对象的过程,修改发出的事件,最终将最简洁的数据传递给Observer对象。下面我们介绍一些比较常用的操作符。

6.RxJava中的操作符

map()操作符

Observable<Integer> observable = Observable.just("hello").map(new Function<String, Integer>() {
@Override
public Integer apply(String s) throws Exception {
return s.length();
}
});

map()操作符,就是把原来的Observable对象转换成另一个Observable对象,同时将传输的数据进行一些灵活的操作,方便Observer获得想要的数据形式。
flatMap()操作符

Observable<Object> observable = Observable.just(list).flatMap(new Function<List<String>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(List<String> strings) throws Exception {
return Observable.fromIterable(strings);
}
});

flatMap()对于数据的转换比map()更加彻底,如果发送的数据是集合,flatmap()重新生成一个Observable对象,并把数据转换成Observer想要的数据形式。它可以返回任何它想返回的Observable对象。
filter()操作符

Observable.just(list).flatMap(new Function<List<String>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(List<String> strings) throws Exception {
return Observable.fromIterable(strings);
}
}).filter(new Predicate<Object>() {
@Override
public boolean test(Object s) throws Exception {
String newStr = (String) s;
if (newStr.charAt(5) - '0' > 5) {
return true;
}
return false;
}
}).subscribe(new Consumer<Object>() {
@Override
public void accept(Object o) throws Exception {
System.out.println((String)o);
}
});

filter()操作符根据test()方法中,根据自己想过滤的数据加入相应的逻辑判断,返回true则表示数据满足条件,返回false则表示数据需要被过滤。最后过滤出的数据将加入到新的Observable对象中,方便传递给Observer想要的数据形式。
take()操作符

Observable.just(list).flatMap(new Function<List<String>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(List<String> strings) throws Exception {
return Observable.fromIterable(strings);
}
}).take(5).subscribe(new Consumer<Object>() {
@Override
public void accept(Object s) throws Exception {
System.out.println((String)s);
}
});

take()操作符:输出最多指定数量的结果。
doOnNext()

Observable.just(list).flatMap(new Function<List<String>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(List<String> strings) throws Exception {
return Observable.fromIterable(strings);
}
}).take(5).doOnNext(new Consumer<Object>() {
@Override
public void accept(Object o) throws Exception {
System.out.println("准备工作");
}
}).subscribe(new Consumer<Object>() {
@Override
public void accept(Object s) throws Exception {
System.out.println((String)s);
}
});

doOnNext()允许我们在每次输出一个元素之前做一些额外的事情。

以上就是一些常用的操作符,通过操作符的使用。我们每次调用一次操作符,就进行一次观察者对象的改变,同时将需要传递的数据进行转变,最终Observer对象获得想要的数据。
以网络加载为例,我们通过Observable开启子线程,进行一些网络请求获取数据的操作,获得到网络数据后,然后通过操作符进行转换,获得我们想要的形式的数据,然后传递给Observer对象。

以上仅仅是介绍RxJava的观察者模式以及RxJava的简单操作与使用。通过本篇文章,可以对RxJava有个简单的了解。后面我会继续介绍RxJava中线程调度的内容,以及RxJava 2.x 中新增的功能。如果大家喜欢这部分内容,可以持续关注,后面会继续更新。