Linux线程的生产者消费者模型 --- 阻塞队列(blockqueue)

news/2024/4/28 14:14:52/文章来源:https://blog.csdn.net/CHJBL/article/details/131693126

文章目录

  • 线程同步
  • 条件变量
    • 条件变量的接口
  • 生产者消费者场景
    • 消费者和消费者的关系
    • 生产者和生产者的关系
    • 生产者和消费者的关系
      • 从何体现出效率的提高
  • Blockqueue
    • blockqueue.hpp
      • 为什么条件变量的接口有锁作为参数
    • CP.cc
    • 生产者 -> queue -> 消费者兼生产者 -> queue -> 消费者
      • 实现大致目的
      • 大致步骤
    • blockqueue.hpp
    • Task.hpp -- 任务头文件
    • CP.cc
    • 实现效果
  • 总结

线程同步

在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问题就叫做同步

也就是说当一个线程申请锁成功后,一旦它解锁了就不能够再申请锁,而是要到整个线程队尾进行排队,让下一个线程去申请锁。这样有序的去申请锁就叫做同步。

条件变量

条件变量的使用:一个线程等待条件变量的条件成立而被挂起;另一个线程使条件成立后唤醒等待的线程。

也就是说使用条件变量后,所有的线程必须同步去执行,但条件满足时线程就挂起直到另一个线程唤醒它。

条件变量相当于一个线程执行的必要条件,只有满足条件的线程才能继续执行

条件变量的接口

定义条件变量:pthread_cond_t XXX

全局初始化 = PTHREAD_COND_INITALIZER

局部初始化使用:pthread_cond_init

#include <pthread.h>
int pthread_cond_init(pthread_cond_t *restrict cond,const pthread_condattr_t *restrict attr);
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

参数一为条件变量的地址,参数二为可以不关心设为nullptr

销毁条件变量:pthread_cond_destroy

int pthread_cond_destroy(pthread_cond_t *cond);

满足条件变量则挂起等待:pthread_cond_wait

int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex);

参数一为条件变量的地址,参数二为锁的地址(后面会谈到为什么有锁作为参数)

唤醒线程:pthread_cond_signal

 int pthread_cond_broadcast(pthread_cond_t *cond);//唤醒全部线程int pthread_cond_signal(pthread_cond_t *cond);//唤醒一个线程

参数都为条件变量的地址

生产者消费者场景

在日常的生活中,这个场景并不会陌生。例如:供货商 -> 超市 -> 顾客。而供货商就相当于生产者,顾客就相当于消费者,超市就充当一个中间商。顾客不可能直接去跟供货商买东西,而供货商也不会直接卖给顾客东西,超市就充当了这样一个中间的角色。

image-20230712234449523

在线程的角度也是如此,假设现在一批线程充当着生产者的角色,另一批线程充当着消费者的角色。那么生产者线程不会直接就将数据传给消费者线程,而是会将数据放入到一个相当于缓冲区中,而这个过程又可以称为 生产者和消费者之间的解耦

生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题

那么对于这种模型又会衍生出三种关系:

消费者和消费者的关系

对于消费者而言,因为缓冲区中的空间有限,而消费者线程只需要将数据写入到缓冲区,可是当缓冲区中已经有线程在写入中了,其他的线程就不能往缓冲区里写了,而是要等到前面的线程写完后再判断缓冲区是否还没满才可以写入。这就形成了消费者和消费者之间的互斥关系

生产者和生产者的关系

对于生产者也是同理,一个线程读时其他线程同样需要等待。这也就形成了互斥关系

生产者和消费者的关系

生产者和消费者既有互斥又有同步,当一个线程写时,另一个线程去读这种情况就会导致数据的不安全性,因此互斥就是为了保证共享资源的安全性。而每次缓冲区都只有一个线程去执行时,其他线程在等待一旦执行缓冲区的线程解锁了,等待的线程就可以马上申请锁去执行缓冲区,这样效率就会大大提高,因此同步是为了提高效率的

从何体现出效率的提高

上面谈到缓冲区每次都只有一个线程在执行。那么在这个线程访问执行时其他线程在干什么呢?

