AOP在PowerJob中的使用,缓存锁保证并发安全,知识细节全总结

news/2024/4/20 9:57:21/文章来源:https://blog.csdn.net/z449077880/article/details/129260759

这是一篇简简单单的文章,需要你简简单单看一眼就好,如果有不明白的地方,欢迎留言讨论。

 

在之前的文章中出现过一次AOP的使用,就是在运行任务之前,需要判断一下,触发该任务执行的server,是不是数据库中对应任务所在app的直接server,使用的是注解@DesignateServer,本篇文章是从另一个注解,再一次顺一遍AOP的使用,而且本篇文章的注解,再一次用到了可重入锁ReentrantLock,这个也是之前的文章中说的内容,可以再熟悉一遍,本篇文章的入口就是注解——UseCacheLock。

从名字来看,该注解是一个使用缓存时的一个锁,该类位于tech.powerjob.server.core.lock包下,用来修饰方法,在运行时执行的,源码如下:

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface UseCacheLock {String type();String key();int concurrencyLevel();
}

type:从使用的代码处得出,目前只有两种一种是processJobInstance,另一种是processWfInstance

key:主要是任务id或者任务实例id,还有工作流id。其中任务id或者任务实例id的选取,是通过一个表达式来判断得出的。

concurrencyLevel:缓存要用到的字段,允许同时并发执行的写操作数。

UseCacheLock 的使用场景

该注解在powerjob中一共使用了8次,其中2次出现在任务的派发,6次出现在工作流的操作中,这一次就选在任务的派发,来讲一下该注解的使用场景。

@UseCacheLock(type = "processJobInstance", key = "#jobInfo.getMaxInstanceNum() > 0 || T(tech.powerjob.common.enums.TimeExpressionType).FREQUENT_TYPES.contains(#jobInfo.getTimeExpressionType()) ? #jobInfo.getId() : #instanceId", concurrencyLevel = 1024)
public void dispatch(JobInfoDO jobInfo, Long instanceId) {... ...
}

方法内部的代码不重要,主要是来看方法上面的注解,里面的三个关键字分别是

processJobInstance

#jobInfo.getMaxInstanceNum() > 0 || T(tech.powerjob.common.enums.TimeExpressionType).FREQUENT_TYPES.contains(#jobInfo.getTimeExpressionType()) ? #jobInfo.getId() : #instanceId

1024

第一个和最后一个没什么好说的,主要说一说中间那一条长长的表达式,该表达式通过解读,就是判断最大同时运行任务数是否大于0,以及任务的时间表达式类型是不是FIX_RATE或者FIX_DELAY。这一表达式可以说,除非人为的将MaxInstanceNum设置为0,否则该条数据默认值就是1,也就是说这个表达式,不负责任的说,99.99%都是真,也就是说都会使用JobId作为key值。

按照代码来看,就是当任务在派发的时候,会使用到该注解,为的是防止该方法同时运行派发同一个任务,如果是同时派发两个不同的任务,就不会有影响,毕竟在派发的过程中涉及到了对任务实例的数据修改,如果两个同时进行,确实会产生问题。

UseCacheLock 的AOP处理

处理该注解的类个该类在同一个包,处理的源代码如下所示:

@Around(value = "@annotation(useCacheLock)")public Object execute(ProceedingJoinPoint point, UseCacheLock useCacheLock) throws Throwable {Cache<String, ReentrantLock> lockCache = lockContainer.computeIfAbsent(useCacheLock.type(), ignore -> {int concurrencyLevel = useCacheLock.concurrencyLevel();log.info("[UseSegmentLockAspect] create Lock Cache for [{}] with concurrencyLevel: {}", useCacheLock.type(), concurrencyLevel);return CacheBuilder.newBuilder().initialCapacity(300000).maximumSize(500000).concurrencyLevel(concurrencyLevel).expireAfterWrite(30, TimeUnit.MINUTES).build();});final Method method = AOPUtils.parseMethod(point);Long key = AOPUtils.parseSpEl(method, point.getArgs(), useCacheLock.key(), Long.class, 1L);final ReentrantLock reentrantLock = lockCache.get(String.valueOf(key), ReentrantLock::new);long start = System.currentTimeMillis();reentrantLock.lockInterruptibly();try {long timeCost = System.currentTimeMillis() - start;if (timeCost > SLOW_THRESHOLD) {final SlowLockEvent slowLockEvent = new SlowLockEvent().setType(SlowLockEvent.Type.LOCAL).setLockType(useCacheLock.type()).setLockKey(String.valueOf(key)).setCallerService(method.getDeclaringClass().getSimpleName()).setCallerMethod(method.getName()).setCost(timeCost);monitorService.monitor(slowLockEvent);log.warn("[UseSegmentLockAspect] wait lock for method({}#{}) cost {} ms! key = '{}', args = {}, ", method.getDeclaringClass().getSimpleName(), method.getName(), timeCost,key,JSON.toJSONString(point.getArgs()));}return point.proceed();} finally {reentrantLock.unlock();}
}

