Android的RxJava笔记

rvjava 一个令人欲罢不能的东西呐~说明一下rxjava是基于观察者模式做的东西,至于观察着模式是啥,可以上网搜,下面回顾的第一张图就是标准的观察者模式。
另外其实这个文章是在rxjava1的时候就写了一点点,现在都用rxjava2了那就基于rxjava2吧~

回顾

观察者模式在Java里有两种实现,一种是完全手动写的一种是调用Java里面的api,先回顾一下传统Java的做法
完全手动写的

api版

RxJava2

配置

需要在build.gradle引入rxjava和rxandroid,rxandroid主要是负责Android的线程切换的。
https://github.com/ReactiveX/RxAndroid
https://github.com/ReactiveX/RxJava
例如

1
2
implementation 'io.reactivex.rxjava2:rxjava:2.1.2'
implementation 'io.reactivex.rxjava2:rxandroid:2.0.1'

背压什么的先忽略吧,上流发送速度远快于下流处理速度的情景一般项目遇不到。

概念

Observable ( 被观察者 ) / Observer ( 观察者 )
说起被观察者 观察者,因为这两个词好接近,反正我一开始没记住哪个是哪个,这点还是需要下功夫背一背。Observable(被观察者)是属于上流发送指令的,而Observer ( 观察者 ) 是负责接收指令的。
subscribeOn() 指定的就是发射事件的线程,observerOn 指定的就是订阅者接收事件的线程。

使用

最简单的操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
Observable observable=Observable.create(new ObservableOnSubscribe<String>() {
//ObservableEmitter<String> String可以改成你喜欢的对象 指定发送的事件类型
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("haha");//会在 observer的onNext收到
e.onError(new Throwable("error"));//会在observer的onError收到 这个一旦执行后,剩下发送的事件下流无法接收
e.onComplete();////会在observer的onComplete收到 这个一旦执行后,剩下发送的事件下流无法接收
}
});
Observer observer=new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
d.dispose();//执行后不再接收上游事件
}
@Override
public void onNext(String s) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
};
observable.subscribe(observer);

rxjava2支持链式操作所以还能这样

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("haha");//会在 observer的onNext收到
e.onError(new Throwable("error"));//会在observer的onError收到 这个一旦执行后,剩下要发送的事件不再发送
e.onComplete();////会在observer的onComplete收到 这个一旦执行后,剩下要发送的事件不再发送 不写情况下onNext走完也会自动调用onComplete
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});

Creat

很简单 就是生成一个 Obserable对象

Just

为了缩减下面的实例代码先讲这个,相当于一个简单的发射器调用onNext()的简写
最多10个

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Observable.just(1,2,3).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});

Observer的简化写法

也是为了缩减下面的实例代码先讲这个
有时候不需要所有的就能这样写
例如我只要onNext的 相当省事有木有

1
2
3
4
5
6
Observable.just(1).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
//onNext
}
});

其它的可参考

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Observable.just(1).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
//onNext
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
//onError
}
}, new Action() {
@Override
public void run() throws Exception {
//onComplete
}
}, new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
//onSubscribe
}
});

Map 数据转换

map就是数据转换咯,先把上流的数据转换合适的对象再传给下流。

1
2
3
4
5
6
7
8
9
10
11
Observable.just(1).map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return "这里转为字符串"+integer;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
//下流接收的就是字符串
}
});

Zip 配对合并

就是每次两个上流各取出一个再转换为下流,两个上流一定有配对关系,上流a取一个,上流b就取一个,两个上流发送的数量要相同,多出来的不会发送到下流。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//BiFunctionz 最后一个参数是你要发送给下流的类型
Observable.zip(Observable.just(1, 2, 3), Observable.just("1", "2"), new BiFunction<Integer, String, String>() {
@Override
public String apply(Integer integer, String s) throws Exception {
return integer+"---"+s;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
});
//运行结果为
//1---4
// 2---5

Concat & Merge连接合并

把两个上流连起来发送 concat是保证上流a发完才到上流b 而merge不保证

1
2
3
4
5
6
Observable.concat(Observable.just(1, 2, 3), Observable.just(4, 5)).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println(integer);
}
});

运行结果

1
2
3
4
5
1
2
3
4
5

ConcatMap&FlatMap 发送的每个数据转为单独的Observable

ConcatMap&FlatMap把上流发送的每个数据独立再包装为一个新的上流,
假如上流数据为 s1 s2 s3 ,包装后的新上流为k1 k2 k3,ConcatMap保证执行顺序为 s1 ->s1的k1 k2 k3 ->s2->s2的k1 k2 k3 而FlatMap不保证。

