RxJava的订阅过程

news/2024/3/29 17:10:46/文章来源:https://blog.csdn.net/yuxuehandong/article/details/129162112

要使用Rxjava首先要导入两个包,其中rxandroid是rxjava在android中的扩展

 	 implementation 'io.reactivex:rxandroid:1.2.1'implementation 'io.reactivex:rxjava:1.2.0'

首先从最基本的Observable的创建到订阅开始分析

    Observable.create(new Observable.OnSubscribe<String>() {@Overridepublic void call(Subscriber<? super String> subscriber) {subscriber.onNext("observable call onNext0");subscriber.onStart();subscriber.onNext("observable call onNext");subscriber.onCompleted();subscriber.onNext("observable call onNext1");}}).subscribe(new Subscriber<String>() {@Overridepublic void onCompleted() {ILog.LogDebug("subscriber onCompleted");}@Overridepublic void onError(Throwable e) {ILog.LogDebug("subscriber onError");}@Overridepublic void onNext(String s) {ILog.LogDebug("subscriber onNext");}});

Observable.create()需要一个OnSubscribe,OnSubscribe又是什么呢

    public static <T> Observable<T> create(OnSubscribe<T> f) {return new Observable<T>(RxJavaHooks.onCreate(f));}

OnSubscribe是一个接口,继承自Action1,Action1继承自Action,Action继承自Function,Function就是所有的action和fun的基类,于是有
OnSubscribe > Action1 > Action > Function , 由于Action1 接口有一个call方法,OnSubscribe接口也拥有了一个call方法。call方法的参数是一个Subscriber

    /*** Invoked when Observable.subscribe is called.* @param <T> the output value type*/public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> {// cover for generics insanity}/*** A one-argument action.* @param <T> the first argument type*/public interface Action1<T> extends Action {void call(T t);}/*** All Action interfaces extend from this.* <p>* Marker interface to allow instanceof checks.*/public interface Action extends Function {}/*** All Func and Action interfaces extend from this.* <p>* Marker interface to allow instanceof checks.*/public interface Function {}

接着继续看RxJavaHooks.onCreate(f)做了什么 , 由于RxJavaHooks 源码较多,这里只贴了关键的一部分,onObservableCreate为RxJavaHooks 初始化后在static区自动执行赋值的,Func1类型,RxJavaHooks.onCreate最后也就是调用了f.call(onSubscribe),参数是我们传进去的onSubscribe实例

public final class RxJavaHooks {........@SuppressWarnings("rawtypes")static volatile Func1<Observable.OnSubscribe, Observable.OnSubscribe> onObservableCreate;@SuppressWarnings("rawtypes")static volatile Func2<Observable, Observable.OnSubscribe, Observable.OnSubscribe> onObservableStart;static volatile Func1<Subscription, Subscription> onObservableReturn;
......../*** Hook to call when an Observable is created.* @param <T> the value type* @param onSubscribe the original OnSubscribe logic* @return the original or replacement OnSubscribe instance*/@SuppressWarnings({ "rawtypes", "unchecked" })public static <T> Observable.OnSubscribe<T> onCreate(Observable.OnSubscribe<T> onSubscribe) {Func1<Observable.OnSubscribe, Observable.OnSubscribe> f = onObservableCreate;if (f != null) {return f.call(onSubscribe);}return onSubscribe;}/*** Hook to call before the Observable.subscribe() method is about to return a Subscription.* @param subscription the original subscription* @return the original or alternative subscription that will be returned*/public static Subscription onObservableReturn(Subscription subscription) {Func1<Subscription, Subscription> f = onObservableReturn;if (f != null) {return f.call(subscription);}return subscription;}/*** Hook to call before the child subscriber is subscribed to the OnSubscribe action.* @param <T> the value type* @param instance the parent Observable instance* @param onSubscribe the original OnSubscribe action* @return the original or alternative action that will be subscribed to*/@SuppressWarnings({ "rawtypes", "unchecked" })public static <T> Observable.OnSubscribe<T> onObservableStart(Observable<T> instance, Observable.OnSubscribe<T> onSubscribe) {Func2<Observable, Observable.OnSubscribe, Observable.OnSubscribe> f = onObservableStart;if (f != null) {return f.call(instance, onSubscribe);}return onSubscribe;}.......@SuppressWarnings({ "rawtypes", "unchecked", "deprecation" })static void initCreate() {onObservableCreate = new Func1<Observable.OnSubscribe, Observable.OnSubscribe>() {@Overridepublic Observable.OnSubscribe call(Observable.OnSubscribe f) {return RxJavaPlugins.getInstance().getObservableExecutionHook().onCreate(f);}};onObservableStart = new Func2<Observable, Observable.OnSubscribe, Observable.OnSubscribe>() {@Overridepublic Observable.OnSubscribe call(Observable t1, Observable.OnSubscribe t2) {return RxJavaPlugins.getInstance().getObservableExecutionHook().onSubscribeStart(t1, t2);}};onObservableReturn = new Func1<Subscription, Subscription>() {@Overridepublic Subscription call(Subscription f) {return RxJavaPlugins.getInstance().getObservableExecutionHook().onSubscribeReturn(f);}};onSingleCreate = new Func1<rx.Single.OnSubscribe, rx.Single.OnSubscribe>() {@Overridepublic rx.Single.OnSubscribe call(rx.Single.OnSubscribe f) {return RxJavaPlugins.getInstance().getSingleExecutionHook().onCreate(f);}};onCompletableCreate = new Func1<Completable.OnSubscribe, Completable.OnSubscribe>() {@Overridepublic Completable.OnSubscribe call(Completable.OnSubscribe f) {return RxJavaPlugins.getInstance().getCompletableExecutionHook().onCreate(f);}};}....}

在 f.call中 又调用了
RxJavaPlugins.getInstance().getObservableExecutionHook().onCreate(f); 最后返回的就是我们传入的onSubscribe

public abstract class RxJavaObservableExecutionHook { ......@Deprecatedpublic <T> OnSubscribe<T> onCreate(OnSubscribe<T> f) {return f;}@Deprecatedpublic <T> OnSubscribe<T> onSubscribeStart(Observable<? extends T> observableInstance, final OnSubscribe<T> onSubscribe) {// pass through by defaultreturn onSubscribe;}@Deprecatedpublic <T> Subscription onSubscribeReturn(Subscription subscription) {// pass through by defaultreturn subscription;}
......
}

最后在回来看new Observable(RxJavaHooks.onCreate(f)), Observable 的构造方法,Observable把传入的onSubscribe 保存了起来。至此饶了一大圈Observable对象产生。

public class Observable<T> {
.....protected Observable(OnSubscribe<T> f) {this.onSubscribe = f;}.....
}

下面会继续调用Observable的subscribe方法并传入Observer(观察者),完成订阅操作。现在来查看Observable的subscribe方法做了什么

public class Observable<T> {
.....public final Subscription subscribe(Subscriber<? super T> subscriber) {return Observable.subscribe(subscriber, this);}
.....static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {// validate and proceedif (subscriber == null) {throw new IllegalArgumentException("subscriber can not be null");}if (observable.onSubscribe == null) {  //此处的onSubscribe正是我们创建订阅的时候传入的onSubscribe throw new IllegalStateException("onSubscribe function can not be null.");/** the subscribe function can also be overridden but generally that's not the appropriate approach* so I won't mention that in the exception*/}// new Subscriber so onStart itsubscriber.onStart();/** See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls* to user code from within an Observer"*/// if not already wrappedif (!(subscriber instanceof SafeSubscriber)) {// assign to `observer` so we return the protected versionsubscriber = new SafeSubscriber<T>(subscriber);}// The code below is exactly the same an unsafeSubscribe but not used because it would// add a significant depth to already huge call stacks.try {// allow the hook to intercept and/or decorateRxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);return RxJavaHooks.onObservableReturn(subscriber);} catch (Throwable e) {// special handling for certain Throwable/Error/Exception typesExceptions.throwIfFatal(e);// in case the subscriber can't listen to exceptions anymoreif (subscriber.isUnsubscribed()) {RxJavaHooks.onError(RxJavaHooks.onObservableError(e));} else {// if an unhandled error occurs executing the onSubscribe we will propagate ittry {subscriber.onError(RxJavaHooks.onObservableError(e));} catch (Throwable e2) {Exceptions.throwIfFatal(e2);// if this happens it means the onError itself failed (perhaps an invalid function implementation)// so we are unable to propagate the error correctly and will just throwRuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);// TODO could the hook be the cause of the error in the on error handling.RxJavaHooks.onObservableError(r);// TODO why aren't we throwing the hook's return value.throw r; // NOPMD}}return Subscriptions.unsubscribed();}}.....
}

在正式的订阅关系产生之前,首先会执行subscriber.onStart()方法,这里可以做一些初始化工作。继续往下看又判断是subscriber 实例是否是一个SafeSubscriber,不是则会新建一个SafeSubscriber来包装subscriber

       if (!(subscriber instanceof SafeSubscriber)) {// assign to `observer` so we return the protected versionsubscriber = new SafeSubscriber<T>(subscriber);}

老规矩,查看SafeSubscriber源码

public class SafeSubscriber<T> extends Subscriber<T> {private final Subscriber<? super T> actual;boolean done;public SafeSubscriber(Subscriber<? super T> actual) {super(actual);this.actual = actual;}/*** Notifies the Subscriber that the {@code Observable} has finished sending push-based notifications.* <p>* The {@code Observable} will not call this method if it calls {@link #onError}.*/@Overridepublic void onCompleted() {if (!done) {done = true;try {actual.onCompleted();} catch (Throwable e) {// we handle here instead of another method so we don't add stacks to the frame// which can prevent it from being able to handle StackOverflowExceptions.throwIfFatal(e);RxJavaHooks.onError(e);throw new OnCompletedFailedException(e.getMessage(), e);} finally { // NOPMDtry {// Similarly to onError if failure occurs in unsubscribe then Rx contract is broken// and we throw an UnsubscribeFailureException.unsubscribe();} catch (Throwable e) {RxJavaHooks.onError(e);throw new UnsubscribeFailedException(e.getMessage(), e);}}}}/*** Notifies the Subscriber that the {@code Observable} has experienced an error condition.* <p>* If the {@code Observable} calls this method, it will not thereafter call {@link #onNext} or* {@link #onCompleted}.** @param e*          the exception encountered by the Observable*/@Overridepublic void onError(Throwable e) {// we handle here instead of another method so we don't add stacks to the frame// which can prevent it from being able to handle StackOverflowExceptions.throwIfFatal(e);if (!done) {done = true;_onError(e);}}/*** Provides the Subscriber with a new item to observe.* <p>* The {@code Observable} may call this method 0 or more times.* <p>* The {@code Observable} will not call this method again after it calls either {@link #onCompleted} or* {@link #onError}.** @param args*          the item emitted by the Observable*/@Overridepublic void onNext(T args) {try {if (!done) {actual.onNext(args);}} catch (Throwable e) {// we handle here instead of another method so we don't add stacks to the frame// which can prevent it from being able to handle StackOverflowExceptions.throwOrReport(e, this);}}/*** The logic for {@code onError} without the {@code isFinished} check so it can be called from within* {@code onCompleted}.** @see <a href="https://github.com/ReactiveX/RxJava/issues/630">the report of this bug</a>*/protected void _onError(Throwable e) { // NOPMDRxJavaHooks.onError(e);try {actual.onError(e);} catch (OnErrorNotImplementedException e2) { // NOPMD/** onError isn't implemented so throw** https://github.com/ReactiveX/RxJava/issues/198** Rx Design Guidelines 5.2** "when calling the Subscribe method that only has an onNext argument, the OnError behavior* will be to rethrow the exception on the thread that the message comes out from the observable* sequence. The OnCompleted behavior in this case is to do nothing."*/try {unsubscribe();} catch (Throwable unsubscribeException) {RxJavaHooks.onError(unsubscribeException);throw new OnErrorNotImplementedException("Observer.onError not implemented and error while unsubscribing.", new CompositeException(Arrays.asList(e, unsubscribeException))); // NOPMD}throw e2;} catch (Throwable e2) {/** throw since the Rx contract is broken if onError failed** https://github.com/ReactiveX/RxJava/issues/198*/RxJavaHooks.onError(e2);try {unsubscribe();} catch (Throwable unsubscribeException) {RxJavaHooks.onError(unsubscribeException);throw new OnErrorFailedException("Error occurred when trying to propagate error to Observer.onError and during unsubscription.", new CompositeException(Arrays.asList(e, e2, unsubscribeException)));}throw new OnErrorFailedException("Error occurred when trying to propagate error to Observer.onError", new CompositeException(Arrays.asList(e, e2)));}// if we did not throw above we will unsubscribe here, if onError failed then unsubscribe happens in the catchtry {unsubscribe();} catch (Throwable unsubscribeException) {RxJavaHooks.onError(unsubscribeException);throw new OnErrorFailedException(unsubscribeException);}}/*** Returns the {@link Subscriber} underlying this {@code SafeSubscriber}.** @return the {@link Subscriber} that was used to create this {@code SafeSubscriber}*/public Subscriber<? super T> getActual() {return actual;}
}

SafeSubscriber是Subscriber的一个具体实现类,看SafeSubscriber像不像一个代理模式,具体的工作都是由actual来做,SafeSubscriber负责更完善的处理操作。
继续回到订阅部分的代码,类似之前的分析,代码已经在上面类贴出 RxJavaHooks.onObservableStart(observable, observable.onSubscribe)也只是返回了observable.onSubscribe实例,最后的.call(subscriber)也就是直接调用了我们在创建observable时传入的匿名实例call方法,最后返回subscriber。
RxJava的订阅过程就基本分析完了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_72752.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

缺少IT人员的服装行业该如何进行数字化转型?

服装行业上、下游产业链长&#xff0c;产品属性复杂&#xff0c;是劳动密集型和技术密集型紧密结合的产物&#xff0c;是典型的实体经济代表。 近二十年是服装业发展的机遇和挑战之年&#xff0c;从“世界工厂”“中国制造”&#xff0c;逐渐向“中国设计”转变,中国服装产业经…

c++常用stl算法

1、头文件 这些算法通常包含在头文件<algorithm> <functional> <numeric>中。 2、常用遍历算法 for_each(v.begin(),v.end(), 元素处理函数/仿函数) 注意&#xff1a;在使用transform转存时&#xff0c;目标容器需要提取开辟合适的空间。 void printfunc(…

数学小课堂:数学的线索(从猜想到定理再到应用的整个过程)

文章目录 引言I 勾股定理1.1 勾三股四弦五1.2 数学和自然科学的三个本质差别1.3 总结引言 从猜想到定理再到应用的整个过程是数学发展和体系构建常常经历的步骤。 I 勾股定理 勾股定理: 直角三角形两条直角边的平方之和等于斜边的平方,这个定理在国外都被称为毕达哥拉斯定理…

渗透中超全的Google hack语法

inurl:Login 将返回url中含有Login的网页intitle:后台登录管理员 将返回含有管理员后台的网页intext:后台登录 将返回含有后台的网页inurl:/admin/login.php 将返回含有admin后台的网页inurl:/phpmyadmin/index.php 将返回含有phpmyadmin后台的网页site:http://baidu.com inur:…

云计算|OpenStack|使用VMware安装华为云的R006版CNA和VRM

前言&#xff1a; FusionCompute架构 (CNA、VRM) CNA(ComputingNode Agent):计算节点代理VNA虚拟节点代理&#xff0c;部署在CNA上&#xff0c;实施计算、存储、网络的虚拟化的配置管理。VRM(Virtual Resource Manager):虚拟资源管理器 VNA可以省略不安装 本次实验使用的是V…

还在用chatGPT聊天?《元宇宙2086》已开始用AIGC做漫画连载了!

ChatGPT 是由 OpenAI开发的一个人工智能聊天机器人程序&#xff0c;于 2022 年 11 月推出。该程序使用基于 GPT-3.5架构的大型语言模型并通过强化学习进行训练。 ChatGPT 目前仍以文字方式互动&#xff0c;而除了可以透过人类自然对话方式进行交互&#xff0c;还可以用于相对复…

关于微前端,你想知道的都在这!

更多请关注微前端专题 https://codeteenager.github.io/Micro-Frontends/ 介绍 微前端官网&#xff1a;https://micro-frontends.org/ 问题&#xff1a;如何实现多个应用之间的资源共享&#xff1f; 之前比较多的处理方式是npm包形式抽离和引用&#xff0c;比如多个应用项目之…

EMR Studio Workspace 访问 Github ( 公网Git仓库 )

EMR Studio Workspace访问公网Git仓库 会遇到很多问题,由于EMR Studio不能给出任何有用的错误信息,导致排查起来非常麻烦。下面总结了若干项注意事项,可以避免踩坑。如果你遇到了同样的问题,请根据以下部分或全部建议去修正你的环境,问题即可解决。本文地址:https://laur…

因子的有效性检验(IC)

使用神经网络的预测值作为因子载荷&#xff08;因子暴露&#xff0c;因子值 factor&#xff09;时&#xff0c; 我们需要知道这个因子是否是有效的&#xff0c;所以要做因子的有效性检验。 当前的学术论文给出的IC&#xff0c; rankIC 这些都是属于判断因子是否有效的metric 因…

gdb的简单练习

题目来自《ctf安全竞赛入门》1.用vim写代码vim gdb.c#include "stdio.h" #include "stdlib.h" void main() {int i 100;int j 101;if (i j){printf("bingooooooooo.");system("/bin/sh");}elseprintf("error............&quo…

面向对象的程序设计C++课堂复盘总结 C语言复习+C++基础语法

Stay Hungry&#xff0c;Stay Foolish. 任何人都能写出机器能看懂的代码&#xff0c;但只有优秀的程序员才能写出人能看懂的代码。 有两种写程序的方式&#xff1a;一种是把代码写得非常复杂&#xff0c;以至于 “看不出明显的错误”&#xff1b;另一种是把代码写得非常简单&am…

DolphinScheduler第一章:环境安装

系列文章目录 DolphinScheduler第一章&#xff1a;环境安装 文章目录系列文章目录前言一、环境准备1.上传文件2.数据库配置3.配置安装文件二、集群部署1.数据部署2.部署 DolphinScheduler3. DolphinScheduler 启停命令总结前言 我们现在开始学习hadoop中的DolphinScheduler组…

Spring Cloud Nacos源码讲解(一)- Nacos源码分析开篇

Nacos源码开篇 Nacos服务注册与发现源码剖析 Nacos核心功能点 服务注册&#xff1a;Nacos Client会通过发送REST请求的方式向Nacos Server注册自己的服务&#xff0c;提供自身的元数据&#xff0c;比如ip地址、端口等信息。Nacos Server接收到注册请求后&#xff0c;就会把这…

Linux基础命令-uname显示系统内核信息

前言 这个命令主要是显示系统内核的相关信息&#xff0c;一般需要查看内核信息才会使用到这个命令&#xff0c;一起来看看吧。 一 命令的介绍 uname命令来自于英文词组“Unix name”的缩写&#xff0c;其功能是用于查看系统主机名、内核及硬件架构等信息。如果不加任务参数&am…

GPT-4——比GPT-3强100倍

GPT-4——比GPT-3强100倍 当前世界上最强大的人工智能系统当属ChatGPT。推出2个月用户数就突破1亿。ChatGPT是当下最炙手可热的话题&#xff0c;科技圈几乎人人都在讨论。这边ChatGPT的热度还在不断攀升&#xff0c;另一边来自《纽约时报》的最新报道称ChatGPT即将被自家超越&…

深入浅出C++ ——二叉搜索树

文章目录一、二叉搜索树概念二、二叉搜索树操作1. 二叉搜索树的查找2. 二叉搜索树的插入3. 二叉搜索树的删除三、二叉搜索树的实现四、二叉搜索树的性能分析一、二叉搜索树概念 二叉搜索树又称二叉排序树/二次查找树&#xff0c;它是一棵空树或者是每颗子树都具有以下性质的二叉…

[qiankun]-多页签缓存

[qiankun]-多页签缓存环境功能需求多页签缓存方案方案1.主服务进行html替换方案2.微服务vnode 替换方案3.每个微服务都不卸载微服务加载方式的选择微服务的路由路径选择微服务的缓存工具微服务的容器使用tab作为微服务的挂载容器使用微服务路由作为微服务的挂载容器场景描述微服…

干货解答:如何设置Facebook Messenger 自动回复?

Facebook Messenger 自动回复消息是提升客户体验的有效方法。在本文中&#xff0c;我们将探讨设置Facebook 自动响应和不同的创建方法 Facebook 自动回复。另外&#xff0c;我们准备了一些最受欢迎的 Facebook Messenger 自动回复消息。Facebook Messenger 自动回复&#xff1a…

https加密原理详解,带你搞懂它为什么比http更安全

文章目录http的缺点对称加密非对称加密数字签名数字证书验证身份数字摘要数字签名验证内容的完整性总结http的缺点 http是超文本传输协议&#xff0c;使用http协议进行通信有如下缺点&#xff1a; http没有提供任何数据加密机制&#xff0c;数据通信使用明文通信&#xff0c;…

x86架构设备的OpenWrt的空间扩容问题

openwrt固件是squashfs-combined-efi非exf4格式 直接将原有根分区扩容 用插件是&#xff1a;fdisk,losetup,resize2fs,blkid df -h fdisk -l fdisk /dev/sda //进入fdisk分区管理工具注意fdisk后参数是磁盘名称&#xff0c;是要根据实际情况填写 fdisk /dev/sda //进入fdi…