nett学习
其实netty就是帮我们封装了java的nio,所以学习netty,就围绕着它是怎么封装java的原始nio实现就可以了。
先看看server要怎么写
SelectorProvider provider = SelectorProvider.provider();/*创建选择器的实例*/selector = provider.openSelector();/*创建ServerSocketChannel的实例*/serverSocketChannel = ServerSocketChannel.open();serverSocketChannel = provider.openServerSocketChannel();/*设置通道为非阻塞模式*/serverSocketChannel.configureBlocking(false);/*注册事件,表示关心客户端连接*/serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);/*绑定端口*/serverSocketChannel.socket().bind(new InetSocketAddress(port));
其实就是几步而已,
- 首先创建一个serverSocketChannel和一个selector,并把channel设置成非阻塞的。
- 把channel注册到selector上。
- 最后把channel绑定一个端口号
来看看如果用netty要怎么实现一个server.
ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 100).handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ChannelPipeline p = ch.pipeline();if (sslCtx != null) {p.addLast(sslCtx.newHandler(ch.alloc()));}//p.addLast(new LoggingHandler(LogLevel.INFO));p.addLast(serverHandler);}});// Start the server.ChannelFuture f = b.bind(PORT).sync();// Wait until the server socket is closed.f.channel().closeFuture().sync();
来看看netty是怎么实现上面的三步的
第一步:创建一个serverSocketChannel和一个selector,并把channel设置成非阻塞的。
创建serverSocketChannel很简单,通过.channel(NioServerSocketChannel.class)
来设置一个类型。后面会通过反射调用这个类的构造方法,我们看看NioServerSocketChannel这个类的构造方法。
public NioServerSocketChannel(SelectorProvider provider, InternetProtocolFamily family) {this(newChannel(provider, family));}private static ServerSocketChannel newChannel(SelectorProvider provider, InternetProtocolFamily family) {try {ServerSocketChannel channel =SelectorProviderUtil.newChannel(OPEN_SERVER_SOCKET_CHANNEL_WITH_FAMILY, provider, family);// 重点是下面这行return channel == null ? provider.openServerSocketChannel() : channel;} catch (IOException e) {throw new ChannelException("Failed to open a socket.", e);}}
可以看到最后也是用provider.openServerSocketChannel() 保存到ServerSocketChannel的成员变量中。
最后在父类中设置成非阻塞模式
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {super(parent);this.ch = ch;this.readInterestOp = readInterestOp;......// 设置成非阻塞ch.configureBlocking(false);......}
那selector在哪创建的呢?netty中selector是根据线程绑定的,因为每次selector发现了事件,都是要交给线程处理的,所以netty设计一个selector对应一个线程,这个线程就死循环处理事件。所以这个线程名字叫做NioEventLoop,保存在我们最开始创建的线程池中:EventLoopGroup bossGroup = new NioEventLoopGroup(1);
在这个类的构造函数里面,先拿到了SelectorProvider.provider(),这个方式单例实现,所以虽然前面创建ServerSocketChannel也是用这个provider,但拿到的都是同一个。
public NioEventLoopGroup(int nThreads, Executor executor) {this(nThreads, executor, SelectorProvider.provider());}
之后经过层层构造方法,最后会通过你设置的线程数,设置对应数量的线程,nio的对应线程就是NioEventLoop。可以看看它的构造方法:
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,EventLoopTaskQueueFactory taskQueueFactory, EventLoopTaskQueueFactory tailTaskQueueFactory) {super(parent, executor, false, newTaskQueue(taskQueueFactory), newTaskQueue(tailTaskQueueFactory),rejectedExecutionHandler);this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");final SelectorTuple selectorTuple = openSelector();this.selector = selectorTuple.selector;this.unwrappedSelector = selectorTuple.unwrappedSelector;}
provider就是我们提到的privider,之后在openSelector()方法里面用到了:
private SelectorTuple openSelector() {final Selector unwrappedSelector;try {unwrappedSelector = provider.openSelector();} catch (IOException e) {throw new ChannelException("failed to open a new selector", e);}if (DISABLE_KEY_SET_OPTIMIZATION) {return new SelectorTuple(unwrappedSelector);}......}
就是通过provider.openSelector(),拿到了selector,放在NioEventLoop里面。代码后面省略的地方是netty对jdk的selector的优化,把里面set的数据结构,替换成数组。
接着来看第二步
第二步:把channel注册到selector上。
全部在这一个方法中ChannelFuture f = b.bind(PORT).sync()
看名字是绑定,其实这个绑定是把nio的注册和绑定都做了。
一层层点进去,注册是在这个方法里面
final ChannelFuture initAndRegister() {Channel channel = null;try {// 这里就是前面提到的通过反射调用构造方法,创建channelchannel = channelFactory.newChannel();init(channel);} catch (Throwable t) {......}// 注册方法ChannelFuture regFuture = config().group().register(channel);......return regFuture;
}
最底层是在这个方法里面io.netty.channel.nio.AbstractNioChannel#doRegister
@Overrideprotected void doRegister() throws Exception {boolean selected = false;for (;;) {try {selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);return;} catch (CancelledKeyException e) {......}}}
javaChannel就是我们创建的那个原始channel,eventLoop().unwrappedSelector()拿到的就是我们的原始selector。
相比于我们前面nio的demo中的register方法,netty多了最后一个参数,就是把this,这个netty自己封装的channel作为附件传进去,这样子就可以把netty自己的channel和原生的selector关联上了。
注册讲完了,那什么时候绑定呢?回到前面netty的bind方法。
private ChannelFuture doBind(final SocketAddress localAddress) {// 这里就是我们刚才说的注册方法。final ChannelFuture regFuture = initAndRegister();final Channel channel = regFuture.channel();if (regFuture.cause() != null) {return regFuture;}if (regFuture.isDone()) {ChannelPromise promise = channel.newPromise();doBind0(regFuture, channel, localAddress, promise);return promise;} else {final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);regFuture.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {Throwable cause = future.cause();if (cause != null) {promise.setFailure(cause);} else {promise.registered();doBind0(regFuture, channel, localAddress, promise);}}});return promise;}}
注册是一个异步的动作,netty里面用了很多异步+回调的方式,感兴趣可以学习一下。在这里通过回调确保了,只有在注册完成后才会进行绑定的操作,绑定的方法是doBind0
最后是交给pipeline来执行,可能是为了可以让pipeline可以对绑定进行处理吧。
@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {return pipeline.bind(localAddress, promise);
}
pipeline是一个双向链表,最后是头结点来处理。
io.netty.channel.DefaultChannelPipeline.HeadContext#bind
io.netty.channel.socket.nio.NioServerSocketChannel#doBind
@SuppressJava6Requirement(reason = "Usage guarded by java version check")
@Override
protected void doBind(SocketAddress localAddress) throws Exception {if (PlatformDependent.javaVersion() >= 7) {javaChannel().bind(localAddress, config.getBacklog());} else {javaChannel().socket().bind(localAddress, config.getBacklog());}
}
最后就是调用我们的原生的绑定方法。
绑定成功后就会发送一个active事件,每个handler都会收到,还是刚才提到的头节点HeadContext
@Overridepublic void channelActive(ChannelHandlerContext ctx) {ctx.fireChannelActive();readIfIsAutoRead();}
在active里面会触发read事件,还是在HeadContext的read事件里面,会注册自己感兴趣的select事件。
@Overrideprotected void doBeginRead() throws Exception {// Channel.read() or ChannelHandlerContext.read() was calledfinal SelectionKey selectionKey = this.selectionKey;if (!selectionKey.isValid()) {return;}readPending = true;final int interestOps = selectionKey.interestOps();if ((interestOps & readInterestOp) == 0) {selectionKey.interestOps(interestOps | readInterestOp);}}
readPending这个值,就是SelectionKey.OP_ACCEPT,一开始创建NioServerSocketChannel的时候设置的。
好了,整个server创建就结束了,等着客户端连接了。