Linux 进程通信:匿名管道、实现进程池

news/2024/4/28 15:01:36/文章来源:https://blog.csdn.net/m0_73800602/article/details/136853301

目录

 一、进程间通信

1、 为什么需要进程通信

2、发展和分类

二、管道

1、概念

2、特点

2、复制并共享

3、用fork来共享管道原理

4、站在文件描述符角度-深度理解管道

5、站在内核角度-管道本质

三、匿名管道

1、概念 

2、创建

3、snprintf 

4、父子进程中进行单向通信 

四、实现简单的进程池模型

Task.hhp:任务函数

1、全局变量

2、任务函数

3、初始化函数

4、辅助函数

ProcessPool.cc:进程池

1、初始化任务和进程槽

2、创建子进程和管道

3、waitCommand函数 

4、进行通信&执行任务

5、sendAndWakeup() 函数

6、父进程派发任务

运行示例


 一、进程间通信

1、 为什么需要进程通信

进程运行的独立性意味着它们在默认情况下是隔离的,使得进程间通信成为一项挑战。进程间通信(IPC)的核心目的是允许不同的进程访问共享资源,例如内存空间,以便于交流、控制和协同工作。进行进程间通信的主要动机包括:

  • 数据传输:实现数据的有效传递,允许一个进程将其数据发送至另一进程,促进信息的共享与处理。
  • 资源共享:通过允许多个进程访问相同的系统资源,优化资源的使用效率和系统性能。
  • 通知事件:使进程能够发送消息通知其他进程某些事件的发生,例如,一个进程在终止时需要告知其父进程。
  • 进程控制:支持特定进程(如调试进程)对其他进程进行控制,包括拦截异常、监视状态变化等,以实现更细致的系统管理和错误调试。

进程间通信是现代操作系统中不可或缺的一部分,它不仅增强了进程之间的协作能力,也提升了系统的整体效率和灵活性。 

2、发展和分类

管道、System V进程间通信(IPC)和POSIX进程间通信是操作系统中实现不同进程之间进行数据交换与同步的三种主要机制。

管道(Pipes)

  • 管道是一种最早的进程间通信机制,最初出现在Unix系统中。它允许两个相关进程之间通过一个单向或双向的数据通道传递字节流。在Linux环境下,有两种类型的管道:
  • 匿名管道(Anonymous Pipes):通常用于父子进程之间的通信,由pipe()系统调用创建,不具有文件系统的接口,生命周期依赖于创建它的进程。
  • 命名管道(FIFOs, Named Pipes):也称为命名队列,它是一个存在于文件系统中的特殊文件,任何知道其路径名的进程都可以打开并使用它进行通信,由mkfifo()系统调用创建。

System V 进程间通信 (IPC)

  • System V IPC 是一套较为复杂的进程间通信方法,主要用于多进程间的协作。它包括以下几种形式:
  • 消息队列(Message Queues):提供了一种异步通信方式,进程可以通过发送和接收消息来交换数据,消息队列可以保证消息的顺序和可靠传输。
  • 共享内存(Shared Memory Segments):允许多个进程直接访问同一块物理内存区域,从而实现高速的数据共享。
  • 信号量(Semaphores):提供了一种进程间同步手段,用于控制对共享资源的并发访问,避免竞态条件。

POSIX 进程间通信

  • POSIX(Portable Operating System Interface for UNIX)定义了一系列跨平台的标准API,为进程间通信提供了更为一致且易于移植的解决方案,主要包括:
  • 消息队列(POSIX Message Queues):类似于System V的消息队列,但接口更符合POSIX标准,旨在提高可移植性。
  • 共享内存(POSIX Shared Memory Objects):同样基于内存共享原理,但使用了不同的API如shm_open()mmap()等,提供了更多的灵活性。
  • 信号量(POSIX Semaphores):不同于System V的信号量,POSIX信号量提供了更统一的操作接口,可通过sem_open()sem_post()sem_wait()等函数进行操作。
  • 此外,POSIX还引入了其他同步机制,如互斥量(Mutexes)、条件变量(Condition Variables)、读写锁(Read-Write Locks),这些不仅适用于进程间同步,也是线程同步的重要工具。

二、管道

1、概念

管道是Unix系统中最早的进程间通信机制,其概念可以被形象地描述为连接两个进程之间的一个数据流通道。在操作系统内部实现上,管道本质上是一种特殊的内核管理的临时存储区,具有类似文件的行为特征,但与普通文件的关键区别在于,管道不需要将数据持久化到磁盘,而是在内存中缓冲和传输。

具体实现时,一个管道拥有两端:读端(入口)和写端(出口)。为了创建并使用管道进行通信,通常采取以下步骤:

  1. 父进程通过调用特定的系统调用如pipe()来创建一个管道,这会返回一对文件描述符,分别对应管道的读端和写端。
  2. 父进程接着执行fork()系统调用以创建子进程,此时,由于父子进程共享同一文件描述符表,因此双方都能访问到这个刚创建的管道资源。
  3. 为了确保正确的数据流向,父进程关闭它不需要的管道读端(如果它只负责写入),子进程则关闭其不需要的管道写端(如果它只负责读取)。

