MyBatis-Plus多数据源dynamic-datasource解决多线程情境下数据源切换失效问题

news/2024/5/20 17:31:31/文章来源:https://blog.csdn.net/l123lgx/article/details/130262218

前言:项目中使用MyBatis-Plus多数据源dynamic-datasource,完成多数据源的切换;但是在并发场景下,我们会发现线程会一直访问默认数据源(配置的Master数据),并没有访问我们在上一步切换后的数据源,之前切换的数据源失效了;显然多数据源对于并发的处理并不友好,那么我们怎么解决这个问题呢。

本文是在springboot项目已集成dynamic-datasource 基础上延伸的问题,项目集成多数据源可以参考:Idea+maven+spring-cloud项目搭建系列–13 整合MyBatis-Plus多数据源dynamic-datasource

1 问题产生的原因:

问题的产生来源于多数据源com.baomidou.dynamic.datasource.toolkit 包下DynamicDataSourceContextHolder 类的问题,当我们打开这个类,会发现,存储当前线程的数据源使用了 ThreadLocal:

package com.baomidou.dynamic.datasource.toolkit;import java.util.ArrayDeque;
import java.util.Deque;
import org.springframework.core.NamedThreadLocal;
import org.springframework.util.StringUtils;public final class DynamicDataSourceContextHolder {// 线程数据源的存储private static final ThreadLocal<Deque<String>> LOOKUP_KEY_HOLDER = new NamedThreadLocal<Deque<String>>("dynamic-datasource") {protected Deque<String> initialValue() {return new ArrayDeque();}};private DynamicDataSourceContextHolder() {}public static String peek() {//  访问数据库时 从队列中peek 出来数据源return (String)((Deque)LOOKUP_KEY_HOLDER.get()).peek();}// 放入要切换的数据源public static String push(String ds) {String dataSourceStr = StringUtils.isEmpty(ds) ? "" : ds;((Deque)LOOKUP_KEY_HOLDER.get()).push(dataSourceStr);return dataSourceStr;}// 从队列获取数据源public static void poll() {Deque<String> deque = (Deque)LOOKUP_KEY_HOLDER.get();deque.poll();if (deque.isEmpty()) {LOOKUP_KEY_HOLDER.remove();}}// 清除数据源public static void clear() {LOOKUP_KEY_HOLDER.remove();}
}

再来看下 NamedThreadLocal:

// 此处可以看到继承了 ThreadLocal 类
public class NamedThreadLocal<T> extends ThreadLocal<T> {private final String name;/*** Create a new NamedThreadLocal with the given name.* @param name a descriptive name for this ThreadLocal*/public NamedThreadLocal(String name) {Assert.hasText(name, "Name must not be empty");this.name = name;}@Overridepublic String toString() {return this.name;}}

简单概况下数据源的切换流程:
当我们进行数据源切换的时候,实际上是向当前线程所持有的LOOKUP_KEY_HOLDER 的ThreadLocal 对象放入数据源,这样在当前线程在进行数据库访问的时候,会得到当前的数据源,然后找到对应的jdbc 连接,完成数据的访问;
因为LOOKUP_KEY_HOLDER 对象是用ThreadLocal 修饰的,也就是说它是线程隔离的,所以当我们在切换完数据源之后,在子线程中维护的LOOKUP_KEY_HOLDER 是空的,再找不到数据源的情况下,就访问到了默认的数据源;

2 问题处理的思路:

既然是由于线程中保存数据源是每个线程隔离的,要想在并发的情形下仍然可以正常的数据源切换,要就需要打破其隔离性:
解决思路1:在开启线程执行任务时 ,先获取到父线程的数据源,然后在子线程内手动完成数据源的切换,保证子父线程数据源的一致性;
解决思路2:在项目中创建一个特殊的线程池,当有任务的执行时,进行拦截,获取父线程的数据源然后手动进行数据源的切换;
解决思路3:项目中覆盖DynamicDataSourceContextHolder 类修改LOOKUP_KEY_HOLDER 的对象,使得子线程在执行任务时,可以拿到父线程的数据源标识,这样也可以保证,子父线程访问数据源的一致性;改方法可以在不入侵原有业务代码的情况下,在业务开发者无感知的情况下,做到统一拦截并进行代理,完成父类数据源的传递;

3 问题解决的办法:

3.1 针对于解决思路1:
在执行线程任务时,进行手动的切换 demo:

