技术学习-消息队列

news/2024/4/26 7:54:09/文章来源:https://blog.csdn.net/cjl13694270972/article/details/129144888

什么是消息队列

可以简单理解为存放消息的队列,数据结构模型和队列一样,都是先进先出。主要用不同线程(Thread)/进程(Process)

为什么需要消息队列

(1)不同进程之间传递消息是,因为进程的耦合度高,改动一个进程,引发必须修改另一个进程。为了隔离两个进程,因此需要在进程间抽离出一层(一个模块),所有两进程之间传递的消息,都必须通过消息队列来传递,不会影响另一个。
(2)在两个不停运行的线程中,一个线程对另一个线程,进行通信,往往容易因为消息量大,而导致另一个线程处理不过来,导致请求丢失或是请求顺序错乱。当出现多生产多消费者的情况时,这种消息错乱的问题会更加明显。因此,我们需要一个消息队列维护消息的规范。

消息队列的应用场景

(1)上游不关心下游处理结果的场景
这一类场景就是类似请求生产者-消费者的场景。这种场景往往就是生产者(只需要生产,将消息给到消费者,打印机拿到消息后只需要消费,并且生产者不需要等待回应结果。举例说,A线程是负责对数据进行计算分析,B线程是负责对A分析的结果进行打印输出的,A+B线程完成一个数据计算分析后,打印输出的任务。A完成一次计算分析任务是不需要去等待B的回应,A只管计算分析,B只管拿到A的结果输出。这种情况下去使用消息队列是比较合适的,A只需要把分析结果写入消息队列,B只需要拿消息队列里面的消息进行打印,这样两者是互不影响的,也能解决并发引起的顺序和丢失问题。
值得注意的是,消息队列传递的是线程的输入输出数据,而且上游线程不需要关注输出数据。

(2)基于数据驱动的场景
这两类任务往往是多个线程配合完成一个工作,而且每一个线程的处理时长都比较久,外部传进来一个data数据包,A线程进行操作1,B线程要等待A线程处理后的data数据包完成操作2,C线程也同样需要B线程的处理结果进行操作3。
这种场景最常用的解决方案,往往是根据经验统计每一个任务的执行时间,然后人工定制一个排班时间表,然后通过时间表执行,这样往往会出现执行时长变化导致的任务无法正常执行
这时候消息队列主要的任务不是传输输入输出数据,传输的是信号,是通知下游线程执行的信号。同样这种场景也是和场景(1)一样,生产者不需要等待回应结果。那为什么不用场景(1)那样去直接用消息队列去传输处理完的data包呢?根据不同的业务可能会有以下原因:a,data数据包体积大,用在消息队列上面传输影响存储性能,这种情况往往是data数据包在一个地方因为某些原因不方便转移,而ABC线程是分别取到那里进行处理的。

(3)上游关注下游执行结果,但是执行时间很长。
有时候上游需要关注执行结果,但是执行结果时间很长(典型的就是调用离线处理,或是跨公网调用),也常用回调网关+MQ来解耦。
例如:微信支付,跨公网调用微信接口,执行时间会比较长,但调用方式又非常关注执行结果

这时候的解决方案流程就是
1)调用方直接跨公网调用微信接口
2)微信返回调用成功,此时不代表返回成功
3)微信执行完成后,回调统一网关
4)网关将返回结果通知MQ
5)请求方收到通知结果

这里需要注意的是,不应该由回调网关调用上游来通知结果,如果是这样的话,每次新增调用方,回调网关都需要修改代码,仍然会反向依赖,使用回调网关+MQ的方案,新增任何对微信支付的调用,都不需要修改代码,因为微信支付直接把调用接口的行为结果返回给你,告诉调用者已经成功调用了支付功能,但是微信后台可能也存在一个消息队列去接收这些支出请求去处理,处理完成了才会通知网关回调通知,至于支付有没有真正成功,就由网关这边进行进行MQ通知。所以这种场景,上游得到的执行结果也不是真正的执行结果,只是通知主线程可以进行下一个异步处理的通知,因为也会有调用支付成功的返回,但是又因为内部系统原因,没有支付成功的情况。所以,消息队列并不适合上游需要实时处理结果的场景