2、特点

  1. 面向血缘进程通信:管道主要用于亲缘进程(如父子进程)之间的数据交换,通过调用pipe()系统调用创建,并在fork()后由父子进程共享同一管道资源。

  2. 访问控制与同步:管道的读写两端提供了自然的访问控制,确保了有序的数据流传递。当一个进程在管道的某一端进行读取或写入时,其他进程必须遵循适当的同步规则,以避免数据冲突或阻塞。

  3. 流式通信服务:管道提供的是面向字节流的通信服务,即数据是以连续的、无结构的字节序列进行传输的,协议相对简单且透明。

  4. 基于文件实现:尽管概念上管道类似于文件,但它是内核管理的临时内存区域,生命周期与关联进程紧密相关,随进程结束而自动清理。不同于磁盘上的普通文件,管道内容不会持久化存储。

  5. 半双工通信:管道通常只能单向传输数据,也就是说,它是一个半双工通信通道,同一时刻只允许单向的数据流动,要么从写端流向读端,要么反之。

    1. 需要双方通信时,需要建立起两个管道

  6. 管道容量与读写行为

    • 当写入速度超过读取速度时,管道内部的缓冲区会逐渐填满,直至达到上限,此时继续写入会导致写进程阻塞,直到有足够的空间。
    • 反之,如果读取速度快于写入速度,当管道中的数据被完全读取后,读进程将阻塞等待新的数据到来。
    • 如果写端关闭而读端仍在读取,当所有已写入的数据都被读出后,读取操作会返回0值,表示达到了逻辑上的“文件结尾”。
    • 若读端先关闭,则写进程在尝试写入管道时,由于没有读者,操作系统可能会发送SIGPIPE信号终止写进程。

2、复制并共享

在创建子进程时,操作系统执行了一个“复制并共享”的过程 

 在Linux操作系统,当通过fork()函数创建子进程时,子进程会继承父进程的文件描述符表。文件描述符(file descriptor)是一个指向内核中文件表项的索引,用于标识进程中打开的文件。每个进程都有自己的文件描述符表,但是这些文件描述符指向的是同一个内核文件表中的条目。

这里有几个关键点需要注意:

  1. 文件描述符的继承:当fork()被调用时,子进程获得父进程文件描述符表的副本。这意味着在fork()调用时,父进程中打开的文件在子进程中也会处于打开状态,且具有相同的文件描述符号。因此,父子进程可以共享打开的文件状态,如当前文件偏移量(file offset)和文件打开模式(例如,读、写)。

  2. 共享内核文件表:父子进程中的同一文件描述符虽然各自存在于各自的文件描述符表中,但它们实际上指向的是同一个内核级别的文件对象。这意味着对文件的操作会影响到所有引用该文件的进程。例如,如果父进程在一个文件描述符上进行了读取或写入操作,那么文件内部的偏移量将同时影响子进程在同一文件描述符上的读取结果。

  3. 文件的实际复制并不发生:重要的是要理解,子进程创建时,并没有对打开的文件数据进行物理复制。相反,复制的是文件描述符表的条目,这些条目指向内核中的文件表。这种机制是高效的,因为它避免了不必要的数据复制。

  4. 独立操作文件描述符:尽管父进程和子进程共享打开的文件,但他们可以独立地操作自己的文件描述符。例如,子进程可以关闭或改变它继承的文件描述符指向的文件的某些属性,而不会影响父进程。然而,对于共享的文件本身(如文件偏移量),更改会影响到所有拥有该文件描述符的进程。

3、用fork来共享管道原理

 

  • 当一个进程调用了 pipe 函数之后,它就会得到一对文件描述符(fd[0](读) 和 fd[1](写))用于访问管道。如果此时该进程又调用了 fork 函数创建了一个子进程,那么这个子进程也会继承这对文件描述符。
  • 在这个过程中,父子进程共享同一个管道,也就是说它们都可以通过这两个文件描述符来访问管道。但是,每个进程只能看到自己打开的那部分管道,l例如:父进程只能看到管道的写端,而子进程只能看到管道的读端。
  • 这样设计的原因是为了保证数据的安全性。因为管道是一个共享的数据结构,如果多个进程同时对它进行操作,就可能会出现数据冲突的问题。因此,操作系统规定,每个进程只能看到自己打开的那一部分管道,从而避免了这种问题的发生。
  • 另外,当一个进程不再需要使用某个文件描述符时,它可以将其关闭。这样做的好处是可以释放相应的系统资源,提高系统的性能。

4、站在文件描述符角度-深度理解管道

图中展示的是一个父进程通过 fork 创建子进程,并且使用管道进行通信的过程。

  • 首先,父进程创建了一个管道,这个管道有两个文件描述符:读端和写端。然后,父进程 fork 出了子进程。在 fork 的时候,子进程会继承父进程的所有资源,包括管道的两个文件描述符。
  • 接着,父进程关闭了管道的读端(fd[0]),而子进程则关闭了管道的写端(fd[1])。这样做的目的是为了确保只有父进程可以向管道写入数据,而只有子进程可以从管道读取数据。
  • 最后,父进程和子进程就可以通过管道进行通信了。父进程可以通过 write 函数将数据写入到管道的写端,而子进程则可以通过 read 函数从管道的读端读取数据。由于管道是全双工的,所以父进程也可以从管道读取数据,而子进程也可以向管道写入数据。

5、站在内核角度-管道本质

在图中,我们看到有两个进程分别对同一个文件进行了读写操作。在这个过程中,内核需要知道这个文件的相关信息,以便正确地处理这些操作。这就是inode的作用,它提供了所有必要的信息,让内核能够正确地处理文件的操作。

 

在图中,我们看到有两个进程通过管道进行通信。从 Linux 内核角度来看,管道是一种特殊的文件,它由内核维护,可以在不同的进程之间传递数据。管道的本质就是一个内存缓冲区,它被映射到了所有使用它的进程的地址空间中。它允许两个进程共享一个缓冲区,从而实现在不同进程中传递数据的功能。

  • 当一个进程想要向管道写入数据时,它会先检查管道的缓冲区是否已满。如果缓冲区未满,则可以直接将数据写入缓冲区;否则,就需要等待其他进程从管道中读取数据,直到缓冲区有空闲的空间为止。
  • 当另一个进程想要从管道中读取数据时,它会先检查管道的缓冲区是否有数据。如果有数据,则可以直接从缓冲区中读取;否则,就需要等待其他进程向管道中写入数据,直到缓冲区中有数据为止。
  • 管道的读写操作都是原子性的,这意味着一次读或写操作要么全部完成,要么不完成。这样可以防止数据的丢失或者损坏。
  • 总的来说,管道是一种非常有用的进程间通信机制,它可以帮助不同进程之间的数据交换变得更加简单和高效。

