在之前的聊天项目中,新增 Rpc 请求和响应消息
消息
新增消息
@Data
public abstract class Message implements Serializable {// 省略旧的代码public static final int RPC_MESSAGE_TYPE_REQUEST = 101;public static final int RPC_MESSAGE_TYPE_RESPONSE = 102;static {// ...messageClasses.put(RPC_MESSAGE_TYPE_REQUEST, RpcRequestMessage.class);messageClasses.put(RPC_MESSAGE_TYPE_RESPONSE, RpcResponseMessage.class);}}
请求消息
@Getter
@ToString(callSuper = true)
public class RpcRequestMessage extends Message {/*** 调用的接口全限定名,服务端根据它找到实现,清楚远程调用哪一个接口*/private String interfaceName;/*** 调用接口中的方法名:远程调用的方法名*/private String methodName;/*** 方法返回类型:远程调用的方法的返回值类型*/private Class<?> returnType;/*** 方法参数类型数组:远程调用的方法的参数类型*/private Class[] parameterTypes;/*** 方法参数值数组:远程调用的方法的实际参数的值*/private Object[] parameterValue;//构造方法:对上面的参数进行赋值public RpcRequestMessage(int sequenceId, String interfaceName, String methodName, Class<?> returnType, Class[] parameterTypes, Object[] parameterValue) {super.setSequenceId(sequenceId);this.interfaceName = interfaceName;this.methodName = methodName;this.returnType = returnType;this.parameterTypes = parameterTypes;this.parameterValue = parameterValue;}@Overridepublic int getMessageType() {return RPC_MESSAGE_TYPE_REQUEST;}
}
响应消息
@Data
@ToString(callSuper = true)
public class RpcResponseMessage extends Message {/*** 返回值*/private Object returnValue;/*** 异常值:远程调用如果出现了错误,便使用这个异常值进行获取*/private Exception exceptionValue;@Overridepublic int getMessageType() {return RPC_MESSAGE_TYPE_RESPONSE;}
}
根据接口名称找到实现
import java.io.IOException;
import java.io.InputStream;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;public class ServicesFactory {static Properties properties;static Map<Class<?>, Object> map = new ConcurrentHashMap<>();static {try (InputStream in = Config.class.getResourceAsStream("/application.properties")) {properties = new Properties();properties.load(in);Set<String> names = properties.stringPropertyNames();for (String name : names) {if (name.endsWith("Service")) {Class<?> interfaceClass = Class.forName(name);Class<?> instanceClass = Class.forName(properties.getProperty(name));map.put(interfaceClass, instanceClass.newInstance());}}} catch (IOException | ClassNotFoundException | InstantiationException | IllegalAccessException e) {throw new ExceptionInInitializerError(e);}}public static <T> T getService(Class<T> interfaceClass) {return (T) map.get(interfaceClass);}
}
service方法
package cn.abc.server.service;public interface HelloService {String sayHello(String name);
}
package cn.abc.server.service;public class HelloServiceImpl implements HelloService {@Overridepublic String sayHello(String msg) {return "你好, " + msg;}
}
application.properties
serializer.algorithm=Json
cn.abc.server.service.HelloService=cn.abc.server.service.HelloServiceImpl
序列化
将java的class转换为json字符串
public interface Serializer {// 反序列化方法<T> T deserialize(Class<T> clazz, byte[] bytes);// 序列化方法<T> byte[] serialize(T object);enum Algorithm implements Serializer {Java {@Overridepublic <T> T deserialize(Class<T> clazz, byte[] bytes) {try {ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));return (T) ois.readObject();} catch (IOException | ClassNotFoundException e) {throw new RuntimeException("反序列化失败", e);}}@Overridepublic <T> byte[] serialize(T object) {try {ByteArrayOutputStream bos = new ByteArrayOutputStream();ObjectOutputStream oos = new ObjectOutputStream(bos);oos.writeObject(object);return bos.toByteArray();} catch (IOException e) {throw new RuntimeException("序列化失败", e);}}},Json {@Overridepublic <T> T deserialize(Class<T> clazz, byte[] bytes) {//注册的时候说明是哪一种类型需要这种转换器//现在是Class类型需要这个转换器Gson gson = new GsonBuilder().registerTypeAdapter(Class.class, new Serializer.ClassCodec()).create();String json = new String(bytes, StandardCharsets.UTF_8);return gson.fromJson(json, clazz);}@Overridepublic <T> byte[] serialize(T object) {//注册的时候说明是哪一种类型需要这种转换器//现在是Class类型需要这个转换器Gson gson = new GsonBuilder().registerTypeAdapter(Class.class, new Serializer.ClassCodec()).create();String json = gson.toJson(object);return json.getBytes(StandardCharsets.UTF_8);}}}//将java的哪一个类型(class)转换为jsonclass ClassCodec implements JsonSerializer<Class<?>>, JsonDeserializer<Class<?>> {@Overridepublic Class<?> deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context) throws JsonParseException {try {//拿到字符串形式:类名String str = json.getAsString();//将类名还原为java中的Classreturn Class.forName(str);} catch (ClassNotFoundException e) {throw new JsonParseException(e);}}//将java中的Class变为json@Override public JsonElement serialize(Class<?> src, Type typeOfSrc, JsonSerializationContext context) {// class -> jsonreturn new JsonPrimitive(src.getName());}}
}
SequenceId
import java.util.concurrent.atomic.AtomicInteger;public abstract class SequenceIdGenerator {private static final AtomicInteger id = new AtomicInteger();public static int nextId() {return id.incrementAndGet();}
}
第一版本(客户端与服务器网络通信)
服务器端
@Slf4j
public class RpcServer {public static void main(String[] args) {NioEventLoopGroup boss = new NioEventLoopGroup();NioEventLoopGroup worker = new NioEventLoopGroup();LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();// rpc 请求消息处理器RpcRequestMessageHandler RPC_HANDLER = new RpcRequestMessageHandler();try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.channel(NioServerSocketChannel.class);serverBootstrap.group(boss, worker);serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {//处理半包黏包的处理器ch.pipeline().addLast(new ProcotolFrameDecoder());//日志handler的处理器ch.pipeline().addLast(LOGGING_HANDLER);//自定义协议的编解码器ch.pipeline().addLast(MESSAGE_CODEC);//rpc 请求消息处理器ch.pipeline().addLast(RPC_HANDLER);}});Channel channel = serverBootstrap.bind(8080).sync().channel();channel.closeFuture().sync();} catch (InterruptedException e) {log.error("server error", e);} finally {boss.shutdownGracefully();worker.shutdownGracefully();}}
}
服务器端Handler(请求)
@Slf4j
@ChannelHandler.Sharable
public class RpcRequestMessageHandler extends SimpleChannelInboundHandler<RpcRequestMessage> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, RpcRequestMessage message) {RpcResponseMessage response = new RpcResponseMessage();response.setSequenceId(message.getSequenceId());try {//通过接口名称获取对应的实现类对象HelloService service = (HelloService)ServicesFactory.getService(Class.forName(message.getInterfaceName()));//通过实现类对象的方法名找到对应的方法Method method = service.getClass().getMethod(message.getMethodName(), message.getParameterTypes());//获取执行结果Object invoke = method.invoke(service, message.getParameterValue());response.setReturnValue(invoke);} catch (Exception e) {e.printStackTrace();String msg = e.getCause().getMessage();response.setExceptionValue(new Exception("远程调用出错:" + msg));}ctx.writeAndFlush(response);}}
客户端
public class RpcClient {public static void main(String[] args) {NioEventLoopGroup group = new NioEventLoopGroup();LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();// rpc 响应消息处理器RpcResponseMessageHandler RPC_HANDLER = new RpcResponseMessageHandler();try {Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(group);bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {//处理半包黏包的处理器ch.pipeline().addLast(new ProcotolFrameDecoder());//日志handler的处理器ch.pipeline().addLast(LOGGING_HANDLER);//自定义协议的编解码器ch.pipeline().addLast(MESSAGE_CODEC);// rpc 响应消息处理器ch.pipeline().addLast(RPC_HANDLER);}});Channel channel = bootstrap.connect("localhost", 8080).sync().channel();//发送请求消息,开始运行出站处理器(MESSAGE_CODEC、LOGGING_HANDLER)//然后进入服务器端ChannelFuture future = channel.writeAndFlush(new RpcRequestMessage(1,"cn.abc.server.service.HelloService","sayHello",String.class,new Class[]{String.class},new Object[]{"张三"})).addListener(f-> {if (!f.isSuccess()) {//获取异常信息Throwable cause = f.cause();log.error("error", cause);}});channel.closeFuture().sync();} catch (Exception e) {log.error("client error", e);} finally {group.shutdownGracefully();}}
}
客户端Handler(响应)
@Slf4j
@ChannelHandler.Sharable
public class RpcResponseMessageHandler extends SimpleChannelInboundHandler<RpcResponseMessage> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, RpcResponseMessage msg) throws Exception {log.debug("{}", msg);}
}
流程说明
-
客户端建立连接
Channel channel = bootstrap.connect("localhost", 8080).sync().channel()
-
客户端这边发送一个请求消息
ChannelFuture future = channel.writeAndFlush(new RpcRequestMessage(1,"cn.abc.server.service.HelloService","sayHello",String.class,new Class[]{String.class},new Object[]{"张三"})).addListener(f-> {if (!f.isSuccess()) {//如果没有成功//获取异常信息Throwable cause = f.cause();log.error("error", cause);}});
-
客户端的请求消息找到客户端的pipeline的出站处理器(现在客户端只有两个出站处理器)
ch.pipeline().addLast(LOGGING_HANDLER); ch.pipeline().addLast(MESSAGE_CODEC);
-
客户端的出站处理器,处理完后,进入服务器端pipeline的入站处理器,对消息进行处理
ch.pipeline().addLast(new ProcotolFrameDecoder()); ch.pipeline().addLast(LOGGING_HANDLER); ch.pipeline().addLast(MESSAGE_CODEC); //将消息交给了请求handler ch.pipeline().addLast(RPC_HANDLER);
-
服务器端的入站处理器,进入请求handler:该handler拿到rpc的请求消息,通过接口名称得到要正的要调用的实现对象,找到要调用的方法,过后反射进行调用获取到执行的结果,将这个过程中成功或异常信息封装到响应消息里,通过ctx返回
@Override protected void channelRead0(ChannelHandlerContext ctx, RpcRequestMessage message) {RpcResponseMessage response = new RpcResponseMessage();response.setSequenceId(message.getSequenceId());try {//通过接口名称获取对应的实现类对象HelloService service = (HelloService)ServicesFactory.getService(Class.forName(message.getInterfaceName()));//通过实现类对象的方法名找到对应的方法Method method = service.getClass().getMethod(message.getMethodName(), message.getParameterTypes());//获取执行结果Object invoke = method.invoke(service, message.getParameterValue());response.setReturnValue(invoke);} catch (Exception e) {e.printStackTrace();String msg = e.getCause().getMessage();response.setExceptionValue(new Exception("远程调用出错:" + msg));}ctx.writeAndFlush(response); }
-
ctx返回的响应消息,要经过服务器端的出站处理器:从上往下找到出站处理器,当前服务器端的两次出站处理器处理完后,便将消息发送到了客户端
ch.pipeline().addLast(LOGGING_HANDLER); ch.pipeline().addLast(MESSAGE_CODEC);
-
客户端这边收到消息后,做入站处理,当前客户端要结果下面的四次入站处理,到第四次交给客户端的handler
ch.pipeline().addLast(new ProcotolFrameDecoder()); ch.pipeline().addLast(LOGGING_HANDLER); ch.pipeline().addLast(MESSAGE_CODEC); //将消息交给了请求handler ch.pipeline().addLast(RPC_HANDLER);
-
客户端的handler,接收到消息后,将消息进行了打印
@Slf4j @ChannelHandler.Sharable public class RpcResponseMessageHandler extends SimpleChannelInboundHandler<RpcResponseMessage> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, RpcResponseMessage msg) throws Exception {log.debug("{}", msg);} }
第二版本
第一版本是将消息写死了,
ChannelFuture future = channel.writeAndFlush(new RpcRequestMessage(1,"cn.abc.server.service.HelloService","sayHello",String.class,new Class[]{String.class},new Object[]{"张三"})).addListener(promise -> {if (!promise.isSuccess()) {Throwable cause = promise.cause();log.error("error", cause);}});
第一版本中,方法名、方法参数、类型等,是自己去做一些转换封装成了消息对象,带来了使用者的一些工作量
new RpcRequestMessage(1,"cn.abc.server.service.HelloService","sayHello",String.class,new Class[]{String.class},new Object[]{"张三"})
下面第二版本的任务便是将客户端的代码进行改造,其他内容不变
客户端
import cn.abc.client.handler.RpcResponseMessageHandler;
import cn.abc.message.RpcRequestMessage;
import cn.abc.protocol.MessageCodecSharable;
import cn.abc.protocol.ProcotolFrameDecoder;
import cn.abc.protocol.SequenceIdGenerator;
import cn.abc.server.service.HelloService;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.concurrent.DefaultPromise;
import lombok.extern.slf4j.Slf4j;import java.lang.reflect.Proxy;@Slf4j
public class RpcClientManager {public static void main(String[] args) {HelloService service = getProxyService(HelloService.class);System.out.println(service.sayHello("zhangsan"));}/* 创建代理类:将service.sayHello("zhangsan")这个方法的调用再转换为rpc消息(RpcRequestMessage);再由当前这个代理类去发送这条消息*/public static <T> T getProxyService(Class<T> serviceClass) {//拿到接口类型的类加载器ClassLoader loader = serviceClass.getClassLoader();//代理要实现的接口的数组Class<?>[] interfaces = new Class[]{serviceClass};// sayHello "张三"//创建代理类Object o = Proxy.newProxyInstance(loader, interfaces, (proxy, method, args) -> {/*proxy:代理对象method:代理类正在执行的方法args:方法的实际参数数组*/int sequenceId = SequenceIdGenerator.nextId();// 1. 将方法调用转换为 消息对象RpcRequestMessage msg = new RpcRequestMessage(sequenceId,serviceClass.getName(),method.getName(),method.getReturnType(),method.getParameterTypes(),args);// 2. 将消息对象发送出去getChannel().writeAndFlush(msg);// 3. 暂时返回nullreturn null; });return (T) o;}private static Channel channel = null;private static final Object LOCK = new Object();// 获取唯一的 channel 对象public static Channel getChannel() {if (channel != null) {return channel;}/*如果有两个线程同时进入这个方法,才开始时,二者都判断出当前channel为null;过后,两个线程(t1、t2)同时进入下面的synchronized;t1线程首先抢到锁,往下运行,如果此时是channel确实是null,则触发init方法,返回channel对象;t1线程执行完后,t2线程开始运行,首先还是需要判断当前channel是否为null,现在不为null了,则直接返回channel;*///对线程加锁synchronized (LOCK) { // t2if (channel != null) { // t1return channel;}//当channel为空,则创建channelinitChannel();return channel;}}// 初始化 channel 方法private static void initChannel() {NioEventLoopGroup group = new NioEventLoopGroup();LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();RpcResponseMessageHandler RPC_HANDLER = new RpcResponseMessageHandler();Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(group);bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new ProcotolFrameDecoder());ch.pipeline().addLast(LOGGING_HANDLER);ch.pipeline().addLast(MESSAGE_CODEC);ch.pipeline().addLast(RPC_HANDLER);}});try {channel = bootstrap.connect("localhost", 8080).sync().channel();//不在使用sync方法在这里阻塞,所以使用异步方式,当channel关闭时触发channel.closeFuture().addListener(future -> {group.shutdownGracefully();});} catch (Exception e) {log.error("client error", e);}}
}
流程说明
- 在客户端创建出来一个代理对象,代理对象将方法调用转换为消息发送
HelloService service = getProxyService(HelloService.class);
-
当调用代理对象中的任何方法时,
service.sayHello("zhangsan")
-
当上面的方法进行调用时,便会进入代理对象的下面代码:zhangsan将会传递为args,开始序列化生成rpc的消息进行发送
(proxy, method, args) -> {/*proxy:代理对象method:代理类正在执行的方法args:方法的实际参数数组*/int sequenceId = SequenceIdGenerator.nextId();// 1. 将方法调用转换为 消息对象RpcRequestMessage msg = new RpcRequestMessage(sequenceId,serviceClass.getName(),method.getName(),method.getReturnType(),method.getParameterTypes(),args);// 2. 将消息对象发送出去getChannel().writeAndFlush(msg);// 3. 暂时返回nullreturn null; }
第三版本
第二版本的客户端代码并没有去接收结果,现在将在第三版本中去接收结果。
第二版本中创建的代理对象内容,都是在主线程中去调用运行的,但是服务器真正将响应消息返回是在:
- 客户端的RpcResponseMessageHandler里面的channelRead0中获取到并打印的,
- 这是在netty的Nio的线程里面执行的
现在就是在两个线程中运行,两个线程中通信可以使用Promise
用图说明过程
- t0、t2、t4:主线程的三次方法调用
- t1、t3、t5:Nio得到响应消息的线程
- 这两个线程之间使用Promise(多个)去交换数据
- t0开始调用时,需要准备好一个promise;将来t1拿到结果了,就将结果填到当前t0准备好的这个promise里面
- t2和t3;t4和t5类似
- 这些promise需要有一个集合去管理,promise之间需要对应起来,这时候就需要一个id(在代码里面是SequenceId)去对应起来
下面还是只是需要修改客户端的代码
客户端
@Slf4j
public class RpcClientManager {//...其他代码不变,这里只是将变化的代码进行说明public static <T> T getProxyService(Class<T> serviceClass) {//拿到接口类型的类加载器ClassLoader loader = serviceClass.getClassLoader();//代理要实现的接口的数组Class<?>[] interfaces = new Class[]{serviceClass};// sayHello "张三"//创建代理类Object o = Proxy.newProxyInstance(loader, interfaces, (proxy, method, args) -> {/*proxy:代理对象method:代理类正在执行的方法args:方法的实际参数数组*/int sequenceId = SequenceIdGenerator.nextId();// 1. 将方法调用转换为 消息对象RpcRequestMessage msg = new RpcRequestMessage(sequenceId,serviceClass.getName(),method.getName(),method.getReturnType(),method.getParameterTypes(),args);// 2. 将消息对象发送出去getChannel().writeAndFlush(msg);// 3. 准备一个空 Promise 对象,来接收结果// getChannel().eventLoop():指定 promise 对象异步接收结果线程DefaultPromise<Object> promise = new DefaultPromise<>(getChannel().eventLoop());//将id(sequenceId) 和 promise 进行对应RpcResponseMessageHandler.PROMISES.put(sequenceId, promise);
// promise.addListener(future -> {//getChannel().eventLoop():指定 promise 对象异步接收结果线程//当前这个Listener方法里面与上面的getChannel().eventLoop()将会是同一个线程
// // 线程
// });// 4. 等待 promise 结果promise.await();//直到有消息:// 即RpcResponseMessageHandler里面调用了 setFailure() 或 setSuccess()方法promise.await()便会结束等待if(promise.isSuccess()) {// 调用正常:返回promise存放的东西(在RpcResponseMessageHandler里面存放的)return promise.getNow();} else {// 调用失败throw new RuntimeException(promise.cause());}});return (T) o;}//....其他代码不变,这里只是将变化的代码进行说明
客户端Handler
import cn.abc.message.RpcResponseMessage;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.concurrent.Promise;
import lombok.extern.slf4j.Slf4j;import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;@Slf4j
@ChannelHandler.Sharable
public class RpcResponseMessageHandler extends SimpleChannelInboundHandler<RpcResponseMessage> {/*使用map集合去存放所有的promise,ConcurrentHashMap线程安全Integer: 序号Promise<Object>:用来接收结果(结果不确定,Object去接收)的 promise 对象*/public static final Map<Integer, Promise<Object>> PROMISES = new ConcurrentHashMap<>();@Overrideprotected void channelRead0(ChannelHandlerContext ctx, RpcResponseMessage msg) throws Exception {log.debug("{}", msg);// 拿到空的 promise:根据消息序号拿到还没有填充结果的promise//remove:拿到移除后的promise,传递完消息后,这个promise就没有用了,所以使用removePromise<Object> promise = PROMISES.remove(msg.getSequenceId());//如果promise为空,则判断结果是长成还是异常if (promise != null) {//正常结果Object returnValue = msg.getReturnValue();//异常结果Exception exceptionValue = msg.getExceptionValue();//setFailure 和 setSuccess 不管调用哪一个,都会使promise.await()方法结束等待if(exceptionValue != null) {promise.setFailure(exceptionValue);} else {promise.setSuccess(returnValue);}}}
}