阿里云-云小站(无限量代金券发放中)
【腾讯云】云服务器、云数据库、COS、CDN、短信等热卖云产品特惠抢购

Kotlin 下 的RxJava 多数据源处理

88次阅读
没有评论

共计 6558 个字符,预计需要花费 17 分钟才能阅读完成。

导读 本文主要讲解 Kotlin 语言处理 RxJava 多数据源的实现。阅读本文最好有 Kotlin 基础,若没有基础,可参考之前文章 Kotlin 初探,使用 Kotlin 优雅的开发 Android 应用,以及 RxJava 基础(本文基于 RxJava2),当然我也会尽可能详细解释让你顺利阅读本文。

源码传送门

写在前面

最近几天回过头,看了之前的总结 RxJava 操作符系列,感觉对 Rxjava 多数据源的处理不是很理解,所以在总结学习一波。大家都知道,最近 Kotlin 语言一直占据热搜榜,褒贬不一,但我想说,不管有什么想法都要抛在脑后,毕竟 Google 爸爸出手,你不情愿也要跟随它的步伐。鉴于此,本篇对 RxJava 多数据源的总结是基于 Kotlin 语言,也让大家明白,使用 Kotlin 开发应用并不是不能使用 Java 库,现在有一部分人担心,Kotlin 第三方库那么少,如果使用 Kotlin 开发那不是给自己找罪受,其实你完全错了,当你说这话的时候,我敢断定你都还没有接触 Kotlin,因为 Koltin 有一个最重要的优势就是和 Java 绝对兼容。

多数据源处理操作符

在 RxJava 中多数据源处理的操作符很多,但是最经典的就要数 merge,contact,zip 了。如果对这三个操作符不是很熟悉的话,可以去查看它的使用,当然如果你懒得去看,我也会简单提一下。merge 操作符可以处理多个 Observable 发送的数据,它是一个异步操作,不保证数据发送的顺序,即有可能出现数据交叉,当一个 Observable 发送了 onError 后,未执行的 Observable 不在继续执行,直接执行 merge 的 onError 方法。

contact 操作符执行时一个同步操作,严格按照 contact 中传入 Observable 先后执行,即前面的先执行后面的后执行,并且最终发送的数据也是有序的,即第一个 Observable 的数据发送完毕再发送第二个,依次类推。

zip 操作符和 contact 和 merge 有了本质的区别,它会将每个 Observable 个数据项分布对应返回一个 Observable 再发送,最终发送的数据量与最小数据长度相同。

使用场景分析

假如现在我们有三种商品,有一个查询商品信息的接口,根据接口可以查询该商品的价格以及出售地点。商品实体类

data class Goods(var id:Int,var price: Int, var address: String)

在 Kotlin 语言中,实体类创建用 data class 关键词,我们不需要和 Java 一样创建 get/set 方法,只需一行代码搞定。

创建模拟网络请求