三、匿名管道

1、概念 

匿名管道(Anonymous Pipe)是操作系统提供的一种简单的进程间通信机制,主要用于父子进程或者有直接亲缘关系的进程之间进行数据交换。它是一种半双工的通信方式,即数据只能单向流动,或从父进程流向子进程,或从子进程流向父进程。

特点:

  1. 内存中存在:匿名管道是在内存中开辟的一段缓冲区,而不是在文件系统中创建一个实际的文件对象。

  2. 无名称标识:与命名管道不同,匿名管道没有明确的名称标识,它由操作系统在创建时分配,并通过句柄(文件描述符)来引用和操作。

  3. 血缘关系限制:通常情况下,匿名管道只能在创建它的进程及其直接子进程中使用。也就是说,只有具有直接亲缘关系的进程才能共享同一匿名管道,其他无关进程无法访问。

  4. 读写模式:匿名管道中的数据传输遵循先进先出(FIFO)原则。一个进程负责写入数据到管道的一端,而另一个进程则从管道的另一端读取数据。

  5. 单向或双向通信:由于匿名管道的半双工特性,若要实现双向通信,需要创建两个管道,分别用于两个方向的数据传输。

  6. 阻塞行为

    • 当管道为空时,尝试从管道读取数据的操作会阻塞,直到有数据可读。
    • 当管道已满时,尝试向管道写入数据的操作也会阻塞,直到有足够的空间可供写入。

2、创建

pipe() 函数是Unix和类Unix系统(包括Linux)中的一个用于创建匿名(无名)管道的系统调用。匿名管道是一种简单的进程间通信机制,允许父子进程或相关联的进程之间进行单向或双向数据传输。

#include <unistd.h>int pipe(int fd[2]);

参数说明:

  • fd: 一个大小为2的整数数组,类型为 int[2]。这个数组由 pipe() 函数填充,并返回两个文件描述符。
    • fd[0]:指向管道的读端(Read End)。从这个文件描述符可以读取通过管道传递过来的数据。
    • fd[1]:指向管道的写端(Write End)。数据可以通过这个文件描述符写入到管道中,进而被连接到读端的进程读取。

功能描述: 当调用 pipe() 函数时,操作系统会在内存中创建一段缓冲区,作为管道的内部实现。任何进程将数据写入到管道的写端时,这些数据会暂时存储在缓冲区中,然后可以从管道的读端读取出来。

返回值:

  • 成功时,pipe() 函数返回0,并且已成功分配了两个文件描述符给 fd 数组。
  • 失败时,返回负值表示错误发生,错误原因可以通过 errno 获取。

使用注意事项:

  1. 管道是半双工的,即一次只能在一个方向上传输数据。虽然理论上可以创建两个管道来模拟全双工通信,但每个管道只支持单向数据流。
  2. 管道具有一定的容量限制,当写入端连续写入数据而读取端没有及时读取时,如果管道满载,后续的写操作将会阻塞,直到有空间可写。
  3. 当读取端关闭后,写入端继续写入数据时,将会收到SIGPIPE信号,通常默认行为是导致进程终止;当然,也可以捕获该信号并采取其他行动。
  4. 管道可用于父子进程间的通信,或者不同进程中需要同步和协作的部分。在多进程编程中,常结合 fork() 和 exec() 家族函数使用,以实现在多个进程间传递信息的目的。

3、snprintf 

学习使用管道前,先拓展一下会用到的函数:

#include <stdio.h>int snprintf(char *str, size_t size, const char *format, ...);

snprintf 是C语言标准库中的一个函数,用于格式化输出到字符串,与 printf 类似,但它的输出受限于指定的缓冲区大小,能够防止缓冲区溢出的安全风险。 

参数

  • str:指向目标缓冲区的指针,用于存放格式化后的字符串。
  • size:指定缓冲区 str 的大小(以字节为单位),包括结束符 \0 所需的空间。如果生成的字符串长度小于或等于 size-1,则在字符串末尾添加 \0 结束符;若生成的字符串过长,则按 size 字节截断,并确保仍能正确终止(即至少包含一个结束符\0)。
  • format:是一个格式字符串,其中可能包含转换说明符(如 %d%s 等),它们将与可变参数列表中的相应数据匹配并进行格式化。
  • ...:是可变参数列表,包含了与 format 中转换说明符相匹配的数据项。

返回值

  • 如果成功且未发生截断,返回实际写入 str 缓冲区的字符数(不包括结束符 \0)。
  • 如果发生截断,返回需要写入的总字符数(即使它大于 size 参数),此时字符串仍然会被适当地截断并在缓冲区中填充了 \0 结束符。
char buffer[50];
int value = 12345;
snprintf(buffer, sizeof(buffer), "The value is %d", value);// 如果buffer足够大,例如大于"The value is 12345" + '\0'所需的长度,则结果将是:
// buffer == "The value is 12345"// 如果buffer太小,例如只有5个字符,则结果可能是:
// buffer == "The v"
// 返回值为10,表示如果没有截断的话,完整的字符串应该是10个字符(包括'\0')。

4、父子进程中进行单向通信 