首先线程在读或写之前肯定是会有其他的任务需要做的,比如创建写的数据,创建存放读到的数据空间等等。那么在一个线程访问缓冲区时,其他的线程就可以做这些访问缓冲区前的任务,一旦访问缓冲区的线程完成了,其他的线程就不需要再去完成访问缓冲区前的任务直接就可以访问缓冲区了,这就是效率提高的表现

因此:效率的提高真正体现的并不是访问缓冲区的过程,而是访问缓存区之前的过程,这也就是多消费者多生产者的意义

Blockqueue

阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。

阻塞队列为空时,消费者线程将被阻塞,直到阻塞队列被放入元素
阻塞队列已满时,生产者线程将被阻塞,直到有元素被取出

那么利用这个阻塞队列结合条件变量和锁,就可以编写出一套简单的模型。

blockqueue.hpp

#pragma once#include <iostream>
#include <pthread.h>
#include <queue>// 设置默认的最大容量
static int max = 10;template <class T>
class blockqueue
{
public:blockqueue(const int &maxnum = max): _maxnum(maxnum){pthread_mutex_init(&_lock, nullptr);pthread_cond_init(&_pcond, nullptr);pthread_cond_init(&_ccond, nullptr);}// 插入数据void push(const T &in){// 加锁pthread_mutex_lock(&_lock);// 判断队列是否满了,如果为空则等待// 充当条件判断的语法必须是while,不能用ifwhile (_q.size() == _maxnum)pthread_cond_wait(&_pcond, &_lock);// 插入数据_q.push(in);// 走到这里说明队列一定有数据,就可以唤醒消费者的线程pthread_cond_signal(&_ccond);// 解锁pthread_mutex_unlock(&_lock);}// 拿到头部数据并删除void pop(T *out){// 加锁pthread_mutex_lock(&_lock);// 判断队列是否满了,如果为空则等待// 充当条件判断的语法必须是while,不能用ifwhile (_q.size() == 0)pthread_cond_wait(&_ccond, &_lock);// 拿到头部数据并删除*out = _q.front();_q.pop();// 走到这里说明队列一定不会满,就可以唤醒生产者的线程pthread_cond_signal(&_pcond);// 解锁pthread_mutex_unlock(&_lock);}~blockqueue(){pthread_mutex_destroy(&_lock);pthread_cond_destroy(&_pcond);pthread_cond_destroy(&_ccond);}private:std::queue<T> _q;int _maxnum; // 最大容量pthread_mutex_t _lock;pthread_cond_t _pcond; // 生产者的条件变量pthread_cond_t _ccond; // 消费者的条件变量
};

注意:为什么上面的代码里判断条件需要用循环而不是if呢,这就要说到上面的为什么pthread_cond_wait的参数里会有锁了。

为什么条件变量的接口有锁作为参数

首先,能够执行到这个接口说明该线程必定申请锁成功了。如果现在线程执行到了wait这个接口,那么线程就会被阻塞。但是这个线程已经申请了锁其他的线程就没有办法再去申请锁了,那么此时这个线程就一定要解锁,而pthread_cond_wait这个接口就会自动的帮这个线程解锁。

当这个线程阻塞后被其他的线程唤醒后,pthread_cond_wait这个接口就会自动帮这个线程再次加锁,所以为了确保再次加锁的锁是和之前的一样的,pthread_cond_wait接口就会带上锁的地址作为参数。

那么为什么要用循环而不是if呢?最主要的原因是,即使这个线程被唤醒了,但是它仍有可能还是处于不满足条件的情况,因此为了确保数据的安全要再次对这个线程进行判断,直到该线程满足条件才能继续往下执行。

CP.cc

