Flink RPC初探

news/2024/4/27 14:39:24/文章来源:https://blog.csdn.net/weixin_44852067/article/details/137116271

1.RPC概述

  RPC( Remote Procedure Call ) 的主要功能目标是让构建分布式计算(应用)更容易,在提供强大的远程调用能力时不损失本地调用的语义简洁性。 为实现该目标,RPC 框架需提供一种透明调用机制让使用者不必显式的区分本地调用和远程调用。 总而言之: RPC是为了解决分布式系统中,各个服务中的调用问题,在进行远程调用时,也像本地调用一样方便,让调用者感知不到远程调用的逻辑

2.RPC的调用分类

RPC的调用主要为两种:

  • 同步调用
    • 客户端等待调用执行完成并返回结果。
  • 异步调用
    • 客户端调用后不用等待结果返回,但是依然可以通过回调通知等方式获取返回结果。若客户端不关心调用返回结果,则变成单向异步调用,单向调用不用返回结果。

3.RPC流程图解

  当 user 想发起一个远程调用时,它实际是通过本地调用user-stub。user-stub 负责将调用的接口、方法和参数通过约定的协议规范进行编码并通过本地的 RPCRuntime 实例传输到远端的实例。远端 RPCRuntime 实例收到请求后交给 server-stub 进行解码后发起本地端调用,调用结果再返回给 user 端。
在这里插入图片描述

以计算器Calculator为例,如果实现类CalculatorImpl是放在本地的,那么直接调用即可:

在这里插入图片描述
现在系统变成分布式了,CalculatorImpl和调用方不在同一个地址空间,那么就必须要进行远程过程调用:

在这里插入图片描述
如何实现远程过程调用(RPC)?一个完整的RPC流程,可以用下面这张图来描述:

在这里插入图片描述
其中左边的Client,对应的就是前面的Service A,而右边的Server,对应的则是Service B。 下面详解流程:

  • Service A的应用层代码中,调用了Calculator的一个实现类的add方法,目标是执行加法运算;
  • Calculator实现类,内部并不是直接实现计算器的加减乘除逻辑,而是通过远程调用Service B的RPC接口,来获取运算结果,因此称之为Stub
  • 远程通信工具(图中的Run-time Library)实现Stub与Service B之间的通信,比如Java的Socket,,当然也可以用基于Http协议的HttpClient,或者其他通讯工具类,都可以,RPC并没有规定说要用何种协议进行通信
  • Stub通过调用调用远程通信工具提供的方法与Service B 建立连接,然后将请求数据发送给Service B。数据传输格式底层为二进制格式,例如 calculator.add(1,2),必须把参数值1和2封装到一个Request对象(包含数据以及其他服务调用对应RPC接口的信息),然后序列化为二进制格式,再传给通信工具类;
  • Service B 接收到Service A中通信工具传递过来的数据后,通过自己的通信工具接收二进制数据请求;
  • Service B中的Stub对二进制数据进行反序列化为请求对象;
  • Service B中通过反射去获取 Calculator的实际实现类去执行add方法;
  • RPC接口处理完成,返回执行结果。即为Service B如何将数据传送给 Service A ?
    • Service B 序列化结果–> Service A 通信工具解析请求–> Service A Stub反序列化请求–>结果返回Service A中的Application

4.RPC Demo

4.1 client(客户端)

  • 客户端发起RPC请求,ComsumerApp

    public class ComsumerApp {private static Logger log = LoggerFactory.getLogger(ComsumerApp.class);public static void main(String[] args) {Calculator calculator = new CalculatorRemoteImpl();int result = calculator.add(1, 2);log.info("result is {}", result);}
    }
    
  • 把RPC的逻辑封装到CalculatorRemoteImpl类中,客户端调用时感知不到远程调用的麻烦。 CalculatorRemoteImpl:

