IO知识整理

news/2024/4/20 7:17:57/文章来源:https://blog.csdn.net/baidu_38900596/article/details/129049169

IO

面向系统IO

page cache

程序虚拟内存到物理内存的转换依靠cpu中的mmu映射

物理内存以page(4k)为单位做分配

多个程序访问磁盘上同一个文件,步骤

  • kernel将文件内容加载到pagecache
  • 多个程序读取同一份文件指向的同一个pagecache
  • 多个程序各自维护各自的fd,fd中seek记录偏移量,指向具体数据

pagecache特点

  • 为内核维护的中间层
  • 淘汰机制
  • 持久化机制,是否会丢失数据
  • 占用内存大小

Java输出流

  • 不使用buffer的输出流
  • 使用buffer的输出流
    • 速度超过不使用buffer的输出流,是因为先在jvm内存中写入,默认到8kb写完调用一次systemcall
  • 随机输出流

内存充足的情况下,会优先写入pagecache,只要未达到脏页持久化阈值,就不会写入磁盘,不论pagecache里面多少数据,关机重启后将全部丢失;内存不充足的情况下,pagecache中的新数据的写入会导致老数据的持久化,进而写入磁盘。

可以通过手动调用系统flush将脏页写入磁盘

脏页持久化之后cache依然存在于内存,只有内存不够分配时才会淘汰cache,且淘汰的cache一定不是脏页。

ByteBuffer

@Test
public void whatByteBuffer() {// 堆内分配内存// ByteBuffer buffer = ByteBuffer.allocate(1024);// 堆外分配内存ByteBuffer buffer = ByteBuffer.allocateDirect(1024);System.out.println("postition: " + buffer.position());System.out.println("limit: " + buffer.limit());System.out.println("capacity: " + buffer.capacity());// position起点 limit终点 capacity容量System.out.println("mark: " + buffer);// position->3 buffer.put("123".getBytes());System.out.println("-------------put:123......");System.out.println("mark: " + buffer);// 读写交替 position->0 limit->3buffer.flip();System.out.println("-------------flip......");System.out.println("mark: " + buffer);// 读取一个byte pos->1 limit->3buffer.get();System.out.println("-------------get......");System.out.println("mark: " + buffer);// 读写交替 pos->2 limit->1024 已读取的字节删除buffer.compact();System.out.println("-------------compact......");System.out.println("mark: " + buffer);buffer.clear();System.out.println("-------------clear......");System.out.println("mark: " + buffer);
}

mmap

堆外创建一个地址空间,只有文件系统可以直接调用

该内存空间的数据不需要通过系统调用及用户态和内核态的切换,直接写入数据就可以同步

但是依然收到系统page cache限制,存在数据丢失的可能

Direct IO

可以绕过系统控制的page cache,不受系统page cache参数控制

程序自己维护page cache,通过自定义代码逻辑维护一致性/dirty等…

好处是自己维护page cache刷磁盘的阈值,与系统通用配置隔离,但是依然存在丢失数据的逻辑

@Test
public void testRandomAccessFileWrite() throws  Exception {RandomAccessFile raf = new RandomAccessFile(path, "rw");raf.write("hello mashibing\n".getBytes());raf.write("hello seanzhou\n".getBytes());System.out.println("write------------");System.in.read();// 指针调整,往前调整后继续写会直接覆盖原有数据raf.seek(4);raf.write("ooxx".getBytes());System.out.println("seek---------");System.in.read();FileChannel rafchannel = raf.getChannel();// mmap  堆外(jvm堆外且是linux的java进程堆外的内存)  和文件映射的   byte  not  object// 此时文件大小会变为4096kbMappedByteBuffer map = rafchannel.map(FileChannel.MapMode.READ_WRITE, 0, 4096);map.put("@@@".getBytes());  //不是系统调用  但是数据会到达 内核的pagecache//曾经我们是需要out.write()  这样的系统调用,才能让程序的data 进入内核的pagecache//曾经必须有用户态内核态切换//mmap的内存映射,依然是内核的pagecache体系所约束的!!!//换言之,丢数据System.out.println("map--put--------");System.in.read();// 数据刷入磁盘 等于flush// map.force(); raf.seek(0);ByteBuffer buffer = ByteBuffer.allocate(8192);// ByteBuffer buffer = ByteBuffer.allocateDirect(1024);// 将channel中的数据读入buffer 等价于buffer.put()int read = rafchannel.read(buffer);   System.out.println(buffer);// 此时翻转后可以开始读取数据 pos->0 limit->4096buffer.flip();System.out.println(buffer);for (int i = 0; i < buffer.limit(); i++) {Thread.sleep(200);System.out.print(((char)buffer.get(i)));}
}

请添加图片描述

面向网络IO

TCP

