说明
netty是java重要的企业级NIO,使用它可以快速实现很多功能通信功能如:http、ftp、socket、websocket、udp等。 本站使用自定义网包实现网络通信。
分享
大数据博客列表 开发记录汇总 个人java工具库 项目https://gitee.com/wangzonghui/object-tool 包含json、string、集合、excel、zip压缩、pdf、bytes、http等多种工具,欢迎使用。
内置编码器和解码器
解码器
名称 说明 ByteToMessageDecoder 如果想实现自己的半包解码器,实现该类 MessageToMessageDecoder 一般作为二次解码器,当我们在 ByteToMessageDecoder 将一个 bytes 数组转换成一个 java 对象的时候,我们可能还需要将这个对象进行二次解码成其他对象,我们就可以继承这个类; LineBasedFrameDecoder 通过在包尾添加回车换行符 \r\n 来区分整包消息; StringDecoder 字符串解码器; DelimiterBasedFrameDecoder 特殊字符作为分隔符来区分整包消息; FixedLengthFrameDecoder 报文大小固定长度,不够空格补全; ProtoBufVarint32FrameDecoder 通过 Protobuf 解码器来区分整包消息; ProtobufDecoder Protobuf 解码器; LengthFieldBasedFrameDecoder 指定长度来标识整包消息,通过在包头指定整包长度来约定包长。
编码器
名称 说明 ProtobufEncoder Protobuf 编码器; MessageToByteEncoder 将 Java 对象编码成 ByteBuf; MessageToMessageEncoder 如果不想将 Java 对象编码成 ByteBuf,而是自定义类就继承这个; LengthFieldPrepender LengthFieldPrepender 是一个非常实用的工具类,如果我们在发送消息的时候采用的是:消息长度字段+原始消息的形式,那么我们就可以使用 LengthFieldPrepender。这是因为 LengthFieldPrepender 可以将待发送消息的长度(二进制字节长度)写到 ByteBuf 的前两个字节。
代码实现
创建核心类
消息实体类
public class MyMessage { private int len; private byte [ ] content; public int getLen ( ) { return len; } public void setLen ( int len) { this . len = len; } public byte [ ] getContent ( ) { return content; } public void setContent ( byte [ ] content) { this . content = content; }
}
自定义编码类
import io. netty. buffer. ByteBuf ;
import io. netty. channel. ChannelHandlerContext ;
import io. netty. handler. codec. MessageToByteEncoder ; public class MyMessageEncoder extends MessageToByteEncoder < MyMessage > { @Override protected void encode ( ChannelHandlerContext channelHandlerContext, MyMessage myMessage, ByteBuf byteBuf) throws Exception { byteBuf. writeInt ( myMessage. getLen ( ) ) ; byteBuf. writeBytes ( myMessage. getContent ( ) ) ; } }
自定义解码类
import java. util. List ; import io. netty. buffer. ByteBuf ;
import io. netty. channel. ChannelHandlerContext ;
import io. netty. handler. codec. ByteToMessageDecoder ; public class MyMessageDecoder extends ByteToMessageDecoder { int length= 0 ; @Override protected void decode ( ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List < Object > list) throws Exception { if ( byteBuf. readableBytes ( ) >= 4 ) { if ( length== 0 ) { length= byteBuf. readInt ( ) ; } if ( byteBuf. readableBytes ( ) < length) { return ; } byte [ ] content= new byte [ length] ; byteBuf. readBytes ( content) ; MyMessage message= new MyMessage ( ) ; message. setLen ( length) ; message. setContent ( content) ; list. add ( message) ; length= 0 ; } } }
服务端
ServerHandler
import com. netty. cn. rpc. selfmessage. core. MyMessage ; import io. netty. channel. ChannelHandlerContext ;
import io. netty. channel. SimpleChannelInboundHandler ;
import io. netty. util. CharsetUtil ; public class MyServerHandler extends SimpleChannelInboundHandler < MyMessage > { private int count; @Override protected void channelRead0 ( ChannelHandlerContext ctx, MyMessage myMessage) throws Exception { System . out. println ( "服务端收到消息:" ) ; System . out. println ( "长度:" + myMessage. getLen ( ) ) ; System . out. println ( "内容: " + new String ( myMessage. getContent ( ) , CharsetUtil . UTF_8 ) ) ; System . out. println ( "收到消息数量:" + ( ++ count) ) ; String msg= "服务端收到请求" ; MyMessage message= new MyMessage ( ) ; message. setContent ( msg. getBytes ( CharsetUtil . UTF_8 ) ) ; message. setLen ( msg. getBytes ( CharsetUtil . UTF_8 ) . length) ; ctx. writeAndFlush ( message) ; } @Override public void channelReadComplete ( ChannelHandlerContext ctx) throws Exception { super . channelReadComplete ( ctx) ; }
}
入口类
import com. netty. cn. rpc. selfmessage. core. MyMessageDecoder ;
import com. netty. cn. rpc. selfmessage. core. MyMessageEncoder ; import io. netty. bootstrap. ServerBootstrap ;
import io. netty. channel. ChannelFuture ;
import io. netty. channel. ChannelInitializer ;
import io. netty. channel. ChannelOption ;
import io. netty. channel. ChannelPipeline ;
import io. netty. channel. EventLoopGroup ;
import io. netty. channel. nio. NioEventLoopGroup ;
import io. netty. channel. socket. SocketChannel ;
import io. netty. channel. socket. nio. NioServerSocketChannel ; public class MyServer { public static void main ( String [ ] args) { int port= 8080 ; EventLoopGroup bossGroup= new NioEventLoopGroup ( 1 ) ; EventLoopGroup workerGroup= new NioEventLoopGroup ( ) ; try { ServerBootstrap bootstrap= new ServerBootstrap ( ) ; bootstrap. group ( bossGroup, workerGroup) . channel ( NioServerSocketChannel . class ) . option ( ChannelOption . SO_BACKLOG , port) . childHandler ( new ChannelInitializer < SocketChannel > ( ) { protected void initChannel ( SocketChannel socketChannel) { ChannelPipeline channelPipeline= socketChannel. pipeline ( ) ; channelPipeline. addLast ( new MyMessageDecoder ( ) ) ; channelPipeline. addLast ( new MyMessageEncoder ( ) ) ; channelPipeline. addLast ( new MyServerHandler ( ) ) ; } } ) ; System . out. println ( "netty server start" ) ; ChannelFuture cf= bootstrap. bind ( port) . sync ( ) ; cf. channel ( ) . closeFuture ( ) . sync ( ) ; } catch ( Exception e) {
System . out. println ( e. toString ( ) ) ; } finally { bossGroup. shutdownGracefully ( ) ; workerGroup. shutdownGracefully ( ) ; } }
}
客户端
ClientHandler
import com. netty. cn. rpc. selfmessage. core. MyMessage ; import io. netty. channel. ChannelHandlerContext ;
import io. netty. channel. SimpleChannelInboundHandler ;
import io. netty. util. CharsetUtil ; public class MyClientHandler extends SimpleChannelInboundHandler < MyMessage > { @Override public void channelActive ( ChannelHandlerContext ctx) throws Exception { System . out. println ( "连接服务端 " + ctx. channel ( ) . remoteAddress ( ) + " 成功" ) ; String msg= "你好,我是张asdfasdfsadfwerwerwerwerewrewrewrewr三。" ; for ( int i= 0 ; i< 20 ; i++ ) { MyMessage message= new MyMessage ( ) ; message. setContent ( msg. getBytes ( CharsetUtil . UTF_8 ) ) ; message. setLen ( msg. getBytes ( CharsetUtil . UTF_8 ) . length) ; ctx. writeAndFlush ( message) ; } } @Override protected void channelRead0 ( ChannelHandlerContext channelHandlerContext, MyMessage myMessage) throws Exception { System . out. println ( "client 接收到信息:" + new String ( myMessage. getContent ( ) ) . toString ( ) ) ; } @Override public void exceptionCaught ( ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx. close ( ) ; } }
入口类
import com. netty. cn. rpc. selfmessage. core. MyMessageDecoder ;
import com. netty. cn. rpc. selfmessage. core. MyMessageEncoder ; import io. netty. bootstrap. Bootstrap ;
import io. netty. channel. ChannelFuture ;
import io. netty. channel. ChannelInitializer ;
import io. netty. channel. ChannelPipeline ;
import io. netty. channel. EventLoopGroup ;
import io. netty. channel. nio. NioEventLoopGroup ;
import io. netty. channel. socket. SocketChannel ;
import io. netty. channel. socket. nio. NioSocketChannel ; public class MyClient { public static void main ( String [ ] args) { int port= 8080 ; EventLoopGroup group= new NioEventLoopGroup ( ) ; try { Bootstrap bootstrap= new Bootstrap ( ) ; bootstrap. group ( group) . channel ( NioSocketChannel . class ) . handler ( new ChannelInitializer < SocketChannel > ( ) { protected void initChannel ( SocketChannel socketChannel) { ChannelPipeline channelPipeline= socketChannel. pipeline ( ) ; channelPipeline. addLast ( new MyMessageDecoder ( ) ) ; channelPipeline. addLast ( new MyMessageEncoder ( ) ) ; channelPipeline. addLast ( new MyClientHandler ( ) ) ; } } ) ; ChannelFuture cf = bootstrap. connect ( "127.0.0.1" , port) . sync ( ) ; cf. channel ( ) . closeFuture ( ) . sync ( ) ; System . out. println ( "启动客户端" + port) ; } catch ( Exception e) {
System . out. println ( e. toString ( ) ) ; } finally { group. shutdownGracefully ( ) ; } }
}
测试
先启动 MyServer
,再启动 MyClient
,可以看到控制台打印如下内容: Server
netty server start
服务端收到消息:
长度:60
内容: 你好,我是张asdfasdfsadfwerwerwerwerewrewrewrewr三。
收到消息数量:1
连接服务端 /127.0.0.1:8080 成功
client 接收到信息:服务端收到请求
参考
总结
该方式定义了数据传输结构,传输过程中由编码器ByteBuf
完成数据处理。 由于内容是二进制格式,可以存储数据,如json字符串、protobuf二次处理后数据,提升了数据传输灵活性。