object NetRequest {
    // 模拟网络请求
    fun getGoodsObservable(id: Int): Observable<Goods> {fun getGoodsObservable(id: Int): Observable<Goods> {
        return Observable.create {
            source ->
            Thread.sleep(Random().nextInt(1000).toLong())
            var data = Goods(id, Random().nextInt(20), "地址 ${id}")
            source.onNext(data)
            source.onComplete()
            Log.e("getGoodsObservable:", "${id}")
        }
    }
}

在上面我们创建了一个单例类,在 Kotlin 中使用 object 修饰类时即给我们自动创建了一个单例对象。在每一句代码结尾我们不需要再和 Java 一样写一个分号“;”来结束,什么也不用写。

Observable.create 使用的是 lambda 表达式,在 Kotlin 语言中是支持 lambda 表达式的。source 就是 ObservableEmitter<Goods>,所以我们可以调用 onNext 发送数据。为了更准确的模拟网络请求,使用 Thread.sleep 随机的延迟,模拟网络请求的时间。

fun getGoodsObservable(id: Int): Observable<Goods> {
        return Observable.create {
            source ->
            Thread.sleep(Random().nextInt(1000).toLong())
            var data = Goods(id, Random().nextInt(20), "地址 ${id}")
            source.onNext(data)
            source.onComplete()
            Log.e("getGoodsObservable:", "${id}")
        }

当然由于 subscribe 只有一个参数,所以我们也可以这样写。也就是省略了 source ->,此时 it 就表示该参数数据。

return Observable.create {Thread.sleep(Random().nextInt(1000).toLong())
            var data = Goods(id, Random().nextInt(20), "地址 ${id}")
            it.onNext(data)
            it.onComplete()
            Log.e("getGoodsObservable:", "${id}")
        }

在 java 中实现如下

return Observable.create(new ObservableOnSubscribe<Goods>() {

            @Override
            public void subscribe(@NonNull ObservableEmitter<Goods> e) throws Exception {// 处理逻辑}
        });
merge

准备好了请求操作,开始使用 merge 看看执行的效果。

fun executeMerge() {Observable.merge(getGoodsObservable(1).subscribeOn(Schedulers.newThread()),
                getGoodsObservable(2).subscribeOn(Schedulers.newThread()),
                getGoodsObservable(3).subscribeOn(Schedulers.newThread()))
                .subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread())
                .toList()
                .subscribe({Log.e(TAG, it.toString())
                }, {Log.e(TAG, it.toString())
                })
    }

merge 中有三个网络请求操作,并通过 subscribeOn(Schedulers.newThread()) 将网络请求切换到线程中执行,数据都请求成功后,再通过 observeOn(AndroidSchedulers.mainThread()) 切换到主线程请求数据。为了三请求都成功后,我们在更新 UI,所以通过 toList() 将请求的数据转换成 List 一块发送。在上面的 subscribe 依然使用的 lambda 表达式,subscribe({},{})中第一个括号是 onSuccess 回调,里面的 it 是接收到的 List< Goods > 数据,第二个括号是 onError 回调,it 表示异常 Throwable 对象。

subscribe 部分 Java 代码

.subscribe(new Consumer<List<Goods>>() {
                    @Override
                    public void accept(@NonNull List<Goods> goodses) throws Exception {}}, new Consumer<Throwable>() {
                    @Override
                    public void accept(@NonNull Throwable throwable) throws Exception {}});

当然如果你想使用 RxJava2 中 onSubscribe(@NonNull Disposable d),你可以这样使用 subscribe

.subscribe(object : SingleObserver<List<Goods>> {override fun onSubscribe(d: Disposable?) { }
                    override fun onError(e: Throwable?) { }
                    override fun onSuccess(t: List<Goods>?) {}})

为了观察,我们将请求成功的数据显示在界面上,我们创建一个 Button,TextView。

class MainActivity : AppCompatActivity(), View.OnClickListener {

    val TAG = "MainActivity"
    override fun onCreate(savedInstanceState: Bundle?) {super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)
        setSupportActionBar(toolbar)
        // 加入这句 import kotlinx.android.synthetic.main.activity_main.*
        // 不用再 findViewById, 可直接使用
        merge.setOnClickListener(this)

    }
    override fun onClick(v: View) {when (v.id) {
            R.id.merge -> {executeMerge()
            }
        }
        //when 关键字和 Java 中的 Switch 关键词是类似的,// 只不过它比 Java 中的 Switch 强大的多,可以接收任何参数,// 然后判断使用, 也可以如下使用
        when (v) {merge -> {}
        }
    }
}
contact

我们点击执行几次发现,返回的 List 的数据并不是按照 merge 参数的先后顺序执行的,它是并发的,最终的顺序,是由网络请求的快慢决定的,请求返回数据越快也就表示该数据最早发送,即在 List 中最靠前。那么此时出现一个问题,如果我想返回数据的 List 顺序严格按照位置的先后顺序呢?那此时使用 merge 的话,是不太现实了。当然前面我们提到 contact 可以使用。那么直接将 merge 更改为 contact 执行以下试试,

fun executeContact() {Observable.concat(getGoodsObservable(1).subscribeOn(Schedulers.newThread()),
                getGoodsObservable(2).subscribeOn(Schedulers.newThread()),
                getGoodsObservable(3).subscribeOn(Schedulers.newThread()))
                .subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread())
                .toList()
                .subscribe({Log.e(TAG, it.toString())
                }, {Log.e(TAG, it.toString())
                })
    }

