RxJava详解

这篇文章大概是2017年时整理的,一直在OneNote中存放着,如今创建了个人博客,自然拿了出来。PS:从OneNote中复制出来后格式乱码,整理的我想吐…

介绍

RxJava是Java上一个灵活的、使用可观测序列组成的一个异步的、基于事件的库。

特点:

  • 作用:异步
  • 模式:观察者模式-本质上是基于回调
  • 结构:响应式编程
  • 逻辑简洁,可读性高,易维护
  • 链式结构的执行顺序

基本流程

  1. 创建事件资源,也就是被观察者。可以用Observable.create/just/from等方法来创建。
  2. 通过filter/debounce等操作符,进行自定义事件过滤。
  3. 通过Schedules进行事件发送和订阅的线程控制,也就是subscribeOn()和observeOn()
  4. 通过map/flatMap/compose等操作符,进行事件的变换
  5. 调用subscribe进行事件订阅。
  6. 最后,不要忘了对订阅者生命周期的控制,不用的时候,记得调用unsubscribe(),以免引发内存泄漏。

注意:未取消订阅而引起的内存泄漏。在Activtity.onDestroy()或不需要继续执行时取消订阅。

CompositeSubscription, 相当于一个Subscription集合,来取消所有订阅。

示例

1
2
3
4
5
6
CompositeSubscription list = new CompositeSubscription();
list.add(subscription1);
list.add(subscription2);
list.add(subscription3);
// 统一调用一次unsubscribe,就可以把所有的订阅都取消
list.unsubscribe();

基础知识

Observer和Subscriber的关系

  • Observer是观察者,Subscriber也是观察者。
  • Subscriber是一个实现了Observer接口的抽象类,对Observer进行了部分扩展,在使用上基本没有区别。
  • Subscriber多了发送之前调用的onStart()和解除订阅关系的unsubscribe()方法。
  • 在RxJava的subscribe过程中,Observer也总是会先被转换成一个Subscriber再使用。
  • RxJava开发过程中一般都使用Subscriber

RxJava的事件订阅回调

支持以下三种不完整定义的回调,我们可以根据当前需要,传入对应的Action,RxJava会相应的自动创建Subscriber。

  1. observable.subscribe(onNextAction);
  2. observable.subscribe(onNextAction, onErrorAction);
  3. observable.subscribe(onNextAction, onErrorAction, onCompleteAction);

响应式编程

  • Observable发出一系列事件,它是事件的产生者。
  • Subscriber负责处理事件,它是事件的消费者。
  • Operator是对Observable发出的事件进行修改和变换 。

    注意:若事件从产生到消费不需要其他处理,则可以省略掉中间的Operator,从而流程变为Obsevable -> Subscriber

  • Subscriber通常在主线程执行,所以原则上不要去处理太多的事务,而这些复杂的事务处理则交给Operator

知识点

Scheduler线程控制

默认情况下,RxJava事件产生和消费均在同一个线程中,例如在主线程中调用,那么事件的产生和消费都在主线程,但RxJava可以自由切换线程。

RxJava线程调度器

  • Schedulers.io();

    I/O操作(读写文件、数据库、网络请求等),与 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 效率比 newThread() 更高。值得注意的是,在 io() 下,不要进行大量的计算,以免产生不必要的线程。

  • Schedulers.newThread();

    开启新线程操作

  • Schedulers.immediate();

    默认指定的线程,也就是当前线程

  • Schedulers.computation();

    计算所使用的调度器。这个计算指的是CPU密集型计算,即不会被I/O等操作限制性能的操作,例如图形的计算。这个Scheduler使用的固定的线程池,大小为CPU核数。值得注意的是不要把I/O操作放在computation()中否则I/O操作的等待时间会浪费CPU。

注意:

  • AndroidSchedulers.mainThread();是RxJava扩展的Android主线程。
  • 通过subscribeOn()observeOn()这两个方法来进行线程调度。

变换操作符(重点)

RxJava可以将发送的事件或事件序列,加工后转换成不用的事件或事件序列。

map操作符

  1. 是一对一的变换
  2. 返回的是变换后的对象
  3. 变换后的对象直接发到Subscriber回调中