//Makefile
mypipe:mypipe.ccg++ -o $@ $^ #-DDEBUG
.PHONY:clean
clean:rm -f mypipe// 引入必要的头文件
#include <iostream>
#include <string>
#include <cstdio>
#include <cstring>
#include <cassert>
#include <unistd.h> // 提供fork、close、write、read等系统调用
#include <sys/types.h> // 定义pid_t等类型
#include <sys/wait.h> // 提供waitpid函数using namespace std;// 主要功能:创建管道并在父子进程中实现单向通信
int main()
{// 1. 创建管道,pipefd[0]是读端口,pipefd[1]是写端口int pipefd[2];int n = pipe(pipefd); // 创建无名管道assert(n != -1); // 断言检查管道创建是否成功(void)n; //消除编译器可能发出的未使用变量n的警告
#ifdef DEBUG// 输出调试信息,显示管道两端的文件描述符cout << "pipefd[0]: " << pipefd[0] << endl;cout << "pipefd[1]: " << pipefd[1] << endl;
#endif// 2. 创建子进程pid_t child_pid = fork(); // 调用fork创建子进程assert(child_pid != -1); // 断言检查fork是否成功if (child_pid == 0){// 子进程部分(读端)// 3.1 子进程关闭不需要的管道写端close(pipefd[1]);// 缓冲区,用于接收父进程发送的消息char buffer[8 * 1024];// 循环读取管道中的数据,直到读到文件结束符(表示父进程已关闭写端口)while (true){// 从管道读取数据,返回读取的字节数,如果没有数据则阻塞等待ssize_t bytes_read = read(pipefd[0], buffer, sizeof(buffer) - 1);if (bytes_read > 0){// 将读取的数据转为C风格字符串buffer[bytes_read] = '\0';// 输出接收到的消息cout << "Child [" << getpid() << "] received a message: " << buffer << endl;}else if (bytes_read == 0) // 读取到文件结束,父进程已关闭写端{cout << "Writer quit (Father), child quitting too!" << endl;break;}}// close(pipefd[0]); // 实际上在while循环条件中可以判断并在此处关闭读端// 子进程完成任务后退出exit(0);}else{// 父进程部分(写端)// 3.1 父进程关闭不需要的管道读端close(pipefd[0]);// 初始化一条要发送的消息string message = "我是父进程,我正在给你发消息";int count = 0;char send_buffer[8 * 1024];// 循环向管道写入消息,直到达到指定次数while (true){// 构造要发送的消息内容snprintf(send_buffer, sizeof(send_buffer), "%s[%d] : %d", message.c_str(), getpid(), count++);// 向管道写入数据write(pipefd[1], send_buffer, strlen(send_buffer));// 模拟延时,每次发送消息后等待1秒sleep(1);cout << count << endl;if (count == 5) // 发送指定数量的消息后退出循环{cout << "Writer quit (Father)" << endl;break;}}// 发送完毕所有消息后,关闭写端close(pipefd[1]);// 4. 父进程等待子进程结束,并获取其退出状态pid_t result = waitpid(child_pid, nullptr, 0); // 等待子进程结束cout << "Child PID: " << child_pid << ", Return Value from waitpid: " << result << endl;assert(result > 0); // 断言检查waitpid是否成功(void)result;//消除编译器可能发出的未使用变量的警告}return 0;
}
[hbr@VM-16-9-centos mypipe]$ ./mypipe 
child get a message[30197] Father# 我是父进程,我正在给你发消息[30196] : 0
1
child get a message[30197] Father# 我是父进程,我正在给你发消息[30196] : 1
2
child get a message[30197] Father# 我是父进程,我正在给你发消息[30196] : 2
3
child get a message[30197] Father# 我是父进程,我正在给你发消息[30196] : 3
4
child get a message[30197] Father# 我是父进程,我正在给你发消息[30196] : 4
5
writer quit(father)
writer quit(father), me quit!!!
id : 30197 ret: 30197

四、实现简单的进程池模型

Task.hhp:任务函数

Task.hpp 是一个头文件,定义了一系列任务(函数)和全局变量,用于定义和管理在进程池模型中执行的任务。通过全局的任务列表和描述,父进程可以根据索引分配任务给子进程执行。

下面我们来逐一分析每个部分:

#pragma once#include <iostream>
#include <string>
#include <vector>
#include <unordered_map>
#include <unistd.h>// 提供fork、close等系统调用
#include <functional>// 用于定义函数对象// 定义函数对象类型
typedef std::function<void()> func;std::vector<func> callbacks;// 存储回调函数的全局向量
std::unordered_map<int, std::string> desc;// 存储回调函数描述的哈希表// 示例回调函数
void readMySQL()
{std::cout << "sub process[" << getpid() << " ] 执行访问数据库的任务\n" << std::endl;
}void execuleUrl()
{std::cout << "sub process[" << getpid() << " ] 执行url解析\n" << std::endl;
}void cal()
{std::cout << "sub process[" << getpid() << " ] 执行加密任务\n" << std::endl;
}void save()
{std::cout << "sub process[" << getpid() << " ] 执行数据持久化任务\n" << std::endl;
}void load()// 加载回调函数到全局向量和描述符
{desc.insert({callbacks.size(), "readMySQL: 读取数据库"});callbacks.push_back(readMySQL);desc.insert({callbacks.size(), "execuleUrl: 进行url解析"});callbacks.push_back(execuleUrl);desc.insert({callbacks.size(), "cal: 进行加密计算"});callbacks.push_back(cal);desc.insert({callbacks.size(), "save: 进行数据的文件保存"});callbacks.push_back(save);
}void showHandler()// 显示回调函数列表
{for(const auto &iter : desc ){std::cout << iter.first << "\t" << iter.second << std::endl;}
}int handlerSize()// 获取回调函数数量
{return callbacks.size();
}

1、全局变量

 std::function是一个模板类,用于封装几乎任何可调用的实体,包括普通函数、Lambda表达式、函数对象以及成员函数指针。std::function的一个重要特性是其类型安全,同时提供了足够的灵活性来存储不同类型的可调用实体。 

typedef std::function<void()> func;

typedef std::function<void()> func; 这行代码定义了一个类型别名func。这里,std::function<void()>是一个特化形式,表示它可以封装任何没有参数并且返回void的可调用实体。