总结:
什么时候使用MQ?
1)数据驱动的任务依赖
2)上游不关心多下游执行结果
3)异步返回执行时间长

其他思考

MQ的缺点

(1)系统复杂程度提升,逻辑变得复杂。同时引入MQ组件,将会对原来的系统设计,通信方式产生大变化。同时系统的改造需要花费大量的资源,同时也要承担引入新组件带来的系统不稳定性。
(2)系统可用性降低,引入mq组件后,通信模型将会发生改变,消息的传递将会依赖于MQ,假如MQ挂了,将会直接导致系统崩溃,这应该怎么处理
(3)一致性问题。一个任务需要ABCD四个系统轮流去处理,A 系统处理完了直接返回成功了,大家都以为你这个请求就成功了;但是问题是,要是 BCD 三个系统那里,BD 两个系统写库成功了,结果 C 系统写库失败了,咋整?这样就会导致数据不一致。

流量整形,削峰填谷

(1)为什么要流量整形?
流量冲击(高并发情况下带来的突发流量):上游调用方(push)不限速。很快就会把下游压垮,这种场景往往发生在上游逻辑容易突然暴增,而同时下游的操作非常多,完成一次需要较多的时间。例如:上游发起下单操作,下游完成秒杀业务逻辑(库存检查,库存加锁,余额检查,订单生成,余额扣减,库存扣减,生成流水,余额解锁,库存解锁),上游业务简单,每秒发生10000个请求,下游业务复杂,每秒只能处理2000个请求,上游不限速下单,会导致下游系统处理不了庞大信息量,引发雪崩。
(2)常见的优化方案
a.上游队列缓冲(put阻塞),限速发送
b.下游队列缓冲(定时或者批量拉取pull,可以起到削平流量),限速执行

ps:如果上流发送流量过大,mq提供拉模式可以起到下游自我保护的作用,会不会引起mq队列的消息堆积呢?
答:下游MQ-client拉取消息,消息接收方能够批量获取消息,需要下游消息接收方进行优化(提供批处理,比如批量写),否则整体吞吐量低,也会导致mq堆积。

高并发系统保护策略

1.缓存
缓存不单单能够提升系统访问速度,也是保护数据库,保护系统的有效方式。大型网站一般主要是”读“,数据先进数据库,然后再走缓存。在大型”写“系统中,先走缓存,在走数据库,对数据库进行批处理操作。(积累一些数据,批量写入;内存里面的缓存队列,mq像是缓存队列)
2.降级
根据服务器压力,指定某些服务或者页面的级别(需求不同,降级策略也不同),为此释放服务器资源,保证核心任务的正常运行
根据服务方式:可以拒接服务,可以延迟服务,也有时候可以随机服务。
根据服务范围:可以砍掉某个功能,可以砍掉某些模块
如果不是核心链路,那么就把这个服务降级掉。打个比喻,现在的APP都讲究千人千面,拿到数据后,做个性化排序展示,如果在大流量下,这个排序就可以降级掉!
3.限流
限制系统的输入和输出流量以达到保护系统的目的。一般来说系统的吞吐量是可以被测算的,一旦达到阈值,就需要限制流量。比如:延迟处理,拒绝处理,部分处理等等

实际场景中常用的限流策略:
(1)nginx前端限流,
按照一定的规则如ip,账号,调用逻辑等在nginx层面做限流
(2)业务应用系统限流
(a)客户端限流(验证码;获取动态请求路径pathvariable,达到接口地址隐藏的效果)
(b)服务端限流(redis限速器,延迟队列)
(3)数据库限流
数据库连接池化,mysql(如max_connections),redis(如tcp-backlog)都会有类似的限制连接数的配置

限流算法

(1)计算器

