一、Netty简介
Netty 是一个异步事件驱动的网络通信应用框架,用于快速开发可维护的高性能服务器和客户端。简单地说Netty封装了JDK的NIO,不用再写一大堆复杂的代码,从NIO各种繁复的细节中脱离出来,让开发者重点关心业务逻辑。
二、Netty重要API
说明:大部分是模板代码,重点应该关注的是处理业务逻辑的Handler, 其按照角色不同分为ServerHandler和ClientHandler,按照数据处理流向不同,分为InboundHandler(对应接口是ChannelInboundHandlerAdapter)-处理输入数据,比如从客户端的角度,是处理从服务端发送过来的数据, 和OutboundHandler(对应接口是ChannelOutboundHandlerAdapter)-处理输出数据,比如从客户端的角度,是处理其发送给服务端的数据。
1、服务端
//用于接收客户端的连接请求的线程池
val bossGroup = new NioEventLoopGroup()
//用与处理客户端SocketChannel的网络读写的线程池
val workerGroup = new NioEventLoopGroup()
//是Netty用户启动NIO服务端的辅助启动类,降低服务端的开发复杂度
val bootstrap = new ServerBootstrap()
//将两个NIO线程组作为参数传入到ServerBootstrap
bootstrap.group(bossGroup, workerGroup)//创建NioServerSocketChannel.channel(classOf[NioServerSocketChannel])//绑定事件处理类.childHandler(new ChannelInitializer[SocketChannel] {override def initChannel(ch: SocketChannel): Unit = {// 日常主要要写的是ServerHandler逻辑 // 并且通常会添加多个ServerHandlerch.pipeline().addLast(new ServerHandler1)}})
//绑定端口地址端口
bootstrap.bind(host, port)
2、客户端
//创建客户端NIO线程组
val eventGroup = new NioEventLoopGroup
//创建客户端辅助启动类
val bootstrap = new Bootstrap
//将NIO线程组传入到Bootstrap
bootstrap.group(eventGroup)//创建NioSocketChannel.channel(classOf[NioSocketChannel])//绑定事件处理类.handler(new ChannelInitializer[SocketChannel] {override def initChannel(ch: SocketChannel): Unit = {// 日常主要要写的是ClientHandler逻辑 // 并且通常会添加多个ClientHandlerch.pipeline().addLast(new ClientHandler1)}})
//发送连接操作
bootstrap.connect(host, port)
三、Handler执行顺序
如第二部分所述,日常开发主要是写Handler,用于处理不同的业务逻辑,并且通常需要添加多个Handler,那么多个Handler的执行顺序如何呢?示例如下
比如一个客户端添加了5个Handler, 其中1和2是inBound,3和4是OutBound,5是InboundOutBound,
该客户端向外发送的消息会依次经过如下Handle的处理:5->4->3,而其接收到的外部信息会依次经过如下Handle的处理:1->2->5。
四、应用举例
1、案例1
描述:客户端与服务端建立连接,会各自执行Handler中的channelActive方法
客户端:ClientHandler1d的channelActive方法被调用[已跟服务器建立连接]
服务端: ServerHandler1的channelActive方法被调用[一个客户端连接上]
(1) 服务端代码
① NettyServer
package top.doe.netty.demo1import io.netty.bootstrap.ServerBootstrap
import io.netty.channel.ChannelInitializer
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.SocketChannel
import io.netty.channel.socket.nio.NioServerSocketChannelclass NettyServer1 {def bind(host: String, port: Int): Unit = {//用于接收客户端的连接请求的线程池val bossGroup = new NioEventLoopGroup()//用与处理客户端SocketChannel的网络读写的线程池val workerGroup = new NioEventLoopGroup()//是Netty用户启动NIO服务端的辅助启动类,降低服务端的开发复杂度val bootstrap = new ServerBootstrap()//将两个NIO线程组作为参数传入到ServerBootstrapbootstrap.group(bossGroup, workerGroup)//创建NioServerSocketChannel.channel(classOf[NioServerSocketChannel])//绑定事件处理类.childHandler(new ChannelInitializer[SocketChannel] {override def initChannel(ch: SocketChannel): Unit = {ch.pipeline().addLast(new ServerHandler1)}})//绑定端口地址端口bootstrap.bind(host, port)}
}object NettyServer1 {def main(args: Array[String]) {val host = "localhost"val port = 8888val server = new NettyServer1server.bind(host, port)}
}
②ServerHandler
package top.doe.netty.demo1import io.netty.channel.{ChannelHandlerContext, ChannelInboundHandlerAdapter}class ServerHandler1 extends ChannelInboundHandlerAdapter {/*** 有客户端与服务端建立连接后调用*/override def channelActive(ctx: ChannelHandlerContext): Unit = {println("ServerHandler的channelActive方法被调用【一个客户端连接上】")}/*** 有客户端与服务端断开连接后调用*/override def channelInactive(ctx: ChannelHandlerContext): Unit = {println("ServerHandler的channelInactive方法被调用【一个客户端与服务端断开连接了】")}/*** 接受客户端发送来的消息*/override def channelRead(ctx: ChannelHandlerContext, msg: scala.Any): Unit = {println("ServerHandler的channelRead方法被调用【收到客户端发送的消息了】")}}
(2) 客户端代码
① NettyClient
package top.doe.netty.demo1import io.netty.bootstrap.Bootstrap
import io.netty.channel.ChannelInitializer
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.SocketChannel
import io.netty.channel.socket.nio.NioSocketChannelclass NettyClient1 {def connect(host: String, port: Int): Unit = {//创建客户端NIO线程组val eventGroup = new NioEventLoopGroup//创建客户端辅助启动类val bootstrap = new Bootstrap//将NIO线程组传入到Bootstrapbootstrap.group(eventGroup)//创建NioSocketChannel.channel(classOf[NioSocketChannel])//绑定事件处理类.handler(new ChannelInitializer[SocketChannel] {override def initChannel(ch: SocketChannel): Unit = {ch.pipeline().addLast(new ClientHandler1)}})//发送连接操作bootstrap.connect(host, port)}
}object NettyClient1 {def main(args: Array[String]) {val host = "localhost"val port = 8888val client = new NettyClient1client.connect(host, port)}
}
②ClientHandler
package top.doe.netty.demo1import io.netty.channel.{ChannelHandlerContext, ChannelInboundHandlerAdapter}class ClientHandler1 extends ChannelInboundHandlerAdapter {/*** 一旦跟服务端建立上连接,channelActive方法将被调用*/override def channelActive(ctx: ChannelHandlerContext): Unit = {println("ClientHandler的channelActive方法被调用!【已经跟服务端连接上了】")}/*** 服务端返回消息后,channelRead方法被调用,该方法用于接送服务端返回的消息*/override def channelRead(ctx: ChannelHandlerContext, msg: scala.Any): Unit = {println("ClientHandler的channelRead方法被调用!")}}
(3) 执行结果
>>客户端
>>服务端
2、案例2
在案例1的基础上,服务端和客户端给彼此发送一条消息。
(1) 基本流程
(2) 服务端代码
① NettyServer
package com.wakedata.demo2import io.netty.bootstrap.ServerBootstrap
import io.netty.channel.ChannelInitializer
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.SocketChannel
import io.netty.channel.socket.nio.NioServerSocketChannelclass NettyServer2 {def bind(host:String,port:Int) = {val bossGroup = new NioEventLoopGroup()val workerGroup = new NioEventLoopGroup()val bootstrap = new ServerBootstrap()bootstrap.group(bossGroup,workerGroup).channel(classOf[NioServerSocketChannel]).childHandler(new ChannelInitializer[SocketChannel] {override def initChannel(ch: SocketChannel): Unit = {ch.pipeline().addLast(new ServerHandler2)}})bootstrap.bind(host,port)}}object NettyServer2{def main (args: Array[String] ): Unit = {val host = "localhost"val port = 8888val server2 = new NettyServer2server2.bind(host,port)
}
}
②ServerHandler
package com.wakedata.demo2import io.netty.buffer.{ByteBuf, Unpooled}
import io.netty.channel.{ChannelHandlerContext, ChannelInboundHandlerAdapter}class ServerHandler2 extends ChannelInboundHandlerAdapter {override def channelActive(ctx: ChannelHandlerContext): Unit = {println("ServerHandler1的channelActive方法被调用[一个客户端连接上]")}override def channelInactive(ctx: ChannelHandlerContext): Unit = {println("ServerHandler1的channelInactive方法被调用[一个客户端断开连接]")}override def channelRead(ctx: ChannelHandlerContext, msg: scala.Any): Unit = {//接收客户端发送过来的消息val byteBuf = msg.asInstanceOf[ByteBuf]val bytes = new Array[Byte](byteBuf.readableBytes())byteBuf.readBytes(bytes)val message = new String(bytes, "UTF-8")println("ServerHandler2的channelRead方法被调用[收到客户端发送的消息了]" + message)//将数据发送到客户端val back = "你好"val resp = Unpooled.copiedBuffer(back.getBytes("UTF-8"))ctx.writeAndFlush(resp)}
}
(3) 客户端代码
① NettyClient
package com.wakedata.demo2import io.netty.bootstrap.Bootstrap
import io.netty.channel.ChannelInitializer
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.SocketChannel
import io.netty.channel.socket.nio.NioSocketChannelclass NettyClient2 {def connect(host:String,port:Int):Unit = {//创建客户端的NIO线程组val eventGroup = new NioEventLoopGroup()//创建客户端辅助启动类val bootstrap = new Bootstrap()//将NIO线程组传入到Bootstrapbootstrap.group(eventGroup).channel(classOf[NioSocketChannel])//绑定事件处理类.handler(new ChannelInitializer[SocketChannel] {override def initChannel(ch: SocketChannel): Unit = {ch.pipeline().addLast(new ClientHandler2)}})//发送连接操作bootstrap.connect(host,port)}
}object NettyClient2{def main(args: Array[String]): Unit = {val host = "localhost"val port = 8888val client = new NettyClient2()client.connect(host,port)}
}
②ClientHandler
package com.wakedata.demo2import io.netty.buffer.{ByteBuf, Unpooled}
import io.netty.channel.{ChannelHandlerContext, ChannelInboundHandlerAdapter}class ClientHandler2 extends ChannelInboundHandlerAdapter{override def channelActive(ctx: ChannelHandlerContext): Unit = {println("ClientHandler2的channelActive方法被调用[已跟服务器建立连接]")//向服务端发送消息val msg = "hello"ctx.writeAndFlush(Unpooled.copiedBuffer(msg.getBytes("UTF-8")))}override def channelRead(ctx: ChannelHandlerContext, msg: Any): Unit = {//读取服务端返回的消息val byteBuf = msg.asInstanceOf[ByteBuf]val bytes = new Array[Byte](byteBuf.readableBytes())byteBuf.readBytes(bytes)val message = new String(bytes, "UTF-8")print("ClientHandler2的channelRead方法被调用,接收到服务器端发送过来的消息:" + message)}
}
(4) 执行结果
>> 客户端
>> 服务端
3、案例3
在案例2的基础上,添加多个Handler处理器(共计3个)。因为发送的case class对象,所以消息发出之前,会经过一个OurBoundHandler进行序列化,然后接受消息时,首先经过一个InBoundHandler进行解码,然后再经过另外一个 InBoundHandler对解码后的数据进行读取。
(1) 服务端代码
① NettyServer
package com.wakedata.demo3import io.netty.bootstrap.ServerBootstrap
import io.netty.channel.ChannelInitializer
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.SocketChannel
import io.netty.channel.socket.nio.NioServerSocketChannel
import io.netty.handler.codec.serialization.{ClassResolvers, ObjectDecoder, ObjectEncoder}class NettyServer3 {def bind(host: String, port: Int): Unit = {//配置服务端线程池组//用于服务器接收客户端的连接val bossGroup = new NioEventLoopGroup()//用户进行SocketChannel的网络读写val workerGroup = new NioEventLoopGroup()//是Netty用户启动NIO服务端的辅助启动类,降低服务端的开发复杂度val bootstrap = new ServerBootstrap()//将两个NIO线程组作为参数传入到ServerBootstrapbootstrap.group(bossGroup, workerGroup)//创建NioServerSocketChannel.channel(classOf[NioServerSocketChannel])//绑定I/O事件处理类.childHandler(new ChannelInitializer[SocketChannel] {override def initChannel(ch: SocketChannel): Unit = {//处理输入的数据执行顺序 decoder -> handler//处理返回的数据执行顺序 encoderch.pipeline().addLast("encoder", new ObjectEncoder) //实现了ChannelOutboundHandlerch.pipeline().addLast("decoder", new ObjectDecoder(ClassResolvers.cacheDisabled(getClass.getClassLoader))) //实现了ChannelInboundHandlerch.pipeline().addLast("handler", new ServerHandler3) //实现了ChannelInboundHandler}})val channelFuture = bootstrap.bind(host, port)channelFuture.syncUninterruptibly}
}object NettyServer3 {def main(args: Array[String]) {val host = "localhost"val port = 8888val server = new NettyServer3server.bind(host, port)}
}
② ServerHandler
package com.wakedata.demo3import io.netty.channel.{ChannelHandlerContext, ChannelInboundHandlerAdapter}class ServerHandler3 extends ChannelInboundHandlerAdapter {/*** 有客户端建立连接后调用*/override def channelActive(ctx: ChannelHandlerContext): Unit = {println("一个客户端连接上了...")}/*** 接受客户端发送来的消息*/override def channelRead(ctx: ChannelHandlerContext, msg: scala.Any): Unit = {//进行模式匹配msg match {case RequestMsg(msg) => {println("收到客户端发送的消息:" + msg)//将数据发送到客户端ctx.writeAndFlush(ResponseMsg("haha"))}}}}
③ ResponseMsg
package com.wakedata.demo3case class ResponseMsg(msg: String)
(2) 服务端代码
① NettyClient
package com.wakedata.demo3import io.netty.bootstrap.Bootstrap
import io.netty.channel.ChannelInitializer
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.SocketChannel
import io.netty.channel.socket.nio.NioSocketChannel
import io.netty.handler.codec.serialization.{ClassResolvers, ObjectDecoder, ObjectEncoder}class NettyClient3 {def connect(host: String, port: Int): Unit = {//创建客户端NIO线程组val eventGroup = new NioEventLoopGroup//创建客户端辅助启动类val bootstrap = new Bootstrap//将NIO线程组传入到Bootstrapbootstrap.group(eventGroup)//创建NioSocketChannel.channel(classOf[NioSocketChannel])//绑定I/O事件处理类.handler(new ChannelInitializer[SocketChannel] {override def initChannel(ch: SocketChannel): Unit = {ch.pipeline().addLast("encoder", new ObjectEncoder)ch.pipeline().addLast("decoder", new ObjectDecoder(ClassResolvers.cacheDisabled(getClass.getClassLoader)))ch.pipeline().addLast("handler", new ClientHandler3)}})//发起异步连接操作val channelFuture = bootstrap.connect(host, port)channelFuture.syncUninterruptibly}
}object NettyClient3 {def main(args: Array[String]) {val host = "localhost"val port = 8888val client = new NettyClient3client.connect(host, port)}
}
② ClientHandler
package com.wakedata.demo3import io.netty.channel.{ChannelHandlerContext, ChannelInboundHandlerAdapter}class ClientHandler3 extends ChannelInboundHandlerAdapter {//一旦跟服务端建立上连接channelActive将被调用override def channelActive(ctx: ChannelHandlerContext): Unit = {println("已经跟服务端连接上了")//向服务端发送case class实例ctx.writeAndFlush(RequestMsg("hello"))}override def channelRead(ctx: ChannelHandlerContext, msg: scala.Any): Unit = {msg match {case ResponseMsg(msg) => {println("收到服务端返回的消息:" + msg)}}}
}
③ RequestMsg
package com.wakedata.demo3case class RequestMsg(content: String)
(3) 执行结果
>>客户端
>>服务端