#include "BlockQueue.hpp"
#include <ctime>
#include <unistd.h>// 生产
void *Producer(void *argc)
{blockqueue<int> *t = (blockqueue<int> *)argc;while (1){// 随机产生数据插入int x = rand() % 100 + 1;t->push(x);std::cout << "生产计算数据:" << x << std::endl;sleep(1);}return nullptr;
}// 消费
void *Consumer(void *argc)
{blockqueue<int> *t = (blockqueue<int> *)argc;while (1){// 拿出数据int x;t->pop(&x);std::cout << "消费计算数据:" << x << std::endl;}return nullptr;
}int main()
{// 设置随机种子srand(time(nullptr));blockqueue<int>* dq = new blockqueue<int>();pthread_t c, p;// 创建计算生产者pthread_create(&p, nullptr, Producer, dq);// 创建计算消费者pthread_create(&c, nullptr, Consumer, dq);pthread_join(p, nullptr);pthread_join(c, nullptr);return 0;
}

上面的代码就可以实现单消费者和单生产者的模型。生产者就会往阻塞队列里面写入数据,消费者就可以往阻塞队列里面读数据

image-20230713002252801

那么根据这个模式再来实现一个加大点难度的模型代码

生产者 -> queue -> 消费者兼生产者 -> queue -> 消费者

image-20230713002831696

实现大致目的

  1. 一个生产者,一个消费者兼生产者,一个消费者
  2. 计算过程由随机数,随机符号
  3. 第一个消费者读到数据后传到第二个队列中
  4. 最后读取计算结果的消费者将数据读到文件中

大致步骤

  1. 因为有不同的任务,所以创建一个任务头文件
  2. 由于是两个不同的队列,因此可以创建一个队列组的类
  3. ±*/ 随机
  4. 以下代码均有注释

blockqueue.hpp

#pragma once#include <iostream>
#include <pthread.h>
#include <queue>// 设置默认的最大容量
static int max = 10;template <class T>
class blockqueue
{
public:blockqueue(const int &maxnum = max): _maxnum(maxnum){pthread_mutex_init(&_lock, nullptr);pthread_cond_init(&_pcond, nullptr);pthread_cond_init(&_ccond, nullptr);}// 插入数据void push(const T &in){// 加锁pthread_mutex_lock(&_lock);// 判断队列是否满了,如果为空则等待// 充当条件判断的语法必须是while,不能用ifwhile (_q.size() == _maxnum)pthread_cond_wait(&_pcond, &_lock);// 插入数据_q.push(in);// 走到这里说明队列一定有数据,就可以唤醒消费者的线程pthread_cond_signal(&_ccond);// 解锁pthread_mutex_unlock(&_lock);}// 拿到头部数据并删除void pop(T *out){// 加锁pthread_mutex_lock(&_lock);// 判断队列是否满了,如果为空则等待// 充当条件判断的语法必须是while,不能用ifwhile (_q.size() == 0)pthread_cond_wait(&_ccond, &_lock);// 拿到头部数据并删除*out = _q.front();_q.pop();// 走到这里说明队列一定不会满,就可以唤醒生产者的线程pthread_cond_signal(&_pcond);// 解锁pthread_mutex_unlock(&_lock);}~blockqueue(){pthread_mutex_destroy(&_lock);pthread_cond_destroy(&_pcond);pthread_cond_destroy(&_ccond);}private:std::queue<T> _q;int _maxnum; // 最大容量pthread_mutex_t _lock;pthread_cond_t _pcond; // 生产者的条件变量pthread_cond_t _ccond; // 消费者的条件变量
};// 将负责计算的队列和负责保存的队列归并成一个类以便后续调用
// 队列组的类
template <class C, class S>
class blockqueues
{
public:// 计算队列blockqueue<C>* _cp;// 保存队列blockqueue<S>* _sc;
};

Task.hpp – 任务头文件

#include <iostream>
#include <string>
#include <functional>
#include <cstdio>// 负责计算的任务类
class CPTask
{// 调用的计算方法,根据传入的字符参数决定typedef std::function<int(int, int, char)> func_t;public:CPTask(){}CPTask(int x, int y, char op, func_t func): _x(x), _y(y), _op(op), _func(func){}// 实现传入的函数调用std::string operator()(){int count = _func(_x, _y, _op);// 将结果以自定义的字符串形式返回char res[2048];snprintf(res, sizeof res, "%d %c %d = %d", _x, _op, _y, count);return res;}// 显示出当前传入的参数std::string tostring(){char res[1024];snprintf(res, sizeof res, "%d %c %d = ", _x, _op, _y);return res;}private:int _x;int _y;char _op;// +-*/func_t _func;// 实现方法
};// 负责计算的任务函数
// 实现+-*/ 随机
int Math(int x, int y, char c)
{int count;switch (c){case '+':count = x + y;break;case '-':count = x - y;break;case '*':count = x * y;break;case '/':{if (y == 0){std::cout << "div zero" << std::endl;count = -1;}elsecount = x / y;break;}default:break;}return count;
}class SCTask
{// 获取保存数据的方法typedef std::function<void(std::string)> func_t;public:SCTask(){}SCTask(const std::string &str, func_t func): _str(str), _func(func){}//调用方法void operator()(){_func(_str);}private:std::string _str;// 数据func_t _func;// 实现方法
};// 负责保存的方法,将数据读取到保存至文件
void Save(const std::string &str)
{std::string res = "./log.txt";FILE *fd = fopen(res.c_str(), "a+");if (!fd)return;fwrite(str.c_str(), 1, sizeof str.c_str(), fd);fputs("\n", fd);fclose(fd);
}

CP.cc

#include "BlockQueue.hpp"
#include <ctime>
#include <unistd.h>
#include "Task.hpp"// 生产
void *Producer(void *argc)
{// 将参数转换回计算队列的类型blockqueue<CPTask> *t = (blockqueue<CPTask> *)((blockqueues<CPTask, SCTask> *)argc)->_cp;while (1){std::string ops("+-*/");// 随机产生数据插入int x = rand() % 100 + 1;int y = rand() % 100 + 1;int opnum = rand() % ops.size();// 随机提取+-*/char op = ops[opnum];// 定义好实现类的对象CPTask C(x, y, op, Math);//将整个对象插入到计算队列中t->push(C);std::cout << "生产计算数据:" << C.tostring() << std::endl;sleep(1);}return nullptr;
}// 消费
void *Consumer(void *argc)
{// 因为这个是身兼两者身份// 因此要有两种队列的类型对象blockqueue<CPTask> *t = (blockqueue<CPTask> *)((blockqueues<CPTask, SCTask> *)argc)->_cp;blockqueue<SCTask> *s = (blockqueue<SCTask> *)((blockqueues<CPTask, SCTask> *)argc)->_sc;while (1){// 计算队列类型拿出数据std::string res;CPTask c;t->pop(&c);res = c();std::cout << "消费计算数据:" << res << std::endl;// 插入保存数据队列SCTask sc(res, Save);s->push(sc);std::cout << "生产保存数据: ......done" << std::endl;}return nullptr;
}void *Saver(void *argc)
{// 将参数转换回保存队列的类型blockqueue<SCTask> *s = (blockqueue<SCTask> *)((blockqueues<CPTask, SCTask> *)argc)->_sc;while (1){// 拿出数据SCTask t;s->pop(&t);//调用方法t();std::cout << "消费保存数据:......done" << std::endl;}return nullptr;
}int main()
{// 设置随机种子srand(time(nullptr));// 创建队列对象blockqueues<CPTask, SCTask> dqs;dqs._cp = new blockqueue<CPTask>;dqs._sc = new blockqueue<SCTask>;pthread_t c, p, s;// 创建计算生产者pthread_create(&p, nullptr, Producer, &dqs);// 创建计算消费者兼保护生产者pthread_create(&c, nullptr, Consumer, &dqs);// 创建保存消费者pthread_create(&c, nullptr, Saver, &dqs);pthread_join(p, nullptr);pthread_join(c, nullptr);pthread_join(s, nullptr);delete dqs._cp;delete dqs._sc;return 0;
}

实现效果

image-20230713004021629

log.txt:

image-20230713004034207

总结

上面的代码都是单线程去做一个工作的,事实上多线程也是可以的,因为对于访问共享资源(缓冲区、阻塞队列)一次只能有一个线程做这个工作。上面也提到了对于效率的提高并不是体现在共享资源内的,而是访问共享资源前的工作。因此多线程的效率提高也就在这方面。

线程的学习需要熟知各个概念和多动手写代码,像这个生产者消费者模型理解起来不算很难,但是上手写代码就非常复杂。线程的接口较多,多练才能熟记

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_331088.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【HarmonyOS】Stage模型二维码/条码生成与解析

HarmonyOS的官方API中提供了QRCode组件&#xff08;QRCode-基础组件-组件参考&#xff08;基于ArkTS的声明式开发范式&#xff09;-ArkTS API参考-HarmonyOS应用开发&#xff09;&#xff0c;这个组件有个缺点只能用于显示二维码&#xff0c;无法显示条码与解析码内容&#xff…

【已解决】Flask项目报错TypeError: tuple indices must be integers or slices, not str

文章目录 问题情境报错及分析报错代码分析 解决方案必要的解决方法可能有用的解决方法 问题情境 本解决方案适用情境&#xff1a;在本地可以正常运行的flask项目&#xff0c;放到云服务器报错TypeError: tuple indices must be integers or slices, not str&#xff0c;即代码…

《深度学习推荐系统》笔记

目录 一、推荐系统是什么1.作用和意义2.推荐系统的架构2.1 逻辑架构2.2 技术架构 二、传统的推荐系统方法1. 协同过滤算法1.1 userCF&&ItemCF1.3 矩阵分解算法 2. 逻辑回归算法3. 因子分解机3.1 POLY2模型3.2 FM模型3.3 FFM模型3.4 小结 4. 组合模型4.1 GBDTLR组合模型…

【C++/嵌入式笔试面试八股】二、24.TCP三次握手四次挥手 | TCP可靠性

TCP三次握手四次挥手 64.TCP头部中有哪些信息?❤️ TCP数据报格式(左图) UDP数据报格式也放这(右图),不具体解释了。 结合三次握手四次挥手来看 端口: 区分应用层的不同应用进程 扩展:应用程序的端口号和应用程序所在主机的 IP 地址统称为 socket(套接字),IP:端口…

Docker安装ElasticSearch/ES

目录 前言准备拉取ElasticSearch镜像安装ElasticSearch拉取elasticsearch-head镜像安装elasticsearch-head参考 前言 TencentOS Server 3.1Docker version 19.03.14, build 5eb3275d40 准备 docker 已安装。 安装 docker 参考&#xff1a;【Centos 8】【Centos 7】安装 docke…

基于STM32 ARM+FPGA伺服控制系统总体设计方案(一)

设计需求 一套完整的伺服控制方案包括了上位机、驱控一体控制器和功率板三者。操作人员 通过上位机发送各种不同指令&#xff0c;然后控制器解析指令后执行相应的伺服功能&#xff0c;其次控 制器将驱动信号传输至功率板驱动电机&#xff0c;最后控制器采集反馈信息进行闭环…

了解PostgreSQL sql shell和VACUUM命令

从SQL Shell进入PostgreSQL&#xff1b;没用过这东西&#xff0c;看一下&#xff1b; 一直回车&#xff1b;最后输入口令就登入了&#xff1b;此时是登入默认的数据库postgres&#xff1b;这个数据库是默认安装的&#xff1b; 看一下有没有表&#xff0c;根据资料可以用 \d 或…

大坝安全监测中需要做好检查监测

大坝安全监测是人们了解大坝运行状态和安全状况的有效手段和方法。它的目的主要是了解大坝安全状况及其发展态势&#xff0c;是一个包括由获取各种环境、水文、结构、安全信息到经过识别、计算、判断等步骤&#xff0c;最终给出一个大坝安全 程度的全过程。 此过程包括&#xf…

Linux中常用的监控性能的命令(sar、mpstat,vmstat, iostat,)详解

Linux中常用的监控性能的命令有&#xff1a; sar&#xff1a;能查看CPU的平均信息&#xff0c;还能查看指定CPU的信息。与mpstat相比&#xff0c;sar能查看CPU历史信息 mpstat&#xff1a;能查看所有CPU的平均信息&#xff0c;还能查看指定CPU的信息。 与sar相比&#xff0c…

九五从零开始的运维之路(其二十)

[TOC](文章目录) 文章目录 前言一、LAMP是什么二、配置环境及安装1.配置yum源2.关闭防火墙、网络图形化工具及SElinux3.安装软件包 三、配置apache服务器内容四、启动服务五、访问验证总结 前言 本篇将简述的内容&#xff1a;Linux系统下的LAMP平台部署 基于discuz框架的论坛搭…

阿里云无影云电脑价格_企业办公型1元_云桌面入口

阿里云无影云电脑配置费用&#xff0c;4核8G企业办公型云电脑可以免费使用3个月&#xff0c;无影云电脑地域不同费用不同&#xff0c;无影云电脑是由云桌面配置、云盘、互联网访问带宽、AD Connector、桌面组共用桌面session等费用组成&#xff0c;阿里云百科分享阿里云无影云电…

中文数据下载

研究AI离不开数据&#xff0c;数据库可以说是AI的半壁天下。有链接的数据库下载是很nice的。 语音数据集整理 目录 1.Mozilla Common Voice. 2 2.翻译和口语音频的大型数据库Tatoeba. 2 3.VOiCES Dataset 3 4. LibriSpeech. 4 5.2000 HUB5 English&#xff1a;... 4 6.…

如何用Three.js + Blender打造一个web 3D展览馆

作者&#xff1a;vivo 互联网前端团队- Wei Xing 运营活动新玩法层出不穷&#xff0c;web 3D炙手可热&#xff0c;本文将一步步带大家了解如何利用Three.js和Blender来打造一个沉浸式web 3D展览馆。 一、前言 3D展览馆是什么&#xff0c;先来预览下效果&#xff1a; 看起来像…

Linux离线环境Jenkins部署SpringBoot

Jenkins服务器 把Jar包上传到Linux服务器的/jenkins/目录下 Dashboard----》新建任务----》构建一个自由风格的软件项目----》test 修改jenkins工作空间 新建构建前执行命令stop.sh&#xff0c;停止SpringBoot并备份 &#xff08;这里是目标服务器&#xff0c;即部署项目的…

激斗云计算:互联网大厂打响新一轮排位战

大模型如同一辆时代列车&#xff0c;所有科技大厂都想上车。 自去年底ChatGPT一炮而红&#xff0c;国内外数十家科技大厂、创业公司、机构相继下场&#xff0c;一时间掀起大模型的热浪。 《中国人工智能大模型地图研究报告》显示&#xff0c;截至今年5月28日&#xff0c;中国…

第八章:SegNet——一个用于强大的语义像素级标注的深度卷积编码-解码架构

0.摘要 我们提出了一种新颖的深度架构SegNet&#xff0c;用于语义像素级图像标注。SegNet具有一些吸引人的特性&#xff1a; (i)它只需要对完全学习的函数进行前向评估&#xff0c;就可以获得平滑的标签预测&#xff1b; (ii)随着深度增加&#xff0c;像素标注考虑了更大的上下…

SpringBoot+actuator和admin-UI实现监控中心

使用SpringBoot很久了&#xff0c;但是很少使用到SpringBoot的查看和监控&#xff0c;将来八成也不会用到&#xff0c;万一有机会用到呢&#xff1f;所以记录一下以前学习SpringBootactuator和adminUI实现监控中心的方式 Springboot的版本2.0.x <parent><groupId>…

keepalived安装配置详解

文章目录 高可用介绍keepalived安装、使用vip漂移抓包脑裂脑裂有没有危害&#xff1f;如果有危害对业务有什么影响&#xff1f; keepalived架构双vip架构 Healthcheck实现 notifyVRRP选举格式 高可用 介绍 高可用性&#xff08;High Availability&#xff09;是指系统或服务能…

Word2Vec实现文本识别分类

深度学习训练营之使用Word2Vec实现文本识别分类 原文链接环境介绍前言前置工作设置GPU数据查看构建数据迭代器 Word2Vec的调用生成数据批次和迭代器模型训练初始化拆分数据集并进行训练 预测 原文链接 &#x1f368; 本文为&#x1f517;365天深度学习训练营 中的学习记录博客&…

pycharm新建分支并提送至GitHub

文章目录 前言pycharm创建本地分支Push至远程分支 前言 当我们写的项目代码越来越多时&#xff0c;一个master分支无法满足需求了&#xff0c;这个时候就需要创建分支来管理代码。 创建分支可以快速的回滚到某个节点的版本&#xff0c;也可以多个开发者同时开发一个项目&#…