代码看着挺长的,但是内容其实没有多少,可以一步一步拆开来看。

缓存的创建

第一步通过type来获取缓存,从文章开头我们知道,这个type就两个类型,processJobInstance就是用来派发任务的,processWfInstance就是用来操作工作流任务的,该代码里面就是processJobInstance,如果缓存存在,直接拿来用,如果不存在,则创建缓存,来看一眼创建缓存的代码:

CacheBuilder.newBuilder().initialCapacity(300000).maximumSize(500000).concurrencyLevel(concurrencyLevel).expireAfterWrite(30, TimeUnit.MINUTES).build();

这个代码的大意就是创建一个有如下属性的缓存,缓存有效时间是30分钟(expireAfterWrite(30, TimeUnit.MINUTES)),这就像鱼有7秒记忆一样,这个缓存只能记录30分钟,过期失效。缓存的最大条目数是50万(maximumSize(500000))。指定用于缓存的hash table最低总规模是300000,允许同时并发操作数是concurrencyLevel,也就是传进来的1024.

key值的获取

第二步就是获取key值,该值主要是为了获取可重入锁用的,获取该值的源代码如下所示:

final Method method = AOPUtils.parseMethod(point);
Long key = AOPUtils.parseSpEl(method, point.getArgs(), useCacheLock.key(), Long.class, 1L);

从这个代码可以看到,用到了AOPUtil这个工具类的两个方法,第一个方法是解析出当前的方法,第二个是获取key值,这个AOPUtil在tech.powerjob.server.common.utils包下。解析方法的源码如下,备注解释各代码的目的:

public static Method parseMethod(ProceedingJoinPoint joinPoint) {//获取接入点的签名,此处必须是方法的签名,否则会报异常Signature pointSignature = joinPoint.getSignature();if (!(pointSignature instanceof  MethodSignature)) {throw new IllegalArgumentException("this annotation should be used on a method!");}//强转成方法的签名MethodSignature signature = (MethodSignature) pointSignature;//获取方法Method method = signature.getMethod();//如果方法所处的类是一个interfaceif (method.getDeclaringClass().isInterface()) {try {//通过IoC容器获取目标对象,然后再获取对象的方法method = joinPoint.getTarget().getClass().getDeclaredMethod(pointSignature.getName(), method.getParameterTypes());} catch (SecurityException | NoSuchMethodException e) {ExceptionUtils.rethrow(e);}}return method;
}

获取到了方法之后,就是获取key值,源代码如下,备注解释各代码的目的:

public static <T> T parseSpEl(Method method, Object[] arguments, String spEl, Class<T> clazz, T defaultResult) {//获取到方法的参数值类型String[] params = discoverer.getParameterNames(method);assert params != null;//创建数据上下文EvaluationContext context = new StandardEvaluationContext();for (int len = 0; len < params.length; len++) {//将param[len] = arguments[len]context.setVariable(params[len], arguments[len]);}try {//执行表达式,也就是前面#jobInfo.getMaxInstanceNum() > 0 || T(tech.powerjob.common.enums.TimeExpressionType).FREQUENT_TYPES.contains(#jobInfo.getTimeExpressionType()) ? #jobInfo.getId() : #instanceIdExpression expression = parser.parseExpression(spEl);//返回表达式执行的结果,以clazz设置的类型返回return expression.getValue(context, clazz);} catch (Exception e) {log.error("[AOPUtils] parse SpEL failed for method[{}], please concat @tjq to fix the bug!", method.getName(), e);return defaultResult;}
}

经过以上两步,key值就获取完毕了

加锁

加锁的源代码如下所示,就是如果缓存里面保存了锁,就直接拿到,如果没有,就new一个出来,然后就启动锁,

那两条时间主要是记录加锁的时间,如果时间过长就要记录一条日志,记录加锁慢时的任务信息。

final ReentrantLock reentrantLock = lockCache.get(String.valueOf(key), ReentrantLock::new);
long start = System.currentTimeMillis();
reentrantLock.lockInterruptibly();
try {long timeCost = System.currentTimeMillis() - start;... ...
}
... ...