面向连接的,可靠的传输协议

  • 三次握手
  • 四次分手
  • 内核级开辟资源,建立连接
    • 即使服务端没有accept,也可以建立连接(established状态),只是不分配具体的pid
    • 而从java进程内部,看到的连接依然是listen状态
    • 此时可以从客户端发送数据,而服务端不能接收,服务端开启accept后可以接收到已发送的数据

三次握手报文

  • win 滑动窗口长度 协商结果为115
    • tcp拥塞控制,提速
      • 服务端窗口已满
      • 客户端阻塞不再继续发包(如果继续发包会丢弃后发的数据)
  • seq 序列号 client->server server将序列号+1作为ack返回给client
  • mtu ifconfig可以看到网口字节长度
  • mss 实际数据字节长度,基本上为mtu-ip长度-port长度 各自20字节

请添加图片描述

四次分手

  • 异常状态
    • CLOSE_WAIT(接收方第三次FIN信号没有发送成功)
    • TIME_WAIT (连接关闭后,为防止接收方没有收到最后一次ACK,保留连接对应资源)
    • FIN_WAIT2(没有收到FIN信号)

请添加图片描述

Socket

四元组(cip + sport +sip + sport)

  • 服务端不需要给客户端连接分配新的端口号(四元组标识唯一,客户端服务端各存一份)

内核级别

关键配置

  • backlog

    • 配置后续队列,超过配置条数+1之后,会将连接状态置为SYNC_REC状态,不可连接

    • 应当根据处理速度、cpu条数来进行配置,防止建立过多连接

  • nodelay

    • false:使用优化,会积攒超过buffer长度的字节一次发到服务端,但是会有延时
    • true:不使用优化,根据内核调度尽快发送,会多出几次网络io
  • oobinline

    • 结合nodelay配置使用效果明显,会单独发出第一个字节
  • keepalive

    • 开启后会保持心跳,判断相互之间连接是否生效

IO模型

模型分类

  • 同步:程序自己进行R/W操作
  • 异步:内核完成R/W 程序像是不访问IO一样,直接使用buffer,linux不可用
  • 阻塞:BLOCKING BIO
  • 非阻塞:NONBLOCKING NIO

linux

  • 同步阻塞:BIO
  • 同步非阻塞:NIO、多路复用

BIO

多线程BIO模型,主线程accept+创建子线程,子线程做recv

public static void main(String[] args) throws Exception {ServerSocket server = new ServerSocket(9090,20);System.out.println("step1: new ServerSocket(9090) ");while (true) {// linux指令 socket() ->fd3 bind(fd3,8090) accept(fd3)->fd5Socket client = server.accept();  //阻塞1System.out.println("step2:client\t" + client.getPort());// 主线程只负责创建子线程用来接收数据new Thread(new Runnable(){public void run() {InputStream in = null;try {in = client.getInputStream();BufferedReader reader = new BufferedReader(new InputStreamReader(in));while(true){// linux指令 recv(fd5) 子线程负责读取数据String dataline = reader.readLine(); //阻塞2if(null != dataline){System.out.println(dataline);}else{client.close();break;}}System.out.println("客户端断开");} catch (IOException e) {e.printStackTrace();}}}).start();}

请添加图片描述

BIO多线程模型需要创建新线程(系统调用clone)+线程调度,导致速率降低

BIO的弊端主要来自于阻塞,系统内核级别阻塞(accept+recv)

  • accept等待连接和recv接收数据会阻塞,所以通过创建子线程的方式进行规避,但是clone指令+线程切换也是有很高成本的,当客户端连接数量增加时,处理速度会明显降低
  • 因为recv接收数据会发生阻塞,所以当多个客户端连接的时候只能使用多个线程的方式来进行读取,如果只使用一个线程,当连接阻塞时,没办法读取其他连接发送的数据

NIO

accept指令不会阻塞,如果没有连接会直接返回-1,在Java api中会体现为对象为null

不阻塞的好处

  • 可以不额外创建线程,在一个线程中执行建立连接和接收信息两个操作,节省了创建连接和线程切换的成本

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-kpaWtaly-1676460489179)(IO.assets/image-20230209151431908.png)]

public static void main(String[] args) throws Exception {LinkedList<SocketChannel> clients = new LinkedList<>();ServerSocketChannel ss = ServerSocketChannel.open();  //服务端开启监听:接受客户端ss.bind(new InetSocketAddress(9090));ss.configureBlocking(false); //重点  OS  NONBLOCKING!!!  //只让接受客户端  不阻塞while (true) {//接受客户端的连接Thread.sleep(1000);SocketChannel client = ss.accept(); //不会阻塞?  -1 NULL//accept  调用内核了:1,没有客户端连接进来在BIO 的时候一直卡着,但是在NIO不卡着,返回-1,NULL//如果来客户端的连接,accept 返回的是这个客户端的fd  5,client  object//NONBLOCKING 就是代码能往下走了,只不过有不同的情况if (client == null) {System.out.println("null.....");} else {client.configureBlocking(false); //重点 连接socket非阻塞// socket// 服务端的listen socket(连接请求三次握手后,通过accept 得到 连接的socket)// 连接socket(连接后的数据读写使用的)int port = client.socket().getPort();System.out.println("client..port: " + port);clients.add(client);}ByteBuffer buffer = ByteBuffer.allocateDirect(4096);  //可以在堆里   堆外//遍历已经连接进来的客户端能不能读写数据for (SocketChannel c : clients) {   //串行化!!!!  多线程!!int num = c.read(buffer);  // >0  -1  0   //不会阻塞if (num > 0) {buffer.flip();byte[] aaa = new byte[buffer.limit()];buffer.get(aaa);String b = new String(aaa);System.out.println(c.socket().getPort() + " : " + b);buffer.clear();}}}
}