std::vector<func> callbacks;
std::unordered_map<int, std::string> desc;
  • std::vector<func> callbacks;:一个函数指针的向量,用于存储可执行的任务。这些任务在运行时被添加到向量中,并且可以通过索引来调用。
  • std::unordered_map<int, std::string> desc;:一个哈希表,用于存储任务的描述。键是任务在callbacks向量中的索引,值是对任务的文字描述。

2、任务函数

void readMySQL()
{std::cout << "sub process[" << getpid() << " ] 执行访问数据库的任务\n" << std::endl;
}void execuleUrl()
{std::cout << "sub process[" << getpid() << " ] 执行url解析\n" << std::endl;
}void cal()
{std::cout << "sub process[" << getpid() << " ] 执行加密任务\n" << std::endl;
}void save()
{std::cout << "sub process[" << getpid() << " ] 执行数据持久化任务\n" << std::endl;
}
  • 文件中定义了几个任务函数,例如readMySQLexeculeUrlcalsave。这些函数模拟了不同的任务,如访问数据库、解析URL、执行计算和数据持久化。每个函数都打印出它正在执行的任务和当前子进程的ID。

3、初始化函数

void load()
{desc.insert({callbacks.size(), "readMySQL: 读取数据库"});callbacks.push_back(readMySQL);desc.insert({callbacks.size(), "execuleUrl: 进行url解析"});callbacks.push_back(execuleUrl);desc.insert({callbacks.size(), "cal: 进行加密计算"});callbacks.push_back(cal);desc.insert({callbacks.size(), "save: 进行数据的文件保存"});callbacks.push_back(save);
}
  • void load():这个函数初始化任务列表和任务描述。它将每个任务函数添加到callbacks向量中,并且在desc哈希表中为每个任务添加一个描述。这样,每个任务都有一个唯一的索引和描述。

4、辅助函数

void showHandler()
{for(const auto &iter : desc ){std::cout << iter.first << "\t" << iter.second << std::endl;}
}int handlerSize()
{return callbacks.size();
}
  • void showHandler():遍历desc哈希表,并打印出所有任务的索引和描述。这个函数可以用来显示当前可用的任务列表。
  • int handlerSize():返回当前任务列表callbacks的大小,即可用任务的数量。

ProcessPool.cc:进程池

#include <iostream>
#include <vector>
#include <cstdlib>
#include <ctime>
#include <cassert>
#include <unistd.h>
#include <sys/wait.h>
#include <sys/types.h>
#include "Task.hpp"#define PROCESS_NUM 5using namespace std;int waitCommand(int waitFd, bool &quit) //如果对方不发,我们就阻塞
{uint32_t command = 0;ssize_t s = read(waitFd, &command, sizeof(command));if (s == 0){quit = true;return -1;}assert(s == sizeof(uint32_t));return command;
}void sendAndWakeup(pid_t who, int fd, uint32_t command)
{write(fd, &command, sizeof(command));cout << "main process: call process " << who << " execute " << desc[command] << " through " << fd << endl;
}int main()
{// 代码中关于fd的处理,有一个小问题,不影响我们使用,但是你能找到吗??load();// pid: pipefdvector<pair<pid_t, int>> slots;// 先创建多个进程for (int i = 0; i < PROCESS_NUM; i++){// 创建管道int pipefd[2] = {0};int n = pipe(pipefd);assert(n == 0);(void)n;pid_t id = fork();assert(id != -1);// 子进程我们让他进行读取if (id == 0){// 关闭写端close(pipefd[1]);// childwhile (true){// pipefd[0]// 等命令bool quit = false;int command = waitCommand(pipefd[0], quit); //如果对方不发,我们就阻塞if (quit)break;// 执行对应的命令if (command >= 0 && command < handlerSize()){callbacks[command]();}else{cout << "非法command: " << command << endl;}}exit(1);}// father,进行写入,关闭读端close(pipefd[0]); // pipefd[1]slots.push_back(pair<pid_t, int>(id, pipefd[1]));}// 父进程派发任务srand((unsigned long)time(nullptr) ^ getpid() ^ 23323123123L); // 让数据源更随机while (true){// 选择一个任务, 如果任务是从网络里面来的?int command = rand() %  handlerSize();// 选择一个进程 ,采用随机数的方式,选择进程来完成任务,随机数方式的负载均衡int choice = rand() % slots.size();// 把任务给指定的进程sendAndWakeup(slots[choice].first, slots[choice].second, command);sleep(1);}// 关闭fd, 所有的子进程都会退出for (const auto &slot : slots){close(slot.second);}// 回收所有的子进程信息for (const auto &slot : slots){waitpid(slot.first, nullptr, 0);}
}

1、初始化任务和进程槽

