Netty源码解读

news/2024/4/27 10:16:31/文章来源:https://blog.csdn.net/qq_19734597/article/details/130317126

Netty源码解读

Netty线程模型

在这里插入图片描述
1、定义了两组线程池BossGroup和WorkerGroup,BossGroup专门负责接收客户端的连接, WorkerGroup专门负责网络的读写
2、BossGroup和WorkerGroup类型都是NioEventLoopGroup,Group中维护了多个事件循环线程NioEventLoop,每个NioEventLoop维护了一个Selector和TaskQueue
3、每个Boss NioEventLoop线程内部循环执行的步骤有 3 步
3.1、处理accept事件 , 与client 建立连接 , 生成 NioSocketChannel
3.2、将NioSocketChannel注册到某个worker NIOEventLoop上的selector
3.3、runAllTasks处理任务队列TaskQueue的任务
4、 每个worker NioEventLoop线程循环执行的步骤
4.1、轮询注册到自己selector上的所有NioSocketChannel 的read, write事件
4.2、处理 I/O 事件, 即read , write 事件, 在对应NioSocketChannel 处理业务
4.3、runAllTasks处理任务队列TaskQueue的任务 ,一些耗时的业务处理一般可以放入TaskQueue中慢慢处理,这样不影响数据在pipeline中的流动处理
4.4、处理NioSocketChannel业务时,会使用 pipeline (管道),管道中维护了很多 handler处理器用来处理 channel 中的数据

Netty服务启动示例

// 创建两个线程组bossGroup和workerGroup, 含有的子线程NioEventLoop的个数默认为cpu核数的两倍
// bossGroup只是处理连接请求 ,真正的和客户端业务处理,会交给workerGroup完成
EventLoopGroup bossGroup = new NioEventLoopGroup(2);
EventLoopGroup workerGroup = new NioEventLoopGroup(4);
// 创建服务器端的启动对象
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024).childHandler(new ChannelInitializer<SocketChannel>() {//创建通道初始化对象,设置初始化参数,在 SocketChannel 建立起来之前执行@Overrideprotected void initChannel(SocketChannel ch) throws Exception {//对workerGroup的SocketChannel设置handler处理器ch.pipeline().addLast(new NettyServerHandler());}});
// 启动服务器(并绑定端口)
ChannelFuture cf = bootstrap.bind(9099).sync();

Netty源码分析

从bootstrap.bind作为入口分析启动流程,进入后可以看到会调用AbstractBootstrap#doBind,最终会调用initAndRegister()方法,主要逻辑都在前三步中实现,本次也主要分析这三个步骤

# AbstractBootstrap类
// 1、创建一个服务端Channel,即NioServerSocketChannel
channel = channelFactory.newChannel();
// 2、初始化NioServerSocketChannel,在pipeline中添加一些处理器hander
init(channel);
// 3、进行注册
ChannelFuture regFuture = config().group().register(channel);
// 把NioServerSocketChannel绑定到指定端口
channel.bind(localAddress, promise);

channelFactory.newChannel();

bootstrap.channel(NioServerSocketChannel.class) 会将serverChannel绑定到ReflectiveChannelFactory上

# AbstractBootstrap类public B channel(Class<? extends C> channelClass) {if (channelClass == null) {throw new NullPointerException("channelClass");}return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}

channelFactory.newChannel()会调用ReflectiveChannelFactory的newChannel方法,进而调用constructor.newInstance(),而该constructor正好是NioServerSocketChannel类;所以new的对象就是NioServerSocketChannel
服务端NioServerSocketChannel进行初始化
1、设置感兴趣事件为连接事件OP_ACCEPT
2、设置channel为非阻塞
3、初始化服务端pipeline

# NioServerSocketChannel类public NioServerSocketChannel(ServerSocketChannel channel) {// 将感兴趣的事件设置为连接事件OP_ACCEPTsuper(null, channel, SelectionKey.OP_ACCEPT);config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}// 父类初始化方法 ch 即为NioServerSocketChannel
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {super(parent);this.ch = ch;this.readInterestOp = readInterestOp;// 设置为非阻塞ch.configureBlocking(false);
}// 父类的父类中初始化pipeline,此时只有HeadContext和TailContext
protected AbstractChannel(Channel parent) {this.parent = parent;id = newId();unsafe = newUnsafe();pipeline = newChannelPipeline();
}

init(channel)

调用ServerBootstrap.init方法,向服务端NioServerSocketChannel的pipeline中添加hander处理器ChannelInitializer;此时服务端pipeline链表中的hander如下
在这里插入图片描述

# ServerBootstrap 类void init(Channel channel) throws Exception {ChannelPipeline p = channel.pipeline();//向 pipeline中添加hander处理器ChannelInitializerp.addLast(new ChannelInitializer<Channel>() {@Overridepublic void initChannel(final Channel ch) throws Exception {final ChannelPipeline pipeline = ch.pipeline();ChannelHandler handler = config.handler();if (handler != null) {pipeline.addLast(handler);}ch.eventLoop().execute(new Runnable() {@Overridepublic void run() {pipeline.addLast(new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));}});}});
}

config().group().register(channel)

bootstrap.group(bossGroup, workerGroup)构造时,将group设置为bossGroup,childGroup设置为workerGroup; config().group().register(channel)会调用bossGroup的register方法,从bossGroup的MultithreadEventLoopGroup线程组中取一个线程SingleThreadEventLoop进行调用register方法

register注册逻辑

服务端的NioServerSocketChannel和客户端的NioSocketChannel都会调用此方法进行注册
1、服务启动时,NioServerSocketChannel注册到selector上,对客户端OP_ACCEPT操作感兴趣
2、当有客户端连接时,通过NioServerSocketChannel的accept()得到每个客户端的NioSocketChannel,将其注册到selector上,对客户端OP_READ操作感兴趣

# SingleThreadEventLoop extends SingleThreadEventExecutor 类public ChannelFuture register(final ChannelPromise promise) {promise.channel().unsafe().register(this, promise);return promise;
}