flatMap操作符

  1. 可以适应一对多的变换
  2. 返回的是一个Observable被观察者对象
  3. 返回的Observable对象并不是直接发送到Subscriber的回调中,而是重新创建一个Observable对象,并激活这个Observable对象,使之开始发送事件
  4. flatMap变换后产生的每一个Observable对象发送的事件,最终都汇入同一个Observable,进而发送给Subscriber回调

注意:

  • map的返回类型与flatMap返回的Observable事件类型,可以与原来的事件类型一样
  • 可以对一个Observable多次使用map和flatMap
  • flatMap常常被用于嵌套的异步操作,例如:嵌套网络请求

map操作符代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
final ImageView ivLogo = (ImageView) findViewById(R.id.ivLogo);
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("https://ss2.baidu.com/-vo3dSag_xI4khGko9WTAnF6hhy/image/h%3D200/sign=4db5130a073b5bb5a1d727fe06d2d523/cf1b9d16fdfaaf51965f931e885494eef11f7ad6.jpg");
}
})
.map(new Func1<String, Drawable>() {
@Override
public Drawable call(String url) {
try {
Drawable drawable = Drawable.createFromStream(new URL(url).openStream(), "src");
return drawable;
} catch (IOException e) {
}
return null;
}
})
// 指定subscribe()所在的线程,也就是call()方法调用的线程
.subscribeOn(Schedulers.io())
// 指定Subscriber回调方法所在的线程,也就是onCompleted,onError,onNext回调的线程
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Drawable>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
Log.e(TAG, e.toString());
}
@Override
public void onNext(Drawable drawable) {
if (drawable != null) {
ivLogo.setImageDrawable(drawable);
}
}
});

flatMap操作符代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
final List<Employee> list = new ArrayList<Employee>() {
{
add(new Employee("jackson", mission_list1));
add(new Employee("sunny", mission_list2));
}
};

Observable.from(list)
.flatMap(new Func1<Employee, Observable<Employee.Mission>>() {
@Override
public Observable<Employee.Mission> call(Employee employee) {
return Observable.from(employee.missions);
}
})
.subscribe(new Subscriber<Employee.Mission>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Employee.Mission mission) {
Log.i(TAG, mission.desc);
}
});

from操作符

接收一个集合作为输入,每次输出一个元素给subscriber

注意:若需要执行耗时操作,即使在from中使用subscribeOn(Schedulers.io()),仍然是在主线程执行,会造成界面卡顿甚至崩溃。

from操作符代码示例

1
2
3
4
5
6
7
8
// 格式:Observable.from(T[] params)
Observable.from(new Integer[]{1, 2, 3, 4, 5})
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer number) {
Log.i(TAG, "number:" + number);
}
});

just操作符

接收一个可变参数作为输入,最终也是生成数组,调用from(),每次输出一个元素给subscriber

just操作符代码示例

1
2
3
4
5
6
7
8
// Observable.just(T... params),params的个数为1 ~ 10
Observable.just(1, 2, 3, 4, 5)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer number) {
Log.i(TAG, "number:" + number);
}
});

filter操作符

条件过滤,用于去除不符合条件的事件

filter操作符代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Observable.from(new Integer[]{1, 2, 3, 4, 5})
.filter(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer number) {
// 偶数返回true,则表示剔除奇数,留下偶数
return number % 2 == 0;
}
})
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer number) {
Log.i(TAG, "number:" + number);
}
});

take操作符

最多保留的事件数

doOnNext操作符

在处理下一个事件前要做的事

take和doOnNext操作符代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
Observable.from(new Integer[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12})
.filter(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer number) {
// 偶数返回true,则表示剔除奇数
return number % 2 == 0;
}
})
// 最多保留三个,也就是最后剩三个偶数
.take(3)
.doOnNext(new Action1<Integer>() {
@Override
public void call(Integer number) {
// 在输出偶数之前输出它的hashCode
Log.i(TAG, "hahcode = " + number.hashCode() + "");
}
})
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer number) {
Log.i(TAG, "number = " + number);
}
});

输出结果:

1
2
3
4
5
6
7
> hahcode = 2
> number = 2
> hahcode = 4
> number = 4
> hahcode = 6
> number = 6
>

