Spring架构篇--2.5 远程通信基础Select 源码篇--window--Select.open()

news/2024/4/25 8:30:46/文章来源:https://blog.csdn.net/l123lgx/article/details/129097554

前言:在Socket通信中使用Select 来对NIO 进行实现,那么它们的实现方式是怎样的呢,本文从 Selector.open() 进行第一步的分析;

Selector.open() :
Selector 类:

   public static Selector open() throws IOException {// 通过 SelectorProvider.provider() 获取SelectorProvider实例// 通过openSelector() 获取不同系统的Selector 实现return SelectorProvider.provider().openSelector();}

先看:SelectorProvider.provider():
SelectorProvider 类:

private static final Object lock = new Object();
private static SelectorProvider provider = null;public static SelectorProvider provider() {synchronized (lock) {// 通过 lock 对象锁,保证当前进程只会有一个SelectorProvider 对象if (provider != null)// 如果发现进程中已经实例化过SelectorProvider 对象则直接返回return provider;// 进程下没有过SelectorProvider 对象则进行加载return AccessController.doPrivileged(new PrivilegedAction<SelectorProvider>() {public SelectorProvider run() {if (loadProviderFromProperty())return provider;if (loadProviderAsService())return provider;provider = sun.nio.ch.DefaultSelectorProvider.create();return provider;}});}
}

SelectorProvider.provider()方法会在System Property中不存在java.nio.channels.spi.SelectorProvider属性和不能找到SelectorProvider的实现类时,创建一个默认的sun.nio.ch.DefaultSelectorProvider来作为SelectorProvide;DefaultSelectorProvider 会根据不同的操作系统返回:

window 下的DefaultSelectorProvider:

package sun.nio.ch;import java.nio.channels.spi.SelectorProvider;public class DefaultSelectorProvider {private DefaultSelectorProvider() {}public static SelectorProvider create() {return new WindowsSelectorProvider();}
}

DefaultSelectorProvider.create() 创建方法:可以看到返回了WindowsSelectorProvider一个实例对象;

package sun.nio.ch;import java.io.IOException;
import java.nio.channels.spi.AbstractSelector;public class WindowsSelectorProvider extends SelectorProviderImpl {public WindowsSelectorProvider() {}public AbstractSelector openSelector() throws IOException {return new WindowsSelectorImpl(this);}
}

在回到Selector 中的 SelectorProvider.provider().openSelector(), SelectorProvider.provider() 实际上返回了WindowsSelectorProvider一个实例对象,然后调用WindowsSelectorProvider的openSelector()方法,可以看到这里创建了一个WindowsSelectorImpl 实例对象并进行了返回:

这里看下对象的关系:
1 ) WindowsSelectorImpl extends SelectorImpl;
2) SelectorImpl extends AbstractSelector;
3) AbstractSelector extends Selector
4) Selector implements Closeable

WindowsSelectorImpl 类:

    //poll数组和channel数组的初始容量