int main()
{load();vector<pair<pid_t, int>> slots;
  • 加载任务:通过调用load()函数,初始化全局的任务列表callbacks和任务描述desc
  • 定义进程槽:使用vector<pair<pid_t, int>> slots;定义一个容器来存储子进程ID和对应的管道写端文件描述符。

2、创建子进程和管道

for (int i = 0; i < PROCESS_NUM; i++)
{// 创建管道int pipefd[2] = {0};int n = pipe(pipefd);assert(n == 0);(void)n;pid_t id = fork();assert(id != -1);
  • 循环创建子进程:通过for循环,创建PROCESS_NUM个子进程。每次循环中,都会创建一个管道(pipe(pipefd)),用于父子进程间的通信。
    • 使用 assert(n == 0) 来确保管道创建成功(在Linux系统中,成功时返回0)
    • (void)n; 是为了Release模式下消除编译器可能产生的未使用变量警告。
  • 管道文件描述符pipefd[0]是管道的读端,pipefd[1]是管道的写端。
  • 使用 fork() 系统调用来创建一个新的进程。fork() 调用后,会生成一个与父进程几乎完全相同的子进程。
    • assert(id != -1); 来检查 fork() 是否成功执行,如果失败则 fork() 会返回-1,程序将终止执行并输出错误信息。

3、waitCommand函数 

waitCommand 函数是这段代码中自定义的一个用于从管道读取命令的函数,其作用是阻塞等待父进程通过管道发送过来的命令,并在接收到特定信号时决定是否退出循环。

int waitCommand(int waitFd, bool &quit) //如果对方不发,我们就阻塞
{uint32_t command = 0;ssize_t s = read(waitFd, &command, sizeof(command));if (s == 0){quit = true;return -1;}assert(s == sizeof(uint32_t));return command;
}

 参数:

  • int waitFd: 这是管道的读端文件描述符(如上文中的 pipefd[0]),用来从管道中读取数据。
  • bool &quit: 这是一个引用类型的布尔变量,用来标记子进程是否需要退出。

函数逻辑:

  • 首先定义一个整型变量 command 用于存储从管道读取到的命令。
  • 使用 read() 系统调用尝试从给定的管道读取指定大小的数据(这里是4字节,假设命令是一个无符号32位整数)。
  • 如果 read() 返回0,这意味着管道另一端关闭了连接,通常这表示父进程打算结束与子进程的通信,因此将 quit 设置为 true 并返回一个非正常值(这里为 -1),以指示子进程应该退出其任务执行循环。
  • 如果 read() 成功读取到了4个字节的数据(等于 sizeof(uint32_t)),则断言成功,并将读取到的命令作为整数值返回。

4、进行通信&执行任务

这段代码描述的是子进程和父进程各自执行的任务,基于之前创建的管道进行通信: 

if (id == 0)
{// 关闭写端close(pipefd[1]);// childwhile (true){   // 等命令bool quit = false;int command = waitCommand(pipefd[0], quit); //如果对方不发,我们就阻塞if (quit)break;// 执行对应的命令if (command >= 0 && command < handlerSize()){callbacks[command]();}else{cout << "非法command: " << command << endl;}}exit(1);
}
// father,进行写入,关闭读端
close(pipefd[0]); // pipefd[1]
slots.push_back(pair<pid_t, int>(id, pipefd[1]));

子进程部分if (id == 0)):

  • 关闭写端: 子进程中不需要向管道写入数据,所以它会关闭管道的写端 close(pipefd[1])
  • 循环等待命令: 子进程进入无限循环,不断从管道读端口 pipefd[0] 等待接收来自父进程的命令。

接收并执行命令

  • waitCommand()用于从管道读取并解析命令,并在接收到特定命令表示退出时设置 quit 为 true

  • 处理命令: 当接收到父进程发来的命令值 command 之后,执行 callbacks[command](); 就是调用预先注册到向量中的对应任务函数,完成实际的工作内容。如果命令非法,则输出错误信息。

  • 退出循环: 当检测到 quit 为 true,即接收到父进程发送的退出命令时,子进程跳出循环并调用 exit(1) 结束自身。

父进程部分

  • 关闭读端: 父进程不需要从管道读取数据,因此关闭管道的读端 close(pipefd[0])

  • 存储子进程信息: 将子进程的ID (id) 和该子进程对应的管道写端口 (pipefd[1]) 保存在一个结构体(这里是一个 pair<pid_t, int> 类型的对象)中,并将其添加到名为 slots 的容器(如向量或列表)中。这样父进程可以管理多个子进程及其对应的管道写端口,以便将来向每个子进程发送不同的命令。

5、sendAndWakeup() 函数

sendAndWakeup()函数用于向指定进程通过管道发送一个命令,并在控制台上打印相关信息。具体说明如下:

void sendAndWakeup(pid_t who, int fd, uint32_t command)
{write(fd, &command, sizeof(command));cout << "main process: call process " << who << " execute " << desc[command] << " through " << fd << endl;
}

参数:

  • pid_t who:表示接收命令的目标子进程的进程ID。
  • int fd:这是管道的写端文件描述符,父进程通过这个描述符将命令写入管道,以通知目标子进程执行任务。
  • uint32_t command:要发送的命令编号,对应于之前定义的任务函数。

函数实现:

  • 使用 write(fd, &command, sizeof(command)) 将命令写入到管道中。这里的 command 是一个整数索引,指向存储在全局变量 callbacks 中的任务函数列表。

  • 在控制台输出一条消息,显示主进程正在通过管道 fd 呼叫进程 who 执行任务 desc[command]。这里的 desc 是一个无序_map(std::unordered_map<int, std::string> desc; ),键是命令索引,值是对应的描述信息。

6、父进程派发任务

父进程持续地以随机方式向各个子进程派发任务,并在完成任务调度后有序地回收子进程资源的功能。

    srand((unsigned long)time(nullptr) ^ getpid() ^ 23323123123L); // 让数据源更随机while (true){// 选择一个任务int command = rand() %  handlerSize();// 选择一个进程 ,采用随机数的方式,选择进程来完成任务,随机数方式的负载均衡int choice = rand() % slots.size();// 把任务给指定的进程sendAndWakeup(slots[choice].first, slots[choice].second, command);sleep(1);}// 关闭fd, 所有的子进程都会退出for (const auto &slot : slots){close(slot.second);}// 回收所有的子进程信息for (const auto &slot : slots){waitpid(slot.first, nullptr, 0);}
}

初始化随机数种子

  • 这行代码使用当前时间戳、当前进程ID以及一个常数值来初始化随机数生成器(srand() 函数)。
  • 异或操作符 (^) 将多个不同的随机源混合起来,以提高生成种子的随机性。这样可以确保不同时间启动的进程或同一进程中多次调用 rand() 都能得到不同的随机数。