的确,发现无论执行多少次 List 的数据都能按照 contact 中 Observable 顺序发送,我们想要的效果可以实现了,不过你会发现,效率太差了,这是同步执行啊,只有第一个请求成功,才会去请求第二个,然后第三个,假如一次请求需要一秒,那三次请求至少三秒啊,不能忍。

zip

鉴于上面两种方式的利弊,如果我们既想如 merge 一样并发执行,又想和 contact 一样保证顺序,是不是有点强迫症的意思,当然强大的 zip 就能实现我们想要的效果。如下实现。

fun executeZip() {Observable.zip(getGoodsObservable(1),
                getGoodsObservable(2),
                getGoodsObservable(3),
                Function3<Goods, Goods, Goods, List<Goods>>
                { goods0, goods1, goods2 ->
                    val list = ArrayList<Goods>()
                    list.add(goods0)
                    list.add(goods1)
                    list.add(goods2)
                    list
                }).subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe({Log.e(TAG, it.toString())
                }, {Log.e(TAG, it.toString())
                })
    }

既然实现了,那我们运行几次,发现完美的实现了我们想要的效果,即并发的执行了,也保证了我们请求数据的顺序性。

在回调中运用 RxJava

在上面我们的单个网络请求是一个同步的请求,如果我们的网络请求封装了,在线程中请求,请求成功后在主线程中回调,那我们又该如何创建呢使用呢?

先来模拟一个子线程请求网络,请求成功回调数据给主线程。

fun getGoods(ctx:Context,id: Int,callbacks:(goods:Goods)->Unit): Unit {
        ctx.doAsync {Thread.sleep(Random().nextInt(1000).toLong())
            var data = Goods(id, Random().nextInt(20), "地址 ${id}")
            ctx.runOnUiThread {callbacks(data)
            }
        }
    }

getGoods 传了三个参数,第一个 Context 对象,第二个是商品 ID, 第三个参数是一个函数,(goods:Goods)->Unit 表示第三个参数的类型是一个参数为 Goods 类型并且返回 Unit 的函数。使用 doAsync 模拟异步请求,请求成功后 runOnUiThread 切换到 UI 线程。然后 callbacks(data)将数据回调。这种使用方式比 Java 中回调优美好用太多了。

接下来就开始在回调成功后创建 Observable

fun getGoodsCallBack(id: Int): Observable<Goods> {
        var subscrbe: ObservableEmitter<Goods>? = null
        var o = Observable.create<Goods> {subscrbe = it}
        //Kotlin 特性
        getGoods(this@MainActivity, id) {subscrbe?.onNext(it)
        }
        return o
    }
    fun executeZipCallBack() {Observable.zip(getGoodsCallBack(1).subscribeOn(Schedulers.newThread()),
                getGoodsCallBack(2).subscribeOn(Schedulers.newThread()),
                getGoodsCallBack(3).subscribeOn(Schedulers.newThread()),
                Function3<Goods, Goods, Goods, List<Goods>>
                { goods0, goods1, goods2 ->
                    val list = ArrayList<Goods>()
                    list.add(goods0)
                    list.add(goods1)
                    list.add(goods2)
                    list
                }).subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe({Log.e(TAG, it.toString())
                }, {Log.e(TAG, it.toString())
                })
    }

ok, 到这里回调情况下创建使用 RxJava 也介绍完毕,到此本篇文章就结束了,有问题欢迎指出,内容杂乱,多多担待,Hava a wonderful day.

阿里云 2 核 2G 服务器 3M 带宽 61 元 1 年,有高配

腾讯云新客低至 82 元 / 年,老客户 99 元 / 年

代金券:在阿里云专用满减优惠券

正文完
星哥玩云-微信公众号
post-qrcode
 0
星锅
版权声明:本站原创文章,由 星锅 于2024-07-24发表,共计6558字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
【腾讯云】推广者专属福利,新客户无门槛领取总价值高达2860元代金券,每种代金券限量500张,先到先得。
阿里云-最新活动爆款每日限量供应
评论(没有评论)
验证码
【腾讯云】云服务器、云数据库、COS、CDN、短信等云产品特惠热卖中