加锁结束之后,就可以执行注解修饰的方法了,执行就是下面这一行:

point.proceed();

执行结束之后,将锁打开就OK了。

总结

 

本篇文章涉及的知识主要是AOP的使用,可重入锁的使用,IoC容器相关,Spring的表达式的使用,缓存Cache的创建,每一个知识点都够我喝一壶了,所以大家如果想要了解这些知识的细节,请自行搜索去查想要了解的内容,如果你懒得查,也可以问我,当然我也懒,回不回答就看我心情了,哼,我外号就叫不高兴,所以大家看着办吧。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_75686.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

AIGC被ChatGPT带火!底层基础算力有望爆发式增长

ChatGPT火爆全球的背后&#xff0c;可以窥见伴随人工智能技术的发展&#xff0c;数字内容的生产方式向着更加高效迈进。ChatGPT属于AIGC的具体应用&#xff0c;而AIGC是技术驱动的数字内容新生产方式。AIGC类产品未来有望成为5G时代新的流量入口&#xff0c;率先受益的有望是AI…

MySQL实战之深入浅出索引(下)

1.前言 在上一篇文章中&#xff0c;我们介绍了InnoDB索引的数据结构模型&#xff0c;今天我们再继续聊一下跟MySQL索引有关的概念。 在介绍之前&#xff0c;我们先看一个问题&#xff1a; 表初始化语句 mysql> create table T ( ID int primary key, k int NOT NULL DEFA…

03、SVN 建立版本库

SVN 建立版本库1 版本库2 版本库的建立步骤2.1 创建版本库的根目录2.2 创建子目录2.3 通过命令创建版本库2.4 生成目的介绍1 版本库 Subversion 是将文件数据信息保存到版本库中进行管理的Subversion 允许用户对版本库目录进行定制 2 版本库的建立步骤 2.1 创建版本库的根目…

RK3568平台开发系列讲解(驱动基础篇)Makefile 详解

🚀返回专栏总目录 文章目录 一、Makefile是什么二、Makefile 详解三、Makefile 语法沉淀、分享、成长,让自己和他人都能有所收获!😄 📢本篇将详细介绍Makefile。 一、Makefile是什么 如果只编译一个hello.c文件,非常简单,所以直接执行下面的指令非常方便: gcc hel…

Java List去重 Lis集合去重 List去重效率对比 List去重复元素效率对比 List去重效率

Java List去重 Lis集合去重 List去重效率对比 List去重复元素效率对比 List去重效率 --- List 去重复元素的几种办法 一、概述 面试的时候&#xff0c;有个常见的问题&#xff1a;“List集合如何去除重复元素”。 常见的回答是&#xff1a;“set集合&#xff0c;for循环对比&a…

KingbaseES V8R3 表加密

前言 透明加密是指将数据库page加密后写入磁盘&#xff0c;当需要读取对应page时进行加密读取。此过程对于用户是透明&#xff0c; 用户无需干预。 该文档进行数据库V8R3版本测试透明加密功能&#xff0c;需要说明&#xff0c;该版本发布时间早于V8R6&#xff0c;所以只能进行表…

SQLite安装及常用语句

SQLite简介SQLite 是一个软件库&#xff0c;实现了自给自足的、无服务器的、零配置的、事务性的 SQL 数据库引擎。SQLite 是在世界上最广泛部署的 SQL 数据库引擎。SQLite 源代码不受版权限制。SQLite安装官网下载 SQLite Download Page新建一个sqlite文件夹&#xff0c;将下载…

【Servlet篇2】创建一个web项目

在上一篇文章当中&#xff0c;已经提到了什么是Maven&#xff0c;以及如何使用maven从中央仓库下载jar包。【Tomcat与Servlet篇1】认识Tomcat与Maven_革凡成圣211的博客-CSDN博客Tomcat&#xff0c;mavenhttps://blog.csdn.net/weixin_56738054/article/details/129228140?spm…

2023年java春招面试题及答案

2023年java春招面试题1、下面有关jdbc statement的说法错误的是&#xff1f;2、下面有关JVM内存&#xff0c;说法错误的是&#xff1f;3、下面有关servlet service描述错误的是&#xff1f;4、下面有关servlet和cgi的描述&#xff0c;说法错误的是&#xff1f;5、下面有关SPRIN…

LeetCode 1237. Find Positive Integer Solution for a Given Equation【双指针,二分,交互】