无限循环派发任务

  • 使用 rand() % handlerSize() 从所有可用的任务中随机选择一个任务索引。
  • 使用 rand() % slots.size() 从所有已创建的子进程中随机选择一个子进程。
  • 调用 sendAndWakeup() 函数,向选定的子进程发送选中的任务命令。这里通过管道写端口将命令传递给子进程,并唤醒其执行相应任务。
    sendAndWakeup(slots[choice].first, slots[choice].second, command);
    这一行代码是调用 sendAndWakeup() 函数并传入三个参数,来向一个子进程发送命令。详细讲解如下:
    • slots[choice].firstslots 是一个存储了子进程信息的 vector<pair<pid_t, int>> 类型容器,在循环中每个元素代表一个子进程及其管道写端口的文件描述符。choice 是通过随机数生成器确定的一个随机索引,用于从 slots 容器中选择一个子进程。

    • slots[choice].first 就是根据这个随机索引获取到的子进程ID(pid_t 类型),它将作为 sendAndWakeup() 函数的第一个参数传递给函数,用来标识要唤醒执行任务的具体子进程。

    • 同样地,slots[choice].second 代表与所选子进程对应的管道写端口的文件描述符(int 类型)。这是第二个参数,用于在函数内部调用 write() 系统调用,将命令通过管道写入到选定子进程,从而通知该子进程开始执行相应的任务。

    • commandcommand 是之前通过 rand() % handlerSize(); 随机生成的任务编号,它是一个整数值(uint32_t 类型)。此值作为第三个参数传递给 sendAndWakeup() 函数,表示要派发给子进程执行的具体任务。

  • 每次派发完任务后,让父进程休眠1秒(sleep(1)),模拟任务之间的间隔。

关闭管道写端口

    for (const auto &slot : slots){close(slot.second);}
  • 当不再需要向子进程发送任务时,父进程遍历 slots 容器,关闭与每个子进程关联的管道写端口。这会导致读取端(在子进程中)检测到 EOF 或异常,进而促使子进程退出其等待命令的循环。

回收子进程信息

    for (const auto &slot : slots){waitpid(slot.first, nullptr, 0);}
  • 父进程再次遍历 slots 容器,对每个子进程调用 waitpid() 函数,用于等待子进程结束并回收其资源。传入参数 nullptr 表示不关心子进程的退出状态码,0 表示阻塞直到子进程结束。通过这种方式,父进程能够确保所有的子进程都已正常结束,并正确释放系统资源。

运行示例

[hbr@VM-16-9-centos ProcessPoll]$ ./ProcessPool 
main process: call process 32283 execute readMySQL: 读取数据库 through 6
sub process[32283 ] 执行访问数据库的任务main process: call process 32281 execute execuleUrl: 进行url解析 through 4
sub process[32281 ] 执行url解析main process: call process 32285 execute readMySQL: 读取数据库 through 8
sub process[32285 ] 执行访问数据库的任务main process: call process 32282 execute cal: 进行加密计算 through 5
sub process[32282 ] 执行加密任务main process: call process 32283 execute execuleUrl: 进行url解析 through 6
sub process[32283 ] 执行url解析main process: call process 32283 execute execuleUrl: 进行url解析 through 6
sub process[32283 ] 执行url解析main process: call process 32283 execute save: 进行数据的文件保存 through 6
sub process[32283 ] 执行数据持久化任务main process: call process 32281 execute execuleUrl: 进行url解析 through 4
sub process[32281 ] 执行url解析main process: call process 32281 execute readMySQL: 读取数据库 through 4
sub process[32281 ] 执行访问数据库的任务main process: call process 32281 execute cal: 进行加密计算 through 4
sub process[32281 ] 执行加密任务main process: call process 32281 execute execuleUrl: 进行url解析 through 4
sub process[32281 ] 执行url解析^C

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_1026353.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Java Swing游戏开发学习19

内容来自RyiSnow视频讲解 这一节讲的是**Entity ArrayList(Render Order Revised)**实体数组列表&#xff08;渲染顺序修改&#xff09;。 前言 由于NPC和player的实体碰撞区域比他们本身的大小要小&#xff0c;所以会造成一个bug&#xff0c;当前的绘制顺序是&#xff0c;NP…

High 级别反射型 XSS 攻击演示(附链接)

环境准备 如何搭建 DVWA 靶场保姆级教程&#xff08;附链接&#xff09;https://eclecticism.blog.csdn.net/article/details/135834194?spm1001.2014.3001.5502 测试 打开靶场找到该漏洞页面 先右键检查输入框属性 还是和之前一样的&#xff0c;所以直接输入 HTML 标签提交…

StringRedisTemplate与RedisTemplate详解【序列化的方式不同】

spring 封装了 RedisTemplate 对象来进行对redis的各种操作&#xff0c;它支持所有的 redis 原生的 api。在RedisTemplate中提供了几个常用的接口方法的使用&#xff0c;分别是: private ValueOperations<K, V> valueOps; private HashOperations<K, V> hashOps; …

微服务(基础篇-006-Docker安装-CentOS7)

目录 05-初识Docker-Docker的安装_哔哩哔哩_bilibilihttps://www.bilibili.com/video/BV1LQ4y127n4?p46&spm_id_frompageDriver&vd_source60a35a11f813c6dff0b76089e5e138cc 0.安装Docker 1.CentOS安装Docker 1.1.卸载&#xff08;可选&#xff09; 1.2.安装dock…

HCIP —— 生成树 (下)

目录 STP&#xff08;生成树&#xff09;的角色选举 根网桥 根端口 选举规则&#xff1a; 指定端口 生成树的端口状态 STP的接口状态&#xff1a;禁用、阻塞、侦听、学习、转发 五种状态 禁用状态 阻塞状态 侦听状态 学习状态 转发状态 当生成树拓扑结构发生变化 …

球面数据的几何深度学习--球形 CNN

