Reactor 模式全解:实现非阻塞 I/O 多路复用

news/2024/4/29 6:49:13/文章来源:https://blog.csdn.net/fofcn/article/details/136970946

4302f48a32e14b49bdb6314e2b0af1ba.png

8d21465ea8f14802aced98dcb5f9bb43.png

6ed7c2d2f8f64ce898ef83d0c1aed539.png

Reactor网络模式是什么?

Reactor网络模式时目前网络最常用的网络模式。如果你使用Netty,那么你在使用Reactor;如果你使用Twisted,那么你子啊使用Reactor;如果你使用netpoll,那么你在使用Reactor。

这里先给出答案:
Reactor = I/O多路复用+非阻塞I/O。

什么是I/O多路复用?

我们还是先使用文字拆解来看看每个词是什么意思吧。

拆词解释

I/O

I/O表示输入和输出,英文为Input/Output。I为输入,O为输出。我们日常编程中操作最多的无非就是网络和文件了,这两类就属于I/O,我们通常称为网络I/O和文件I/O。

下面是两个Java和Go操作文件I/O的例子:
java按行读取文件:

  try (BufferedReader br = new BufferedReader(new FileReader(fileName))) {String line;while ((line = br.readLine()) != null) {System.out.println(line);}} catch (IOException e) {e.printStackTrace();}

 