private final int INIT_CAP = 8;
//select操作时,每个线程处理的最大FD数量。为INIT_CAP乘以2的幂
private final static int MAX_SELECTABLE_FDS = 1024;
//由这个选择器服务的SelectableChannel的列表
private SelectionKeyImpl[] channelArray = new SelectionKeyImpl[INIT_CAP];
//存放所有FD的包装器,主要用于poll操作
private PollArrayWrapper pollWrapper;
//注册到当前选择器上总的通道数量,初始化为1是因为实例化选择器时加入了wakeupSourceFd
private int totalChannels = 1;
//选择操作所需要的辅助线程数量。每增加一组MAX_SELECTABLE_FDS - 1个通道,就需要一个线程。
private int threadsCount = 0;
//辅助线程列表
private final List<SelectThread> threads = new ArrayList();
//创建一个Pipe实例,用于实现唤醒选择器的功能
private final Pipe wakeupPipe ;
//管道的read端FD,用于实现唤醒选择器的功能
private final int wakeupSourceFd;
//管道的write端FD,用于实现唤醒选择器的功能
private final int wakeupSinkFd;
//关闭锁,通常在注册、注销,关闭,修改选择键的interestOps时都存在竞态条件,主要保护channelArray、pollWrapper等
private Object closeLock = new Object();
//FD为键,SelectionKeyImpl为value的内部map,方便通过FD查找SelectionKeyImpl
private final FdMap fdMap = new FdMap();
//内部类SubSelector中封装了发起poll调用和处理poll调用结果的细节。由主线程调用
private final SubSelector subSelector = new SubSelector();
//选择器每次选择的超时参数
private long timeout;
//中断锁,用于保护唤醒选择器使用的相关竞态资源,如interruptTriggered
private final Object interruptLock = new Object();
//是否触发中断,唤醒选择器的重要标志,由interruptLock保护
private volatile boolean interruptTriggered = false;
//启动锁,当使用多线程处理选择器上Channel的就绪事件时,用于协调这些线程向内核发起系统调用
//辅助线程会在该锁上等待
private final WindowsSelectorImpl.StartLock startLock = new WindowsSelectorImpl.StartLock();
//完成锁,当使用多线程处理选择器上Channel的就绪事件时,用于协调这些线程从系统调用中返回
//主线程会在该锁上等待
private final WindowsSelectorImpl.FinishLock finishLock = new WindowsSelectorImpl.FinishLock();
//updateSelectedKeys调用计数器
//SubSelector.fdsMap中的每个条目都有一个的updateCount值。调用processFDSet时,当我们增加numKeysUpdated,
//会同步将updateCount设置为当前值。 这用于避免多次计算同一个选择键更新多次numKeysUpdated。
//同一个选择键可能出现在readfds和writefds中。
private long updateCount = 0L;WindowsSelectorImpl(SelectorProvider var1) throws IOException {// 调用 SelectorImpl 的父类构造方法 // 将 上一步WindowsSelectorProvider的openSelector() SelectorProvider 实例传入super(var1);// Pipe wakeupPipe = Pipe.open() 的fd 文件描述符赋值this.wakeupSourceFd = ((SelChImpl)this.wakeupPipe.source()).getFDVal();SinkChannelImpl var2 = (SinkChannelImpl)this.wakeupPipe.sink();// 禁用 Nagle 算法,当 sink 端写入 1 字节数据时,将立即发送,而不必等到将较小的包组合成较大的包再发送,// 这样 source 端就可以立马读取数据var2.sc.socket().setTcpNoDelay(true);// Pipe wakeupPipe = Pipe.open() 的fd 文件描述符赋值this.wakeupSinkFd = var2.getFDVal();//  Pipe wakeupPipe = Pipe.open() 的fd 文件描述符赋值加入到管道数组中// 把唤醒端的文件描述符(wakeupSourceFd)放到pollWrapper里this.pollWrapper.addWakeupSocket(this.wakeupSourceFd, 0);
}

WindowsSelectorImpl 类中通过socket 建立了管道连接,并将管道符进行保存;

super(var1) 调用 SelectorImpl 的父类构造方法:

