1.RPC概述
RPC( Remote Procedure Call ) 的主要功能目标是让构建分布式计算(应用)更容易,在提供强大的远程调用能力时不损失本地调用的语义简洁性。 为实现该目标,RPC 框架需提供一种透明调用机制让使用者不必显式的区分本地调用和远程调用。 总而言之: RPC是为了解决分布式系统中,各个服务中的调用问题,在进行远程调用时,也像本地调用一样方便,让调用者感知不到远程调用的逻辑 。
2.RPC的调用分类
RPC的调用主要为两种:
- 同步调用
- 客户端等待调用执行完成并返回结果。
- 异步调用
- 客户端调用后不用等待结果返回,但是依然可以通过回调通知等方式获取返回结果。若客户端不关心调用返回结果,则变成单向异步调用,单向调用不用返回结果。
3.RPC流程图解
当 user 想发起一个远程调用时,它实际是通过本地调用user-stub。user-stub 负责将调用的接口、方法和参数通过约定的协议规范进行编码并通过本地的 RPCRuntime 实例传输到远端的实例。远端 RPCRuntime 实例收到请求后交给 server-stub 进行解码后发起本地端调用,调用结果再返回给 user 端。
以计算器Calculator为例,如果实现类CalculatorImpl是放在本地的,那么直接调用即可:
现在系统变成分布式了,CalculatorImpl和调用方不在同一个地址空间,那么就必须要进行远程过程调用:
如何实现远程过程调用(RPC)?一个完整的RPC流程,可以用下面这张图来描述:
其中左边的Client,对应的就是前面的Service A,而右边的Server,对应的则是Service B。 下面详解流程:
- Service A的应用层代码中,调用了Calculator的一个实现类的add方法,目标是执行加法运算;
- Calculator实现类,内部并不是直接实现计算器的加减乘除逻辑,而是通过远程调用Service B的RPC接口,来获取运算结果,因此称之为Stub;
- 远程通信工具(图中的Run-time Library)实现Stub与Service B之间的通信,比如Java的Socket,,当然也可以用基于Http协议的HttpClient,或者其他通讯工具类,都可以,RPC并没有规定说要用何种协议进行通信;
- Stub通过调用调用远程通信工具提供的方法与Service B 建立连接,然后将请求数据发送给Service B。数据传输格式底层为二进制格式,例如 calculator.add(1,2),必须把参数值1和2封装到一个Request对象(包含数据以及其他服务调用对应RPC接口的信息),然后序列化为二进制格式,再传给通信工具类;
- Service B 接收到Service A中通信工具传递过来的数据后,通过自己的通信工具接收二进制数据请求;
- Service B中的Stub对二进制数据进行反序列化为请求对象;
- Service B中通过反射去获取 Calculator的实际实现类去执行add方法;
- RPC接口处理完成,返回执行结果。即为Service B如何将数据传送给 Service A ?
- Service B 序列化结果–> Service A 通信工具解析请求–> Service A Stub反序列化请求–>结果返回Service A中的Application
4.RPC Demo
4.1 client(客户端)
-
客户端发起RPC请求,ComsumerApp
public class ComsumerApp {private static Logger log = LoggerFactory.getLogger(ComsumerApp.class);public static void main(String[] args) {Calculator calculator = new CalculatorRemoteImpl();int result = calculator.add(1, 2);log.info("result is {}", result);} }
-
把RPC的逻辑封装到CalculatorRemoteImpl类中,客户端调用时感知不到远程调用的麻烦。 CalculatorRemoteImpl:
public class CalculatorRemoteImpl implements Calculator {public static final int PORT = 9090;private static Logger log = LoggerFactory.getLogger(CalculatorRemoteImpl.class);public int add(int a, int b) {//在分布式系统中,一个服务可能有多个实例,比如Service B,可能有ip地址为198.168.1.11和198.168.1.13两个实例,lookupProviders是在寻找要调用服务的实例列表。在分布式应用下,通常会有一个服务注册中心,来提供查询实例列表的功能。List<String> addressList = lookupProviders("Calculator.add");//查到实例列表之后,chooseTarget是要选择调用哪一个实例,其实内部就是一个负载均衡String address = chooseTarget(addressList);//实现一个简单的RPC,所以暂时不考虑服务注册中心和负载均衡,因此代码里返回ip地址为127.0.0.1。try {Socket socket = new Socket(address, PORT);//Socket通信// 将请求序列化CalculateRpcRequest calculateRpcRequest = generateRequest(a, b);ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());// 将请求发给服务提供方objectOutputStream.writeObject(calculateRpcRequest);// 将响应体反序列化ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());Object response = objectInputStream.readObject();log.info("response is {}", response);if (response instanceof Integer) {return (Integer) response;} else {throw new InternalError();}} catch (Exception e) {log.error("fail", e);throw new InternalError();}}private CalculateRpcRequest generateRequest(int a, int b) {CalculateRpcRequest calculateRpcRequest = new CalculateRpcRequest();calculateRpcRequest.setA(a);calculateRpcRequest.setB(b);calculateRpcRequest.setMethod("add");return calculateRpcRequest;}private String chooseTarget(List<String> providers) {if (null == providers || providers.size() == 0) {throw new IllegalArgumentException();}return providers.get(0);}public static List<String> lookupProviders(String name) {List<String> strings = new ArrayList();strings.add("127.0.0.1");return strings;}
}
4.2 server(服务端)
-
服务端是接收到请求,响应需求, ProviderApp:
public class ProviderApp {private static Logger log = LoggerFactory.getLogger(ProviderApp.class);private Calculator calculator = new CalculatorImpl();public static void main(String[] args) throws IOException {new ProviderApp().run();}private void run() throws IOException {ServerSocket listener = new ServerSocket(9090);try {while (true) {Socket socket = listener.accept();try {// 将请求反序列化ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());Object object = objectInputStream.readObject();log.info("request is {}", object);// 调用服务int result = 0;if (object instanceof CalculateRpcRequest) {CalculateRpcRequest calculateRpcRequest = (CalculateRpcRequest) object;if ("add".equals(calculateRpcRequest.getMethod())) {result = calculator.add(calculateRpcRequest.getA(), calculateRpcRequest.getB());} else {throw new UnsupportedOperationException();}}// 返回结果ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());objectOutputStream.writeObject(new Integer(result));} catch (Exception e) {log.error("fail", e);} finally {socket.close();}}} finally {listener.close();}} }
Server端主要是通过ServerSocket的accept方法,来接收Client端的请求,接着就是反序列化请求->执行->序列化执行结果,最后将二进制格式的执行结果返回给Client。
5.Flink RPC
5.1 概述
在学习Flink RPC框架前,总结一哈常见大数据组件的RPC实现:
技术组件 | RPC实现 |
---|---|
HDFS | Netty |
HBase | HBase-2.x 以前:NIO + ProtoBuf HBase-2.x 以后:Netty |
ZooKeeper | BIO(集群启动选举) + NIO(服务端处理客户端请求3.4) + Netty(3.6) |
Spark | Spark-1.x 基于 Akka Spark-2.x 基于 Netty |
Flink | Akka + Netty |
总言之:Flink的RPC实现:基于Scala的网络编程库:Akka。
5.2 前置知识
- ActorSystem(重量级的系统对象) 是管理 Actor生命周期的组件, Actor是负责进行通信的组件。
- 每个 Actor 都有一个 MailBox,其他 Actor 发送给它的消息首先储存在 MailBox 中,通过这种方式可以实现异步通信。
- 每个 Actor 是单线程处理方式,不断从 MailBox 拉取消息执行处理,所以对于 Actor 的消息处理,不适合调用会阻塞的处理方法。
- 每个ActorSystem 和 Actor在启动时会给定一个 name,如果要从ActorSystem中获取一 个 Actor,则通过以下的方式来进行 Actor的获取:akka.tcp://asname@bigdata02:9527/user/actorname 。
- 如果一个Actor和另外一个 Actor进行通信,则必须先获取对方 Actor 的 ActorRef 对象,然后通过该对象发送消息即可。
- 通过 tell 发送异步消息,不接收响应,通过 ask 发送异步消息,得到 Future 返回,通过异步返回处理结果。
5.3 Flink RPC详解
Flink 中 RPC 的框架设计的主要类:
Flink 中的 RPC 实现主要在 flink-runtime 模块下的 org.apache.flink.runtime.rpc 包中,涉及到的最重要的 API 主要是以下这四个: RpcGateway 、RpcServer、RpcEndpoint 以及 RpcService
5.3.1 RpcGateway
Flink的RPC协议通过RpcGateway来定义,主要定义通信行为;用于远程调用RpcEndpoint的某些方法,可以理解为客服端代理。
/*** Rpc gateway interface which has to be implemented by Rpc gateways.*/
public interface RpcGateway {/*** Returns the fully qualified address under which the associated rpc endpoint is reachable.** @return Fully qualified (RPC) address under which the associated rpc endpoint is reachable*/String getAddress();/*** Returns the fully qualified hostname under which the associated rpc endpoint is reachable.** @return Fully qualified hostname under which the associated rpc endpoint is reachable*/String getHostname();
}
5.3.2 RpcServer
RpcServer负责接收响应远端RPC消息请求。有两个实现:AkkaInvocationHandler和FencedAkkaInvocationHandler。RpcServer 是 Actor 与 RpcEndpoint 两层之间的粘合层。
/*** Interface for self gateways.*/
public interface RpcServer extends StartStoppable, MainThreadExecutable, RpcGateway {/*** Return a future which is completed when the rpc endpoint has been terminated.** @return Future indicating when the rpc endpoint has been terminated*/CompletableFuture<Void> getTerminationFuture();
}
5.3.3 RpcEndPoint
RpcEndpoint是通信终端,提供RPC服务组件的生命周期管理(start、stop)。每个RpcEndpoint对应了一个路径(endpointId和actorSystem共同确定),每个路径对应一个Actor,其实现了RpcGateway接口,其构造函数如下:
protected RpcEndpoint(final RpcService rpcService, final String endpointId) {// 保存rpcService和endpointIdthis.rpcService = checkNotNull(rpcService, "rpcService");this.endpointId = checkNotNull(endpointId, "endpointId");/** 注释:通过RpcService启动 ResourceManager的RPCServer服务* 此处启动的是 ResourceManager 的 RPC 服务端,在接收TaskManager启动完成信息之后,进行注册和心跳,来汇报Taskmanager的资源情况。* 通过动态代理的形式构建了一个Server。*/this.rpcServer = rpcService.startServer(this);// 主线程执行器,所有调用在主线程中串行执行this.mainThreadExecutor = new MainThreadExecutor(rpcServer, this::validateRunsInMainThread);}
RpcEndpoint 下面有四个比较重要的子类: TaskExecutor、Dispatcher、JobMaster、ResourceManager ;当在任意地方要创建这四个组件中任何一个组件的实例对象时,创建成功之后,都会要去执行其中的 onStart()方法 ,在集群启动的源码分析中,其实这些组件很多的工作流程,都被放在 onStart() 里面。
5.3.4 RpcService
RpcService 是Rpc服务的接口,其主要作用如下:
- 根据提供的RpcEndpoint来启动和停止RpcServer(Actor);
- 根据提供的地址连接到RpcServer,并返回一个RpcGateway;
- 延迟/立刻调度Runnable、Callable;
6.总结
- RPC是为了解决分布式系统中,不同服务之间的调用问题,让远程调用也如同本地调用一样丝滑。
- RpcGateway 是所有 RPC 的祖宗,各种RPC组件均是RpcGateway的子类。
- RpcEndpoint是业务载体,对应Actor的封装。
- RpcService 是 Rpc服务的接口,对应ActorSystem的封装。
- RpcServer 是 RpcService 与 RpcEndpoint 之间的粘合层。
- RpcEndpoint 下面有四个比较重要的子类: TaskExecutor、Dispatcher、JobMaster、ResourceManager,且当实例化其中的一个组件对象成功后会执行对应的onStart()方法。