请求合并-线程之间通信
常见的使用场景:
在我们平时业务中,经常会遇到一些情况,请求频率很高,需要频繁请求第三方接口,或者需要频繁操作数据库。
比如,如下几个例子:
(1)电商系统,秒杀场景 ,需要频繁的去数据库修改库存。
(2)业务场景,当前接口需要频繁的调用三方接口,当三方接口有反爬虫,或者有固定时间请求次数限制的话,就会导致请求报错或者超时。
常见的方案:
已有的方案,比如先用redis进行缓存,再通过定时任务,将消息先存储在rabbitMQ中,然后在通过消费者,从rabbitMQ队列中异步消费。
但是,这种情况,只是可以应对响应速度不需要很高的请求,针对一直页面上用户点击的请求来说,这种响应速度是很致命的。
那针对这种既需要请求快,有需要QPS高的情况,有什么解决方案呢?
相信看见文章标题,大家应该也都猜到了,我们可以通过请求合并的方式进行处理。
请求合并的要点:
将一段短暂时间内的请求,先进行阻塞,进行合并之后,一次性去处理,然后在拆分结果,最后唤醒被阻塞的请求。
请求合并的前提:
(1)如果是数据库操作,如果是插入、修改、删除,需要支持批量操作的sql语句,并且如果修改失败了,支持回滚;如果是查询,需要支持结果和请求的拆分,也就是要能够将查询结果进行拆分,可以将结果分配给每个请求。
(2)如果是请求第三方接口,三方接口要支持批量操作,同时请求和响应也需要有能够标识区分的字段,以便可以将结果进行拆分。
请求合并的流程图:
通过对请求和响应对象封装成请求响应体,然后将请求响应体,加入缓存队列后,对请求响应体wait()一段时间,让请求阻塞住,合并线程去缓存队列中定时去取一部分请求响应体,进行合并后,统一处理,然后再将结果进行拆分,将响应结果填入请求响应体中的响应后,最后,将已经封装结果的请求响应体notify()。注意,要确保这块的操作是在一个锁里面,防止下面响应结果还没有装进去,就已经返回了。
请求合并的步骤:
(1)每新来的请求UserRequest,先将请求UserRequest和响应对象Result进行封装成请求响应体RequestPromise,存放到一个阻塞队列中,将当前线程wait固定时间(请求合并间隔时间 + 请求大概需要时间),防止超时。
(2)启动一个线程,用于请求合并,在固定时间内,将队列中的一部分(根据实际情况,不能超过三方接口的最大限制)请求响应体拿出。
(3)将请求信息进行合并,将所有请求中的retrieval列表拿出来组装成一个请求,因为每个retrieval中都有一个唯一的retrievalId,可以区分。
(4)先去请求第三方接口(或者去请求
(5)再拿到结果里面的FaceImageId,即memberId,去请求云库,获取人员信息。
(6)最终将结果进行拆分,根据retrievalId分到每个请求的响应中。
(7)最后将这一批请求线程notify唤醒。
注:
- retrievalId 指的时,每个请求来的时候唯一的标识,需要和响应结果中retrieval列表对应。
请求合并模版的实现:
我们可以使用模版的设计模式,可以将请求合并的模版设计出来,后面在具体应用的时候,只需要将业务相关的代码实现即可,做到代码复用。
首先,我们先创建请求和响应的基类。
UserRequest.java
@Data
public class UserRequest {
}
UserRequest.java
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Result {private Boolean success;private String msg;
}
再创建一个请求响应体的封装类RequestPromise,将请求和响应封装起来
RequestPromise.java
@Data
@AllArgsConstructor
@NoArgsConstructor
public class RequestPromise<T extends UserRequest,R extends Result> {private T userRequest;private R result;
}
到这我们的基本类就创建结束,下面我们实现请求合并模版类。
TreadMerge.java
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.CollectionUtils;import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;@Slf4j
/**
* T 是请求
* R 是响应
* E 是合并请求后去共同请求后的响应体
*/
public abstract class TreadMerge<T extends UserRequest, R extends Result, E> {/*** 定义一个阻塞队列*/public BlockingDeque<RequestPromise<T, R>> requestQueue;/*** 入口操作,用户来调用,将用户请求先添加到缓存队列中** @param userRequest 一次的用户请求* @return 一次响应的结果* @throws InterruptedException*/public R entry(T userRequest) throws InterruptedException {RequestPromise requestPromise = new RequestPromise(userRequest, null);synchronized (requestPromise) {boolean enqueueSuccess = requestQueue.offer(requestPromise, getTaktTime(), TimeUnit.MILLISECONDS);if (!enqueueSuccess) {throw new ServiceException(BizCommonErrorCode.REQUEST_QUEUE_FULL);}try {requestPromise.wait(this.getTaktTime() + getRequestTime());if (requestPromise.getResult() == null) {throw new ServiceException(BizCommonErrorCode.CLIENT_ERROR);}} catch (InterruptedException e) {throw new ServiceException(BizCommonErrorCode.CLIENT_ERROR);}}return (R) requestPromise.getResult();}/*** 合并线程,将一段时间内的请求进行合并*/public void mergeJob() {//获取子类线程池,执行请求合并线程getMargeRequestExecutorService().execute(() -> {if (requestQueue == null) {requestQueue = new LinkedBlockingDeque<>(getRequestQueueSize());}while (true) {try {List<RequestPromise<T, R>> temp = new ArrayList<>(getTempQueueSize());if (requestQueue.isEmpty()) {Thread.sleep(100);continue;}int batchSize = requestQueue.size();log.info("------------requestQueue请求队列大小:{}", batchSize);for (int i = 0; i < batchSize; i++) {// 合并请求if (otherMergeCondition(requestQueue, temp)) {temp.add(requestQueue.poll());} else {break;}}if (!CollectionUtils.isEmpty(temp)) {log.info("------------temp中间队列大小:{}", temp.size());//合并之后的操作getHandleMargeRequestExecutorService().execute(() -> {Stopwatch stopwatch = Stopwatch.createStarted();mergeOperate(temp);temp.clear();stopwatch.stop();log.info("当前线程执行时间为{}", stopwatch.elapsed(TimeUnit.MILLISECONDS));});log.info("当前排队线程数{}", getHandleMargeRequestExecutorService().getQueue().size());log.info("当前活动线程数{}", getHandleMargeRequestExecutorService().getActiveCount());log.info("执行完线程数{}", getHandleMargeRequestExecutorService().getCompletedTaskCount());log.info("总线程数{}", getHandleMargeRequestExecutorService().getTaskCount());}Thread.sleep(getTaktTime());} catch (Exception e) {e.printStackTrace();}}});}/*** 合并请求额外的条件,默认返回true* 如有需要,子类可以自定义实现** @param requestQueue* @param temp* @return*/public boolean otherMergeCondition(BlockingDeque<RequestPromise<T, R>> requestQueue, List<RequestPromise<T, R>> temp){return true;}/*** 请求合并之后的处理** @param requestPromises* @return*/public void mergeOperate(List<RequestPromise<T, R>> requestPromises) {try {//将合并的线程,共同去请求E mergeResult = mergeAndHandle(requestPromises);//拆分响应,将结果塞到requestPromisessplitOperate(mergeResult, requestPromises);} catch (BusinessException e) {log.error("业务请求异常,异常信息:code:{}message:{}",e.getErrorCode(),e.getMessage());e.printStackTrace();} catch (Exception e) {log.error("请求合并之后的操作mergeOperate:{}",e.toString());e.printStackTrace();} finally {//将这批线程全部唤醒for (RequestPromise<T, R> requestPromise : requestPromises) {synchronized (requestPromise) {requestPromise.notify();}}}}/*** 参数合并的具体操作,并且处理操作** @param requestPromises* @return*/public abstract E mergeAndHandle(List<RequestPromise<T, R>> requestPromises) throws Exception;/*** 拆分响应结果mergeResult,将结果封装到请求中RequestPromise.result中** @param mergeResult 合并的结果* @param requestPromises 请求列表* @return*/public abstract void splitOperate(E mergeResult, List<RequestPromise<T, R>> requestPromises);/*** 继承此类必须实现的方法* 用于返回请求队列大小* @return*/protected abstract Integer getRequestQueueSize();/*** 继承此类必须实现的方法* 用于返回中间队列大小* @return*/protected abstract Integer getTempQueueSize();/*** 继承此类必须实现的方法* 用于返回请求间歇时间* @return*/protected abstract Long getTaktTime();/*** 继承此类必须实现的方法* 用于返回请求的大概时间,防止超时* @return*/protected abstract Long getRequestTime();/*** 继承此类必须实现的方法* 用于所需要的线程池* @return*/protected abstract ExecutorService getMargeRequestExecutorService();
}
具体应用场景:
以下只有部分逻辑,并不全,主要是看大概使用方法。
(1)首先,基于上面的请求、响应类实现具体类。
DtoUserRequest.java
/*** 封装请求信息*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class DtoUserRequest extends UserRequest {private BatchSearch search;
}
DtoResult.java
/*** 封装响应信息* @author <a href="zhang_gengying@dahuatech.com">zhang_gengying</a>* @Date: 2022/7/18* @Since 1.1.1*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class DtoResult extends Result {private ResultsVO<BatchRetrievalVO> result;
}
(2)再去实现请求合并的具体类
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;import javax.annotation.PostConstruct;
import javax.servlet.http.HttpServletRequest;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;/*** 实现DTO请求合并**/
// 可以在配置文件中,设置以下变量值
@ConfigurationProperties(prefix = "dto.request.merge")
@Component
@Data
@Slf4j
public class DtoRequestMerge extends TreadMerge<DtoUserRequest, DtoResult, BatchRetrieval VO> {/*** 请求队列大小*/public Integer requestQueueSize = 1000;/*** 请求队列大小*/public Integer tempQueueSize = 64;/*** 请求合并间隔时间*/public Long taktTime = 500L;/*** 请求大概需要时间,防止超时*/public Long requestTime = 10000L;/*** 最大检索数*/public int maxRetrieval = 64;/*** 线程池,用于请求合并* / @Autowired@Qualifier("margeRequestExecutorService")private ExecutorService margeRequestExecutorService;/*** 在bean初始化结束后执行,启动合并请求线程 */@PostConstructpublic void initMode() {//启动合并请求线程mergeJob();try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}}/*** 自定义实现一些其他的合并条件* 计算temp列表是否有足够空间放入请求队列中第一个请求的retrieval列表** @param requestQueue* @param temp* @return*/@Overridepublic boolean otherMergeCondition(BlockingDeque<RequestPromise<DtoUserRequest, DtoResult>> requestQueue, List<RequestPromise<DtoUserRequest, DtoResult>> temp) {if (CollectionUtils.isEmpty(requestQueue)) {return false;}//拿到队列头的第一个元素final RequestPromise<DtoUserRequest, DtoResult> requestPromise = requestQueue.peekFirst();//获取队头第一个请求的 retrieval 列表的大小final int firstRequestRetrievalSize = Optional.ofNullable(requestPromise).map(RequestPromise::getUserRequest).map(DtoUserRequest::getSearch).map(BatchRetrievalRepositorySearch::getRetrieval).orElse(new ArrayList<>()).size();// 计算temp中已经存在的请求中的retrieval列表的之和final Integer tempHasSize = temp.stream().mapToInt(t ->Optional.ofNullable(t).map(RequestPromise::getUserRequest).map(DtoUserRequest::getSearch).map(BatchRetrievalRepositorySearch::getRetrieval).orElse(new ArrayList<>()).size()).sum();//返回是否有足够空间return firstRequestRetrievalSize <= (dtoMaxRetrieval - tempHasSize);}@Overridepublic DtoMergeResult mergeAndHandle(List<RequestPromise<DtoUserRequest, DtoResult>> requestPromises) {//将请求合并,封装到search对象中。BatchRetrievalRepositorySearch search = new BatchRetrievalRepositorySearch();//........// 合并请求后,一起请求try {ResultsDTO<BatchRetrievalRepositoryVO> resultsVO = requestDtoAndComplete(search, userDTO);return resultsVO;} catch (BusinessException e) {log.error("远程请求异常,异常信息如下:", e);throw e;} catch (Exception e) {throw e;}}/*** 合并请求信息后,具体的请求* @param search* @param userDTO* @return*/public ResultsDTO<BatchRetrievalRepositoryVO> requestDtoAndComplete(BatchRetrievalRepositorySearch search, UserDTO userDTO) {// 合并请求信息后,具体的请求}@Overridepublic void splitOperate(DtoMergeResult mergeResult, List<RequestPromise<DtoUserRequest, DtoResult>> requestPromises) {// 实现具体的拆分}}
/*** 一个模拟的请求*/@PostMapping("/retrieval/repository/merge")public ResultsVO<BatchRetrievalRepositoryVO> retrievalRepository(@RequestBody BatchRetrievalRepositorySearch search) {Result dtoResult = null;DtoUserRequest DtoUserRequest = new DtoUserRequest(search);try {dtoResult = dtoRequestMerge.entry(DtoUserRequest);} catch (InterruptedException e) {e.printStackTrace();} finally {if (dtoResult == null) {throw new ServiceException(BizCommonErrorCode.CLIENT_ERROR);}return dtoResult.getResult();}}
以上,我们的请求合并的模版和实例就已经编写结束了,如果对您有帮助或者还存在异议的话,欢迎下面评论和讨论。