public class CalculatorRemoteImpl implements Calculator {public static final int PORT = 9090;private static Logger log = LoggerFactory.getLogger(CalculatorRemoteImpl.class);public int add(int a, int b) {//在分布式系统中,一个服务可能有多个实例,比如Service B,可能有ip地址为198.168.1.11和198.168.1.13两个实例,lookupProviders是在寻找要调用服务的实例列表。在分布式应用下,通常会有一个服务注册中心,来提供查询实例列表的功能。List<String> addressList = lookupProviders("Calculator.add");//查到实例列表之后,chooseTarget是要选择调用哪一个实例,其实内部就是一个负载均衡String address = chooseTarget(addressList);//实现一个简单的RPC,所以暂时不考虑服务注册中心和负载均衡,因此代码里返回ip地址为127.0.0.1。try {Socket socket = new Socket(address, PORT);//Socket通信// 将请求序列化CalculateRpcRequest calculateRpcRequest = generateRequest(a, b);ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());// 将请求发给服务提供方objectOutputStream.writeObject(calculateRpcRequest);// 将响应体反序列化ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());Object response = objectInputStream.readObject();log.info("response is {}", response);if (response instanceof Integer) {return (Integer) response;} else {throw new InternalError();}} catch (Exception e) {log.error("fail", e);throw new InternalError();}}private CalculateRpcRequest generateRequest(int a, int b) {CalculateRpcRequest calculateRpcRequest = new CalculateRpcRequest();calculateRpcRequest.setA(a);calculateRpcRequest.setB(b);calculateRpcRequest.setMethod("add");return calculateRpcRequest;}private String chooseTarget(List<String> providers) {if (null == providers || providers.size() == 0) {throw new IllegalArgumentException();}return providers.get(0);}public static List<String> lookupProviders(String name) {List<String> strings = new ArrayList();strings.add("127.0.0.1");return strings;}
}

4.2 server(服务端)

  • 服务端是接收到请求,响应需求, ProviderApp:

    public class ProviderApp {private static Logger log = LoggerFactory.getLogger(ProviderApp.class);private Calculator calculator = new CalculatorImpl();public static void main(String[] args) throws IOException {new ProviderApp().run();}private void run() throws IOException {ServerSocket listener = new ServerSocket(9090);try {while (true) {Socket socket = listener.accept();try {// 将请求反序列化ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());Object object = objectInputStream.readObject();log.info("request is {}", object);// 调用服务int result = 0;if (object instanceof CalculateRpcRequest) {CalculateRpcRequest calculateRpcRequest = (CalculateRpcRequest) object;if ("add".equals(calculateRpcRequest.getMethod())) {result = calculator.add(calculateRpcRequest.getA(), calculateRpcRequest.getB());} else {throw new UnsupportedOperationException();}}// 返回结果ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());objectOutputStream.writeObject(new Integer(result));} catch (Exception e) {log.error("fail", e);} finally {socket.close();}}} finally {listener.close();}}
    }
    

    Server端主要是通过ServerSocket的accept方法,来接收Client端的请求,接着就是反序列化请求->执行->序列化执行结果,最后将二进制格式的执行结果返回给Client。

5.Flink RPC

5.1 概述

  在学习Flink RPC框架前,总结一哈常见大数据组件的RPC实现:

技术组件RPC实现
HDFSNetty
HBaseHBase-2.x 以前:NIO + ProtoBuf HBase-2.x 以后:Netty
ZooKeeperBIO(集群启动选举) + NIO(服务端处理客户端请求3.4) + Netty(3.6)
SparkSpark-1.x 基于 Akka Spark-2.x 基于 Netty
FlinkAkka + Netty

总言之:Flink的RPC实现:基于Scala的网络编程库:Akka。

5.2 前置知识

  • ActorSystem(重量级的系统对象) 是管理 Actor生命周期的组件, Actor是负责进行通信的组件。
  • 每个 Actor 都有一个 MailBox,其他 Actor 发送给它的消息首先储存在 MailBox 中,通过这种方式可以实现异步通信。
  • 每个 Actor 是单线程处理方式,不断从 MailBox 拉取消息执行处理,所以对于 Actor 的消息处理,不适合调用会阻塞的处理方法。
  • 每个ActorSystem 和 Actor在启动时会给定一个 name,如果要从ActorSystem中获取一 个 Actor,则通过以下的方式来进行 Actor的获取:akka.tcp://asname@bigdata02:9527/user/actorname 。
  • 如果一个Actor和另外一个 Actor进行通信,则必须先获取对方 Actor 的 ActorRef 对象,然后通过该对象发送消息即可。
  • 通过 tell 发送异步消息,不接收响应,通过 ask 发送异步消息,得到 Future 返回,通过异步返回处理结果。

5.3 Flink RPC详解

