信号量
在Linux下,POSIX信号量是一种线程同步机制,用于控制多个线程之间的访问顺序。POSIX信号量可以用于实现线程之间的互斥或者同步。
在之前的阻塞队列生产者消费者模型中,阻塞队列是一个共享资源,不管是生产者还是消费者,任何时刻只允许一个线程访问共享资源,所以对阻塞队列进行了加锁保护,使阻塞队列成为为临界资源。在这里,是将整个共享资源(阻塞队列)当作一个整体使用的。
信号量使用思路:一个共享资源,不当作整体使用,将其划分为若干个小的共享资源,使得不同的执行流访问不同的小共享资源,本质上,当不同执行流访问不同的小共享资源时,不需要进行加锁保护,可以并发执行。而如果两个执行流访问到了同一个小的共享资源时,再进行互斥保护共享资源。
POSIX信号量提供了两种类型的信号量:二进制信号量和计数信号量。二进制信号量只有两个取值:0和1,它用于表示资源是否被占用。计数信号量可以有多个取值,它表示可用资源的数量。
二进制信号量可以充当互斥锁的作用。因为信号量的初始值为1,p相当于加锁,v相当于解锁。
计数信号量的值通常被称为“资源计数器”,它记录了当前可用的共享资源数量。当一个线程需要使用共享资源时,它会尝试获取计数信号量,如果当前可用资源数量大于0,则线程可以获取资源并将计数信号量的值减1。如果当前可用资源数量为0,则表示共享资源已被占用,线程需要等待直到有资源可用。
POSIX信号量接口
int sem_init(sem_t *sem, int pshared, unsigned int value); // 初始化int sem_destroy(sem_t *sem); // 销毁int sem_wait(sem_t *sem); // 申请信号量,P操作int sem_post(sem_t *sem); // 释放信号量,V操作
因为信号量本质就是一个计数器,记录着当前的资源数目,其中初始化时的value值就表示初始时的资源数目。
在使用POSIX信号量时,线程可以通过调用sem_wait()函数来请求获取信号量,如果当前信号量的值大于0,则线程可以继续执行,同时信号量的值会减1;否则线程将被阻塞,等待信号量的值变为大于0。当线程释放资源时,可以通过调用sem_post()函数来释放信号量,使得其它线程可以获取资源。
基于环形队列的生产者消费者模型
1.缓冲区的改变
基于环形队列的生产者消费者模型,对比之前的基于阻塞队列的生产者消费者模型,只是缓冲区的具体数据结构变了。两个角色:生产者,消费者。三种关系:生产者与生产者:互斥。消费者与消费者:互斥。生产者与消费者:互斥与同步。
如上图,环节队列是逻辑抽象结构,底层的物理结构是数组。要想达到环形队列的效果,只需在访问数组的最后一个元素的下一个元素时将下标 %= size即可。再者,环形队列有一个判空和判满的问题,因为这里的容量是有上限的,从数据结构的角度来说,一般会有一个start和end下标,当start == end时为空,而当数据为满时,不能也将start == end标定为满,因为这样空和满的判断条件就相同了。解决方法:1. 空出一个位置,当(end + 1) % size == start时,判定为满。 2. 加一个计数器,当计数器 == size时为满,计数器 == 0时为空。
但是,因为信号量的作用,在环形队列生产消费模型中,我们不需要考虑判空判满的问题。见下文。
2. 结合生产者消费者
先考虑单生产者,单消费者的环形队列生产消费模型,有以下前提:
通过给生产者线程和消费者线程分别设定一个下标,从而标定它们当前生产和消费的下标位置。
我们不需要通过空出一个位置或者添加计数器的方式来避免队列为空和为满时生产消费线程的下标相同这样的判定条件重复。因为有信号量的存在。
因此,队列为空和为满时,生产者线程的下标一定等于消费者线程的下标。一种情况是当前缓冲区中没有数据,一种是当前缓冲区中数据已满。
目的期望:
当生产线程和消费线程指向了环形队列的同一个位置时(此时队列为空or为满),也就是访问了同一个共享资源,此时需要进行多线程互斥与同步。
当生产线程和消费线程不指向环形队列的同一个位置时,此时不需要进行互斥与同步,可以并发执行,因为没有访问同一个共享资源。
当环形队列为空时,必须让生产者先执行,消费者必须等待生产者生产完至少一个数据。这也就是第一点中的互斥与同步。
当环形队列为满时,必须让消费者先执行,生产者必须等待消费者消费完至少一个数据。这也就是第一点中的互斥与同步。
当环形队列不为空不为满时,生产线程与消费线程并发执行。
上方5点,其实345和12是一个原则。34对应1,5对应2。
3.结合信号量
信号量表征的是共享资源的数量,在环形队列生产消费模型中,设定两个信号量,分别表征数据资源数目和空间资源数目。生产者关注空间资源信号量,初始为N。消费者关注数据资源信号量,初始为0。
按照上面生产者和消费者的执行逻辑来看,每次在生产或者消费之前必须先申请信号量,申请信号量的本质是一种对资源的预定机制。若信号量大于0,则--信号量,继续执行。若为0,则阻塞等待信号量被其他线程释放。
这样一来,起始时,或当其他环形队列为空时,dataSem为0,消费者没有数据资源可以申请,故必须阻塞等待。这也就是我们期望的,当生产和消费的下标相等时,且队列为空时,必须让生产者先执行,消费者阻塞等待。
当环形队列为满时,spaceSem为0,dataSem为N,生产者没有空间资源可以申请,故必须阻塞等待。这也就是我们期望的,当生产和消费下标相等时,且队列为满时,必须让消费者先执行,生产线程必须阻塞等待。
(注意,上方两种互斥情况下,只有当另一方生产/消费完,执行完V操作之后,阻塞方才能P成功,这也就实现了互斥和同步
当环形队列不为空时,生产和消费的下标不同,此时并没有访问同一个资源,故可以并发执行。
4.代码...
ringQueue.hpp
#ifndef _RINGQUEUE_HPP_
#define _RINGQUEUE_HPP_#include <vector>
#include "sem.hpp"
#include "mutex.hpp"const int g_default_size = 5;template <class T>
class RingQueue
{
public:RingQueue(int size = g_default_size): ring_queue_(size), size_(size), space_sem_(size), data_sem_(0), p_index_(0), c_index_(0), p_mutex_(), c_mutex_(){}~RingQueue() = default;void push(const T &in){// 生产者: 环形队列的push的操作space_sem_.p(); // 空间资源的p操作p_mutex_.lock();ring_queue_[p_index_++] = in;p_index_ %= size_;p_mutex_.unlock();data_sem_.v(); // 数据资源v操作}void pop(T *out){data_sem_.p(); // 数据资源的p操作,消费者进行。若没有数据资源,则阻塞挂起消费者线程c_mutex_.lock();*out = ring_queue_[c_index_++];c_index_ %= size_;c_mutex_.unlock();space_sem_.v(); // 空间资源v操作}private:std::vector<T> ring_queue_; // 使用vector模拟环形队列int size_; // 环形队列的大小int p_index_; // 生产者下标int c_index_; // 消费者下标Sem space_sem_; // 空间资源信号量,生产者使用Sem data_sem_; // 数据资源信号量,消费者使用Mutex p_mutex_;Mutex c_mutex_;
};// 自己写的:环形队列利用互斥锁+条件变量进行。
// template <class T>
// class RingQueue
// {
// public:
// RingQueue(int size = g_default_size)
// : ring_queue_(size)
// , size_(size)
// , c_index_(0), p_index_(0)
// , c_mutex_(), p_mutex_()
// {
// pthread_cond_init(¬_empty_, nullptr);
// pthread_cond_init(¬_full_, nullptr);
// }
// ~RingQueue()
// {
// pthread_cond_destroy(¬_empty_);
// pthread_cond_destroy(¬_full_);
// }
// void push(const T &in)
// {
// // 生产者
// p_mutex_.lock(); // 加锁// while((p_index_ + 1) % size_ == c_index_)
// {
// // 满的
// pthread_cond_wait(¬_full_, &p_mutex_.mtx_);
// }
// // 不满了
// ring_queue_[p_index_++] = in;
// p_index_ %= size_;
// pthread_cond_signal(¬_empty_);// p_mutex_.unlock();
// }
// void pop(T *out)
// {
// // 消费者
// c_mutex_.lock();// while(c_index_ == p_index_) // 为空,没有数据资源,等待条件变量。
// {
// pthread_cond_wait(¬_empty_, &c_mutex_.mtx_);
// }
// *out = ring_queue_[c_index_++];
// c_index_ %= size_;
// pthread_cond_signal(¬_full_);// c_mutex_.unlock();
// }
// private:
// std::vector<T> ring_queue_; // 环形队列
// int size_;
// int c_index_;
// int p_index_;
// Mutex c_mutex_;
// Mutex p_mutex_;
// pthread_cond_t not_empty_;
// pthread_cond_t not_full_;
// };#endif
testMain.cc
#include "ringQueue.hpp"
#include <pthread.h>
#include <iostream>
#include <unistd.h>
using namespace std;// 消费者
void *consumer(void *args)
{RingQueue<int> *rq = (RingQueue<int> *)args; // 生产消费的缓冲区:环形队列while (true){sleep(1);int x;rq->pop(&x);cout << "消费:" << x << " [" << pthread_self() << "]" << endl;}pthread_exit(nullptr);
}// 生产者
void *producer(void *args)
{RingQueue<int> *rq = (RingQueue<int> *)args;while (true){int x = rand() % 100 + 1;rq->push(x);cout << "生产:" << x << " [" << pthread_self() << "]" << endl;}pthread_exit(nullptr);
}// int main()
// {
// srand((unsigned int)time(nullptr));// // RingQueue<int>* ring_queue = new RingQueue<int>();
// RingQueue<int> ringQueue;
// pthread_t c[2], p[3]; // 多生产多消费,3生产2消费// for(int i = 0; i < 2; ++i)
// pthread_create(c + i, nullptr, consumer, &ringQueue);
// for(int i = 0; i < 3; ++i)
// pthread_create(p + i, nullptr, producer, &ringQueue);// for(int i = 0; i < 2; ++i)
// pthread_join(c[i], nullptr);
// for(int i = 0; i < 3; ++i)
// pthread_join(p[i], nullptr);
// return 0;
// }
int main()
{srand((unsigned int)time(nullptr));RingQueue<int> ringQueue;pthread_t c, p;pthread_create(&c, nullptr, consumer, &ringQueue);pthread_create(&p, nullptr, producer, &ringQueue);pthread_join(c, nullptr);pthread_join(p, nullptr);return 0;
}// 二元信号量充当互斥锁// int tickets = 10000;
// pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;// void *getTickets(void *args)
// {
// Sem *sem = (Sem *)args;
// while (true)
// {
// sem->p(); // 申请信号量,原子的,只有一个线程会申请成功
// // pthread_mutex_lock(&mtx);
// if (tickets > 0)
// {
// usleep(1000); // 1ms,模拟抢票过程
// cout << "tickets : " << tickets << " [" << pthread_self() << "]" << endl;
// tickets--;
// sem->v();
// // pthread_mutex_unlock(&mtx);
// }
// else
// {
// sem->v();
// // pthread_mutex_unlock(&mtx);
// break;
// }
// }
// pthread_exit(nullptr);
// }// int main()
// {
// pthread_t tids[4];
// Sem sem(1);
// for (int i = 0; i < 4; ++i)
// {
// pthread_create(tids + i, nullptr, getTickets, &sem);
// }// for (int i = 0; i < 4; ++i)
// {
// pthread_join(tids[i], nullptr);
// }
// return 0;
// }
sem.hpp
#ifndef _SEM_HPP_
#define _SEM_HPP_#include <semaphore.h>// 信号量的封装
class Sem
{
public:Sem(int value){sem_init(&sem_, 0, value);}~Sem(){sem_destroy(&sem_); }void p(){sem_wait(&sem_);}void v(){sem_post(&sem_);}
private:sem_t sem_;
};#endif
5.多生产多消费
多生产多消费时,任何一个生产者和一个消费者之间都已经通过信号量达到了应有的互斥与同步,现在需要考虑的是生产生产之间和消费消费之间的互斥关系。
生产生产之间,消费消费之间,需要保护的共享资源为下标:c_index_或者p_index_,环形队列:ring_queue_(STL:vector,不是线程安全的),故,需要在push和pop内部,进行加锁,构建临界区,保护临界资源。
因为这里生产和消费之间的互斥,已经在当下标相等时,为空或为满时,由信号量维护了,所以这里用两把锁,分别维护生产生产之间和消费消费之间。(见上方代码实现)
6.基于环形队列的生产消费模型为什么效率高,优势?
我们可以看到,多个生产之间,在对环形队列push时,内部是互斥的,也就是串型执行的。多个消费之间,在对环形队列pop时,内部是互斥的,也是串行执行的。对比阻塞队列的生产消费模型来说,改进就是,当缓冲区,环形队列不为空且不为满时,生产和消费的push和pop可以并发执行。
还是那句话,生产消费的过程并非只是pushpop的过程(放数据和拿数据的过程),还有生产者放数据之前生产数据和消费者拿数据之后处理数据的过程(这里才是最耗时间的)。多生产多消费的意义是,可以让生产数据和处理数据时多个线程并发执行,从而提高效率。 (和基于阻塞队列的一样)
7.信号量的理解
信号量的本质就是一个计数器,表示当前资源数目。计数器的意义是什么呢?
在之前使用互斥锁,条件变量的使用场景下,常规流程是:申请锁->判断临界资源是否就绪->就绪则访问,不就绪则在条件变量下等待->访问临界资源->释放锁。
而信号量这个计数器的意义就是:可以不进入临界区就得知资源情况,减少了临界区内的资源是否就绪的判断。本来,信号量就是一种资源预定机制,当P操作成功时,就代表着此时有一个资源可以供你获取/访问(比如环形队列中的空间资源/数据资源)。