调用AbstractChannel的register方法,创建一个注册的task交给EventLoop线程处理

# AbstractChannel 类public final void register(EventLoop eventLoop, final ChannelPromise promise) {.......AbstractChannel.this.eventLoop = eventLoop;.......// 1、处理连接事件时,用的是bossGroup里的NioEventLoop// 2、处理读写事件时,用的是workGroup里的NioEventLoopeventLoop.execute(new Runnable() {@Overridepublic void run() {register0(promise);}});
}private void register0(ChannelPromise promise) {doRegister();// 1、NioServerSocketChannel 处理逻辑// 调用NioServerSocketChannel服务端pipeline中hander的handlerAdded方法// 此时会调用到ChannelInitializer的handlerAdded,然后调用其initChannel,该方法中// 会向服务端pipeline中加入ServerBootstrapAcceptor// 调用服务端pipeline中hander的channelRegistered方法// 调用服务端pipeline中hander的ChannelActive方法// 2、NioSocketChannel 处理逻辑 调用我们自定义hander中的方法// 调用客户端pipeline中hander的handlerAdded方法// 调用客户端pipeline中hander的channelRegistered方法// 调用客户端pipeline中hander的ChannelActive方法,我们自定义hander的ChannelActive在此调用pipeline.invokeHandlerAddedIfNeeded();pipeline.fireChannelRegistered();pipeline.fireChannelActive();}
// doRegister()逻辑由子类AbstractNioChannel实现
protected void doRegister() throws Exception {boolean selected = false;for (;;) {try {// 将channel注册到Selector上// 1、NioServerSocketChannel注册到Selector上// 2、NioSocketChannel注册到Selector上selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);return;} catch (CancelledKeyException e) {}}
}

eventLoop.execute就是调用SingleThreadEventExecutor#execute

# SingleThreadEventExecutor 类
@Override
public void execute(Runnable task) {// 将注册register0逻辑加入队列taskQueueaddTask(task);// 开启线程循环监听事件,会调用SingleThreadEventExecutor.run方法// 最终调用子类NioEventLoop的run()方法startThread();
}

死循环执行 selector.select方法,直到监听到事件或者超时,才会执行processSelectedKeys逻辑
1、服务端启动后,NioServerSocketChannel若监听到客户端OP_ACCEPT操作,则会执行processSelectedKeys逻辑,若超时,则继续下一次循环
2、客户端连接成功后,NioSocketChannel若监听到客户端OP_READ操作,则会执行processSelectedKeys逻辑,若超时,则继续下一次循环

# NioEventLoop 类
@Override
protected void run() {for (;;) {try {try {switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {....case SelectStrategy.SELECT:// 该方法监听到事件(OP_ACCEPT|OP_READ)时才会返回select(wakenUp.getAndSet(false));default:}} catch (IOException e) {.....}// 监听到事件执行try {// 1、获取SelectionKey处理事件processSelectedKeys();} finally {// 2、执行taskQueue中其他的注册方法register0runAllTasks();}}}
}   private void select(boolean oldWakenUp) throws IOException {// 一直循环遍历int selectCnt = 0;for (;;) {// 根据注册的定时任务,获取本次select的阻塞时间long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;// 没有监听到事件或没有超时,则一直阻塞(会让出cpu资源)int selectedKeys = selector.select(timeoutMillis);selectCnt ++;if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {// 正常场景// 当有连接|读写操作或者selector被唤醒了,则直接返回break;}long time = System.nanoTime();if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {// 正常场景// 说明没有监听到事件,而是超时了,则重置selectCntselectCnt = 1;} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {// 异常场景  select 空轮询bug修复// 若空轮询次数超过SELECTOR_AUTO_REBUILD_THRESHOLD配置// 则关闭老的select,建立新的selectselector = selectRebuildSelector(selectCnt);selectCnt = 1;break;}currentTimeNanos = time;}
}private void processSelectedKeysOptimized() {// 遍历所有的selectedKeys进行处理for (int i = 0; i < selectedKeys.size; ++i) {processSelectedKey(k, (AbstractNioChannel) a);}
}private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();int readyOps = k.readyOps();if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {// 连接|读写操作会调用该方法// 1、连接操作调用NioMessageUnsafe的read方法// 2、读写操作调用NioByteUnsafe的read方法unsafe.read();}
}

OP_ACCEPT连接操作处理
1、为每个客户端创建NioSocketChannel,并进行初始化
1.1、设置感兴趣事件为OP_READ
1.2、设置channel为非阻塞
1.3、初始化客户端pipeline
2、调用服务端NioServerSocketChannel的pipeline,将客户端的NioSocketChannel作为参数传过去,最终会调用到ServerBootstrapAcceptor,将NioSocketChannel注册到workGroup上

# NioMessageUnsafe 类
public void read() {final ChannelPipeline pipeline = pipeline();// 创建每个客户端的NioSocketChanneldoReadMessages(readBuf);int size = readBuf.size();// readBuf为NioSocketChannel,遍历客户端所有的NioSocketChannel// 执行服务端NioServerSocketChannel的pipeline,循环执行fireChannelRead,// 最终会调用服务端hander的ChannelRead方法,此处会调用到ServerBootstrapAcceptor的ChannelRead方法for (int i = 0; i < size; i ++) {readPending = false;pipeline.fireChannelRead(readBuf.get(i));}// 调用服务端pipeline的读完成方法pipeline.fireChannelReadComplete();}protected abstract int doReadMessages(List<Object> buf) throws Exception;
// 调用子类NioServerSocketChannel#doReadMessages
protected int doReadMessages(List<Object> buf) throws Exception {// 获取客户端的连接得到SocketChannel,每个客户端在服务端都有一个对应的SocketChannelSocketChannel ch = SocketUtils.accept(javaChannel());try {if (ch != null) {// NioSocketChannel处理方式同NioServerSocketChannel// 1、设置感兴趣事件为连接事件OP_READ// 2、设置channel为非阻塞// 3、初始化客户端NioSocketChannel的pipelinebuf.add(new NioSocketChannel(this, ch));return 1;}} catch (Throwable t) {}return 0;
}

将我们自定义的hander添加到NioSocketChannel的pipeline上,然后将NioSocketChannel注册到workGroup上,此时客户端pipeline链表中的hander如下
在这里插入图片描述

# ServerBootstrapAcceptor 类public void channelRead(ChannelHandlerContext ctx, Object msg) {// 传过来的NioSocketChannelfinal Channel child = (Channel) msg;// 将我们手动添加的Hander添加到pipelinechild.pipeline().addLast(childHandler);try {// 将NioSocketChannel注册workGroup的一个线程的selector上,// 方式同NioServerSocketChannel,执行register注册逻辑childGroup.register(child).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {.....}});} catch (Throwable t) {}
}

OP_READ操作处理
进行数据读写,并执行pipeline中自定义的hander

# NioByteUnsafe类// 接受到客户端OP_READ事件时调用
public void read() {// 获取客户端NioSocketChannel的pipelinefinal ChannelPipeline pipeline = pipeline();do {// 数据读写// 调用pipeline.fireChannelRead时会依次调用pipeline中hander的ChannelRead方法// 我们自定义的hander的ChannelRead方法就会在此处调用byteBuf = allocHandle.allocate(allocator);allocHandle.lastBytesRead(doReadBytes(byteBuf));pipeline.fireChannelRead(byteBuf);} while (allocHandle.continueReading());allocHandle.readComplete();// 调用pipeline的读完成方法pipeline.fireChannelReadComplete();
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_103910.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

案例3:Java汽车保养维修系统设计与实现开题报告

博主介绍&#xff1a;✌全网粉丝30W,csdn特邀作者、博客专家、CSDN新星计划导师、java领域优质创作者,博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ &#x1f345;文末获取源码联系&#x1f345; &#x1f447;&#x1f3fb; 精彩专…

linux系统防火墙开启放行其他端口

linux系统防火墙开启放行其他端口 弹性云主机防火墙放行对应端口 1、WDCP 系统端口放行&#xff1a; &#xff08;1&#xff09;3.2版本 登录wdcp面板后点击“安全管理”——“防火墙设置”——“快速添加”&#xff0c;如图填写端口&#xff08;协议默认都使用tcp&#xf…

【Linux基本指令和权限(1)】

本文思维导图&#xff1a; 文章目录 一、Linux操作的特点二、使用指令从Xhell登录云服务器三、基本指令1.ls指令2. pwd指令&#xff1a;3.cd指令4. touch指令5. rm指令 写在最后 Linux是一个操作系统&#xff0c;操作系统是一款做软硬件管理的软件。 一、Linux操作的特点 Li…

transformer 网络概述

1. RNN存在的问题 RNN对并行计算并不友好&#xff0c;下一输出依赖于上一输入&#xff0c;难以实现并行高效计算RNN相比较与self-attension模块&#xff0c;缺少对部分变量权重的预估&#xff0c;输出的数据默认拥有一致的权重 2. self-attension self-attension是干嘛的&am…

2023年商票研究报告

第一章 行业概况 1.1 定义 商票是指出票人依托商业汇票系统&#xff0c;以数据电文形式制作的&#xff0c;委托付款人在指定日期无条件支付确定的金额给收款人或者持票人的票据。按承兑人的不同&#xff0c;商业汇票分为银行承兑汇票和商业承兑汇票&#xff08;即商票&#x…

Linux进程(1)

目录 ⛹&#x1f3fd;进程简介⛹&#x1f3fd;查看进程⛹&#x1f3fd;系统调用&#x1f6b4;&#x1f3fd;获取进程标示符&#x1f6b4;&#x1f3fd;创建进程 ⛹&#x1f3fd;进程状态&#x1f6b4;&#x1f3fd;孤儿进程&#xff1a;&#x1f6b4;&#x1f3fd;进程优先级 ⛹…

新一代边缘计算盒子,英码科技边缘计算盒子SY-E160

SY-E160 是英码科技推出的新一代智能工作站&#xff0c;内部集成了 4 核强悍处理器 A551.5 GHz&#xff0c;其内置的算力核拥有 16Tops 超强算力。SY-E160 工作站采用低功耗技术设计&#xff0c;支持 宽温度环境工作&#xff0c;可以灵活部署于各种 AI 场景中。 SY-E160 深元 A…

每日一个小技巧:1招教你提取伴奏怎么做

伴奏是指在演唱或演奏时&#xff0c;用来衬托或补充主唱或乐器的音乐声音。而伴奏提取是一种技术&#xff0c;它可以帮助我们从歌曲中将人声和乐器分离出来。当我们听到一些喜欢的歌曲时&#xff0c;往往会被它的旋律深深吸引&#xff0c;想要将其作为自己的演唱曲目&#xff0…

Domino的线程ID和操作系统的进程ID对应关系

大家好&#xff0c;才是真的好。 很多时候&#xff0c;在Domino中运行的任务出现一些错误提示&#xff0c;如果能够准确定位到和提示信息相关任务时&#xff0c;对我们排错有着巨大的帮助&#xff0c;也能节省很多时间。 例如&#xff0c;我们可能在Domino实时控制台上看到以…

InstructGPT原理讲解及ChatGPT类开源项目

InstructGPT原理讲解及ChatGPT类开源项目 Generative Pre-Trained Transformer&#xff08;GPT&#xff09; 是OpenAI的提出的生成式预训练语言模型&#xff0c;目前已经发布了GPT-1、GPT-2、GPT-3和GPT-4&#xff0c;未来也将发布GPT-5。 最近非常火的ChatGPT是基于Instruct…

想提高应用程序的用户满意度——APK体积包优化少不了

作者&#xff1a;子不语Any 前言 减少应用程序安装包的大小&#xff0c;不仅仅减少用户的网络数据流量&#xff0c;还减少了下载等待的时间。毋庸置疑&#xff0c;尽量减少程序安装包的大小是十分有必要的。 通常来说&#xff0c;减少程序安装包的大小有两条规律&#xff1a;…

Java_常用API

Java_常用API ​ API即Application Programming Interface&#xff0c;即应用程序接口。一般来说API就是软件组件之间信息交互的桥梁&#xff0c;通过它无需访问源码。API除了有应用程序接口的含义外&#xff0c;还特质API的说明文档&#xff0c;也称为帮助文档。 1.字符串的…

Linux服务使用宝塔面板搭建网站,并发布公网访问 - 内网穿透(1)

文章目录 前言1. 环境安装2. 安装cpolar内网穿透3. 内网穿透4. 固定http地址5. 配置二级子域名6. 创建一个测试页面 转载自远程内网穿透的文章&#xff1a;Linux使用宝塔面板搭建网站&#xff0c;并内网穿透实现公网访问 前言 宝塔面板作为简单好用的服务器运维管理面板&#…

读书笔记---植物基因组学(樊龙江主编)

读书笔记---植物基因组学&#xff08;樊龙江主编&#xff09; 最近看了这本书&#xff0c;作者是樊龙江教授&#xff08;浙江大学&#xff09;&#xff0c;里面主要分为两个大部分&#xff1a; 总论&#xff1a;15章 各论&#xff1a;10章详细目录可以参看下面链接&#xff1a…

MySQL中使用批量插入,但需要校验每条数据是否重复且是否已经存在数据库中

问题 批量插入一组数据&#xff0c;数据库中name和age字段组合起来不能有重复&#xff0c;如果出现重复&#xff0c;则直接跳过不插入。 name和age字段组合起来不重复&#xff0c;要求如下&#xff1a; 解决方法 建立name和age的复合索引&#xff0c;并设置为唯一索引 场景…

数列分段 马蹄集

数列分段 难度&#xff1a;黄金 0时间限制&#xff1a;1秒 巴占用内存&#xff1a;128M 对于给定的一个长度为N的正整数数列A,,现要将其分成连续的若干段&#xff0c; 并且每段和不超过M(可以等于M),问最少能将其分成多少段使得满足 要求。 格式 输入格式&#xff1a;第一行包含…

基于Java开发的分布式在线教育系统,支持考试、直播、问答

一、开源项目简介 知道学习平台是一个基于 Java 开发的分布式在线教育系统项目采用前后端分离的企业级微服务架构引入组件化的思想实现高内聚低耦合&#xff0c;项目代码简洁注释丰富上手容易注重代码规范&#xff0c;严格控制包依赖可以帮助个人、企业或机构快速搭建一个在线…

刷爆朋友圈!前百度总裁陆奇最新AI重磅演讲:我的大模型世界观

文 / 高扬 陆奇的演讲刷屏了&#xff0c;我认真看了他的演讲稿&#xff0c;收获颇丰。 陆奇提到&#xff0c;人类社会的发展&#xff0c;大致可分为&#xff1a;农业化、工业化和数字化三个阶段。 在三个递进的阶段中&#xff0c;人类一直在探索如何减少烦琐且消耗能量的体力和…

案例1:Java超市管理系统设计与实现开题报告

博主介绍&#xff1a;✌全网粉丝30W,csdn特邀作者、博客专家、CSDN新星计划导师、java领域优质创作者,博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ &#x1f345;文末获取源码联系&#x1f345; &#x1f447;&#x1f3fb; 精彩专…

Java文件IO操作基础

目录 前言 java.io.File 1. 构造方法 2. 方法 get类方法 文件的创建和删除 目录的创建与删除 输入输出流 InputStream FileInputStream 概述 代码实例1 代码实例2 字符集问题? Scanner 读取 OutputStream Java输入输出流的使用案例 创作不易, 多多支持&#x1f636;‍&…