  Flink 中 RPC 的框架设计的主要类:
在这里插入图片描述

  Flink 中的 RPC 实现主要在 flink-runtime 模块下的 org.apache.flink.runtime.rpc 包中,涉及到的最重要的 API 主要是以下这四个: RpcGatewayRpcServerRpcEndpoint 以及 RpcService

5.3.1 RpcGateway

  Flink的RPC协议通过RpcGateway来定义,主要定义通信行为;用于远程调用RpcEndpoint的某些方法,可以理解为客服端代理。

/*** Rpc gateway interface which has to be implemented by Rpc gateways.*/
public interface RpcGateway {/*** Returns the fully qualified address under which the associated rpc endpoint is reachable.** @return Fully qualified (RPC) address under which the associated rpc endpoint is reachable*/String getAddress();/*** Returns the fully qualified hostname under which the associated rpc endpoint is reachable.** @return Fully qualified hostname under which the associated rpc endpoint is reachable*/String getHostname();
}
5.3.2 RpcServer

  RpcServer负责接收响应远端RPC消息请求。有两个实现:AkkaInvocationHandler和FencedAkkaInvocationHandler。RpcServer 是 Actor 与 RpcEndpoint 两层之间的粘合层。

/*** Interface for self gateways.*/
public interface RpcServer extends StartStoppable, MainThreadExecutable, RpcGateway {/*** Return a future which is completed when the rpc endpoint has been terminated.** @return Future indicating when the rpc endpoint has been terminated*/CompletableFuture<Void> getTerminationFuture();
}
5.3.3 RpcEndPoint

