本文的主角RxJava算是Android几个框架里面较为复杂的一个,所以这篇博客花的时间、精力都会比较多。希望读者能通过此文更好的认识和使用RxJava。
相关框架源码地址:
RxJava源码地址https://github.com/ReactiveX/RxJava
RxAndroid源码地址:https://github.com/ReactiveX/RxAndroid
概念
RxJava是什么?相信很多读者一听到这个东西的时候都可能跟我开始的时候一样纳闷,只知道它是个响应式编程的框架,那么具体它到底是什么呢?
有一定Android开发经验的人肯定都知道AsyncTask、Handler等等。这些东西都是为了实现软件的异步操作,而RxJava跟它们的实质作用是差不多的,但是具体来讲,RxJava更加简洁方便!
RxJava作为一个工具库,使用的就是通用形式的观察者模式。这里关于观察者模式并不细讲,但在理解RxJava中对观察者模式的理解很重要,所以有需要的读者可以去查阅一下相关的资料。
在观察者模式中,RxJava的核心组成部分是Observables
和Subscribers
(Observer
是RxJava最小的构建块,实践中使用最多的是Subscriber
)。两者通过subscribe()
方法实现订阅关系。Observable
发送消息,而Subscriber
则用于消费消息。
配置
添加依赖库:
compile 'io.reactivex:rxjava:1.2.0'
compile 'io.reactivex:rxandroid:1.2.1'
使用
步骤一:创建Subscriber
Subscriber
对Observer
接口进行了一些扩展,基本使用方式是完全一样的:
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onNext(String s) {
Log.d(tag, "Item: " + s);
}
@Override
public void onCompleted() {
Log.d(tag, "Completed!");
}
@Override
public void onError(Throwable e) {
Log.d(tag, "Error!");
}
};
如果你只想使用基本功能,选择Observer
和Subscriber
是完全一样的。它们的区别对于使用者来说主要有两点:
onStart()
:这是Subscriber
增加的方法。它会在事件发送前被调用,用于做准备工作。需要注意的是,如果对准备工作的线程有要求(例如弹出对话框,这必须在主线程执行),onStart()
就不适用了,因为它总是在subscribe
所发生的线程被调用,而不能指定线程。要在指定的线程来做准备工作,可以使用doOnSubscribe()
方法。unsubscribe()
:这是Subscriber
所实现的另一个接口Subscription
的方法,用于取消订阅。一般在这个方法调用前,可以使用isUnsubscribed()
先判断一下状态。unsubscribe()
很重要,因为在subscribe()
之后,Observable
会持有Subscriber
的引用,将有内存泄露的风险。所以最好保持一个原则:要在不再使用的时候尽快在合适的地方(例如onPause()
、onStop()
等方法中)调用unsubscribe()
来解除引用关系。
步骤二:创建 Observable
Observable
即被观察者,它决定什么时候触发事件以及触发怎样的事件。RxJava使用create()
方法来创建一个 Observable
,并为它定义事件触发规则:
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Hello");
subscriber.onNext("Hi");
subscriber.onNext("Aloha");
subscriber.onCompleted();
}
});
create()
方法是RxJava最基本的创造事件序列的方法。基于这个方法,RxJava还提供了一些方法用来快捷创建事件队列,例如:
just(T...)
: 将传入的参数依次发送出来。
Observable observable = Observable.just("Hello", "Hi", "Aloha");
// 将会依次调用:
// onNext("Hello");
// onNext("Hi");
// onNext("Aloha");
// onCompleted();
from(T[])
/from(Iterable)
: 将传入的数组或Iterable
拆分成具体对象后,依次发送出来。
String[] words = {"Hello", "Hi", "Aloha"};
Observable observable = Observable.from(words);
// 将会依次调用:
// onNext("Hello");
// onNext("Hi");
// onNext("Aloha");
// onCompleted();
上面 just(T...)
的例子和 from(T[])
的例子,都和之前的 create(OnSubscribe)
的例子是等价的。
步骤三:Subscribe (订阅)
创建了 Observable
和 Observer
之后,再用 subscribe()
方法将它们联结起来,整条链子就可以工作了。代码形式很简单:
observable.subscribe(subscriber);
Observable.subscribe(Subscriber)
的内部实现是这样的(仅核心代码):
// 注意:这不是 subscribe() 的源码,而是将源码中与性能、兼容性、扩展性有关的代码剔除后的核心代码。
// 如果需要看源码,可以去 RxJava 的 GitHub 仓库下载。
public Subscription subscribe(Subscriber subscriber) {
subscriber.onStart();
onSubscribe.call(subscriber);
return subscriber;
}
可以看到,subscriber()
做了3件事:
- 调用
Subscriber.onStart()
。这个方法在前面已经介绍过,是一个可选的准备方法。 - 调用
Observable
中的OnSubscribe.call(Subscriber)
。在这里,事件发送的逻辑开始运行。从这也可以看出,在 RxJava 中,Observable
并不是在创建的时候就立即开始发送事件,而是在它被订阅的时候,即当subscribe()
方法执行的时候。 - 将传入的
Subscriber
作为Subscription
返回。这是为了方便unsubscribe()
.
除了 subscribe(Observer)
和 subscribe(Subscriber)
,subscribe()
还支持不完整定义的回调,RxJava 会自动根据定义创建出Subscriber
。形式如下:
Action1<String> onNextAction = new Action1<String>() {
// onNext()
@Override
public void call(String s) {
Log.d(tag, s);
}
};
Action1<Throwable> onErrorAction = new Action1<Throwable>() {
// onError()
@Override
public void call(Throwable throwable) {
// Error handling
}
};
Action0 onCompletedAction = new Action0() {
// onCompleted()
@Override
public void call() {
Log.d(tag, "completed");
}
};
// 自动创建 Subscriber ,并使用 onNextAction 来定义 onNext()
observable.subscribe(onNextAction);
// 自动创建 Subscriber ,并使用 onNextAction 和 onErrorAction 来定义 onNext() 和 onError()
observable.subscribe(onNextAction, onErrorAction);
// 自动创建 Subscriber ,并使用 onNextAction、 onErrorAction 和 onCompletedAction 来定义 onNext()、 onError() 和 onCompleted()
observable.subscribe(onNextAction, onErrorAction, onCompletedAction);
步骤四:操作符
如果你只是想了解基础的部分,那以上的内容可以帮你解决了。不过如果真正要用到开发中,我们还需要了解很多东西,比如RxJava操作符。
操作符就是为了解决对Observable对象的变换的问题,操作符用于在Observable和最终的Subscriber之间修改Observable发出的事件。RxJava提供了很多很有用的操作符。比如map操作符,就是用来把把一个事件转换为另一个事件的。在这里给大家推荐一个学习操作符比较好的地方Operaters。当然,如果你直接点进去这个网站,你可能会一头雾水。那么,就让我们用一个例子来讲解一下。
// 得到多个Student对象中的name,保存到nameList中
Observable.just(student1, student2, student2)
//使用map进行转换,参数1:转换前的类型,参数2:转换后的类型
.map(new Func1<Student, String>() {
@Override
public String call(Student i) {
String name = i.getName();//获取Student对象中的name
return name;//返回name
}
})
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
nameList.add(s);
}
});
此外,flatMap也是经常用到的一个操作符,它的使用和理解相对复杂一些:
List<Student> students = new ArrayList<Student>();
students.add...
...
Observable.from(students)
.flatMap(new Func1<Student, Observable<Course>>() {
@Override
public Observable<Course> call(Student student) {
return Observable.from(student.getCoursesList());
}
})
.subscribe(new Action1<Course>() {
@Override
public void call(Course course) {
Log.i(TAG, course.getName());
}
});
常见的操作符还有
- filter:集合进行过滤
- each:遍历集合
- take:取出集合中的前几个
- skip:跳过前几个元素
- unique:相当于按照数学上的集合处理,去重
步骤五:线程操作
事件的发起和消费默认都是在同一个线程中执行,也就是说之前我们使用的RxJava是同步的。使用RxJava,你可以使用subscribeOn()
指定观察者代码运行的线程,使用observerOn()
指定订阅者运行的线程。代码如下:
myObservableServices.retrieveImage(url)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(bitmap -> myImageView.setImageBitmap(bitmap));
调度器Schedulers实际上有很多种,具体如下:
- Schedulers.computation( ):用于计算任务,如事件循环或和回调处理,不要用于IO操作(IO操作请使用Schedulers.io());默认线程数等于处理器的数量
- Schedulers.from(executor):使用指定的Executor作为调度器
- Schedulers.immediate( ):在当前线程立即开始执行任务
- Schedulers.io( ):用于IO密集型任务,如异步阻塞IO操作,这个调度器的线程池会根据需要增长;对于普通的计算任务,请使用Schedulers.computation();Schedulers.io( )默认是一个CachedThreadScheduler,很像一个有线程缓存的新线程调度器
- Schedulers.newThread( ):为每个任务创建一个新线程
- Schedulers.trampoline( ):当其它排队的任务完成后,在当前线程排队开始执行
步骤六:Subscriptions
当调用Observable.subscribe(),会返回一个Subscription对象。这个对象代表了被观察者和订阅者之间的联系。
ubscription subscription = Observable.just("Hello, World!")
.subscribe(s -> System.out.println(s));1212
你可以在后面使用这个Subscription对象来操作被观察者和订阅者之间的联系.
subscription.unsubscribe();
System.out.println("Unsubscribed=" + subscription.isUnsubscribed());
// Outputs "Unsubscribed=true"
步骤七:Android生命周期
如何处理Activity的生命周期?主要就是两个问题:
-
在configuration改变(比如转屏)之后继续之前的Subscription
解决方案是使用RxJava的缓存机制,这样就可以对同个
Observable
对象执行unsubscribe
或resubscribe
,却不用重复运行得到Observable
的代码。cache()
或replay()
会继续执行网络请求(甚至你调用了unsubscribe
也不会停止)。这就是说你可以在Activity重新创建的时候从cache()
的返回值中创建一个新的Observable
对象。Observable<Photo> request = service.getUserPhoto(id).cache(); Subscription sub = request.subscribe(photo -> handleUserPhoto(photo)); // ...When the Activity is being recreated... sub.unsubscribe(); // ...Once the Activity is recreated... request.subscribe(photo -> handleUserPhoto(photo));
注意,两次sub是使用的同一个缓存的请求。当然在哪里去存储请求的结果还是要自己做,必须在生命周期外的某个地方存储(retained fragment或者单例等等)。
-
Observable持有Context导致的内存泄露
解决方案就是在生命周期的某个时刻取消订阅。一个很常见的模式就是使用
CompositeSubscription
来持有所有的Subscriptions
,然后在onDestroy()
或者onDestroyView()
里取消所有的订阅。private CompositeSubscription mCompositeSubscription = new CompositeSubscription(); private void doSomething() { mCompositeSubscription.add( AndroidObservable.bindActivity(this, Observable.just("Hello, World!")) .subscribe(s -> System.out.println(s))); } @Override protected void onDestroy() { super.onDestroy(); mCompositeSubscription.unsubscribe(); }
你可以在Activity/Fragment的基类里创建一个
CompositeSubscription
对象,在子类中使用它。注意:一旦你调用了
CompositeSubscription.unsubscribe()
,这个CompositeSubscription
对象就不可用了, 如果你还想使用CompositeSubscription
,就必须在创建一个新的对象了。
RxJava虽然写成博客后看起来很深奥很复杂,但只要掌握了它,你会在Android很多开发场景中感受到它的方便。