NIO的弊端

  • 每次获取数据是O(n)级别的时间复杂度

    • 例如有10w连接,只有100个数据传输,nio需要10w次recv系统调用,这里面大部分的系统调用是无意义的(没有数据传输,徒增成本)

多路复用IO

程序通过一次系统调用获得其中IO状态,然后程序自己实现对于有状态的IO进行R/W,时间负责度O(m) + O(1)

无论select、poll还是nio,本质上都是对程序持有的fds依次遍历,只不过区别是nio的依次遍历发生在程序侧,需要发生n次系统调用,以及n次用户态内核态的切换;而select和poll,是需要程序传递fds给内核,内核触发遍历,只发生一次系统调用。

linux下 多路复用器

  • SELECT POSIX规范
    • 受到FD_SETSIZE的限制,一次最多1024个fd
    • 每次都要重新传递fds,每次内核调用都需要触发全量调用传递的fds
  • POLL
    • 不受到FD_SETSIZE的限制
    • 每次都要重新传递fds,每次内核调用都需要触发全量调用传递的fds
  • EPOLL
    • 内核开辟空间保存fd,规避程序重复传递fd的问题和遍历全量fd的问题

请添加图片描述

public class SocketMultiplexingSingleThreadv1 {private ServerSocketChannel server = null;//linux 多路复用器(select poll    epoll kqueue) nginx  event{}private Selector selector = null;   int port = 9090;public void initServer() {try {server = ServerSocketChannel.open();server.configureBlocking(false);server.bind(new InetSocketAddress(port));//如果在epoll模型下,open--》  epoll_create -> fd3selector = Selector.open();  //  select  poll  *epoll  优先选择:epoll  但是可以 -D修正//server 约等于 listen状态的 fd4/*register如果:select,poll:jvm里开辟一个数组 fd4 放进去epoll:  epoll_ctl(fd3,ADD,fd4,EPOLLIN*/server.register(selector, SelectionKey.OP_ACCEPT);} catch (IOException e) {e.printStackTrace();}}public void start() {initServer();System.out.println("服务器启动了。。。。。");try {while (true) {  //死循环Set<SelectionKey> keys = selector.keys();System.out.println(keys.size()+"   size");//1,调用多路复用器(select,poll  or  epoll  (epoll_wait))/*select()是啥意思:1,select,poll  其实  内核的select(fd4)  poll(fd4)2,epoll:  其实 内核的 epoll_wait()*, 参数可以带时间:没有时间,0  :  阻塞,有时间设置一个超时selector.wakeup()  结果返回0懒加载:其实再触碰到selector.select()调用的时候触发了epoll_ctl的调用*/while (selector.select() > 0) {Set<SelectionKey> selectionKeys = selector.selectedKeys();  //返回的有状态的fd集合Iterator<SelectionKey> iter = selectionKeys.iterator();//  NIO 对着每一个fd调用系统调用,浪费资源//  多路复用IO调用一次select方法,返回的那些fc可以R/Wwhile (iter.hasNext()) {SelectionKey key = iter.next();iter.remove(); //set  不移除会重复循环处理if (key.isAcceptable()) {//看代码的时候,这里是重点,如果要去接受一个新的连接//语义上,accept接受连接且返回新连接的FD对吧?//那新的FD怎么办?//select,poll,因为他们内核没有空间,那么在jvm中保存和前边的fd4那个listen的一起//epoll: 我们希望通过epoll_ctl把新的客户端fd注册到内核空间acceptHandler(key);} else if (key.isReadable()) {readHandler(key);  //连read 还有 write都处理了//在当前线程,这个方法可能会阻塞  ,如果阻塞了十年,其他的IO早就没电了。。。//所以,为什么提出了 IO THREADS//redis  是不是用了epoll,redis是不是有个io threads的概念 ,redis是不是单线程的//tomcat 8,9  异步的处理方式  IO  和   处理上  解耦}}}}} catch (IOException e) {e.printStackTrace();}}public void acceptHandler(SelectionKey key) {try {ServerSocketChannel ssc = (ServerSocketChannel) key.channel();SocketChannel client = ssc.accept(); //来啦,目的是调用accept接受客户端  fd7client.configureBlocking(false);ByteBuffer buffer = ByteBuffer.allocate(8192);//你看,调用了register/*select,poll:jvm里开辟一个数组 fd7 放进去epoll:  epoll_ctl(fd3,ADD,fd7,EPOLLIN*/client.register(selector, SelectionKey.OP_READ, buffer);System.out.println("-------------------------------------------");System.out.println("新客户端:" + client.getRemoteAddress());System.out.println("-------------------------------------------");} catch (IOException e) {e.printStackTrace();}}public void readHandler(SelectionKey key) {SocketChannel client = (SocketChannel) key.channel();ByteBuffer buffer = (ByteBuffer) key.attachment();buffer.clear();int read = 0;try {while (true) {read = client.read(buffer);if (read > 0) {buffer.flip();while (buffer.hasRemaining()) {client.write(buffer);}buffer.clear();} else if (read == 0) {break;} else {client.close();break;}}} catch (IOException e) {e.printStackTrace();}}public static void main(String[] args) {SocketMultiplexingSingleThreadv1 service = new SocketMultiplexingSingleThreadv1();service.start();}
}