  RpcEndpoint是通信终端,提供RPC服务组件的生命周期管理(start、stop)。每个RpcEndpoint对应了一个路径(endpointId和actorSystem共同确定),每个路径对应一个Actor,其实现了RpcGateway接口,其构造函数如下:

protected RpcEndpoint(final RpcService rpcService, final String endpointId) {// 保存rpcService和endpointIdthis.rpcService = checkNotNull(rpcService, "rpcService");this.endpointId = checkNotNull(endpointId, "endpointId");/**  注释:通过RpcService启动 ResourceManager的RPCServer服务*  此处启动的是 ResourceManager 的 RPC 服务端,在接收TaskManager启动完成信息之后,进行注册和心跳,来汇报Taskmanager的资源情况。*  通过动态代理的形式构建了一个Server。*/this.rpcServer = rpcService.startServer(this);// 主线程执行器,所有调用在主线程中串行执行this.mainThreadExecutor = new MainThreadExecutor(rpcServer, this::validateRunsInMainThread);}

  RpcEndpoint 下面有四个比较重要的子类: TaskExecutor、Dispatcher、JobMaster、ResourceManager ;当在任意地方要创建这四个组件中任何一个组件的实例对象时,创建成功之后,都会要去执行其中的 onStart()方法 ,在集群启动的源码分析中,其实这些组件很多的工作流程,都被放在 onStart() 里面。

5.3.4 RpcService

  RpcService 是Rpc服务的接口,其主要作用如下:

  • 根据提供的RpcEndpoint来启动和停止RpcServer(Actor);
  • 根据提供的地址连接到RpcServer,并返回一个RpcGateway;
  • 延迟/立刻调度Runnable、Callable;

6.总结

  • RPC是为了解决分布式系统中,不同服务之间的调用问题,让远程调用也如同本地调用一样丝滑。
  • RpcGateway 是所有 RPC 的祖宗,各种RPC组件均是RpcGateway的子类。
  • RpcEndpoint是业务载体,对应Actor的封装。
  • RpcService 是 Rpc服务的接口,对应ActorSystem的封装。
  • RpcServer 是 RpcService 与 RpcEndpoint 之间的粘合层。
  • RpcEndpoint 下面有四个比较重要的子类: TaskExecutor、Dispatcher、JobMaster、ResourceManager,且当实例化其中的一个组件对象成功后会执行对应的onStart()方法。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_1026560.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

jenkins权限分配

1.安装权限插件 Role-Based Strategy 2.创建用户 3.修改全局安全配置中的授权策略为Role-Based Strategy 4.进入Manage and Assign Roles创建Global roles和Item roles 4.进入Assign Roles给用户分配role

UI风格汇:材料设计(Material Design),是对扁平风格的延展。

Hello&#xff0c;我是大千UI工场&#xff0c;设计风格是我们新开辟的栏目&#xff0c;主要讲解各类UI风格特征、辨识方法、应用场景、运用方法等&#xff0c;本次带来的材料风格风格的解读&#xff0c;有设计需求&#xff0c;我们也可以接单。 一、什么是材料设计&#xff08;…

apisix创建https

总结了下apisix 使用https 的问题和方法 1、apisix 默认https 端口是9443 2、apisix 需要上传证书后才可以使用https 否二curl测试会报错 SSL routines:CONNECT_CR_SRVR_HELLO 3、apisix 上传证书方法 我是使用的自签名证书&#xff0c;注意自签名证书的Common Name 要写你…

静态路由表学习实验

实验要求&#xff1a;各个pc设备可以通信&#xff0c;并且可以访问外网&#xff0c;假设R1已连接外网 拓扑结构 思路&#xff1a;配置pc机ip地址&#xff0c;子网掩码&#xff0c;和网关&#xff08;网关地址是上层路由接口的地址&#xff09;&#xff0c;配置路由各个接口地址…

【Qt】使用Qt实现Web服务器(七):动态模板引擎

1、示例 2、源码 2.1 模板配置参数 配置文件中关于模板配置参数如下 path为存放模板的目录suffix为模板文件后缀[templates] path=templates suffix=.tpl encoding=UTF-8 cacheSize=1000000

OpenHarmony开发知识点记录之ABI

OpenHarmony系统支持丰富的设备形态&#xff0c;支持多种架构指令集&#xff0c;支持多种操作系统内核&#xff1b;为了应用在各种OpenHarmony设备上的兼容性&#xff0c;本文定义了"OHOS" ABI&#xff08;Application Binary Interface&#xff09;的基础标准&#…

缓冲区溢出漏洞相关知识点汇总

1.缓冲区基础知识相关定义 缓冲区定义&#xff1a;缓冲区一块连续的内存区域&#xff0c;用于存放程序运行时&#xff0c;加载到内存的运行代码和数据。 缓冲区溢出&#xff1a;缓冲区溢出是指程序运行时&#xff0c;向固定大小的缓冲区写入超过其容量的数据。多余的数据会越…

Java代码基础算法练习-求一个三位数的各位数字之和-2024.03.27

任务描述&#xff1a; 输入一个正整数n&#xff08;取值范围&#xff1a;100<n<1000&#xff09;&#xff0c;然后输出每位数字之和 任务要求&#xff1a; 代码示例&#xff1a; package M0317_0331;import java.util.Scanner;public class m240327 {public static voi…

Abaqus周期性边界代表体单元Random Sphere RVE 3D (Mesh)插件

插件介绍 Random Sphere RVE 3D (Mesh) - AbyssFish 插件可在Abaqus生成三维具备周期性边界条件(Periodic Boundary Conditions, PBC)的随机球体骨料及骨料-水泥界面过渡区(Interfacial Transition Zone, ITZ)模型。即采用周期性代表性体积单元法(Periodic Representative Vol…

信号量,sem_init/wait/post/destroy函数的使用

sem_init&#xff08;&#xff09;&#xff1b;--------------------------------------------------------------------------------------- 信号量的初始化函数定义在线程创建之前&#xff0c;资源变量定义为全局变量 一开始只有一个写资源&#xff0c;没有读资源 sem_wait(…

DC电源模块的设计与调试技巧

BOSHIDA DC电源模块的设计与调试技巧 DC电源模块的设计与调试是电子工程师在实际项目中常常需要面对的任务。一个稳定可靠的DC电源模块对于电路的正常运行起到至关重要的作用。以下是一些设计与调试的技巧&#xff0c;帮助工程师们更好地完成任务。 第一&#xff0c;正确选择…

mysql基础1sql分类

mysql基础 [rootvm ~]# docker run -itd -p 3306:3306 -e "MYSQL_ROOT_PASSWORD123456" mysql:5.7.26通用语法 1). SQL语句可以单行或多行书写&#xff0c;以分号结尾。 2). SQL语句可以使用空格/缩进来增强语句的可读性。 3). MySQL数据库的SQL语句不区分大小写…

【软考】UML中的图之状态图

目录 1. 说明2. 图示 1. 说明 1.状态图&#xff08;State Diagram&#xff09;展现了一个状态机。2.由状态、转换、事件和活动组成。3.关注系统的动态视图。4.对于接口、类和协作的行为建模尤为重要。5.强调对象行为的事件顺序。6.通常包括简单状态和组合状态、转换&#xff0…

Django之Celery篇(二)

一、Celery-任务的调用 1.1、异步任务调用 delay( )方法 from celery_task import *def delay():rs = send_email.delay(baizhan)print(rs.id)rs2 = send_msg.delay(SXT)print(rs2.id)1.2、定时任务调用 apply_async( )方法 from celery_task import * import datetimedef …

网络编程之流式套接字

流式套接字&#xff08;SOCK_STREAM&#xff09;是一种网络编程接口&#xff0c;它提供了一种面向连接的、可靠的、无差错和无重复的数据传输服务。这种服务保证了数据按照发送的顺序被接收&#xff0c;使得数据传输具有高度的稳定性和正确性。通常用于那些对数据的顺序和完整性…

Docker部署一个SpringBoot项目(超级详细)

注意&#xff1a;下面的教程主要是针对 Centos7 的&#xff0c;如果使用的其他发行版会有细微的差别&#xff0c;请查看官方文档。 Docker部署一个SpringBoot项目&#xff08;超级详细&#xff09; 一、安装Docker1.卸载旧版2.配置Docker的yum库3.安装Docker4.设置开机自启动5.…

Cornflakes: Zero-Copy Serialization for Microsecond-Scale Networking——论文泛读

SOSP 2023 Paper 论文阅读笔记整理 问题 数据序列化对于许多数据中心应用程序来说至关重要&#xff0c;序列化的主要开销在于数据移动&#xff0c;将应用程序数据移动到数据包中所需的内存拷贝成本高昂。最近的零拷贝API暴露了NIC分散收集功能&#xff0c;增加了将数据移动卸…

短视频矩阵系统--技术3年源头迭代

短视频矩阵系统核心技术算法主要包括以下几个方面&#xff1a; 1. 视频剪辑&#xff1a;通过剪辑工具或API从各大短视频平台抓取符合要求的视频。这些视频通常符合某些特定条件&#xff0c;如特定关键词、特定时间段发布的视频、视频点赞评论转发等数据表现良好的视频。 2. 视…

每日一题 --- 209. 长度最小的子数组[力扣][Go]

长度最小子数组 题目&#xff1a; 给定一个含有 n 个正整数的数组和一个正整数 target 。 找出该数组中满足其总和大于等于 target 的长度最小的 连续 子数组 [numsl, numsl1, ..., numsr-1, numsr] &#xff0c;并返回其长度**。**如果不存在符合条件的子数组&#xff0c…

深度学习项目-基于深度学习的股票价格预测研究

概要 随着经济的发展&#xff0c;中国股票市场的规模持续扩大&#xff0c;早已成为金融投资的重要部分&#xff0c;掌握股票市场的变化规律无论是对监管者还是投资者都具有极其重要的意义。正因如此&#xff0c;人们不断探索着股票市场的变化规律&#xff0c;其中使用深度学习预…