计算器是一种比较简单的限流算法,用途比较广泛,在接口层面,很多地方使用这种方式限流。在一段时间内,进行计数,与阈值进行比较,到了时间临界点,将计数清0.

示例代码:

public class CustomerDemo {private static long timeStamp = System.currentTimeMillis();//限制为1s内 限制100请求private static long limitCount = 100;private static long interval = 1000;//请求数private static long reqCount = 0;public static boolean grant(){ //判断是否需要限流long now = System.currentTimeMillis();if (now < timeStamp+interval){ //当前时间在1s内if (reqCount < limitCount){//当前请求的数量不超过最大限制数,允许请求++reqCount;return true;}else{//否则限流return false;}}else{//超过了当前时间,计数器清0 timeStamp = System.currentTimeMillis();reqCount = 0;return  false;}}public static void main(String[] args) {for (int i =0;i<500;i++){//模拟500个订单同时请求new Thread(new Runnable() {@Overridepublic void run() {if(grant()){System.out.println("执行业务逻辑");}else{System.out.println("限流");}}}).start();}}
}

这里需要注意的是,存在一个时间临界点缺陷的问题。举个栗子,在12:01:00到12:01:58这段时间内没有用户请求,然后在12:01:59这一瞬时发出100个请求,OK,然后在12:02:00这一瞬时又发出了100个请求。这里你应该能感受到,在这个临界点可能会承受恶意用户的大量请求,甚至超出系统预期的承受。

滑动窗口

由于计数器存在临界点缺陷,后来出现了 滑动窗口算法来解决
滑动窗口的意思是说把固定时间片,进行划分,并且随着时间的流逝,进行移动,这样就巧妙的避开了计数器的临界点问题。也就是说这些固定数量的可以移动的格子,将会进行计数判断阈值,因此格子的数量影响这滑动窗口算法的精度。
在这里插入图片描述

漏桶算法

虽然滑动窗口有效避免了时间临界点的问题,但是依然有时间片的概念,而漏桶算法在这方面比滑动窗口而言更加先进。
思想:有一个固定的桶,进水的速率是不确定的,但是出水的速率是恒定的,当水满的时候会溢出。

代码实现

public class LeakBucketDemo {//时间刻度private static long time = System.currentTimeMillis();//桶里面现在的水private static int water = 0;//桶的大小private static int size = 10;//出水率private static int rate = 3;public static boolean grant(){//计算出水数量long now = System.currentTimeMillis();int out = (int)((now - time)/700*rate);//出水量//漏水后的剩余water = Math.max(0,water-out);//避免剩余水量为负数time = now;if((water+1)<size){++water;return true;}else{return  false;}}public static void main(String[] args) {for (int i =0;i<500;i++){ //模拟500个订单同时请求new Thread(new Runnable() {@Overridepublic void run() {if(grant()){System.out.println("执行业务逻辑");}else{System.out.println("限流");}}}).start();}}
}

令牌桶算法

注意到,漏桶的出水速度是恒定的,那么意味着如果瞬时大流量的话,将有大部分请求被丢弃掉(也就是所谓的溢出)。为了解决这个问题,令牌桶进行了算法的改进
思想:令牌桶算法和漏桶算法不同的是,令牌桶是将以恒定的速度生成令牌放入到桶中,然后让请求过来了,拿到了令牌就请求,否则就丢弃;而漏桶是把请求放入漏桶,恒定速度去处理。这种方式可以避免了瞬间大流量而丢掉大量的请求。

在这里插入图片描述

生成令牌的速度是恒定的,而请求去拿令牌是没有速度限制的。这意味,面对瞬时大流量,该算法可以在短时间内请求拿到大量令牌,而且拿令牌的过程并不是消耗很大的事情。(有一点生产令牌,消费令牌的意味) 不论是对于令牌桶拿不到令牌被拒绝,还是漏桶的水满了溢出,都是为了保证大部分流量的正常使用,而牺牲掉了少部分流量,这是合理的,如果因为极少部分流量需要保证的话,那么就可能导致系统达到极限而挂掉,得不偿失。