不同多路复用器对应的系统指令

  • POLL(jdk native 用户空间 保存了fd)
    • 创建server并进行监听
      • socket(PF_INET, SOCK_STREAM, IPPROTO_IP) = 4
      • fcntl(4, F_SETFL, O_RDWR|O_NONBLOCK) = 0 //server.configureBlocking(false);
      • bind(4, {sa_family=AF_INET, sin_port=htons(9090)
      • listen(4, 50)
    • 多路复用建立连接
      • poll([{fd=5, events=POLLIN}, {fd=4, events=POLLIN}], 2, -1) = 1 ([{fd=4, revents=POLLIN}])
      • accept(4, = 7 //获取到新的客户端
      • fcntl(7, F_SETFL, O_RDWR|O_NONBLOCK) //非阻塞
      • poll([{fd=5, events=POLLIN}, {fd=4, events=POLLIN}, {fd=7, events=POLLIN}], 3, -1) = 1 //selector.select() 注册新的客户端并查询客户端fd是否有状态变更
  • EPOLL
    • 创建server并进行监听
      • socket(PF_INET, SOCK_STREAM, IPPROTO_IP) = 4
      • fcntl(4, F_SETFL, O_RDWR|O_NONBLOCK) = 0 //server.configureBlocking(false);
      • bind(4, {sa_family=AF_INET, sin_port=htons(9090)
      • listen(4, 50)
    • 多路复用创建连接
      • epoll_create(256) = 7 (epfd)
      • epoll_ctl(7, EPOLL_CTL_ADD, 4,
      • epoll_wait(7, {{EPOLLIN, {u32=4, u64=2216749036554158084}}}, 4096, -1) = 1 // selector.select()
      • accept(4 =8 //建立连接获取新client的fd
      • fcntl(8, F_SETFL, O_RDWR|O_NONBLOCK) //设置为非阻塞
      • epoll_ctl(7, EPOLL_CTL_ADD, 8, {EPOLLIN, //注册新的客户端到多路复用器
      • epoll_wait(7, //等待有状态变更的fd返回

多线程Selector

多线程 多路复用IO

分一个bossGroup 和一个workerGroup;bossGroup负责listen,workGroup负责人R/W

public class MainThread {public static void main(String[] args) {//1,创建 IO Thread  (一个或者多个)SelectorThreadGroup boss = new SelectorThreadGroup(3);  //混杂模式//boss有自己的线程组SelectorThreadGroup worker = new SelectorThreadGroup(3);  //混杂模式//worker有自己的线程组boss.setWorker(worker);//但是,boss得多持有worker的引用:/*** boss里选一个线程注册listen , 触发bind,从而,这个不选中的线程得持有 workerGroup的引用* 因为未来 listen 一旦accept得到client后得去worker中 next出一个线程分配*/boss.bind(9999);boss.bind(8888);boss.bind(6666);boss.bind(7777);}
}
public class SelectorThread  extends  ThreadLocal<LinkedBlockingQueue<Channel>>  implements   Runnable{// 每线程对应一个selector,// 多线程情况下,该主机,该程序的并发客户端被分配到多个selector上//注意,每个客户端,只绑定到其中一个selector//其实不会有交互问题Selector  selector = null;LinkedBlockingQueue<Channel> lbq = get();  //lbq  在接口或者类中是固定使用方式逻辑写死了。你需要是lbq每个线程持有自己的独立对象SelectorThreadGroup stg;@Overrideprotected LinkedBlockingQueue<Channel> initialValue() {return new LinkedBlockingQueue<>();}SelectorThread(SelectorThreadGroup stg){try {this.stg = stg;selector = Selector.open();} catch (IOException e) {e.printStackTrace();}}@Overridepublic void run() {//Loopwhile (true){try {//1,select()int nums = selector.select();  //阻塞  wakeup()//2,处理selectkeysif(nums>0){Set<SelectionKey> keys = selector.selectedKeys();Iterator<SelectionKey> iter = keys.iterator();while(iter.hasNext()){  //线程处理的过程SelectionKey key = iter.next();iter.remove();if(key.isAcceptable()){  //复杂,接受客户端的过程(接收之后,要注册,多线程下,新的客户端,注册到那里呢?)acceptHandler(key);}else if(key.isReadable()){readHander(key);}else if(key.isWritable()){}}}//3,处理一些task :  listen  clientif(!lbq.isEmpty()){//只有方法的逻辑,本地变量是线程隔离的Channel c = lbq.take();if(c instanceof ServerSocketChannel){ServerSocketChannel server = (ServerSocketChannel) c;server.register(selector,SelectionKey.OP_ACCEPT);System.out.println(Thread.currentThread().getName()+" register listen");}else if(c instanceof  SocketChannel){SocketChannel client = (SocketChannel) c;ByteBuffer buffer = ByteBuffer.allocateDirect(4096);client.register(selector, SelectionKey.OP_READ, buffer);System.out.println(Thread.currentThread().getName()+" register client: " + client.getRemoteAddress());}}} catch (Exception e) {e.printStackTrace();} }}private void readHander(SelectionKey key) {System.out.println(Thread.currentThread().getName()+" read......");ByteBuffer buffer = (ByteBuffer)key.attachment();SocketChannel client = (SocketChannel)key.channel();buffer.clear();while(true){try {int num = client.read(buffer);if(num > 0){buffer.flip();  //将读到的内容翻转,然后直接写出while(buffer.hasRemaining()){client.write(buffer);}buffer.clear();}else if(num == 0){break;}else {//客户端断开了System.out.println("client: " + client.getRemoteAddress()+"closed......");key.cancel();break;}} catch (IOException e) {e.printStackTrace();}}}private void acceptHandler(SelectionKey key) {System.out.println(Thread.currentThread().getName()+"   acceptHandler......");ServerSocketChannel server = (ServerSocketChannel)key.channel();try {SocketChannel client = server.accept();client.configureBlocking(false);stg.nextSelectorV3(client);// stg.nextSelectorV2(client);} catch (IOException e) {e.printStackTrace();}}public void setWorker(SelectorThreadGroup stgWorker) {this.stg =  stgWorker;}
}
public class SelectorThreadGroup {  //天生都是bossSelectorThread[] sts;ServerSocketChannel server=null;AtomicInteger xid = new AtomicInteger(0);SelectorThreadGroup  stg =  this;public void setWorker(SelectorThreadGroup  stg){this.stg =  stg;}SelectorThreadGroup(int num){//num  线程数sts = new SelectorThread[num];for (int i = 0; i < num; i++) {sts[i] = new SelectorThread(this);new Thread(sts[i]).start();}}public void bind(int port) {try {server =  ServerSocketChannel.open();server.configureBlocking(false);server.bind(new InetSocketAddress(port));//注册到那个selector上呢?nextSelectorV3(server);} catch (IOException e) {e.printStackTrace();}}public void nextSelectorV3(Channel c) {try {if(c instanceof  ServerSocketChannel){SelectorThread st = next();  //listen 选择了 boss组中的一个线程后,要更新这个线程的work组st.lbq.put(c);st.setWorker(stg);st.selector.wakeup();}else {SelectorThread st = nextV3();  //在 main线程种,取到堆里的selectorThread对象//1,通过队列传递数据 消息st.lbq.add(c);//2,通过打断阻塞,让对应的线程去自己在打断后完成注册selectorst.selector.wakeup();}} catch (InterruptedException e) {e.printStackTrace();}}//无论 serversocket  socket  都复用这个方法private SelectorThread next() {int index = xid.incrementAndGet() % sts.length;  //轮询就会很尴尬,倾斜return sts[index];}private SelectorThread nextV3() {int index = xid.incrementAndGet() % stg.sts.length;  //动用worker的线程分配return stg.sts[index];}
}

Netty

ButeBuf

类似于jdk原生ByteBuffer封装

//initialCapacity maxCapacity 默认分配为堆外内存
//ByteBuf buf = PooledByteBufAllocator.DEFAULT.heapBuffer(8, 20);
ByteBuf buf = PooledByteBufAllocator.DEFAULT.heapBuffer(8, 20);System.out.println("buf.isReadable()    :" + buf.isReadable());
System.out.println("buf.readerIndex()   :" + buf.readerIndex());
System.out.println("buf.readableBytes() " + buf.readableBytes());
System.out.println("buf.isWritable()    :" + buf.isWritable());
System.out.println("buf.writerIndex()   :" + buf.writerIndex());
System.out.println("buf.writableBytes() :" + buf.writableBytes());
System.out.println("buf.capacity()  :" + buf.capacity());
System.out.println("buf.maxCapacity()   :" + buf.maxCapacity());
//是否为堆外内存
System.out.println("buf.isDirect()  :" + buf.isDirect());

Client端

NioEventLoopGroup

NioSocketChannel

客户端读写需要注册到类似于多路复用器

@Test
public void clientMode() throws Exception {NioEventLoopGroup thread = new NioEventLoopGroup(1);//客户端模式:NioSocketChannel client = new NioSocketChannel();thread.register(client);  //epoll_ctl(5,ADD,3)//响应式:输入处理ChannelPipeline p = client.pipeline();p.addLast(new MyInHandler());//reactor  异步的特征ChannelFuture connect = client.connect(new InetSocketAddress("192.168.150.11", 9090));ChannelFuture sync = connect.sync();ByteBuf buf = Unpooled.copiedBuffer("hello server".getBytes());ChannelFuture send = client.writeAndFlush(buf);//同步处理  send.sync();//同步处理 等待服务端断开连接sync.channel().closeFuture().sync();System.out.println("client over....");
}class MyInHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRegistered(ChannelHandlerContext ctx) throws Exception {System.out.println("client  registed...");}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println("client active...");}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf = (ByteBuf) msg;// 这里readCharSequence会移动ByteBuffer里面的指针,所以再写回去是没有数据的// CharSequence str = buf.readCharSequence(buf.readableBytes(), CharsetUtil.UTF_8);CharSequence str = buf.getCharSequence(0, buf.readableBytes(), CharsetUtil.UTF_8);System.out.println(str);ctx.writeAndFlush(buf);}
}

Server端

NioEventLoopGroup

NioServerSocketChannel

还是响应式编程,有客户端连接后通过acceptHandler进行accept和注册R/W Handler

@Test
public void serverMode() throws Exception {NioEventLoopGroup thread = new NioEventLoopGroup(1);NioServerSocketChannel server = new NioServerSocketChannel();thread.register(server);ChannelPipeline p = server.pipeline();//这里通过ChannelInit处理注册R/W处理器//accept接收客户端,并且注册到selectorp.addLast(new MyAcceptHandler(thread, new ChannelInit())); ChannelFuture bind = server.bind(new InetSocketAddress("192.168.150.1", 9090));bind.sync().channel().closeFuture().sync();System.out.println("server close....");
}class MyAcceptHandler extends ChannelInboundHandlerAdapter {private final EventLoopGroup selector;private final ChannelHandler handler;public MyAcceptHandler(EventLoopGroup thread, ChannelHandler myInitHandler) {this.selector = thread;this.handler = myInitHandler;  //ChannelInit}@Overridepublic void channelRegistered(ChannelHandlerContext ctx) throws Exception {System.out.println("server registerd...");}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {//  listen  socket   accept    client//  socket           R/W       ByteBufSocketChannel client = (SocketChannel) msg;//响应式的  handlerChannelPipeline p = client.pipeline();p.addLast(handler);  //1,client::pipeline[ChannelInit,]//注册selector.register(client);}
}/**
* @ChannelHandler.Sharable表示线程间共享
* 通过ChannelInit初始化来进行与业务处理器之间的解耦
*/
@ChannelHandler.Sharable
class ChannelInit extends ChannelInboundHandlerAdapter {@Overridepublic void channelRegistered(ChannelHandlerContext ctx) throws Exception {Channel client = ctx.channel();ChannelPipeline p = client.pipeline();p.addLast(new MyInHandler());//2,client::pipeline[ChannelInit,MyInHandler]//用完清除自己即可ctx.pipeline().remove(this);//3,client::pipeline[MyInHandler]}
}

Nio Bootstrap

Client端

@Test
public void nettyClient() throws InterruptedException {NioEventLoopGroup group = new NioEventLoopGroup(1);Bootstrap bs = new Bootstrap();ChannelFuture connect = bs.group(group).channel(NioSocketChannel.class)// .handler(new ChannelInit()).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline p = ch.pipeline();p.addLast(new MyInHandler());}}).connect(new InetSocketAddress("192.168.150.11", 9090));Channel client = connect.sync().channel();ByteBuf buf = Unpooled.copiedBuffer("hello server".getBytes());ChannelFuture send = client.writeAndFlush(buf);send.sync();client.closeFuture().sync();
}

Server端


@Test
public void nettyServer() throws InterruptedException {NioEventLoopGroup group = new NioEventLoopGroup(1);ServerBootstrap bs = new ServerBootstrap();ChannelFuture bind = bs.group(group, group).channel(NioServerSocketChannel.class)// .childHandler(new ChannelInit()).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ChannelPipeline p = ch.pipeline();p.addLast(new MyInHandler());}}).bind(new InetSocketAddress("192.168.150.1", 9090));bind.sync().channel().closeFuture().sync();}

C10K problem

单机连接10k客户端问题,随着单服务端连接客户端越来越多,才逐渐出现Nio、多路复用IO等模型

http://www.kegel.com/c10k.html

public static void main(String[] args) {LinkedList<SocketChannel> clients = new LinkedList<>();InetSocketAddress serverAddr = new InetSocketAddress("192.168.150.11", 9090);//端口号的问题:65535//  windowsfor (int i = 10000; i < 65000; i++) {try {SocketChannel client1 = SocketChannel.open();SocketChannel client2 = SocketChannel.open();/*linux中你看到的连接就是:client...port: 10508client...port: 10508*/client1.bind(new InetSocketAddress("192.168.150.1", i));//  192.168.150.1:10000   192.168.150.11:9090client1.connect(serverAddr);clients.add(client1);client2.bind(new InetSocketAddress("192.168.110.100", i));//  192.168.110.100:10000  192.168.150.11:9090client2.connect(serverAddr);clients.add(client2);} catch (IOException e) {e.printStackTrace();}}System.out.println("clients "+ clients.size());try {System.in.read();} catch (IOException e) {e.printStackTrace();}
}

IO数据流向图

EPOLL即为在内核空间中通过链表保存有数据状态变更的fd,直接从链表中获取fd即可

请添加图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_258430.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

VMware 修复了三个身份认证绕过漏洞

Bleeping Computer 网站披露&#xff0c;VMware 近期发布了安全更新&#xff0c;以解决 Workspace ONE Assist 解决方案中的三个严重漏洞&#xff0c;分别追踪为 CVE-2022-31685&#xff08;认证绕过&#xff09;、CVE-2022-31686 &#xff08;认证方法失败&#xff09;和 CVE-…

2023想转行软件测试的看过来,你想要了解的薪资、前景、岗位方向、学习路线都讲明白了

在过去的一年中&#xff0c;软件测试行业发展迅速&#xff0c;随着数字化技术应用的广泛普及&#xff0c;业界对于软件测试的要求也在持续迭代与增加。 同样的&#xff0c;有市场就有需求&#xff0c;软件测试逐渐成为企业中不可或缺的岗位&#xff0c;作为一个高薪又需求广的…

RTT 消息邮箱

1.邮箱概念 邮箱服务是实时操作系统中一种典型的线程间通信方法。举一个简单的例子&#xff0c;有两个线程&#xff0c;线程 1 检测按键状态并发送&#xff0c;线程 2 读取按键状态并根据按键的状态相应地改变 LED 的亮灭。这里就可以使用邮箱的方式进行通信&#xff0c;线程 …

软件测试选Python还是Java?

目录 前言 1、先从一门语言开始 2、两个语言的区别 3、两个语言的测试栈技术 4、如何选择两种语言&#xff1f; 总结 前言 对于工作多年的从业者来说&#xff0c;同时掌握java和Python两门语言再好不过&#xff0c;可以大大增加找工作时的选择范围。但是对于转行的人或者…

Spring Cloud Gateway集成Nacos实现负载均衡

&#x1f4a1;Nacas可以用于实现Spring Cloud Gateway中网关动态路由功能&#xff0c;也可以基于Nacos来实现对后端服务的负载均衡&#xff0c;前者利用Nacos配置中心功能&#xff0c;后者利用Nacos服务注册功能。接下来我们来看下Gateway集成Nacos实现负载均衡的架构图一. 环境…

央行数据-一款查逆回购 LPR 货币供应量 资产负债表 Shibor 数据的专业工具

自己开发的APP, App Store搜索"央行数据" 即可下载欢迎大家下载,给修改意见逆回购、正回购、MLF、票据&#xff0c;俗称央行发钱房贷基准利率多少? M2/M1/M0, 资产负债表,Shibor 了解下这款APP是经济,投资理财,股市,房价分析参考利器适用于关注经济、货币政策的用户…

Spring Cloud Alibaba环境搭建

环境依赖 SpringCloud Alibaba 依赖 Java环境来运行。还需要为此配置 Maven环境&#xff0c;请确保是在以下版本环境中安装使用: 1. 64 bit JDK 1.8&#xff1b;下载& 配置。 1.8.0_131 2. Maven 3.2.x&#xff1b;下载& 配置搭建微服务 1.建立微服务项目 1.idea通过…

Python编程 动态爱心

作者简介&#xff1a;一名在校计算机学生、每天分享Python的学习经验、和学习笔记。 座右铭&#xff1a;低头赶路&#xff0c;敬事如仪 个人主页&#xff1a;网络豆的主页​​​​​​ 目录 前言 一.所用库 1.random简介 2.math 简介 3.tkinter库的简介 二.实际图 三.…

机器学习笔记之生成模型综述(五)重参数化技巧(随机反向传播)

机器学习笔记之生成模型综述——重参数化技巧[随机反向传播]引言回顾神经网络的执行过程变分推断——重参数化技巧重参数化技巧(随机反向传播)介绍示例描述——联合概率分布示例描述——条件概率分布总结引言 本节将系统介绍重参数化技巧。 回顾 神经网络的执行过程 上一节…

android 混淆后的异常信息 工具处理

我们可以利用SDK中tools下的proguardgui.bat工具和混淆对应文档进行反混淆处理 D:\Android\sdk\tools\proguard\bin\proguardgui.bat 工具在SDK中的位置&#xff0c;有的SDK版本不同这个工具的具体位置可能有改变&#xff0c;也可以在tools中直接搜索proguardgui.bat&#x…

【并发编程】【2】进程与线程

并发编程 2.进程与线程 2.1 进程与线程 进程 程序由指令和数据组成&#xff0c;但这些指令要运行&#xff0c;数据要读写&#xff0c;就必须将指令加载至 CPU&#xff0c;数据加载至内存。在 指令运行过程中还需要用到磁盘、网络等设备。进程就是用来加载指令、管理内存、管…

MQ技术选型

RocketMQ部署架构图NameServer&#xff1a;主要是对元数据的管理&#xff0c;包括Topic和路由信息的管理&#xff0c;底层由netty实现&#xff0c;是一个提供路由管理、路由注册和发现的无状态节点&#xff0c;类似于ZooKeeperBroker&#xff1a;消息中转站&#xff0c;负责收发…

chatGPT 配合excel /maxscript使用_初试

EXCEL 配合chatGPT方法一&#xff1a;利用excel的開發工具1打開excel的VB编辑器&#xff1a;如果頂部菜單上沒有看到開發工具&#xff0c;需要先按下面步驟打開開發工具&#xff1a;這樣按上面步驟就可以打開excel的開發工具~ 然後點擊VB~ 2让chatGPT帮忙写脚本在chatGPT上提问…

智慧校园人脸识别系统源码

智慧校园人脸识别系统源码 技术架构&#xff1a; 后端&#xff1a;Java 框架&#xff1a;springboot 前端页面&#xff1a;vue element-ui 小程序&#xff1a;小程序原生开发 电子班牌&#xff1a;Java Android 源码有演示&#xff0c;可正常上线运营可授权。 随着智慧校…

对撞双指针(一) 盛水最多的容器

描述 给定一个数组height&#xff0c;长度为n&#xff0c;每个数代表坐标轴中的一个点的高度&#xff0c;height[i]是在第i点的高度&#xff0c;请问&#xff0c;从中选2个高度与x轴组成的容器最多能容纳多少水 1.你不能倾斜容器 2.当n小于2时&#xff0c;视为不能形成容器&…

【Java 面试合集】HashMap中为什么引入红黑树,而不是AVL树呢

HashMap中为什么引入红黑树&#xff0c;而不是AVL树呢1. 概述 开始学习这个知识点之前我们需要知道&#xff0c;在JDK1.8 以及之前&#xff0c;针对HashMap有什么不同。 JDK 1.7的时候&#xff0c;HashMap的底层实现是数组 链表JDK1.8的时候&#xff0c;HashMap的底层实现是数…

秒杀项目的消息推送

目录 一、创建消费者 二、创建订单链路配置 1.定义RabbitMQ配置类 2.创建RabbitmqOrderConfig配置类 三、如何实现RabbitMQ重复投递机制 1.开启发送者消息确认模式 2.消息发送确认 ① 创建ConfirmCallBacker确认模式 ② 创建ReturnCallBack退回模式 3.创建生产者 …

零信任-Akamai零信任介绍(6)

​Akamai零信任介绍 Akamai是一家专注于分布式网络服务的公司&#xff0c;它提供了一系列的互联网内容和应用加速服务。关于Akamai的零信任&#xff0c;它指的是Akamai的安全架构中不存在任何一个环节是可以被单独的控制或影响的&#xff0c;因此可以提供更高的安全性。通过使…

Utkuici:一款功能强大的Nessus自动化任务实现工具

关于Utkuici 今天&#xff0c;随着信息技术系统的普及&#xff0c;网络安全领域的投资已大幅增加。各种规模的组织都需要进行漏洞管理、渗透测试和各种分析&#xff0c;以准确确定各自机构受网络威胁的影响程度。通过借助漏洞管理工具的行业领先者Tenable Nessus&#xff0c;我…

ChatGPT提示语编写指南

ChatGPT AI 对话模型自 2022 年 11 月下旬开始可用&#xff0c;此后用户一直在探索聊天机器人的局限性和功能。 然而&#xff0c;OpenAI 也在不断地进行调整&#xff0c;因此 ChatGPT 处于不断变化的状态。 但是我们在这个小指南中描述的提示应该是永恒的。 要获得想要的结果&…