共计 6558 个字符,预计需要花费 17 分钟才能阅读完成。
导读 | 本文主要讲解 Kotlin 语言处理 RxJava 多数据源的实现。阅读本文最好有 Kotlin 基础,若没有基础,可参考之前文章 Kotlin 初探,使用 Kotlin 优雅的开发 Android 应用,以及 RxJava 基础(本文基于 RxJava2),当然我也会尽可能详细解释让你顺利阅读本文。 |
源码传送门
最近几天回过头,看了之前的总结 RxJava 操作符系列,感觉对 Rxjava 多数据源的处理不是很理解,所以在总结学习一波。大家都知道,最近 Kotlin 语言一直占据热搜榜,褒贬不一,但我想说,不管有什么想法都要抛在脑后,毕竟 Google 爸爸出手,你不情愿也要跟随它的步伐。鉴于此,本篇对 RxJava 多数据源的总结是基于 Kotlin 语言,也让大家明白,使用 Kotlin 开发应用并不是不能使用 Java 库,现在有一部分人担心,Kotlin 第三方库那么少,如果使用 Kotlin 开发那不是给自己找罪受,其实你完全错了,当你说这话的时候,我敢断定你都还没有接触 Kotlin,因为 Koltin 有一个最重要的优势就是和 Java 绝对兼容。
在 RxJava 中多数据源处理的操作符很多,但是最经典的就要数 merge,contact,zip 了。如果对这三个操作符不是很熟悉的话,可以去查看它的使用,当然如果你懒得去看,我也会简单提一下。merge 操作符可以处理多个 Observable 发送的数据,它是一个异步操作,不保证数据发送的顺序,即有可能出现数据交叉,当一个 Observable 发送了 onError 后,未执行的 Observable 不在继续执行,直接执行 merge 的 onError 方法。
contact 操作符执行时一个同步操作,严格按照 contact 中传入 Observable 先后执行,即前面的先执行后面的后执行,并且最终发送的数据也是有序的,即第一个 Observable 的数据发送完毕再发送第二个,依次类推。
zip 操作符和 contact 和 merge 有了本质的区别,它会将每个 Observable 个数据项分布对应返回一个 Observable 再发送,最终发送的数据量与最小数据长度相同。
假如现在我们有三种商品,有一个查询商品信息的接口,根据接口可以查询该商品的价格以及出售地点。商品实体类
data class Goods(var id:Int,var price: Int, var address: String)
在 Kotlin 语言中,实体类创建用 data class 关键词,我们不需要和 Java 一样创建 get/set 方法,只需一行代码搞定。
创建模拟网络请求
object NetRequest {
// 模拟网络请求
fun getGoodsObservable(id: Int): Observable<Goods> {fun getGoodsObservable(id: Int): Observable<Goods> {
return Observable.create {
source ->
Thread.sleep(Random().nextInt(1000).toLong())
var data = Goods(id, Random().nextInt(20), "地址 ${id}")
source.onNext(data)
source.onComplete()
Log.e("getGoodsObservable:", "${id}")
}
}
}
在上面我们创建了一个单例类,在 Kotlin 中使用 object 修饰类时即给我们自动创建了一个单例对象。在每一句代码结尾我们不需要再和 Java 一样写一个分号“;”来结束,什么也不用写。
Observable.create 使用的是 lambda 表达式,在 Kotlin 语言中是支持 lambda 表达式的。source 就是 ObservableEmitter<Goods>,所以我们可以调用 onNext 发送数据。为了更准确的模拟网络请求,使用 Thread.sleep 随机的延迟,模拟网络请求的时间。
fun getGoodsObservable(id: Int): Observable<Goods> {
return Observable.create {
source ->
Thread.sleep(Random().nextInt(1000).toLong())
var data = Goods(id, Random().nextInt(20), "地址 ${id}")
source.onNext(data)
source.onComplete()
Log.e("getGoodsObservable:", "${id}")
}
当然由于 subscribe 只有一个参数,所以我们也可以这样写。也就是省略了 source ->,此时 it 就表示该参数数据。
return Observable.create {Thread.sleep(Random().nextInt(1000).toLong())
var data = Goods(id, Random().nextInt(20), "地址 ${id}")
it.onNext(data)
it.onComplete()
Log.e("getGoodsObservable:", "${id}")
}
在 java 中实现如下
return Observable.create(new ObservableOnSubscribe<Goods>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Goods> e) throws Exception {// 处理逻辑}
});
准备好了请求操作,开始使用 merge 看看执行的效果。
fun executeMerge() {Observable.merge(getGoodsObservable(1).subscribeOn(Schedulers.newThread()),
getGoodsObservable(2).subscribeOn(Schedulers.newThread()),
getGoodsObservable(3).subscribeOn(Schedulers.newThread()))
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.toList()
.subscribe({Log.e(TAG, it.toString())
}, {Log.e(TAG, it.toString())
})
}
merge 中有三个网络请求操作,并通过 subscribeOn(Schedulers.newThread()) 将网络请求切换到线程中执行,数据都请求成功后,再通过 observeOn(AndroidSchedulers.mainThread()) 切换到主线程请求数据。为了三请求都成功后,我们在更新 UI,所以通过 toList() 将请求的数据转换成 List 一块发送。在上面的 subscribe 依然使用的 lambda 表达式,subscribe({},{})中第一个括号是 onSuccess 回调,里面的 it 是接收到的 List< Goods > 数据,第二个括号是 onError 回调,it 表示异常 Throwable 对象。
subscribe 部分 Java 代码
.subscribe(new Consumer<List<Goods>>() {
@Override
public void accept(@NonNull List<Goods> goodses) throws Exception {}}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {}});
当然如果你想使用 RxJava2 中 onSubscribe(@NonNull Disposable d),你可以这样使用 subscribe
.subscribe(object : SingleObserver<List<Goods>> {override fun onSubscribe(d: Disposable?) { }
override fun onError(e: Throwable?) { }
override fun onSuccess(t: List<Goods>?) {}})
为了观察,我们将请求成功的数据显示在界面上,我们创建一个 Button,TextView。
class MainActivity : AppCompatActivity(), View.OnClickListener {
val TAG = "MainActivity"
override fun onCreate(savedInstanceState: Bundle?) {super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
setSupportActionBar(toolbar)
// 加入这句 import kotlinx.android.synthetic.main.activity_main.*
// 不用再 findViewById, 可直接使用
merge.setOnClickListener(this)
}
override fun onClick(v: View) {when (v.id) {
R.id.merge -> {executeMerge()
}
}
//when 关键字和 Java 中的 Switch 关键词是类似的,// 只不过它比 Java 中的 Switch 强大的多,可以接收任何参数,// 然后判断使用, 也可以如下使用
when (v) {merge -> {}
}
}
}
我们点击执行几次发现,返回的 List 的数据并不是按照 merge 参数的先后顺序执行的,它是并发的,最终的顺序,是由网络请求的快慢决定的,请求返回数据越快也就表示该数据最早发送,即在 List 中最靠前。那么此时出现一个问题,如果我想返回数据的 List 顺序严格按照位置的先后顺序呢?那此时使用 merge 的话,是不太现实了。当然前面我们提到 contact 可以使用。那么直接将 merge 更改为 contact 执行以下试试,
fun executeContact() {Observable.concat(getGoodsObservable(1).subscribeOn(Schedulers.newThread()),
getGoodsObservable(2).subscribeOn(Schedulers.newThread()),
getGoodsObservable(3).subscribeOn(Schedulers.newThread()))
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.toList()
.subscribe({Log.e(TAG, it.toString())
}, {Log.e(TAG, it.toString())
})
}
的确,发现无论执行多少次 List 的数据都能按照 contact 中 Observable 顺序发送,我们想要的效果可以实现了,不过你会发现,效率太差了,这是同步执行啊,只有第一个请求成功,才会去请求第二个,然后第三个,假如一次请求需要一秒,那三次请求至少三秒啊,不能忍。
鉴于上面两种方式的利弊,如果我们既想如 merge 一样并发执行,又想和 contact 一样保证顺序,是不是有点强迫症的意思,当然强大的 zip 就能实现我们想要的效果。如下实现。
fun executeZip() {Observable.zip(getGoodsObservable(1),
getGoodsObservable(2),
getGoodsObservable(3),
Function3<Goods, Goods, Goods, List<Goods>>
{ goods0, goods1, goods2 ->
val list = ArrayList<Goods>()
list.add(goods0)
list.add(goods1)
list.add(goods2)
list
}).subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe({Log.e(TAG, it.toString())
}, {Log.e(TAG, it.toString())
})
}
既然实现了,那我们运行几次,发现完美的实现了我们想要的效果,即并发的执行了,也保证了我们请求数据的顺序性。
在上面我们的单个网络请求是一个同步的请求,如果我们的网络请求封装了,在线程中请求,请求成功后在主线程中回调,那我们又该如何创建呢使用呢?
先来模拟一个子线程请求网络,请求成功回调数据给主线程。
fun getGoods(ctx:Context,id: Int,callbacks:(goods:Goods)->Unit): Unit {
ctx.doAsync {Thread.sleep(Random().nextInt(1000).toLong())
var data = Goods(id, Random().nextInt(20), "地址 ${id}")
ctx.runOnUiThread {callbacks(data)
}
}
}
getGoods 传了三个参数,第一个 Context 对象,第二个是商品 ID, 第三个参数是一个函数,(goods:Goods)->Unit 表示第三个参数的类型是一个参数为 Goods 类型并且返回 Unit 的函数。使用 doAsync 模拟异步请求,请求成功后 runOnUiThread 切换到 UI 线程。然后 callbacks(data)将数据回调。这种使用方式比 Java 中回调优美好用太多了。
接下来就开始在回调成功后创建 Observable
fun getGoodsCallBack(id: Int): Observable<Goods> {
var subscrbe: ObservableEmitter<Goods>? = null
var o = Observable.create<Goods> {subscrbe = it}
//Kotlin 特性
getGoods(this@MainActivity, id) {subscrbe?.onNext(it)
}
return o
}
fun executeZipCallBack() {Observable.zip(getGoodsCallBack(1).subscribeOn(Schedulers.newThread()),
getGoodsCallBack(2).subscribeOn(Schedulers.newThread()),
getGoodsCallBack(3).subscribeOn(Schedulers.newThread()),
Function3<Goods, Goods, Goods, List<Goods>>
{ goods0, goods1, goods2 ->
val list = ArrayList<Goods>()
list.add(goods0)
list.add(goods1)
list.add(goods2)
list
}).subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe({Log.e(TAG, it.toString())
}, {Log.e(TAG, it.toString())
})
}
ok, 到这里回调情况下创建使用 RxJava 也介绍完毕,到此本篇文章就结束了,有问题欢迎指出,内容杂乱,多多担待,Hava a wonderful day.