目录 一、说明二、球形 CNN概述三、球面数据的对称性四、标准&#xff08;平面&#xff09;CNN的局限性五、卷积并发症六、球面卷积七、球面卷积是不够的 一、说明 球面数据的几何深度学习–球形 CNN。通过对物理世界的平移对称性进行编码&#xff0c;卷积神经网络 &#xff0…

3.21系统栈、数据结构栈、栈的基本操作、队列、队列的基本操作------------》

栈 先进后出、后进先出 一、系统栈 大小&#xff1a;8MB 1、局部变量 2、未经初始化为随机值 3、代码执行到变量定义时为变量开辟空间 4、当变量的作用域结束时回收空间 5、函数的形参和返回值 6、函数的调用关系、保护现场和恢复现场 7、栈的增长方向&#xff0c;自高…

yolov8 pose keypoint解读

yolov8进行关键点检测的代码如下&#xff1a; from ultralytics import YOLO# Load a model model YOLO(yolov8n.pt) # pretrained YOLOv8n model# Run batched inference on a list of images results model([im1.jpg, im2.jpg]) # return a list of Results objects# Pr…

SD卡备份和烧录ubuntu20.04镜像

设备及系统&#xff1a;nuc幻影峡谷工控机&#xff0c;ubuntu20.04&#xff0c;树莓派4B&#xff0c;SD卡读卡器 一、确定SD卡设备号的两种方法 方法1&#xff1a; 将有ubuntu镜像的SD卡插入读卡器&#xff0c;再将读卡器插入电脑主机&#xff0c;在 工具 中打开 磁盘&#…

PostgreSQL FDW(外部表) 简介

1、FDW: 外部表 背景 提供外部数据源的透明访问机制。PostgreSQL fdw(Foreign Data Wrapper)是一种外部访问接口,可以在PG数据库中创建外部表,用户访问的时候与访问本地表的方法一样,支持增删改查。 而数据则是存储在外部,外部可以是一个远程的pg数据库或者其他数据库(…

企业微信可以更换公司主体吗?

企业微信变更主体有什么作用&#xff1f;当我们的企业因为各种原因需要注销或已经注销&#xff0c;或者运营变更等情况&#xff0c;企业微信无法继续使用原主体继续使用时&#xff0c;可以申请企业主体变更&#xff0c;变更为新的主体。企业微信变更主体的条件有哪些&#xff1…

springboot多模块

这里springboot使用idea中的 Spring Initializr 来快速创建。 一、demo 1、创建父项目 首先使用 Spring Initializr 来快速创建好一个父Maven工程。然后删除无关的文件&#xff0c;只需保留pom.xml 文件。 &#xff08;1&#xff09;new Project -> spring initializr快…

基于spring boot的个人博客系统的设计与实现(带源码)

随着国内市场经济这几十年来的蓬勃发展&#xff0c;突然遇到了从国外传入国内的互联网技术&#xff0c;互联网产业从开始的群众不信任&#xff0c;到现在的离不开&#xff0c;中间经历了很多挫折。本次开发的个人博客系统&#xff0c;有管理员&#xff0c;用户&#xff0c;博主…

从一次 RPC 请求,探索 MOSN 的工作流程

王程铭&#xff08;呈铭&#xff09; 蚂蚁集团技术工程师&#xff0c;Apache Committer 专注 RPC、Service Mesh 和云原生等领域。 本文 7368 字&#xff0c;预计阅读 15 分钟 前言 MOSN&#xff08;Modular Open Smart Network&#xff09;是一款主要使用 Go 语言开发的云…

吴恩达深度学习笔记:神经网络的编程基础2.5-2.8

目录 第一门课&#xff1a;神经网络和深度学习 (Neural Networks and Deep Learning)第二周&#xff1a;神经网络的编程基础 (Basics of Neural Network programming)2.5 导数&#xff08;Derivatives&#xff09;2.6 更多的导数例子&#xff08;More Derivative Examples&…

Node.js学习(一)

版权声明 本文章由B站上的黑马课程整理所得&#xff0c;仅供个人学习交流使用。如涉及侵权问题&#xff0c;请立即与本人联系&#xff0c;本人将积极配合删除相关内容。感谢理解和支持&#xff0c;本人致力于维护原创作品的权益&#xff0c;共同营造一个尊重知识产权的良好环境…

【二叉树】Leetcode 543. 二叉树的直径【简单】

二叉树的直径 给你一棵二叉树的根节点&#xff0c;返回该树的 直径 。 二叉树的 直径 是指树中任意两个节点之间最长路径的 长度 。这条路径可能经过也可能不经过根节点 root 。 两节点之间路径的 长度 由它们之间边数表示。 示例1&#xff1a; 输入&#xff1a;root [1,2…

C语言实现顺序表(增,删,改,查)

目录 一.概念&#xff1a; 1.静态顺序表&#xff1a;使用定长数组存储元素。 2.动态顺序表&#xff1a;使用动态开辟的数组存储。 二.顺序表的实现: 1.顺序表增加元素 1.检查顺序表 2.头插 3.尾插 2.顺序表删除元素 1.头删 2.尾删 3.指定位置删 3.顺序表查找元素 …

使用Qt生成图片

Qt之生成png/jpg/bmp格式图片_qt生成图片-CSDN博客 (1)使用QPainter 示例关键代码&#xff1a; QImage image(QSize(this->width(),this->height()),QImage::Format_ARGB32);image.fill("white");QPainter *painter new QPainter(&image);painter->…

深入浅出:探索Hadoop生态系统的核心组件与技术架构

目录 前言 HDFS Yarn Hive HBase Spark及Spark Streaming 书本与课程推荐 关于作者&#xff1a; 推荐理由&#xff1a; 作者直播推荐&#xff1a; 前言 进入大数据阶段就意味着 进入NoSQL阶段&#xff0c;更多的是面向OLAP场景&#xff0c;即数据仓库、BI应用等。 …