// 获取当前父线程的数据源
String parentDb = "";new Thread(()->{// 切换数据源DynamicDataSourceContextHolder.push(parentDb ); try {// do some thing}finally {// 最后移除数据源DynamicDataSourceContextHolder.clear();}    }).start();
// 在子线程执行任务时

3.2 针对于解决思路2:
创建一个线程池,当执行任务时,都使用改线程池:
线程配置类:TaskExecutionConfig

import org.springframework.context.annotation.*;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;import java.util.concurrent.ThreadPoolExecutor;@Configuration
public class TaskExecutionConfig {// cpu 核心数private static final int DEFAULT_THREADS = Math.max(1, Runtime.getRuntime().availableProcessors());@Primary@Bean(name = {"taskHolderExecutorProxy", "executor"})public TaskHolderExecutorProxy threadPoolTaskExecutor() {ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();threadPoolTaskExecutor.setCorePoolSize(DEFAULT_THREADS);threadPoolTaskExecutor.setMaxPoolSize(DEFAULT_THREADS << 1);threadPoolTaskExecutor.setQueueCapacity(Integer.MAX_VALUE);threadPoolTaskExecutor.setKeepAliveSeconds(120);threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());threadPoolTaskExecutor.initialize();return new TaskHolderExecutorProxy(threadPoolTaskExecutor);}
}

线程执行任务时进行拦截进行数据源切换:TaskHolderExecutorProxy

import org.springframework.web.context.request.RequestAttributes;
import org.springframework.web.context.request.RequestContextHolder;import java.util.concurrent.Executor;/*** 使用多线程并行查询时,非主线程 尝试获取 用户上下文 (即httpServletRequest)时* 用户上下文为空,会导致 使用多线程查询的服务 无法使用多租户功能* 所以这个proxy在提交任务到线程池之前先保存线程的上下文,* 这样非主线程也能拿到主线程的用户上下文,从而使用多租户*/public class TaskHolderExecutorProxy implements Executor {/*** 被代理的线程池*/private final Executor executor;public TaskHolderExecutorProxy(Executor executor) {this.executor = executor;}@Overridepublic void execute(Runnable command) {
//		保存主线程的 用户上下文// RequestAttributes requestAttributes = RequestContextHolder.currentRequestAttributes();// 获取当前父线程的数据源String parentDb = "";executor.execute(() -> {//			为线程池 设置用户上下文//  RequestContextHolder.setRequestAttributes(requestAttributes);// 切换数据源DynamicDataSourceContextHolder.push(parentDb ); try {command.run();} finally {
//				清理线程池线程的上下文
//                RequestContextHolder.resetRequestAttributes();// 最后移除数据源DynamicDataSourceContextHolder.clear();}});}
}

3.3 针对于解决思路3:重写DynamicDataSourceContextHolder 类覆盖掉MyBatis-Plus 原有的类,并进行代理,在子线程任务执行之前放入父线程的数据源标识,并在子线程任务执行结束之后移除改数据源标识:
3.3.1 首先需要引入一个阿里的jar ,让其可以帮助我们将父线程ThreadLocal 修饰的常量,可以继承到子线程中:

  <!-- https://mvnrepository.com/artifact/com.alibaba/transmittable-thread-local --><dependency><groupId>com.alibaba</groupId><artifactId>transmittable-thread-local</artifactId><version>2.12.1</version></dependency>

3.3.2 重写 DynamicDataSourceContextHolder 类:
我们需要在项目中创建一个路径和MyBatis-Plus 下 DynamicDataSourceContextHolder 类 路径相同,类名相同的DynamicDataSourceContextHolder 类:
在这里插入图片描述
DynamicDataSourceContextHolder 中我们重新定义LOOKUP_KEY_HOLDER

package com.baomidou.dynamic.datasource.toolkit;import org.springframework.util.StringUtils;import java.util.ArrayDeque;
import java.util.Deque;public class DynamicDataSourceContextHolder {private static final ThreadLocal<Deque<String>> LOOKUP_KEY_HOLDER = new ChildThreadTreadLocal<Deque<String>>("dynamic-datasource") {protected Deque<String> initialValue() {return new ArrayDeque();}};private DynamicDataSourceContextHolder() {}public static String peek() {return (String)((Deque)LOOKUP_KEY_HOLDER.get()).peek();}public static String push(String ds) {String dataSourceStr = StringUtils.isEmpty(ds) ? "" : ds;((Deque)LOOKUP_KEY_HOLDER.get()).push(dataSourceStr);return dataSourceStr;}public static void poll() {Deque<String> deque = (Deque)LOOKUP_KEY_HOLDER.get();deque.poll();if (deque.isEmpty()) {LOOKUP_KEY_HOLDER.remove();}}public static void clear() {LOOKUP_KEY_HOLDER.remove();}
}

相同包路径下定义ChildThreadTreadLocal类:在该类中我们继承TransmittableThreadLocal 类帮我进行父子线程数据的传递

package com.baomidou.dynamic.datasource.toolkit;import com.alibaba.ttl.TransmittableThreadLocal;
import org.springframework.util.Assert;public class ChildThreadTreadLocal<T> extends TransmittableThreadLocal {private final String name;public ChildThreadTreadLocal(String name) {Assert.hasText(name, "Name must not be empty");this.name = name;}public String toString() {return this.name;}
}

3.3.3 对项目中所以线程任务的执行增加代理
在需要代理的项目跟路径下放入之前pom 下载到maven 仓库的transmittable-thread-local-2.12.1.jar 包
新建buildlocal 文件包,并放入transmittable-thread-local-2.12.1.jar 包:
在这里插入图片描述
3.3.4 项目启动的jvm 参数增加代理:
在这里插入图片描述
-javaagent:xxxx/buildlocal/transmittable-thread-local-2.12.1.jar

3.3.5 对于线上部署docker 时 ,在doker 容器启动时增加代理:
在这里插入图片描述
4 总结:

  • 针对方法1和方法2:都需要侵入代码进行数据源的切换和移除;
  • 针对方法3 因为重新了DynamicDataSourceContextHolder 并且对数据源对象LOOKUP_KEY_HOLDER 使用TransmittableThreadLocal 进行修饰,当启动项目是使用-javaagent:完成代理后,每次在子线程进行任务执行时子线程都可以获取到父线程中的数据源,从而保证了子父线程数据源的一致性,并且该方法不需要入侵原有的业务代码;

5 扩展:
在项目开启-javaagent:xxxx/buildlocal/transmittable-thread-local-2.12.1.jar 线程的代理后,测试ThreadLocal 数据的可见性:


import com.alibaba.ttl.TransmittableThreadLocal;
import com.cric.zhongjian.common.datasource.Master;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;@RestController
public class ThreadTestController {private final static ThreadLocal<String> threadLocal1 = new ThreadLocal<>();private final static ThreadLocal<String> threadLocal2 = new InheritableThreadLocal<>();private final static ThreadLocal<String> threadLocal3= new TransmittableThreadLocal<>();@Master@GetMapping("threadlocal")public void testThread() throws InterruptedException {List<String > a = Arrays.asList("100,200".split(","));threadLocal1.set("x");threadLocal2.set("y");threadLocal3.set("z");new Thread(()->{System.out.println( Thread.currentThread().getId()+":"+ Thread.currentThread().getName());System.out.println("ExecutorServicex threadLocal1.get() = " + threadLocal1.get());System.out.println("ExecutorServicex threadLocal2.get() = " + threadLocal2.get());System.out.println("ExecutorServicex threadLocal3.get() = " + threadLocal3.get());System.out.println("================== ");}).start();Thread.sleep(1000);threadLocal1.set("a");threadLocal2.set("b");threadLocal3.set("c");ExecutorService fixedThreadPool = Executors.newFixedThreadPool(1);fixedThreadPool.submit(()->{System.out.println( Thread.currentThread().getId()+":"+ Thread.currentThread().getName());System.out.println("ExecutorService1 threadLocal1.get() = " + threadLocal1.get());System.out.println("ExecutorService1 threadLocal2.get() = " + threadLocal2.get());System.out.println("ExecutorService1 threadLocal3.get() = " + threadLocal3.get());System.out.println("================== ");});Thread.sleep(1000);threadLocal1.set("1");threadLocal2.set("2");threadLocal3.set("3");fixedThreadPool.submit(()->{System.out.println( Thread.currentThread().getId()+":"+ Thread.currentThread().getName());System.out.println("ExecutorService2 threadLocal1.get() = " + threadLocal1.get());System.out.println("ExecutorService2 threadLocal2.get() = " + threadLocal2.get());System.out.println("ExecutorService2 threadLocal3.get() = " + threadLocal3.get());System.out.println("================== ");});Thread.sleep(1000);threadLocal1.set("aa");threadLocal2.set("bb");threadLocal3.set("cc");a.parallelStream().forEach(e->{System.out.println(Thread.currentThread().getName()+":parallelStream threadLocal1.get() = " + threadLocal1.get());System.out.println(Thread.currentThread().getName()+":parallelStream threadLocal2.get() = " + threadLocal2.get());System.out.println(Thread.currentThread().getName()+":parallelStream threadLocal3.get() = " + threadLocal3.get());System.out.println("================== ");});}
}

测试结果:

160:Thread-30
ExecutorServicex threadLocal1.get() = null
ExecutorServicex threadLocal2.get() = y
ExecutorServicex threadLocal3.get() = z
================== 
161:pool-9-thread-1
ExecutorService1 threadLocal1.get() = null
ExecutorService1 threadLocal2.get() = b
ExecutorService1 threadLocal3.get() = c
================== 
161:pool-9-thread-1
ExecutorService2 threadLocal1.get() = null
ExecutorService2 threadLocal2.get() = b
ExecutorService2 threadLocal3.get() = 3
================== 
http-nio-9201-exec-2:parallelStream threadLocal1.get() = aa
http-nio-9201-exec-2:parallelStream threadLocal2.get() = bb
http-nio-9201-exec-2:parallelStream threadLocal3.get() = cc
================== 
ForkJoinPool.commonPool-worker-3:parallelStream threadLocal1.get() = null
ForkJoinPool.commonPool-worker-3:parallelStream threadLocal2.get() = bb
ForkJoinPool.commonPool-worker-3:parallelStream threadLocal3.get() = cc
================== 

可以看到当使用TransmittableThreadLocal 修饰后,在项目中进行子线程任务的执行时,子线程都可以拿到父线程的ThreadLocal 数据;

6 参考:
6.1 TransmittableThreadLocal的使用及原理解析;
6.2 springboot springmvc 拦截线程池线程执行业务逻辑;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_102132.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

参展第六届中国城市轨道交通智慧运维大会 | 图扑软件

2022&#xff08;第六届&#xff09;中国城市轨道交通智慧运维大会在西安顺利举行。此次大会由现代轨道交通网联合中国机械工程学会设备智能运维分会主办&#xff0c;西安市轨道交通集团有限公司运营分公司、轨道交通工程信息化国家重点实验室(中铁一院)协办。来自行业学会、地…

STM32的GPIO重映射配置(解除下载端口的重映射)

在设计一个项目的时候&#xff0c;因为用的是STMF103C8T6&#xff0c;引脚较少&#xff0c;所以把可以用的GPIO都需要用上&#xff0c;但是由于下载的引脚在出生时&#xff0c;被厂家已经配置好了&#xff0c;所以我们得利用软件配置一下&#xff0c;使引脚变成正常的GPIO。 手…

R语言风险评分绘图

生信分析中&#xff0c;经常要建立分险模型&#xff0c;对每个患者进行分险评分&#xff0c;根据这些评分对患者进行分组&#xff0c;不同分组的预后差异很大。 ### 1. 构造数据 risk_df<- data.frame(samplespaste0("S",1:100),scorerunif(100,1,10),surv_time …

Vue监视数据的学习笔记

Vue监测数据变化的更新 <div id"monitor"><h2>人员列表</h2><button click"updateMei">更新马冬梅信息</button><ul><li v-for"(p,index) of persons" :key"index">{{p.name}}--{{p.age}}…

Selenium安装及环境配置

目录 一、Selenium 简介1. 组件2. 特点 二、安装Selenium✨三、下载对应版本的Chromedriver1.查看Chrome的版本号2.下载驱动 chromedriver和配置3.解压到本地4.复制文件放入python安装目录的Scripts文件夹中5.Selenium启动Chrome 一、Selenium 简介 1. 组件 Selenium IDE&…

QinQ技术与Portal技术

QinQ 802.1Q-in-802.1Q&#xff0c;是一种扩展VLAN标签技术。在城域网中&#xff0c;需要大量的VLAN来隔离区分不同的用户&#xff0c;但是原有的802.1Q只有12个比特&#xff0c;仅能标识4096个VLANQinQ即在802.1Q的基础上&#xff0c;再增加一层外层标签。使得可以标识4096*40…

项目结束倒数2

今天,解决了,多个点的最短路问题 用的dfs,配上了floyed计算出的广源距离 难点是要记录路线,dfs记录路线就很烦 但是好在结束了,经过无数的测试,确保没啥问题(应该把) 来看看我的代码 void dfs(int b[], int x, int* sum, int last, int sums, int a[], BFS& s, Floyd_A…

微信小程序uniapp基于Android的大学生社交论坛交流app系统

实现一个基于Android的社交APP小程序,一共3个身份&#xff0c;包括老师、学生和管理员&#xff0c;其中老师和学生在手机端注册登录&#xff0c;管理员在web端后台登录。学生和老师登录后可以查询通知新闻信息&#xff0c;收藏信息&#xff0c;查看好友推荐&#xff0c;论坛发帖…

antDesignPro6项目:供应链系统—实战问题解决汇总

系统使用的技术&#xff1a;antDesignPro6 Umi4 antDesign antDesignProComponents 其他技术 1、如何设置ModalForm组件&#xff0c;销毁时&#xff0c;自动重置表单&#xff1f; modalProps{{ destroyOnClose: true }} // 重置表单 答&#xff1a;给ModalForm组件添加mo…

React Native 9个好用的开发工具盘点

近几年在大前端的开发领域&#xff0c;选择跨端方案的公司和部门越来越多&#xff0c;曾一何时市面有不下10种跨端框架&#xff0c;但随着“生物进化论”的推动&#xff0c;目前市面上仅剩两种主流方案&#xff0c;就是经常听到的 React Native 和 Flutter。去年终于引来了 Rea…

【Docker01】入门

目录 概述 Docker平台 Docker可以做什么 快速、一致地交付应用程序 响应式部署和扩展 在同一硬件上运行更多工作负载 Docker架构 Docker守护程序&#xff08;The Docker daemon&#xff09; Docker客户端&#xff08;The Docker client&#xff09; Docker桌面&#x…

Redis框架与SpringBoot的整合及详细学习汇总

目录 springBoot整合Redis Redis 的优势 Redis安装 Redis数据类型 springboot操作Redis springboot 配置redis RedisTemplate及其相关方法 springBoot实现上传下载 RedisTemplate及其相关方法 springBoot实现上传下载 springBoot CORS&#xff08;跨域资源共享&#…

使用opencv进行场景识别

opencv场景识别 文章目录 opencv场景识别一、需求1、现状2、设想 二、模型使用1、opencv dnn支持的功能2、ANN_MLP相关知识3、图像分类模型训练学习4、目标检测模型5、opencv调用darknet物体识别模型 三、模型训练1、现状2、步骤-模型编译3、步骤-模型训练 一、需求 1、现状 …

按照条件向Spring容器中注册bean

1.Conditional注解概述 Conditional注解可以按照一定的条件进行判断&#xff0c;满足条件向容器中注册bean&#xff0c;不满足条件就不向容器中注册bean。 package org.springframework.context.annotation;import java.lang.annotation.Documented; import java.lang.annota…

9. 树的进阶

9. 树的进阶 ​ 之前我们学习过二叉查找树&#xff0c;发现它的查询效率比单纯的链表和数组的查询效率要高很多&#xff0c;大部分情况下&#xff0c;确实是这样的&#xff0c;但不幸的是&#xff0c;在最坏情况下&#xff0c;二叉查找树的性能还是很糟糕。 例如我们依次往二叉…

RelativeLayout相对布局

一、官方地址&#xff1a; https://developer.android.google.cn/reference/kotlin/android/widget/RelativeLayout?hlen 二、概述 相对布局&#xff08;RelativeLayout&#xff09;是一种根据父容器和兄弟控件作为参照来确定控件位置的布局方式 三、基本格式 <RelativeLay…

Jenkins配置邮箱发送报告

本文以qq邮箱为例 1.下载Email Extension Plugin插件 2.在Manage Jenkins--System&#xff0c;Jenkins Location下配置理员邮件 Extended E-mail Notification 下配置Jenkins SMTP server&#xff08;邮箱服务&#xff09;、SMTP Port&#xff08;邮箱端口&#xff09;、Cred…

无法解析的外部符号 __mingw_vsprintf

windows下的ffmpeg是采取mingw平台上编译&#xff0c;本人采用的是msys2&#xff0c;本人需要h264&#xff0c;于是先在msys2里面编译了x264静态库&#xff0c;注意这里是静态库&#xff0c;动态库经过了链接&#xff0c;不会出现下面的问题&#xff0c;然后在ffmpeg里面用下面…

Java项目打包exe运行文件

Java项目打包exe运行文件 JavaSE打包成exe运行文件的方法有很多种&#xff0c;此处我们主要讲解我常用的一种exe4j&#xff0c;打包前我们需要先安装exe4j这个工具。 注意&#xff1a;exe4j仅支持最低JDK1.8最高JDK11&#xff0c;所以在安装之前一定要查看自己的JDK版本&#…

【移动端网页布局】移动端网页布局基础概念 ⑤ ( 视网膜屏技术 | 二倍图概念 | 代码示例 )

文章目录 一、视网膜屏技术二、二倍图概念三、代码示例 一、视网膜屏技术 PC 端 和 早期的 移动端 网页中 , CSS 中配置的 1 像素 对应的就是物理屏幕中的 1 像素 ; Retina 视网膜屏幕 技术出现后 , 将多个物理像素压缩到一块屏幕中 , 可以达到更高的分辨率 , 画面显示效果更好…