protected Set<SelectionKey> selectedKeys = new HashSet();
protected HashSet<SelectionKey> keys = new HashSet();
private Set<SelectionKey> publicKeys;
private Set<SelectionKey> publicSelectedKeys;
protected SelectorImpl(SelectorProvider var1) {// 调用 AbstractSelector 的父类构造方法super(var1);// publicKeys 和 publicSelectedKeys 进行映射赋值if (Util.atBugLevel("1.4")) {this.publicKeys = this.keys;this.publicSelectedKeys = this.selectedKeys;} else {this.publicKeys = Collections.unmodifiableSet(this.keys);this.publicSelectedKeys = Util.ungrowableSet(this.selectedKeys);}}

SelectorImpl 的构造,对publicKeys和publicSelectedKeys 集合做了映射,方便操作selectedKeys和keys 集合可以直接影响数据;

super(var1) 调用 AbstractSelector的父类构造方法:

private final SelectorProvider provider;
protected AbstractSelector(SelectorProvider provider) {this.provider = provider;
}

因为WindowsSelectorImpl 的实例处理构造方法的调用就是对Pipe wakeupPipe = Pipe.open() 产生的文件操作符进行赋值,这里看下 Pipe.open():
Pipe 类:

public static Pipe open() throws IOException {// SelectorProvider.provider() 返回直接创建好的WindowsSelectorProvider的openSelector() SelectorProvider 实例传入// 然后调用openPipe()return SelectorProvider.provider().openPipe();
}

SelectorProviderImpl类中的openPipe():

public Pipe openPipe() throws IOException {// 返回 PipeImpl 的实例,传入创建好的WindowsSelectorProvider的openSelector() SelectorProvider 实例return new PipeImpl(this);
}

PipeImpl 的构造方法:

private static final int NUM_SECRET_BYTES = 16;
private static final Random RANDOM_NUMBER_GENERATOR = new SecureRandom();
private SourceChannel source;
private SinkChannel sink;PipeImpl(SelectorProvider var1) throws IOException {try {//  AccessController.doPrivileged() 借用权限完成方法的执行AccessController.doPrivileged(new PipeImpl.Initializer(var1));} catch (PrivilegedActionException var3) {throw (IOException)var3.getCause();}
}

重点看下PipeImpl 内部类中 new PipeImpl.Initializer(var1),在进行构造之后,执行run方法:

private class Initializer implements PrivilegedExceptionAction<Void> {private final SelectorProvider sp;private IOException ioe;private Initializer(SelectorProvider var2) {// 异常原因this.ioe = null;// SelectorProvider 实例this.sp = var2;}public Void run() throws IOException {// 启动线程执行LoopbackConnector 的run方法PipeImpl.Initializer.LoopbackConnector var1 = new PipeImpl.Initializer.LoopbackConnector();var1.run();if (this.ioe instanceof ClosedByInterruptException) {// 如果建立通道过程中发生了关闭异常则重新发起线程调用LoopbackConnector 的run方法this.ioe = null;Thread var2 = new Thread(var1) {public void interrupt() {}};var2.start();while(true) {try {var2.join();break;} catch (InterruptedException var4) {}}Thread.currentThread().interrupt();}if (this.ioe != null) {// 发生异常抛出异常throw new IOException("Unable to establish loopback connection", this.ioe);} else {return null;}}private class LoopbackConnector implements Runnable {private LoopbackConnector() {}public void run() {// 声明服务端的 ServerSocketChannel ServerSocketChannel var1 = null;// 声明客户端的SocketChannel SocketChannel var2 = null;SocketChannel var3 = null;try {// 声明var4 和 var5 两个内存空间分别为16个字节大小ByteBuffer var4 = ByteBuffer.allocate(16);ByteBuffer var5 = ByteBuffer.allocate(16);// 声明本机的地址InetAddress var6 = InetAddress.getByName("127.0.0.1");assert var6.isLoopbackAddress();// 声明var7 地址InetSocketAddress var7 = null;while(true) {if (var1 == null || !var1.isOpen()) {// 初始化服务端的ServerSocketChannel var1 = ServerSocketChannel.open();// 绑定服务端监听的端口ip 和端口var1.socket().bind(new InetSocketAddress(var6, 0));// 将地址赋值给var7 var7 = new InetSocketAddress(var6, var1.socket().getLocalPort());}// 打开客户端SocketChannel ,地址为var7 建立连接var2 = SocketChannel.open(var7);// 向var4 写入随机数PipeImpl.RANDOM_NUMBER_GENERATOR.nextBytes(var4.array());do {// 向SocketChannel 建立的连接写入数据var2.write(var4);} while(var4.hasRemaining());// 返回此缓冲区var4.rewind();// 服务端阻塞等待连接连接var3 = var1.accept();do {// 从服务端连接中读取数据var3.read(var5);} while(var5.hasRemaining());// 返回此缓冲区var5.rewind();// 如果客户端发送的数据和服务端接收到的数据相同说通道完成建立if (var5.equals(var4)) {// 放入通道描述PipeImpl.this.source = new SourceChannelImpl(Initializer.this.sp, var2);PipeImpl.this.sink = new SinkChannelImpl(Initializer.this.sp, var3);// 跳出循环break;}// 关闭 var3.close();var2.close();}} catch (IOException var18) {try {if (var2 != null) {var2.close();}if (var3 != null) {var3.close();}} catch (IOException var17) {}// 异常原因Initializer.this.ioe = var18;} finally {try {if (var1 != null) {// 关闭var1.close();}} catch (IOException var16) {}}}}
}
  • windows下的实现是创建两个本地的socketChannel,然后连接(链接的过程通过写一个随机long做两个socket的链接校验),两个socketChannel分别实现了管道的source与sink端。
  • source端由前面提到的WindowsSelectorImpl放到了pollWrapper中(pollWrapper.addWakeupSocket(wakeupSourceFd, 0));
  • 改通道的建立并不是为了确保连接,而是为了后续Select在获取内核准备好的数据时,一旦有socket返回可读/可写时间,方便唤醒对主线程的唤醒使用;

pollWrapper.addWakeupSocket(this.wakeupSourceFd, 0):
PollArrayWrapper类:

void putDescriptor(int var1, int var2) {this.pollArray.putInt(SIZE_POLLFD * var1 + 0, var2);
}void putEventOps(int var1, int var2) {this.pollArray.putShort(SIZE_POLLFD * var1 + 4, (short)var2);
}void addWakeupSocket(int var1, int var2) {this.putDescriptor(var2, var1);this.putEventOps(var2, Net.POLLIN);
}

这里将source的POLLIN事件标识为感兴趣的,当sink端有数据写入时,source对应的文件描述符wakeupSourceFd就会处于就绪状态;

总结:

  • window–Select.open() 通过加锁的方式获取进程下的唯一一个 WindowsSelectorImpl对象实例。
  • WindowsSelectorImpl 的对象实例中通过 对127.0.0.1 端口为0 ,建立两个连接的方式拿到文件操作符,并把 把唤醒端的文件描述符(wakeupSourceFd)放到pollWrapper里;

参考:
Java NIO 之 Selector(第一部分Selector.open());

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_72599.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Npde.js】express以及nodemon

express初始Express什么是Express不使用Express可以创建web服务器吗&#xff1f;Express能做什么安装Express监听GET请求和post请求获取URL中携带的查询参数获取URL中携带的动态参数托管静态资源nodemon为什么使用nodemon初始Express 什么是Express 官方给出的概念&#xff0…

Vue3 基础

Vue3 基础 概述 Vue (发音为 /vjuː/&#xff0c;类似 view) 是一款用于构建用户界面的 JavaScript 框架。它基于标准 HTML、CSS 和 JavaScript 构建&#xff0c;并提供了一套声明式的、组件化的编程模型&#xff0c;帮助你高效地开发用户界面。无论是简单还是复杂的界面&…

Java:Java与Python — 编码大战

Java和Python是目前市场上最热门的两种编程语言&#xff0c;因为它们具有通用性、高效性和自动化能力。两种语言都有各自的优点和缺点&#xff0c;但主要区别在于Java 是静态类型的&#xff0c;Python是动态类型的。它们有相似之处&#xff0c;因为它们都采用了“一切都是对象”…

单片机输入输出模式

单片机输入输出模式输入模式模拟输入、浮空输入、上拉输入、下拉输入GPIO输出模式推挽输出、开漏输出、复用推挽输出、复用开漏输出。上下拉电阻上拉电阻下拉电阻输入模式 模拟输入、浮空输入、上拉输入、下拉输入 模拟输入&#xff1a;I/O端口的模拟信号&#xff08;电压信号…

日志收集笔记(架构设计、Log4j2项目初始化、Lombok)

1 架构设计 ELK 技术栈架构设计图&#xff1a; 从左往右看&#xff0c; Beats&#xff1a;主要是使用 Filebeat&#xff0c;用于收集日志&#xff0c;将收集后的日志数据发送给 Kafka&#xff0c;充当 Kafka 的生产者Kafka&#xff1a;高性能消息队列&#xff0c;主要起缓冲…

关于客户背景调查的两个案例,说下我的真实看法

这篇文章我只是想客观陈述下事实&#xff0c;并没有对他人的贬低与对自己的吹捧之意。只是想通过这样两件小事&#xff0c;传递出来一个观点&#xff1a;在外贸业务开发过程中&#xff0c;很多时候正是那些我们内心抗拒&#xff0c;不愿意沉下心去做的事&#xff0c;才给了我们…

C#与三菱PLC MC协议通信,Java与三菱PLC MC协议通信

三菱PLC的MC协议是一种常用的通信协议&#xff0c;用于实现三菱PLC与其他设备之间的通信。以下是一些关于MC协议的基本信息&#xff1a;协议格式MC协议的通信数据格式如下&#xff1a;数据头网络编号PC编号目标模块IO编号目标模块站号本机模块IO编号本机模块站号请求数据长度请…

嵌套走马灯Carousel

Carousel 的应用很广泛&#xff0c;基础用法这里不多做阐述&#xff0c;感兴趣的可以去element-gui了解Carousel 组件。 今天主要是梳理嵌套走马灯的逻辑&#xff0c;背景如下&#xff1a; 需要对项目做一个展示&#xff0c;项目可能有一个或多个&#xff0c;同时一个项目可能…

初探 qiling ( 麒麟 ):开源的二进制分析、高级代码模拟框架

官方介绍&#xff1a; 官网&#xff1a;https://qiling.io/&#xff1a;https://twitter.com/qiling_iogithub 地址&#xff1a;https://github.com/qilingframework/qiling 1、qiling 简介 qiling 是什么 qiling 基于 python 开发&#xff0c;是一个开源的、可模拟多种架构…

Web前端:全栈开发人员的责任

多年来&#xff0c;关于全栈开发人员有很多说法&#xff0c;全栈开发人员是一位精通应用程序全栈开发过程的专业人士。这包括数据库、API、前端技术、后端开发语言和控制系统版本。你一定遇到过前端和后端开发人员。前端开发人员将构建接口&#xff0c;而后端开发人员将开发、更…

狂神说:方法

何为方法方法是语句和集合&#xff0c;一起执行一个功能【实际上方法就是函数&#xff0c;说法不一样而已】定义方法加了static才能被main方法调用修饰符&#xff08;public static&#xff09; 返回类型 方法名&#xff08;参数类型 参数名&#xff09;// main方法public stat…

vscode SSH 保存密码自动登录服务器

先在win local上拿到秘钥&#xff0c;然后再把这秘钥copy 进服务器 1. 创建 RSA 密钥对 第一步是在客户端机器&#xff08;通常是您的计算机 win 10&#xff09;上创建密钥对&#xff1a;打开powershell, 输入 ssh-keygen默认情况下ssh-keygen将创建一个 2048 位 RSA 密钥对…

“双碳”目标下二氧化碳地质封存技术应用前景及模型构建实践方法与讨论

我国二氧化碳地质封存技术起步较晚&#xff0c;目前仍没有一套相对完整的行业规范&#xff1b;且就该技术而言&#xff0c;涉及环节众多&#xff0c;理论相对复杂&#xff0c;对于行业的新入局者不太友好。因此&#xff0c;结合时代背景&#xff0c;我们首次尝试对二氧化碳地质…

nodejs出现require is not defined和__dirname is not define 错误

参阅此&#xff0c; Cesium环境搭建成功和初步看一下它的示例_bcbobo21cn的博客-CSDN博客 运行Cesium入门示例&#xff0c;出现下图错误&#xff0c;根据资料&#xff0c;这是node版本的问题&#xff1b; 解决方法是&#xff0c;在server.js头部加入&#xff0c; import { cre…

Flink04: Flink核心API之DataStream

一、Flink 4种不同层次的API Flink中提供了4种不同层次的API&#xff0c;每种API在简洁和易表达之间有自己的权衡&#xff0c;适用于不同的场景。目前上面3个会用得比较多。 • 低级API(Stateful Stream Processing)&#xff1a;提供了对时间和状态的细粒度控制&#x…

Endless lseek导致的SQL异常

最近碰到同事咨询的一个问题&#xff0c;在执行一个函数时&#xff0c;发现会一直卡在那里。 strace抓了下发现会话一直在执行lseek&#xff0c;大致情况如下&#xff1a; 16:13:55.451832 lseek(33, 0, SEEK_END) 1368064 <0.000037> 16:13:55.477216 lseek(33, 0, SE…

linux下安装mongoDB

一、下载mongoDB包 下载地址&#xff1a; https://www.mongodb.com/try/download/community 个人建议&#xff1a;如果是学习阶段&#xff0c;使用5以下版本更好些。 二、安装及配置 1、安装 # 1、解压 $ tar -zxvf mongodb-linux-x86_64-rhel70-4.4.19-rc1.tgz# 2、迁移目…

【音视频处理】为什么MP3不是无损音乐?音频参数详解,码率、采样率、音频帧、位深度、声道、编码格式的关系

大家好&#xff0c;欢迎来到停止重构的频道。上期我们讨论了视频的相关概念&#xff0c;本期我们讨论音频的相关概念。包括采样率、码率、单双声道、音频帧、编码格式等概念。这里先抛出一个关于无损音频的问题。为什么48KHz采样率的.mp3不是无损音乐 &#xff0c;而48KHz采样率…

高性能爬虫之单线程、多进程、多线程的使用,线程池、进程池、协程池的使用

目录一、单线程爬虫代码实现二、 多线程爬虫1、多线程的方法使用2、队列模块的使用3、多线程实现思路剖析4、代码实现**注意点&#xff1a;**三、多进程爬虫1、多进程程的方法使用2、多进程中队列的使用3 代码实现**小结**四、线程池实现爬虫1、线程池使用方法介绍2、使用线程池…

365天深度学习训练营-第J3周:DenseNet算法实战与解析

目录 一、前言 二、论文解读 1、DenseNet的优势 2、设计理念 3、网络结构 4、与其他算法进行对比 三、代码复现 1、使用Pytorch实现DenseNet 2、使用Tensorflow实现DenseNet网络 四、分析总结 一、前言 &#x1f368; 本文为&#x1f517;365天深度学习训练营 中的学习…