项目亮点:请求合并-线程之间通信,解决高并发请求

news/2024/6/16 10:25:28/文章来源:https://blog.csdn.net/baidu_41907361/article/details/128883328

请求合并-线程之间通信

常见的使用场景:

在我们平时业务中,经常会遇到一些情况,请求频率很高,需要频繁请求第三方接口,或者需要频繁操作数据库。

比如,如下几个例子:

(1)电商系统,秒杀场景 ,需要频繁的去数据库修改库存。

(2)业务场景,当前接口需要频繁的调用三方接口,当三方接口有反爬虫,或者有固定时间请求次数限制的话,就会导致请求报错或者超时。

常见的方案:

已有的方案,比如先用redis进行缓存,再通过定时任务,将消息先存储在rabbitMQ中,然后在通过消费者,从rabbitMQ队列中异步消费。

D2CA7AF0-7965-4A9D-AD4D-B74F1A19A473_1_105_c

但是,这种情况,只是可以应对响应速度不需要很高的请求,针对一直页面上用户点击的请求来说,这种响应速度是很致命的。

那针对这种既需要请求快,有需要QPS高的情况,有什么解决方案呢?

相信看见文章标题,大家应该也都猜到了,我们可以通过请求合并的方式进行处理。

请求合并的要点:

将一段短暂时间内的请求,先进行阻塞,进行合并之后,一次性去处理,然后在拆分结果,最后唤醒被阻塞的请求。

请求合并的前提:

(1)如果是数据库操作,如果是插入、修改、删除,需要支持批量操作的sql语句,并且如果修改失败了,支持回滚;如果是查询,需要支持结果和请求的拆分,也就是要能够将查询结果进行拆分,可以将结果分配给每个请求。

(2)如果是请求第三方接口,三方接口要支持批量操作,同时请求和响应也需要有能够标识区分的字段,以便可以将结果进行拆分。

请求合并的流程图:

通过对请求和响应对象封装成请求响应体,然后将请求响应体,加入缓存队列后,对请求响应体wait()一段时间,让请求阻塞住,合并线程去缓存队列中定时去取一部分请求响应体,进行合并后,统一处理,然后再将结果进行拆分,将响应结果填入请求响应体中的响应后,最后,将已经封装结果的请求响应体notify()。注意,要确保这块的操作是在一个锁里面,防止下面响应结果还没有装进去,就已经返回了。

F23E44A2-8F4D-4558-A006-29766D17051A_1_105_c

请求合并的步骤:

(1)每新来的请求UserRequest,先将请求UserRequest和响应对象Result进行封装成请求响应体RequestPromise,存放到一个阻塞队列中,将当前线程wait固定时间(请求合并间隔时间 + 请求大概需要时间),防止超时。
(2)启动一个线程,用于请求合并,在固定时间内,将队列中的一部分(根据实际情况,不能超过三方接口的最大限制)请求响应体拿出。
(3)将请求信息进行合并,将所有请求中的retrieval列表拿出来组装成一个请求,因为每个retrieval中都有一个唯一的retrievalId,可以区分。
(4)先去请求第三方接口(或者去请求
(5)再拿到结果里面的FaceImageId,即memberId,去请求云库,获取人员信息。
(6)最终将结果进行拆分,根据retrievalId分到每个请求的响应中。
(7)最后将这一批请求线程notify唤醒。

注:

  • retrievalId 指的时,每个请求来的时候唯一的标识,需要和响应结果中retrieval列表对应。

请求合并模版的实现:

我们可以使用模版的设计模式,可以将请求合并的模版设计出来,后面在具体应用的时候,只需要将业务相关的代码实现即可,做到代码复用。

首先,我们先创建请求和响应的基类。

UserRequest.java

@Data
public class UserRequest {
}

UserRequest.java

@Data
@AllArgsConstructor
@NoArgsConstructor
public class Result {private Boolean success;private String msg;
}

再创建一个请求响应体的封装类RequestPromise,将请求和响应封装起来

RequestPromise.java

@Data
@AllArgsConstructor
@NoArgsConstructor
public class RequestPromise<T extends UserRequest,R extends Result> {private T userRequest;private R result;
}

到这我们的基本类就创建结束,下面我们实现请求合并模版类。

TreadMerge.java

import lombok.extern.slf4j.Slf4j;
import org.springframework.util.CollectionUtils;import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;@Slf4j
/**
* T 是请求
* R 是响应
* E 是合并请求后去共同请求后的响应体
*/
public abstract class TreadMerge<T extends UserRequest, R extends Result, E> {/*** 定义一个阻塞队列*/public BlockingDeque<RequestPromise<T, R>> requestQueue;/*** 入口操作,用户来调用,将用户请求先添加到缓存队列中** @param userRequest 一次的用户请求* @return 一次响应的结果* @throws InterruptedException*/public R entry(T userRequest) throws InterruptedException {RequestPromise requestPromise = new RequestPromise(userRequest, null);synchronized (requestPromise) {boolean enqueueSuccess = requestQueue.offer(requestPromise, getTaktTime(), TimeUnit.MILLISECONDS);if (!enqueueSuccess) {throw new ServiceException(BizCommonErrorCode.REQUEST_QUEUE_FULL);}try {requestPromise.wait(this.getTaktTime() + getRequestTime());if (requestPromise.getResult() == null) {throw new ServiceException(BizCommonErrorCode.CLIENT_ERROR);}} catch (InterruptedException e) {throw new ServiceException(BizCommonErrorCode.CLIENT_ERROR);}}return (R) requestPromise.getResult();}/*** 合并线程,将一段时间内的请求进行合并*/public void mergeJob() {//获取子类线程池,执行请求合并线程getMargeRequestExecutorService().execute(() -> {if (requestQueue == null) {requestQueue = new LinkedBlockingDeque<>(getRequestQueueSize());}while (true) {try {List<RequestPromise<T, R>> temp = new ArrayList<>(getTempQueueSize());if (requestQueue.isEmpty()) {Thread.sleep(100);continue;}int batchSize = requestQueue.size();log.info("------------requestQueue请求队列大小:{}", batchSize);for (int i = 0; i < batchSize; i++) {// 合并请求if (otherMergeCondition(requestQueue, temp)) {temp.add(requestQueue.poll());} else {break;}}if (!CollectionUtils.isEmpty(temp)) {log.info("------------temp中间队列大小:{}", temp.size());//合并之后的操作getHandleMargeRequestExecutorService().execute(() -> {Stopwatch stopwatch = Stopwatch.createStarted();mergeOperate(temp);temp.clear();stopwatch.stop();log.info("当前线程执行时间为{}", stopwatch.elapsed(TimeUnit.MILLISECONDS));});log.info("当前排队线程数{}", getHandleMargeRequestExecutorService().getQueue().size());log.info("当前活动线程数{}", getHandleMargeRequestExecutorService().getActiveCount());log.info("执行完线程数{}", getHandleMargeRequestExecutorService().getCompletedTaskCount());log.info("总线程数{}", getHandleMargeRequestExecutorService().getTaskCount());}Thread.sleep(getTaktTime());} catch (Exception e) {e.printStackTrace();}}});}/*** 合并请求额外的条件,默认返回true* 如有需要,子类可以自定义实现** @param requestQueue* @param temp* @return*/public boolean otherMergeCondition(BlockingDeque<RequestPromise<T, R>> requestQueue, List<RequestPromise<T, R>> temp){return true;}/*** 请求合并之后的处理** @param requestPromises* @return*/public void mergeOperate(List<RequestPromise<T, R>> requestPromises) {try {//将合并的线程,共同去请求E mergeResult = mergeAndHandle(requestPromises);//拆分响应,将结果塞到requestPromisessplitOperate(mergeResult, requestPromises);} catch (BusinessException e) {log.error("业务请求异常,异常信息:code:{}message:{}",e.getErrorCode(),e.getMessage());e.printStackTrace();} catch (Exception e) {log.error("请求合并之后的操作mergeOperate:{}",e.toString());e.printStackTrace();} finally {//将这批线程全部唤醒for (RequestPromise<T, R> requestPromise : requestPromises) {synchronized (requestPromise) {requestPromise.notify();}}}}/*** 参数合并的具体操作,并且处理操作** @param requestPromises* @return*/public abstract E mergeAndHandle(List<RequestPromise<T, R>> requestPromises) throws Exception;/*** 拆分响应结果mergeResult,将结果封装到请求中RequestPromise.result中** @param mergeResult     合并的结果* @param requestPromises 请求列表* @return*/public abstract void splitOperate(E mergeResult, List<RequestPromise<T, R>> requestPromises);/*** 继承此类必须实现的方法* 用于返回请求队列大小* @return*/protected abstract Integer getRequestQueueSize();/*** 继承此类必须实现的方法* 用于返回中间队列大小* @return*/protected abstract Integer getTempQueueSize();/*** 继承此类必须实现的方法* 用于返回请求间歇时间* @return*/protected abstract Long getTaktTime();/*** 继承此类必须实现的方法* 用于返回请求的大概时间,防止超时* @return*/protected abstract Long getRequestTime();/*** 继承此类必须实现的方法* 用于所需要的线程池* @return*/protected abstract ExecutorService getMargeRequestExecutorService();
}

具体应用场景:

以下只有部分逻辑,并不全,主要是看大概使用方法。

(1)首先,基于上面的请求、响应类实现具体类。

DtoUserRequest.java

/*** 封装请求信息*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class DtoUserRequest extends UserRequest {private BatchSearch search;
}

DtoResult.java

/*** 封装响应信息* @author <a href="zhang_gengying@dahuatech.com">zhang_gengying</a>* @Date: 2022/7/18* @Since 1.1.1*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class DtoResult extends Result {private ResultsVO<BatchRetrievalVO> result;
}

(2)再去实现请求合并的具体类

import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;import javax.annotation.PostConstruct;
import javax.servlet.http.HttpServletRequest;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;/*** 实现DTO请求合并**/
// 可以在配置文件中,设置以下变量值
@ConfigurationProperties(prefix = "dto.request.merge")
@Component
@Data
@Slf4j
public class DtoRequestMerge extends TreadMerge<DtoUserRequest, DtoResult, BatchRetrieval VO> {/*** 请求队列大小*/public Integer requestQueueSize = 1000;/*** 请求队列大小*/public Integer tempQueueSize = 64;/*** 请求合并间隔时间*/public Long taktTime = 500L;/*** 请求大概需要时间,防止超时*/public Long requestTime = 10000L;/*** 最大检索数*/public int maxRetrieval = 64;/*** 线程池,用于请求合并* / @Autowired@Qualifier("margeRequestExecutorService")private ExecutorService margeRequestExecutorService;/*** 在bean初始化结束后执行,启动合并请求线程 */@PostConstructpublic void initMode() {//启动合并请求线程mergeJob();try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}}/*** 自定义实现一些其他的合并条件* 计算temp列表是否有足够空间放入请求队列中第一个请求的retrieval列表** @param requestQueue* @param temp* @return*/@Overridepublic boolean otherMergeCondition(BlockingDeque<RequestPromise<DtoUserRequest, DtoResult>> requestQueue, List<RequestPromise<DtoUserRequest, DtoResult>> temp) {if (CollectionUtils.isEmpty(requestQueue)) {return false;}//拿到队列头的第一个元素final RequestPromise<DtoUserRequest, DtoResult> requestPromise = requestQueue.peekFirst();//获取队头第一个请求的 retrieval 列表的大小final int firstRequestRetrievalSize = Optional.ofNullable(requestPromise).map(RequestPromise::getUserRequest).map(DtoUserRequest::getSearch).map(BatchRetrievalRepositorySearch::getRetrieval).orElse(new ArrayList<>()).size();// 计算temp中已经存在的请求中的retrieval列表的之和final Integer tempHasSize = temp.stream().mapToInt(t ->Optional.ofNullable(t).map(RequestPromise::getUserRequest).map(DtoUserRequest::getSearch).map(BatchRetrievalRepositorySearch::getRetrieval).orElse(new ArrayList<>()).size()).sum();//返回是否有足够空间return firstRequestRetrievalSize <= (dtoMaxRetrieval - tempHasSize);}@Overridepublic DtoMergeResult mergeAndHandle(List<RequestPromise<DtoUserRequest, DtoResult>> requestPromises) {//将请求合并,封装到search对象中。BatchRetrievalRepositorySearch search = new BatchRetrievalRepositorySearch();//........// 合并请求后,一起请求try {ResultsDTO<BatchRetrievalRepositoryVO> resultsVO = requestDtoAndComplete(search, userDTO);return resultsVO;} catch (BusinessException e) {log.error("远程请求异常,异常信息如下:", e);throw e;} catch (Exception e) {throw e;}}/*** 合并请求信息后,具体的请求* @param search* @param userDTO* @return*/public ResultsDTO<BatchRetrievalRepositoryVO> requestDtoAndComplete(BatchRetrievalRepositorySearch search, UserDTO userDTO) {// 合并请求信息后,具体的请求}@Overridepublic void splitOperate(DtoMergeResult mergeResult, List<RequestPromise<DtoUserRequest, DtoResult>> requestPromises) {// 实现具体的拆分}}
	/*** 一个模拟的请求*/@PostMapping("/retrieval/repository/merge")public ResultsVO<BatchRetrievalRepositoryVO> retrievalRepository(@RequestBody BatchRetrievalRepositorySearch search) {Result dtoResult = null;DtoUserRequest DtoUserRequest = new DtoUserRequest(search);try {dtoResult = dtoRequestMerge.entry(DtoUserRequest);} catch (InterruptedException e) {e.printStackTrace();} finally {if (dtoResult == null) {throw new ServiceException(BizCommonErrorCode.CLIENT_ERROR);}return dtoResult.getResult();}}

以上,我们的请求合并的模版和实例就已经编写结束了,如果对您有帮助或者还存在异议的话,欢迎下面评论和讨论。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_253681.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

项目框架及多模块开发

目录 项目模式 技术栈 项目架构图 模块 案例演示 主模块 子模块 zmall-common子模块 zmall-user子模块 项目模式 电商模式&#xff1a;市面上有5种常见的电商模式&#xff0c;B2B、B2C、 C2B、 C2C、O2O; 1、B2B模式 B2B (Business to Business)&#xff0c;是指 商家与…

GcExcel-JAVA 6.0.3-Documents for Excel

在更短的时间内生成 Excel 电子表格&#xff0c;不依赖于 Excel&#xff01; 在任何应用程序中转换、计算、格式化和解析电子表格。快速高效&#xff1a;其轻巧的尺寸意味着 Documents for Excel 针对快速处理大型 Excel 文档进行了优化使用适用于 Windows、Linux 和 Mac 的 J…

5.5 多级放大电路的频率响应

在多级放大电路中含有多个放大管&#xff0c;因而在高频等效电路中就含有多个 Cπ′C_πCπ′​&#xff08;或 Cgs′C_{gs}Cgs′​&#xff09;&#xff0c;即有多个低通电路。在阻容耦合放大电路中&#xff0c;如有多个耦合电容或旁路电容&#xff0c;则在低频等效电路中就含…

【C++】—— 异常

目录 一、C语言传统的处理错误的方式 二、C异常的概念 三、异常的使用 1. 异常的抛出和捕获 2. 异常的重新抛出 3. 异常安全 4. 异常规范 四、自定义异常体系 五、C标准库的异常体系 六、异常的优缺点 一、C语言传统的处理错误的方式 传统的错误处理机制&#xff1a;…

下一代Jupyter Notebook?也太好用了吧...

程序员宝藏库&#xff1a;https://gitee.com/sharetech_lee/CS-Books-Store Jupyter Notebook以其交互式调试&#xff0c;支持markdown/latex&#xff0c;支持多种编程语言等优点&#xff0c;使得它在Python开发过程中具有很多不可取代的优势。 如果经常浏览GitHub会发现&…

arm32栈回溯原理学习以及示例代码

arm32栈回溯原理学习栈回溯原理缺点简单介绍下传统栈回溯原理&#xff0c;方便理解。栈回溯原理 如上图所示&#xff0c;是一个传统的arm架构下函数栈数据分布&#xff0c;需要编译选项-fno-omit-fram-pointer -mapcs -mno-sched-prolog 函数进入时&#xff0c;首先会 mov ip…

详细掌握java和数据结构中七大基于比较的排序算法基本原理及实现(非常详细的讲解)

常见的内部排序算法有&#xff1a;插入排序、希尔排序、选择排序、冒泡排序、归并排序、快速排序、堆排序、基数排序等。本篇文章将详细介绍一下七大算法&#xff01;&#xff01;&#xff01; 文章目录 文章目录 一、排序的概念及引用 1.1排序的概念 二、常见的七大基于比较的…

mysql搜索引擎

宏观图&#xff1a; 分为三个层次 客户端&#xff1a; 命令行、jdbc、navicat可视化工具都归属于客户端&#xff0c;用来提交SQL语句 mysqlserver&#xff1a; 要运行sql必须要启动mysql的服务&#xff0c;如果不启动的话是运行不了的 第三层-存储引擎&#xff1a;可以认…

视图存储过程存储函数

文章目录视图常见数据库对象视图概述为什么使用视图&#xff1f;视图的理解创建视图创建单表视图创建多表联合视图基于视图创建视图查看视图更新视图的数据一般情况不可更新的视图修改、删除视图修改视图删除视图总结视图优点视图不足存储过程&存储函数存储过程概述理解分类…

C语言学习笔记-指针

什么是指针&#xff1f; 指针也就是内存地址&#xff0c;指针变量是用来存放内存地址的变量。就像其他变量或常量一样&#xff0c;必须在使用指针存储其他变量地址之前&#xff0c;对其进行声明。指针变量声明的一般形式为&#xff1a; type *var_name;在这里&#xff0c;typ…

缓存的基本实现

目录1. 什么是缓存2. 缓存的作用3. 如何使用缓存4. 添加商户缓存4.1 缓存思路4.3 代码实现1. 什么是缓存 缓存(Cache),就是数据交换的缓冲区,俗称的缓存就是缓冲区内的数据,一般从数据库中获取,存储于本地代码。 2. 缓存的作用 缓存主要有两个作用 “避震”&#xff1a;在越…

Apache Oozie(1):Apache Oozie简介

1 Oozie 概述 Oozie 是一个用来管理 Hadoop 生态圈 job 的工作流调度系统。由Cloudera 公司贡献给 Apache。Oozie 是运行于 Java servlet 容器上的一个 java web 应用。Oozie 的目的是按照 DAG&#xff08;有向无环图&#xff09;调度一系列的 Map/Reduce或者Hive 等任务。Ooz…

javascript对象

JS对象是属性和方法的命名值的容器 他是易变的因为是通过引用寻址的 JS中除了原始值&#xff08;string&#xff0c;number&#xff0c;boolean&#xff0c;null&#xff0c;undefined&#xff09;之外都是对象 以名称值书写的对象类似于java中的哈希映射 对象创建方法&…

Python获取重庆市农场品行情

文章目录前言一、需求二、分析三、运行前言 本系列文章来源于真实的需求本系列文章你来提我来做本系列文章仅供学习参考 one:Leave a message at the end of the article two:Get wechat contact information 一、需求 http://sc.cqnync.cn/marketSta/ 获取数据&#xff0c;日…

效率倍增,VS Code瞬间好用多了

程序员宝藏库&#xff1a;https://gitee.com/sharetech_lee/CS-Books-Store 如果说评近期热度较高的技术产品&#xff0c;估计非ChatGPT莫属&#xff0c;甚至可以说是热度最高&#xff0c;没有之一的那种。 ChatGPT不仅在国内社交平台连续很多天的讨论度居高不下&#xff0c;在…

【javaEE】文件

目录 文件概念 文件路径 绝对路径 相对路径 文件类型 文本文件 二进制文件 Java中对文件的操作 对文件系统的操作 get相关方法 文件类型判断和创建 文件删除 文件目录的创建 文件重命名 对文件内容的操作 字符流(操作字符数据) 代码例子 删除文件 复制文件 …

DELL服务器R230旧磁盘换新磁盘,新磁盘复制旧磁盘所以数据

背景 该需求是因为公司服务器使用期限已满四年,服务器上内容比较重要,使用四年的磁盘有一定坏的风险,于是需要将旧磁盘的所有内容复制到新磁盘上,把新磁盘插上进行使用,旧磁盘拔除备用。 第一步 插入新磁盘阶段 盘位分布,我这边只有0和1的盘符位置,有一个是空的,在空…

WebRTC音频系统 peerconnection初始化

文章目录2.1 peerconnection conductor2.2 PeerConnectionFactory和PeerConnection2.2.1 CreatePeerConnectionFactory2.2.2 PeerConnection2.2.3 PeerConnection::Create2.3 Conductor::AddTracks2.3.1 LocalAudioSource2.3.2 AudioTrack创建2.3.4 AudioTrack绑定2.4 开启发送…

IDEA插件开发入门.01

环境准备Idea插件SDK文档在线地址&#xff1a;https://plugins.jetbrains.com/docs/intellij/welcome.html安装IntelliJ IDEA&#xff0c;这里使用版本2020.1.3 X64IDEA中安装Plugin DevKit插件创建插件项目新建工程。File ->New -> Project选择工程类型&#xff0c;Inte…

滑动窗口的最大值问题

今天解决一道算法中的滑动窗口问题&#xff0c;依次给出几种解决思路。 目录 题目描述 解题思路 方法一&#xff1a;暴力解法 方法二&#xff1a;辅助队列 方法三&#xff1a;大顶堆法 题目描述 给定一个数组 nums 和滑动窗口的大小 k&#xff0c;请找出所有滑动窗口里的…