代码实现

public class TokenBucketDemo {private static long time = System.currentTimeMillis();private static int createTokenRate = 3;private static  int size = 10;//当前令牌数量private static int token = 0;public static boolean grant(){long now  = System.currentTimeMillis();//当前时间需要生产令牌数int in = (int)((now-time)/50*createTokenRate);token = Math.min(size,token+in);time = now;if (token>0){--token;return true;}else{return false;}}public static void main(String[] args) {for (int i =0;i<500;i++){ //模拟500个订单同时请求new Thread(new Runnable() {@Overridepublic void run() {if(grant()){System.out.println("执行业务逻辑");}else{System.out.println("限流");}}}).start();}}
}

常见消息队列对比

在这里插入图片描述

C++实现简单消息队列

在目前的工作中也用到多线程技术去完成一些相关的业务,其中利用消息队列在多线程之间通信中起到了非常重要的作用。因为我现在工作主要是用c++,所以下面附上自己在工作中要到c++简单实现的消息队列代码
MessageQueue.h

#ifndef MESSAGEQUEUE_H
#define MESSAGEQUEUE_H#include <queue>
#include <map>
#include <string>
#include <pthread.h>#define MSG_QUIT                    0using namespace std;typedef struct {int code;void* data;
} MSG;typedef struct {pthread_mutex_t qmutex;pthread_cond_t qready;
} CondMutex;typedef struct {int id;string strJson;
} CONFIG_PACKET;class MessageQueue
{
public:MessageQueue();virtual ~MessageQueue();bool push(int code, void* data, int max_msg = 100);bool pop(MSG** pmsg);void wait(void);void wait(MSG** pmsg);protected:CondMutex m_mutex;queue<MSG*> m_msgs;map<int, int> msg_cnt;
};#endif // MESSAGEQUEUE_H

MessageQueue.cpp