debounce操作符

过滤在指定的时间间隔之间的事件,接收一个事件后将在指定时间间隔后开始接收事件

debounce操作符代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
int i = 0;
int[] times = new int[]{100, 1000};
while (true) {
i++;
if (i >= 100) break;
subscriber.onNext(i);
try {
/** 注意:
当i为奇数时,休眠1000ms,然后才发送i+1,这时i不会被过滤掉。
当i为偶数时,只休眠100ms,便发送i+1,这时i会被过滤掉
*/
Thread.sleep(times[i % 2]);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
subscriber.onCompleted();
}
})
// 间隔400ms以内的事件将被丢弃
.debounce(400, TimeUnit.MILLISECONDS)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
Log.i(TAG, "complete");
}
@Override
public void onError(Throwable e) {
Log.e(TAG, e.toString());
}
@Override
public void onNext(Integer integer) {
Log.i(TAG, "integer = " + integer);
}
});

merge操作符

用于合并两个Observable为一个Observable
格式:

1
2
Observable.merge(Observable1, Observable2)
.subscribe(subscriber);

concat操作符

顺序的执行多个Ovservable,个数为1—9(示例见first操作符)

compose操作符

类似flatMap,都是进行变换,返回Observable对象,激活并发送事件

和flatmap区别

  1. compose是唯一一个能从数据流中得到原始Observable的操作符,需要对整个数据流产生作用的操作需使用compose来实现。如subscribeOn()和observeOn(),在flatMap中使用的话,仅对在flatMap中创建的Observable起作用,不会对剩下的流产生影响.
  2. compose是对Observable整体的变换。flatMap转换Observable里的每一个事件,compose转换的是整个Observable数据流。
  3. flatMap每发送一个事件都创建一个Observable,效率低。compose只在主干数据流上执行操作。
  4. 建议使用compose代替flatMap。

first操作符

只发送符合条件的第一个事件。如:可以结合contact做网络三级缓存

first操作符代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// 从缓存获取
Observable<BookList> fromDisk = Observable.create(new Observable.OnSubscribe<BookList>() {
@Override
public void call(Subscriber<? super BookList> subscriber) {
BookList list = getFromDisk();
if (list != null) {
subscriber.onNext(list);
} else {
subscriber.onCompleted();
}
}
});
// 从网络获取
Observable<BookList> fromNetWork = bookApi.getBookDetailDisscussionList();
Observable.concat(fromDisk, fromNetWork)
// 如果缓存不为null,则不再进行网络请求。
.first()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<BookList>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(BookList discussionList) {
}
});

timer操作符

定时器,可以做定时操作或延迟操作

timer操作符代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
Observable.timer(2, TimeUnit.SECONDS)
.subscribe(new Subscriber<Long>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Long aLong) {
Log.i(TAG, "Hello World!");
}
});

interval操作符

定时的周期性操作,与timer操作符的区别是可以重复操作

throttleFirst操作符

类似debounce操作符,时间间隔太短就会丢弃事件。可用于防抖操作,如防止双击

throttleFirst操作符代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
RxView.clicks(button)
.throttleFirst(1, TimeUnit.SECONDS)
.subscribe(new Observer<Object>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Object o) {
Log.i(TAG, "do clicked!");
}
});

Single操作符

相当于Observable的精简版。观察者回调的不是onNext/onError/onCompleted,而是回调onSuccess/onError

subject操作符

既是事件的生产者,又是事件的消费者

subject操作符代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
Subject subject = PublishSubject.create();

subject.debounce(400, TimeUnit.MILLISECONDS)
.subscribe(new Subscriber<Object>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Object o) {
// request
}
});

edittext.addTextChangedListener(new TextWatcher() {
@Override
public void beforeTextChanged(CharSequence s, int start, int count, int after) { }
@Override
public void onTextChanged(CharSequence s, int start, int before, int count) {
subject.onNext(s.toString());
}
@Override
public void afterTextChanged(Editable s) { }
});

参考资料->RxJava详解-由浅入深

这是当时的文章名称,如今去看作者已经进行了更新RxJava 从入门到出轨,也是骚的不行~