1
2
3
4
5
6
7
8
9
10
11
Observable.just(1,2,3).concatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
return Observable.just(integer+"--");//concatMap保证这里的Observable是按顺序的,假如这里Observable有多个onNext肯定1的onNext发完才到2
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
}
});

Filter 过滤器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Observable.just(1,8,2,40,3).filter(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer>3;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println(integer);
}
});
//输出结果
//8
//40

Distinct 去掉重复项

1
2
3
4
5
6
7
8
9
10
11
Observable.just(1,2,2,3,4,3).distinct().subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println(integer);
}
});
//输出结果
//1
//2
//3
//4

Buffer 下流一次取多值

1
2
3
4
5
6
7
8
9
10
11
12
//第一个参数是一次取多少值,第二个参数下一次是从上一次开头偏移多少个开始
Observable.just(1,2,3,4,5,6,7,8,9).buffer(2,3).subscribe(new Consumer<List<Integer>>() {
@Override
public void accept(List<Integer> integers) throws Exception {
System.out.println(integers);
}
});
//输出结果
//[1, 2]
//[4, 5]
//[7, 8]
// 2是每次去的值个个数,3 就是开头的 1 4 7的间隔。

FromIterable发射列表

1
2
3
4
5
6
7
8
9
10
11
12
List<Integer> list=new ArrayList<>();
list.add(1);
list.add(2);
Observable.fromIterable(list).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println(integer);
}
});
//输出结果
//1
//2

Timer 延迟执行

此处是延迟2秒执行,注意timer默认在新线程执行

1
2
3
4
5
6
7
Observable.timer(2, TimeUnit.SECONDS).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
}
});

Interval 定时循环执行

1
2
3
4
5
6
7
8
9
10
11
12
13
//第一次延迟了 3 秒后接收到,后面每次间隔了 2 秒
Disposable mDisposable = Observable.interval(3,2, TimeUnit.SECONDS).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.e("interval", "accept: "+aLong );
}
});
//取消
if (mDisposable != null && !mDisposable.isDisposed()) {
mDisposable.dispose();
}

Skip 跳过某次数执行

1
2
3
4
5
6
7
8
9
//跳过前两次内容发送
Observable.just(1,2,3,4,5)
.skip(2)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
}
});

Take 只执行前几次

//只发送前两次

1
2
3
4
5
6
7
8
Flowable.fromArray(1,2,3,4,5)
.take(2)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
}
});

Debounce 过滤频率过快

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("a");
Thread.sleep(400);
e.onNext("b");
Thread.sleep(605);
e.onNext("c");
}
}).debounce(500, TimeUnit.MILLISECONDS)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e("rxjava", "accept: "+s);
}
});
//输出 b c

Defer 为每个观察者创建一个新的Observable

直到有观察者订阅时才创建Observable,并且为每个观察者创建一个新的Observable,Defer 操作符会一直等待直到有观察者订阅它,然后它使用Observable工厂方法生成一个 Observable。它对每个观察者都这样做,因此尽管每个订阅者都以为自己订阅的是同一个 Observable,事实上每个订阅者获取的是它们自己的单独的数据序列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
//一般订阅
Observable<Integer> observable = Observable.just(1, 2, 3);
observable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.print(integer);
}
});
try {
Thread.sleep(3000);//模拟延迟订阅
} catch (InterruptedException e) {
e.printStackTrace();
}
observable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.print(integer);
}
});
}
//输出123
// defer
Observable<Integer> observable = Observable.defer(new Callable<ObservableSource<Integer>>() {
@Override
public ObservableSource<Integer> call() throws Exception {
return Observable.just(1, 2, 3);
}
});
observable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.print(integer);
}
});
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
observable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.print(integer);
}
});
//输出123123

Single只发送一个参数

1
2
3
4
5
6
Single.just(4).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
}
});

Last取最后一项

1
2
3
4
5
6
7
8
Observable.just(1, 2, 3,4)
.last(4)//源观测资源为空,则发出默认项
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
System.out.print(integer);
}
});

Scan&Reduce把上一项的结果传给第下次操作

scan:每次操作之后先把数据输出,然后在调用scan的回调函数进行第二次操作
reduce:把所有的操作都操作完成之后在调用一次观察者,把数据一次性输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
Observable.just(1, 2, 3,4).scan(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer integer, Integer integer2) throws Exception {
return integer+integer2;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println(integer);
}
});
System.out.println("===========================");
Observable.just(1, 2, 3,4).reduce(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer integer, Integer integer2) throws Exception {
return integer+integer2;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println(integer);
}
});

输出结果