1.有什么使用了rxjava或rxandroid的源码开源项目?
2.舒服了!Kotlin协程优雅的源码与Retrofit缠绵
3.RxJava基本原理分析
4.我的RxJava源码解读笔记
5.深入理解 RxJava2:Scheduler(2)
有什么使用了rxjava或rxandroid的开源项目?
在探索使用了 RxJava 或 RxAndroid 的开源项目时,我们首先可以回顾 GitHub 上的源码官方资源:ReactiveX/RxJava。这个项目作为 RxJava 的源码源头,提供了核心库和文档,源码是源码wingy 源码学习 RxJava 的重要起点。值得一提的源码是,中国在 RxJava 领域有着优秀的源码贡献者,如@hi大头鬼hi,源码他的源码教程以其精准性和实用性,对众多学习者提供了巨大帮助。源码国内的源码开发者常常将翻译或撰写的资料先请大头鬼审校,可见其权威性之高。源码
接下来,源码我们聚焦到 Flipoard 的源码扔物线,他的开源库 MaterialEditText 和对 Dagger 源码的解析,都是深入 Android 开发领域的经典之作。虽然扔物线的教程现在可能不在公开博客中发布,但感兴趣的开发者依然可以通过搜索找到相关信息。
此外,yongjhih 这位台湾开发者同样值得推荐。作为 RxJava 的狂热爱好者,yongjhih 的 GitHub 上积累了丰富的 Examples,为学习者提供了实际操作的参考和灵感。
在寻找使用了 RxJava 或 RxAndroid 的项目时,上述提到的资源和开发者无疑是很好的起点。然而,阅读这些资料仅是学习的开始,更重要的是实践。动手编写 Demo,将 RxJava 与传统 Android 组件(如 Handler、AsyncTask、BroadcastReceiver 等)结合使用,可以显著加深理解。不断练习,相信自己能够掌握,是学习过程中的关键。
在这个领域,持续探索、实践和分享是推动技术进步的重要力量。无论是从官方文档开始,还是追随这些知名开发者的学习路径,最终的目标是将理论知识转化为实际能力,解决实际问题。在这个过程中,ws源码不断尝试、总结和反思,将带来最大的成长。通过实践和交流,我们可以更加深入地理解 RxJava 或 RxAndroid 的应用场景,从而在项目中发挥它们的独特优势。
舒服了!Kotlin协程优雅的与Retrofit缠绵
Kotlin已成为Android开发的Google推荐语言,项目已长时间使用Kotlin。Kotlin 1.3发布后,Kotlin协程已稳定,引发了一些思考。
对于网络请求功能,我们一直在思考如何编写优雅、简洁、快速、安全的代码。这也是许多开发者持续思考的问题。由于我们项目使用Retrofit作为网络库,因此所有思考都基于Retrofit展开。
本文将从我的思考历程开始,涉及Kotlin协程、扩展方法、DSL等。没有基础的小伙伴,请先了解这三样东西。关于DSL,可以参考我写的简介。
网络请求中,我们需要关注页面生命周期的绑定,关闭页面时需关闭未完成的网络请求。前辈们为此各显神通。我也是从学习、模仿前辈,到自我理解的转变。
最初使用Callback异步方法是Retrofit最基本的使用方式。在关闭网络请求时,需要在onDestroy中调用cancel方法。这种方式容易导致忘记调用cancel方法,且网络操作和关闭请求的操作分开,不利于管理。
随着Rx的火爆,我们项目的网络请求方式也逐渐转为Rx。在Rx的kx源码使用中,我们尝试了各种封装方式,如自定义Subscriber,将onNext、onCompleted、onError进行拆分组合,满足不同需求。
在Retrofit中添加Rx转换器RxJava2CallAdapterFactory.create(),将接口的Call改为Observable。使用方式如下,配合RxAndroid绑定声明周期。这种使用方式方便了许多,响应式编程的思想也很优秀,一切皆为事件流。通过RxAndroid切换UI线程和绑定页面生命周期,页面关闭时自动切断向下传递的事件流。
RxJava最大的风险在于内存泄露,而RxAndroid确实规避了一定的泄露风险。通过查看RxJava2CallAdapterFactory的源码,发现确实调用了cancel方法,貌似不错。但总觉得RxJava过于庞大,有些大材小用。
随着项目推进和Google全家桶发布,轻量化版本的RxJava进入我们的视线,那就是LiveData。LiveData借鉴了很多RxJava的设计思想,属于响应式编程范畴。LiveData的最大优势在于响应Activity的生命周期,无需像RxJava那样绑定声明周期。
首先需要添加LiveDataCallAdapterFactory,用于将Retrofit的Callback转换为LiveData。接口改为,调用方法如下。在项目中使用时,通常会自定义Observer,用于区分各种数据。
在调用observe方法时,传递this,指的是声明周期。一般我们在AppCompatActivity中使用时,直接传递其本身即可。通过查看源码可以发现,this本身是传递的LifecycleOwner。在层层跳转AppCompatActivity时,商会源码会发现AppCompatActivity继承于SupportActivity的父类,实现了LifecycleOwner接口。一般只需传递其本身即可。LiveData会自动处理数据流的监听和解除绑定。
在onCreate中对数据进行一次性的绑定,后面就不需要再次绑定了。当生命周期走到onStart和onResume时,LiveData会自动接收事件流;当页面处于不活动时,会暂停接收事件流,页面恢复时恢复数据接收。当页面onDestroy时,会自动删除观察者,从而中断事件流。
可以看出LiveData作为官方套件,使用简单,生命周期的响应也很智能,一般都不需要额外处理了。更高级的用法可以参考官方Demo,可以对数据库缓存等待进行一整套响应式封装,非常不错。建议学习官方的封装思想,就算不用,也是对自己大有裨益。
上面说了那么多,这里步入正题。大家仔细观察会发现,上面均是使用Retrofit的enqueue异步方法,再使用Callback进行网络回调,就算是RxJava和LiveData转换器,内部其实也是使用的Callback。在此之前,Retrofit的作者也写了一个协程的转换器,但内部依然使用的是Callback,本质相同。Retrofit有同步和异步两种调用方式。上面这就是异步调用方式,传入一个Callback,这也是我们最常用到的方式。
上面这种是同步调用方法,会阻塞线程,返回的直接就是网络数据Response,很少使用。后来我就在思考,能不能结合Kotlin的报刊源码协程,抛弃Callback,直接使用Retrofit的同步方法,把异步当同步写,代码顺序书写,逻辑清晰,效率高,同步的写法更方便对象的管理。
首先写一个协程的扩展方法,上面就是核心代码,主要的意思都写了注释。整个工作流程是出于UI协程中,所以可以随意操作UI控件,接着在IO线程中去同步调用网络请求,并等待IO线程执行完毕,接着再拿到结果进行处理,整个流程都是基于同步代码的书写方式,一步一个流程,没有回掉导致的代码割裂感。那么继续,我们想办法把获取的数据返回出去。
这里我们采用DSL方法,首先自定义一个类,此类对外暴露了三个方法:onSuccess、onComplete、onFailed,用于分类返回数据。接着,我们对核心代码进行改造,将方法进行传递。这里使用DSL传递方法,可以更具需要传递的,例如只需要onSuccess,那就只传递这一个方法,不必三个都传递,按需使用。
使用方式如下,首先需要按照Kotlin的官方文档改造下Activity,Activity实现CoroutineScope接口,就能直接根据当前的context获取协程使用。接下来就是真正的使用,在任意位置即可调用此扩展方法。在有的时候,我们只需要处理onSuccess的情况,并不关心其他两个。那么直接写:需要哪个写哪个,代码非常整洁。
可以看出,我们不需要单独再对网络请求进行生命周期的绑定,在页面被销毁的时候,job也就被关闭了,当协程被关闭后,会执行调用Retrofit的cancel方法关闭网络。
协程的开销小于Thread多线程,响应速度很快,非常适合轻量化的工作流程。对于协程的使用,还有更多深入的思考和学习。协程并不是Thread的替代品,而是多异步任务的一个补充,我们不能按照惯性思维去理解协程,而是要多从其本身特性入手,开发出它更安逸的使用方式。而且随着Retrofit 2.6.0的发布,自带了新的协程方案,增加了suspend挂起函数的支持,可见协程的应用会越来越受欢迎。
上面所说的所有网络处理方法,不论是Rx还是LiveData,都是很好的封装方式,技术没有好坏之分。我的协程封装方式也许不是最好的,但是我们不能缺乏思考、探索、实践三要素,去想去做。
RxJava基本原理分析
RxJava作为近年来流行的异步开发框架,因其易用性和强大功能备受青睐,但深入理解其源码逻辑却可能让人感到困惑。尽管网上资源众多,初学者往往还是难以清晰掌握其基本原理。
核心角色包括Observable,它是事件流的描述者,通过链式调用形成,每一步操作都会创建一个新的Observable。ObservableOnSubscribe,即事件源,由用户创建并重写subscribe()方法,操作符如map()会重写call(),在订阅者和事件源之间建立联系。Observer作为订阅者,接收并处理事件流,仅关注onNext()方法。
最简单的例子是,不使用操作符和线程切换,通过create()创建Observable,然后调用subscribe()与Observer建立订阅关系,业务代码在subscribe()调用后执行。
当加入map()操作符时,RxJava会在事件流中创建新的Observable,替换原始的,并在第三层回调中执行类型转换。subscribeOn()切换线程在第二层回调中完成,observeOn()则在第三层执行,影响后续的业务代码。
总结来说,RxJava的执行分为三层:第一层创建Observable,第二层建立订阅关系,第三层执行业务代码。操作符的转换和线程切换分别在第三层和第二层进行。深入源码理解,还需进一步学习框架的复杂设计。
我的RxJava源码解读笔记
RxJava是一个用于处理异步任务的库,其主要功能包括观察者模式、数据发送与接收、切换线程、数据变换等。在学习RxJava源码时,梳理了其工作流程,包括创建Observable、创建观察者(使用Subscriber)、订阅(使用subscribe方法)、变换操作(如map、compose)、线程切换(通过subscribeOn和observeOn方法)等关键步骤。从源码角度深入理解了RxJava的工作原理,如Observable的创建、Subscriber的实现、OnSubscribe的作用、Subscription的生命周期管理、变换操作的具体实现以及线程控制机制。通过分析RxJava的源码,不仅加强了记忆,也为实际应用提供了清晰的指导。RxJava通过观察者模式实现了数据的高效异步处理,支持在线程间灵活切换,通过变换操作符实现了数据的转换,是处理异步编程和事件流的理想工具。
深入理解 RxJava2:Scheduler(2)
欢迎来到深入理解 RxJava2 系列第二篇,本文基于 RxJava 2.2.0 正式版源码,将探讨 Scheduler 与 Worker 的概念及其实现原理。
Scheduler 与 Worker 在 RxJava2 中扮演着至关重要的角色,它们是线程调度的核心与基石。虽然 Scheduler 的作用较为熟悉,但 Worker 的概念了解的人可能较少。为何在已有 Scheduler 的情况下,还要引入 Worker 的概念呢?让我们继续探讨。
首先,Scheduler 的核心定义是调度 Runnable,支持立即、延时和周期性调用。而 Worker 是任务的最小单元的载体。在 RxJava2 内部实现中,通常一个或多个 Worker 对应一个 ScheduledThreadPoolExecutor 对象,这里暂不深入探讨。
在 RxJava 1.x 中,Scheduler 没有 scheduleDirect/schedulePeriodicallyDirect 方法,只能先创建 Worker,再通过 Worker 来调度任务。这些方法是对 Worker 调度的简化,可以理解为创建一个只能调度一次任务的 Worker 并立即调度该任务。在 Scheduler 基类的源码中,默认实现是直接创建 Worker 并创建对应的 Task(虽然在部分 Scheduler 的覆盖实现上并没有创建 Worker,但可以认为存在虚拟的 Worker)。
一个 Scheduler 可以创建多个 Worker,这两者是一对多的关系,而 Worker 与 Task 也是一对多的关系。Worker 的存在旨在确保两件事:统一调度 Runnable 和统一取消任务。例如,在 observeOn 操作符中,可以通过 Worker 来统一调度和取消一系列的 Runnable。
RxJava2 默认内置了多种 Scheduler 实现,适用于不同场景,这些 Scheduler 都可以在 Schedulers 类中直接获得。以下是两个常用 Scheduler 的源码分析:computation 和 io。
NewThreadWorker 在 computation、io 和 newThread 中都有涉及,下面简单了解一下这个类。NewThreadWorker 与 ScheduledThreadPoolExecutor 之间是一对一的关系,在构造函数中通过工厂方法创建一个 corePoolSize 为 1 的 ScheduledThreadPoolExecutor 对象并持有。
ScheduledThreadPoolExecutor 从 JDK1.5 开始存在,这个类继承于 ThreadPoolExecutor,支持立即、延时和周期性任务。但是注意,在 ScheduledThreadPoolExecutor 中,maximumPoolSize 参数是无效的,corePoolSize 表示最大线程数,且它的队列是无界的。这里不再深入探讨该类,否则会涉及太多内容。
有了这个类,RxJava2 在实现 Worker 时就站在了巨人的肩膀上,线程调度可以直接使用该类解决,唯一的麻烦之处就是封装一层 Disposable 的逻辑。
ComputationScheduler 是计算密集型的 Scheduler,其线程数与 CPU 核心数密切相关。当线程数远超过 CPU 核心数目时,CPU 的时间更多地损耗在了线程的上下文切换。因此,保持最大线程数与 CPU 核心数一致是比较通用的方式。
FixedSchedulerPool 可以看作是固定数量的真正 Worker 的缓存池。确定了 MAX_THREADS 后,在 ComputationScheduler 的构造函数中会创建 FixedSchedulerPool 对象,FixedSchedulerPool 内部会直接创建一个长度为 MAX_THREADS 的 PoolWorker 数组。PoolWorker 继承自 NewThreadWorker,但没有任何额外的代码。
PoolWorker 的使用方法是从池子里取一个 PoolWorker 并返回。但是需要注意,每个 Worker 是独立的,每个 Worker 内部的任务是绑定在这个 Worker 中的。如果按照上述方法暴露 PoolWorker,会出现两个问题:
为了解决上述问题,需要在 PoolWorker 外再包一层 EventLoopWorker。EventLoopWorker 是一个代理对象,它会将 Runnable 代理给 FixedSchedulerPool 中取到的 PoolWorker 来调度,并负责管理通过它创建的任务。当自身被取消时,会将创建的任务全部取消。
与 ComputationScheduler 恰恰相反,IoScheduler 的线程数是无上限的。这是因为 IO 设备的速度远低于 CPU 速度,在等待 IO 操作时,CPU 往往是闲置的。因此,应该创建更多的线程让 CPU 尽可能地利用。当然,并不是线程越多越好,线程数目膨胀到一定程度会影响 CPU 的效率,也会消耗大量的内存。在 IoScheduler 中,每个 Worker 在空置一段时间后就会被清除以控制线程的数目。
CachedWorkerPool 是一个变长并定期清理的 ThreadWorker 的缓存池,内部通过一个 ConcurrentLinkedQueue 维护。和 PoolWorker 类似,ThreadWorker 也是继承自 NewThreadWorker。仅仅是增加了一个 expirationTime 字段,用来标识这个 ThreadWorker 的超时时间。
在 CachedWorkerPool 初始化时,会传入 Worker 的超时时间,目前是写死的 秒。这个超时时间表示 ThreadWorker 闲置后最大存活时间(实际中不保证 秒时被回收)。
IoScheduler 中也存在一个 EventLoopWorker 类,它和 ComputationScheduler 中的作用类似。因为 CachedWorkerPool 是每隔 秒清理一次队列的,所以 ThreadWorker 的存活时间取决于入队的时机。如果一直没有被再次取出,其被实际清理的延迟在 - 秒之间。
熟悉线程的读者会发现,ComputationScheduler 与 IoScheduler 很像某些参数下的 ThreadPoolExecutor。它们对线程的控制外在表现很相似,但实际的线程执行对象不一样。这两者的对比有助于我们更深刻地理解 Scheduler 设计的内在逻辑。
Scheduler 是 RxJava 线程的核心概念,RxJava 基于此屏蔽了 Thread 相关的概念,只与 Scheduler/Worker/Runnable 打交道。
本来计划继续基于 Scheduler 和大家一起探讨 subscribeOn 与 observeOn,但考虑到篇幅问题,这些留待下篇分享。
感谢大家的阅读,欢迎关注笔者的公众号,可以第一时间获取更新,同时欢迎留言沟通。