Go按行读取文件:

    file, _ := os.Open(fileName)defer file.Close()reader := bufio.NewReader(file)for {line,err := reader.ReadString('\n')if err != nil {break}fmt.Print(line)

 

好的,I/O搞清楚了我们就是搞清楚多路。

多路

多路字面意思就是多条路,放在计算机网络编程中的话,一般是指多个通道或者数据源。比如:你的进程或者需要打开很多的文件或者有很多网络连接,并监控这些文件或者网络连接是否发生变化(也就是是否产生一些事件)以进行必要的处理。

复用

复用的字面意思就是重复使用。我们把多路放到计算机网络编程中的话,一般是指重复使用一个或者几个线程,这里的关键是线程一定要很少而且重复使用它来完成I/O+多路。
总的来说就是:重复使用一个或者几个献策会给你来完成多路I/O的变化(事件)处理。

给I/O多路复用下个定义

好了,有了以上的背景或许你已经对I/O多路复用有了自己的理解和定义。这儿我根据自己的理解来对I/O多路复用进行定义:
I/O多路复用就是使用一个或者几个进程或者线程来完成大量通道或者数据源的事件监控和处理。

I/O多路复用复用了什么?

复用了线程。
在没有I/O多路复用之前,可能客户端每创建一个连接服务端都需要新建一个线程来处理事件,这样10K个客户端连接就需要10K个线程,服务端应对这些连接很吃力,因为创建线程有开销,切换线程有开销,还有同步,锁,死锁等问题。

那有了I/O多路复用之后,服务端可能1个线程就可以应对10K个连接的事件。

一般使用什么技术实现I/O多路复用

I/O多路复用技术实现依赖于操作系统,但是主流操作系统都是支持,下面是三大操作系统对I/O多路复用的支持:

  1. Linux: 这个是目前的大哥,Linux使用epoll,当然还有select, poll,目前网络上基本都用epoll
  2. MacOS: Kqueue
  3. Windows: IOCP I/O完成端口

I/O多路复用就告一段落,看下非阻塞I/O。

什么是非阻塞I/O

非阻塞I/O是相对于阻塞I/O而言的,它们之间的区别就是你进行I/O操作时是否阻塞你后续的执行。非阻塞不会阻塞后续执行,而阻塞会。这就好比:
你用某App网上下单到店取一样。假设你直接到店里面用手机下单,你必须在店里等待食物准备好。在这个过程中,你不能去做其他任何事情,直到拿到东西后,你才离开。

而非阻塞I/O就像是你网上下单起手配送,在起手配送期间你可以和你的朋友或者同时唠唠嗑,等骑手把东西送到给你打电话的时候你就下去拿。

总的来说:阻塞I/O中的程序在等待I/O完成时会一直停留在相应操作上,不会执行后续的代码。与之相反,在非阻塞I/O模式下,程序会立即返回一个状态值,如果I/O尚未完成,则程序可以继续执行其他任务,然后随后再次检查I/O状态。
阻塞I/O: 死等
非阻塞I/O:立即返回,下次重试

I/O多路复用和非阻塞I/O组合到一起擦出什么样的火花?

Reactor设计模式结合了非阻塞I/O和I/O多路复用,使得单个线程就能高效地处理多个网络通信。这种结合擦出的“火花”就是使事件驱动的网络服务器变得可能,这种服务器可以以非常轻量级的方式支持大规模并发连接。

在Reactor模式中,一个中央分派器(Reactor)负责监听所有I/O事件(使用select、poll、epoll等系统调用),并且当某个事件发生时,它将调用预先注册的回调函数来处理这些事件。由于采用了非阻塞I/O,这个中央分派器在等待I/O事件时不会被阻塞,这使得它可以在任何给定时间处理上千甚至上万个不同的I/O请求。
下面是单线程的Reactor模型:

单线程Reactor模式

e47b5644f5874f69998ce30f1f2099e7.png

快速实现一个Reactor

  1. 代码关键点1:Reactor线程创建一个事件循环(可能这会勾起你想起Netty的Boss)
 // 创建线程,执行事件循环pthread_t accept_threads[2];for (int i = 0; i < 1; i++) {printf("create acceptor thread. index: %d\n", i);// run_event_loop为事件的处理函数,循环处理pthread_create(&accept_threads[i], NULL, run_event_loop, &server_fd);pthread_detach(accept_threads[i]);}

 

  1. 代码关键点2: 事件处理线程(可能这会勾起你想起Netty的Worker)
// 事件发生后的处理函数: handle_io_event
pthread_create(&worker_thread, NULL, handle_io_event, &client_fd);

 

  1. 代码关键点3: 非阻塞I/O
// 设置文件描述符为非阻塞I/O
fcntl(client_fd, F_SETFL, fcntl(client_fd, F_GETFL, 0) | O_NONBLOCK);// 设置非阻塞
fcntl(server_fd, F_SETFL, fcntl(server_fd, F_GETFL, 0) | O_NONBLOCK);

 

  1. 完整代码
#include <stdio.h>
#include <pthread.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#include <signal.h>#define PORT 12345
#define MAX_EVENTS 10
#define BUFF_SIZE 1024
#define WORKER_SIZE 4// epoll file descriptor
int epoll_fd;// handlers
void* run_event_loop(void* arg);
void* handle_io_event(void* arg);
void wait_to_death();int main() {int server_fd;struct sockaddr_in server_addr;// 创建serverserver_fd = socket(AF_INET, SOCK_STREAM, 0);// 设置非阻塞fcntl(server_fd, F_SETFL, fcntl(server_fd, F_GETFL, 0) | O_NONBLOCK);// 绑定memset(&server_addr, 0, sizeof(server_addr));server_addr.sin_family = AF_INET;server_addr.sin_addr.s_addr = htonl(INADDR_ANY);server_addr.sin_port = htons(PORT);printf("binding\n");bind(server_fd, (struct sockaddr *)&server_addr, sizeof(server_addr));printf("listen\n");// 监听listen(server_fd, MAX_EVENTS);printf("epoll create\n");// epoll创建epoll_fd = epoll_create1(0);struct epoll_event event;event.events = EPOLLIN;event.data.fd = server_fd;printf("epoll add\n");epoll_ctl(epoll_fd, EPOLL_CTL_ADD, server_fd, &event);// 创建线程,执行事件循环pthread_t accept_threads[2];for (int i = 0; i < 1; i++) {printf("create acceptor thread. index: %d\n", i);pthread_create(&accept_threads[i], NULL, run_event_loop, &server_fd);pthread_detach(accept_threads[i]);}wait_to_death();close(epoll_fd);close(server_fd);return 0;
}void* run_event_loop(void* arg) {int server_fd = *(int*)arg;while (1) {struct epoll_event events[MAX_EVENTS];int n = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);for (int i = 0; i < n; i++) {if (events[i].data.fd == server_fd) {// 新连接int client_fd = accept(server_fd, NULL, NULL);// 设置非阻塞fcntl(client_fd, F_SETFL, fcntl(client_fd, F_GETFL, 0) | O_NONBLOCK);// worker线程负责处理这个事件pthread_t worker_thread;pthread_create(&worker_thread, NULL, handle_io_event, &client_fd);}}}
}void* handle_io_event(void* arg) {int client_fd = *(int*)arg;while (1) {char buff[BUFF_SIZE] = {0};int len = read(client_fd, buff, BUFF_SIZE);if (len <= 0) {close(client_fd);struct epoll_event event;event.events = EPOLLIN;event.data.fd = client_fd;epoll_ctl(epoll_fd, EPOLL_CTL_DEL, client_fd, &event);break;}else {printf("Received %s from client\n", buff);}}return NULL;
}void wait_to_death() {sigset_t allset;sigemptyset(&allset);sigaddset(&allset, SIGINT); // Ctrl+Csigaddset(&allset, SIGQUIT); // Ctrl+\int sig;for (;;) {int err = sigwait(&allset, &sig);if (err == 0) {printf("received signal %d, prepare to exit\n", sig);break;}}
}

 

搞定收工,如有错误请指正,谢谢

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_1028289.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

K8S之DaemonSet控制器

DaemonSet控制器 概念、原理解读、应用场景概述工作原理典型的应用场景介绍DaemonSet 与 Deployment 的区别 解读资源清单文件实践案例 概念、原理解读、应用场景 概述 DaemonSet控制器能够确保K8S集群所有的节点都分别运行一个相同的pod副本&#xff1b; 当集群中增加node节…

华为升级FIT AP示例(通过AC的命令行)

升级FIT AP示例&#xff08;通过AC的命令行&#xff09; 前提条件 从官网下载升级目标版本对应的系统软件包&#xff0c;保存在PC本地。如果下载的文件是压缩文件&#xff0c;则需要解压缩出系统软件包。 AP已在WAC上线。 背景信息 升级的过程是先将系统软件包传到设备上&…

数据结构基础(三)链表

链表&#xff08;Linked List&#xff09;是一种常见的线性数据结构&#xff0c;由一系列称为节点&#xff08;Node&#xff09;的元素组成&#xff0c;每个节点包含两部分&#xff1a;数据&#xff08;Data&#xff09;和指向下一个节点的引用&#xff08;Pointer 或者 Link&a…

STM32CubeMX学习笔记27---FreeRTOS事件

一、简介 1、 基本概念 事件是一种实现任务间通信的机制&#xff0c;主要用于实现多任务间的同步&#xff0c;但事件通信只能是事件类型的通信&#xff0c;无数据传输。 与信号量不同的是&#xff0c;它可以实现一对多&#xff0c;多对多的同步。即一个任务可以等待多个事件的…

CentOS使用Docker部署Halo并结合内网穿透实现公网访问本地博客

文章目录 1. Docker部署Halo1.1 检查Docker版本如果未安装Docker可参考已安装Docker步骤&#xff1a;1.2 在Docker中部署Halo 2. Linux安装Cpolar2.1 打开服务器防火墙2.2 安装cpolar内网穿透 3. 配置Halo个人博客公网地址4. 固定Halo公网地址 本文主要介绍如何在CentOS 7系统使…

C语言例4-33:求调和级数中第多少项的值大于10

代码如下&#xff1a; //求调和级数中第多少项的值大于10 //调和级数的第n项为11/21/3...1/n #include<stdio.h> #define LIMIT 10 int main(void) {int n1;float sum0.0;for(;;) //死循环&#xff0c;或者while&#xff08;1&#xff09;{sumsum1.0/n;if(sum&g…

GitLab更新失败(Ubuntu)

在Ubuntu下使用apt更新gitlab报错如下&#xff1a; An error occurred during the signature verification.The repository is not updated and the previous index files will be used.GPG error: ... Failed to fetch https://packages.gitlab.com/gitlab/gitlab-ee/ubuntu/d…

Solidity Uniswap V2 Router swapTokensForExactTokens

最初的router合约实现了许多不同的交换方式。我们不会实现所有的方式&#xff0c;但我想向大家展示如何实现倒置交换&#xff1a;用未知量的输入Token交换精确量的输出代币。这是一个有趣的用例&#xff0c;可能并不常用&#xff0c;但仍有可能实现。 GitHub - XuHugo/solidit…

elasticsearch 8.12+kibana 8.12

准备工作&#xff1a;1.下载相关的安装包放到/usr/local/ES下面 elasticsearch下载地址:Download Elasticsearch | Elastic elasticsearch-head-master下载地址:https://github.com/mobz/elasticsearch-head/archive/master.zip node下载地址:Index of /dist/ kibana地址:Downl…

设计模式之桥接模式解析

桥接模式 1&#xff09;概述 1.定义 桥接模式(Bridge Pattern) 将抽象部分与它的实现部分分离&#xff0c;使它们都可以独立地变化。 2.作用 如果系统中某个类存在两个独立变化的维度&#xff0c;通过该模式可以将这两个维度分离出来&#xff0c;使两者可以独立扩展。 3.…

(一)基于IDEA的JAVA基础5

Scanner的使用 使用scanner可以接收键盘上输入的数据&#xff0c; Scanner inputnew Scanner(System.in)&#xff1b; 导包的方式: 什么是导包&#xff0c;导入的是jdk提供的java开发工具包&#xff0c;我们建一个java文件&#xff0c;psvm快捷输入后&#xff0c;打上new S…

静态住宅IP优缺点,究竟要怎么选?

在进行海外 IP 代理时&#xff0c;了解动态住宅 IP 和静态住宅 IP 的区别以及如何选择合适的类型非常重要。本文将介绍精态住宅 IP 特点和&#xff0c;并提供选择建议&#xff0c;帮助您根据需求做出明智的决策。 静态住宅 IP 的特点 静态住宅 IP 是指 IP 地址在一段时间内保…

论文研读:Transformers Make Strong Encoders for Medical Image Segmentation

论文&#xff1a;TransUNet&#xff1a;Transformers Make Strong Encoders for Medical Image Segmentation 目录 Abstract Introduction Related Works 各种研究试图将自注意机制集成到CNN中。 Transformer Method Transformer as Encoder 图像序列化 Patch Embed…

47 vue 常见的几种模型视图不同步的问题

前言 这里主要是来看一下 关于 vue 中的一些场景下面 可能会出现 模型和视图 不同步更新的情况 然后 这种情况主要是 vue 中的对象 属性没有响应式的 setter, getter 然后 我们这里就来看一下 大多数的情况下的一个场景, 和一些处理方式 当然 处理方式主要是基于 Vue.set, …

书生浦语训练营2期-第一节课笔记

笔记总结: 了解大模型的发展方向、本质、以及新一代数据清洗过滤技术、从模型到应用的典型流程、获取数据集的网站、不同微调方式的使用场景和训练数据是什么&#xff0c;以及预训练和微调在训练优势、通信/计算调度、显存管理上的区别。 收获&#xff1a; 理清了预训练和微调…

【优选算法】专题1 -- 双指针 -- 复写0

前言&#xff1a; 补充一下前文没有写到的双指针入门知识&#xff1a;专题1 -- 双指针 -- 移动零 目录 基础入门知识&#xff1a; 1. 复写零&#xff08;easy&#xff09; 1. 题⽬链接&#xff1a;1089.复习0 - 力扣&#xff08;LeetCode&#xff09; 2. 题⽬描述&#xff…

windwos权限维持

1.php 不死马权限维持 <?php ignore_user_abort(); //关掉浏览器&#xff0c;PHP脚本也可以继续执行. set_time_limit(0);//通过set_time_limit(0)可以让程序无限制的执行下去 $interval 5; // 每隔*秒运行 do { $filename test.php; if(file_exists($filename)) { echo…

iOS网络抓包工具全解析

摘要 本文将深入探讨iOS平台上常用的网络抓包工具&#xff0c;包括Charles、克魔助手、Thor和Http Catcher&#xff0c;以及通过SSH连接进行抓包的方法。此外&#xff0c;还介绍了克魔开发助手作为iOS应用开发的辅助工具&#xff0c;提供的全方面性能监控和调试功能。 在iOS应…

一小时学习redis!

redis 基于内存的数据存储系统 三种使用方式 redis优势 安装redis 最后一种方式只能得到5.0的redis版本 比较老&#xff01; 启动redis redis-server.exe 命令 停止ctrlc或关闭 启动客户端 redis-cli redisinsight安装 字符串 redis区分大小写 默认使用字符串存储 二进制…

2024年,如何实现高效的自动化渗透测试?

随着当前网络安全威胁的不断扩展与升级&#xff0c;开展渗透测试工作已经成为广大企业组织主动识别安全漏洞与潜在风险的关键过程。然而&#xff0c;传统的人工渗透测试模式对测试人员的专业能力和经验水平有很高的要求&#xff0c;企业需要投入较大的时间和资源才能完成。在此…