#include "MessageQueue.h"MessageQueue::MessageQueue()
{pthread_mutex_init(&m_mutex.qmutex, NULL);pthread_cond_init(&m_mutex.qready, NULL);
}MessageQueue::~MessageQueue()
{pthread_mutex_destroy(&m_mutex.qmutex);pthread_cond_destroy(&m_mutex.qready);
}bool MessageQueue::push(int code, void* data, int max_msg)
{bool bSuccess = false;pthread_mutex_lock(&m_mutex.qmutex);int& cnt = msg_cnt[code];if (cnt < max_msg) {MSG* pmsg = new MSG;pmsg->code = code;pmsg->data = data;m_msgs.push(pmsg);cnt++;pthread_cond_signal(&m_mutex.qready);bSuccess = true;}pthread_mutex_unlock(&m_mutex.qmutex);return bSuccess;
}bool MessageQueue::pop(MSG** pmsg)
{*pmsg = NULL;pthread_mutex_lock(&m_mutex.qmutex);if (!m_msgs.empty()) {*pmsg = m_msgs.front();msg_cnt[(*pmsg)->code]--;m_msgs.pop();}pthread_mutex_unlock(&m_mutex.qmutex);return *pmsg != NULL;
}void MessageQueue::wait(void)
{pthread_mutex_lock(&m_mutex.qmutex);while (m_msgs.empty()) {pthread_cond_wait(&m_mutex.qready, &m_mutex.qmutex);}pthread_mutex_unlock(&m_mutex.qmutex);
}void MessageQueue::wait(MSG** pmsg)
{*pmsg = NULL;pthread_mutex_lock(&m_mutex.qmutex);while (m_msgs.empty()) {pthread_cond_wait(&m_mutex.qready, &m_mutex.qmutex);}*pmsg = m_msgs.front();msg_cnt[(*pmsg)->code]--;m_msgs.pop();pthread_mutex_unlock(&m_mutex.qmutex);
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_72096.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

npm 上传自己的包

mkdir demo 创建一个新的文件夹 npm init 初始化项目 生成一个package.json文件 name version description等等touch index.js 创建一个node 可执行脚本新的js 文件 #!/usr/bin/env node // 必须在文件头加如上内容指定运行环境为node console.log(hello cli)在package.json 中…

【教程】GitStats代码统计工具(附GitLab API相关)

使用GitStats进行代码统计 官方文档&#xff1a;GitStats - git history statistics generator GitStats是基于Git的数据统计生成器&#xff0c;输出格式为HTML&#xff0c;可直接在浏览器打开查看&#xff0c;展现为图表形式的可视化数据&#xff0c;内容包括&#xff1a; 常…

图像识别技术解析:手写数字识别(一)

本文通过构建一个手写数字识别的程序来解析来自机器学习与深度学习的不同算法的特点&#xff0c;以及如何对识别效果进行改进。 一、如何构建一个手写数字识别程序 首先可以考虑构建一个简单的页面用于用户输入&#xff0c;也就是前端&#xff1b;接下来需要准备一个后端用于…

mac 好用的类似Xshell工具

下载royal TSX 5.1.1 http://share.uleshi.com/f/9490615-685692355-33bf1e修改mac的etc/hosts文件权限访达(鼠标右键) -> 前往文件夹 ->输入/private --> 打开etc/hosts --> 显示简洁(鼠标右键) --> 权限改成读和写hosts文件写入如下内容&#xff1a;# Royal T…

空间直线方程及其与面线的夹角

一、空间直线的方程 1.1 空间直线的一般方程 空间直线 LLL 可以看做是两个平面 Π1\Pi_1Π1​ 和 Π2\Pi_2Π2​ 的交线&#xff0c;那么就可以用两个平面方程来表示这个直线&#xff1a; {A1xB1yC1zD10A2xB2yC2zD20(1)\left\{ \begin{aligned} A_1xB_1yC_1zD_10\\ A_2xB_2yC…

卷起来了,2023金三银四自动化测试面试题精选【字节二面】

面试一般分为技术面和hr面&#xff0c;形式的话很少有群面&#xff0c;少部分企业可能会有一个交叉面&#xff0c;不过总的来说&#xff0c;技术面基本就是考察你的专业技术水平的&#xff0c;hr面的话主要是看这个人的综合素质以及家庭情况符不符合公司要求&#xff0c;一般来…

Office 365 备份与恢复

Microsoft Office 365中的不同服务几乎可以随时访问&#xff0c;这要归功于Microsoft的99.9%正常运行时间记录。但是&#xff0c;Office 365步履蹒跚的一个方面是提供了一种从意外数据丢失中恢复的方法。Microsoft 提供的数据保留功能并非适用于所有数据丢失情况的可行解决方案…

简述操作系统的文件系统

前言 文件系统是操作系统中负责管理持久数据的子系统&#xff0c;将用户的文件保存在硬盘等硬件设备中&#xff0c;即使断电了数据也不会丢失。 对于用户而言&#xff0c;文件是存储的最小单位&#xff0c;再少的数据也需要以文件的形式存储在外部存储器中。以硬盘为例&#…

FLV-初学总结

FLV-初学总结 从零开始仅学习了一下午的总结&#xff0c;本文非常稚嫩… 本文为纯初学者的学习记录&#xff0c;为了方便理解&#xff0c;内容未必严谨&#xff0c;可以用作纯新手的入门了解篇。本文主要的参考链接如下⬇️ 详细了解FLV&#xff1a;FLV官方文档&#xff08;Ve…

论文解读 | [CVPR2019] 基于自适应文本区域表示的任意形状场景文本检测

目录 1 研究背景及意义 2 总体设计 3 方法论 3.1 自适应文本区域表示 3.2 文本建议 3.3 建议改进 4 损失函数 5 实验及结果 1 研究背景及意义 现有的场景文本检测方法使用固定点数的多边形来 表示文本区域。例如&#xff0c;水平文本使用2个点(左上/右下)表示文本区域&…

VR全景带你打卡《狂飙》经典取景地!

热度“狂飙”&#xff01;电视剧《狂飙》的取景地——江门墟顶老街人气火爆&#xff0c;720VR全景带您了解&#xff0c;这个具有新活力的老街区&#xff0c;蛙色3DVR提供技术支持&#xff01;通过航拍VR全景&#xff0c;全方位展示江门历史文化街区&#xff0c;720浏览&#xf…

【Java基础】反射

概述 引入 package ref;import java.io.FileInputStream;import java.io.FileNotFoundException;import java.io.IOException;import java.lang.reflect.Constructor;import java.lang.reflect.Field;import java.lang.reflect.InvocationTargetException;import java.lang.r…

Revit项目浏览器的标准设置应用和快速视图样板?

一、Revit项目浏览器的标准设置应用 设计院阶段的BIM应用&#xff0c;主要是Revit出施工图方面&#xff0c;需要涉及到很多标准的制定方面的问题&#xff0c;而且这个标准不仅仅是一个命名标准&#xff0c;还有很多的符合本院的出图标准等等&#xff0c;本期就不做详细讨论&…

实验室通风橱通风柜的构成

一、实验室通风橱通风柜简介通风柜是一个密闭的同时又能排风的工作空间。其设计目的是为了控制、稀释以及排除这个密闭空间内产生制造的烟气、气雾和微粒&#xff0c;同时它也是实验室预防泄露控制的重要组成部分。在大多数实验室中&#xff0c;通风柜是保护实验室操作者免受有…

vulnhub LordOfTheRoot_1.0.1

总结&#xff1a;端口敲门&#xff0c;CVE-2015-8660提权&#xff0c; 目录 下载地址 漏洞分析 信息收集 端口敲门 网站分析 方法一 ssh登录提权 方法二 下载地址 LordOfTheRoot_1.0.1.ova (Size: 1.6 GB)Download: http://www.mediafire.com/download/m5tbx0dua05szjm…

OpenGL学习日记之模型绘制

自己编译运行过程中遇到的一些问题 下载Assimp已编译的lib(因为我们公司的电脑有很多权限和限制&#xff0c;也不能自己安装一些没有报备的软件&#xff0c;所以愁方便我就没有用cMake自己编译了)找到一位免费分享的博主的。 https://blog.csdn.net/lady_killer9/article/deta…

【论文阅读】SCRFD: Sample and Computation 重分配的高效人脸检测

原始题目Sample and Computation Redistribution for Efficient Face Detection中文名称采样和计算 重分配的 高效人脸检测发表时间2021年5月10日平台ICLR-2022来源Imperial College&#xff0c; InsightFace文章链接https://arxiv.org/pdf/2105.04714.pdf开源代码官方实现&…

STM32开发(13)----获取唯一设备标识符UID

获取唯一设备标识符UID前言一、什么事UID二、实验过程1.CubeMx配置2.代码实现3.实验结果总结前言 这一章节介绍如何获取STM32芯片中的唯一的ID号的两种方法。 一、什么事UID 在许多项目中&#xff0c;识别设备是必要的。从简单的设备描述到更复杂的设备&#xff0c;如 USB 串…

uboot / linux添加/去除 版本号LOCALVERSION

背景 偶然的机会&#xff0c;在insmod驱动模块的时候&#xff0c;遇到报错&#xff1a; 查找原因&#xff0c;说是当前系统内核版本和模块编译使用版本不同&#xff01; 使用如下命令查看当前系统内核版本&#xff1a; uname -r 使用modinfo命令&#xff08;嵌入式设备没有此…

2022年中国前10电商GMV总结

我是卢松松&#xff0c;点点上面的头像&#xff0c;欢迎关注我哦&#xff01; 1&#xff0c;阿里8万亿;2&#xff0c;京东3万亿;3&#xff0c;拼多多3万亿;4&#xff0c;小程序私域电商3万亿;5&#xff0c;抖音电商1.4万亿。6&#xff0c;抖音本地生活服务电商600亿。7&#xf…