本文属于「征服LeetCode」系列文章之一&#xff0c;这一系列正式开始于2021/08/12。由于LeetCode上部分题目有锁&#xff0c;本系列将至少持续到刷完所有无锁题之日为止&#xff1b;由于LeetCode还在不断地创建新题&#xff0c;本系列的终止日期可能是永远。在这一系列刷题文章…

开发场景中前端交付的对于后端数据的获取功能书写+页面简繁体转换+页面链接跳转新页面

1&#xff0c;开发场景中前端交付对于后端数据的获取功能书写 首先&#xff0c;我们明确基本逻辑概念&#xff0c;前端获取数据本质是利用ajax中的api接口来获取变量&#xff0c;再将其导入我们的data&#xff1b; 明确基本概念开发就可以进行ajax的定义 下文中e变量是获取前端…

全志T3+FPGA国产核心板——Pango Design Suite的FPGA程序加载固化

本文主要基于紫光同创Pango Design Suite(PDS)开发软件,演示FPGA程序的加载、固化,以及程序编译等方法。适用的开发环境为Windows 7/10 64bit。 测试板卡为全志T3+Logos FPGA核心板,它是一款基于全志科技T3四核ARM Cortex-A7处理器 + 紫光同创Logos PGL25G/PGL50G FPGA设计…

【观察】连续八年霸榜云数据库“领导者”,揭秘亚马逊云科技背后的“统治力”...

日前&#xff0c;全球市场分析机构 Gartner发布《2022 云数据库管理系统魔力象限》报告。其中&#xff0c;在Gartner本次魔力象限报告评估的20家供应商中&#xff0c;亚马逊云科技在纵轴“执行能力”和横轴“愿景完整性”两个维度分别处于最高、最右位置&#xff0c;这也是亚马…

ANTLR的IDE——ANTLRWorks2的安装及基本使用

1. ANTLRWorks2的简单介绍 ① ANTLR官网对ANTLRWorks2的介绍 ANTLRWorks 2.此IDE是ANTLR v3 / v4语法以及StringTemplate模板的复杂编辑器。 它可以运行ANTLR工具来生成识别器&#xff0c;并可以运行TestRig&#xff08;在命令行上运行&#xff09;来测试语法。 要将ANTLR生成…

Java内置队列和高性能队列Disruptor

一、队列简介 队列是一种特殊的线性表&#xff0c;遵循先入先出、后入后出&#xff08;FIFO&#xff09;的基本原则&#xff0c;一般来说&#xff0c;它只允许在表的前端进行删除操作&#xff0c;而在表的后端进行插入操作&#xff0c;但是java的某些队列运行在任何地方插入删…

EEGLAB处理运动想象脑电数据

最近在看论文时&#xff0c;经常看到作者处理数据的过程&#xff0c;之前都是一代而过&#xff0c;知道怎么处理就可以了&#xff0c;一直没有实践&#xff0c;最近需要一些特殊的数据&#xff0c;需要自己处理出来&#xff0c;这里尝试着自己用MATLAB处理数据&#xff0c;记录…

Kubernetes12:k8s集群安全机制 ***与证书生成***

Kubernetes12&#xff1a;k8s集群安全机制 1、概述 1&#xff09;访问一个k8s集群的时候&#xff0c;需要经过以下三个步骤才能完成具体操作 第一步&#xff1a;认证操作第二部&#xff1a;鉴权操作&#xff08;授权&#xff09;第三部&#xff1a;准入控制操作 2&#xff…

Java枚举详解

一.枚举 1.为什么有枚举&#xff1f; 如果我们的程序需要表示固定的几个值&#xff1a; 比如季节&#xff1a;spring (春)&#xff0c;summer(夏)&#xff0c;autumn(秋)&#xff0c;winter(冬) 用常量表示&#xff1a; public static final int SEASON_SPRING 1;public st…

记一次MySQL数据迁移到SQLServer全过程

为什么要做迁移&#xff1f; 由于系统版本、数据库的升级&#xff0c;导致测试流程阻塞&#xff0c;为了保证数据及系统版本的一致性&#xff0c;我又迫切需要想用这套环境做性能测试&#xff0c;所以和领导、开发请示&#xff0c;得到批准后&#xff0c;便有了这次学习的机会…

idea 安装JUnit单元测试框架

JUnit是一套专门用于java的单元测试框架&#xff0c;主要是测试方法 junit4官方网站&#xff1a; JUnit – About junit5官方网站&#xff1a;JUnit 5 框架依赖&#xff1a;junit-4.12.jar&#xff1b;hamcrest-core-1.3.jar 安装步骤&#xff1a; &#xff08;1&#xff…