Java 线程池调度周期性任务“异常“探究

news/2024/5/18 22:52:53/文章来源:https://blog.csdn.net/zhuoxiuwu/article/details/133994167

背景

在做性能监控需求的过程中,会存在很多监控任务需要定时执行。比如在卡顿监控的实现中,我们需要每50ms 执行一次主线程调用栈的任务,此时由于任务本身可能存在一些耗时,可能需要对任务下次调度的时间做一些调整,避免相差太大。
这里以 Handler 执行为例最终实现的代码可能是这样的

public class MyHandler extends Handler {private long lastExecTime = System.nanoTime();private long delayTime = 50 * 1000000; // 转换为纳秒@Overridepublic void handleMessage(Message msg) {long startTime = System.nanoTime();// 执行任务doTask();// 计算下一次任务执行的延迟时间long timeDiff = System.nanoTime() - lastExecTime;long nextDelayTime = Math.max(delayTime - timeDiff, 0);// 记录本次执行时间和延迟时间lastExecTime = System.nanoTime();delayTime = nextDelayTime;// 发送下一次任务消息sendEmptyMessageDelayed(0, delayTime / 1000000); // 转换为毫秒}private void doTask() {// 需要执行的任务}
}

由于Handler通常是挂载到某一个指定的线程上,如果每个不同的定时任务(比如Cpu采集、内存采集任务)都创建一个Thread,会造成线程浪费,而如果每个定时任务的Handler同附属在到同一个线程Looper上,又可能会因为某一个任务执行比较耗时,影响了其他任务的调度,因此我们希望不同的采集任务之间尽可能隔离。

此时就需要使用到线程池方式来动态分配或复用线程,Java本身提供 ScheduledExecutorService� 类,它相比 ExecutorService 多支持了延迟执行任务或定时执行任务的能力。

public interface ScheduledExecutorService extends ExecutorService {public ScheduledFuture<?> schedule(Runnable command,long delay, TimeUnit unit);public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit);public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit);}

最终在Apm项目中我创建了一个全局通用的ScheduledExecutorService 对象来支持内部各个监控任务定时执行的需求,这里我最终使用的是 **scheduleAtFixedRate **来进行任务注册调度的。

问题

在某个性能采集功能上线后,偶然发现一些用户采集的数据异常,跟踪异常用户采集的性能日志发现,这些性能日志上报的时间异常,本来是10S执行一次的任务,但从收集的日志上看,1S内执行了几百次(出现异常的概率较低,平均每天1个设备左右)。
image.png
从日志time上可以确认,这并不是重复的日志,因此只可能是任务调度的间隔发生了异常,再Review相关代码后,确认配置的任务间隔参数值 不可能出现几毫米的情况。

在线下进行了多次自测后,未复现该问题。

源码分析

问题在线下无法复现,那么此时只能猜测是不是线程池内部的调度逻辑存在什么问题,因此需要深入分析下线程池执行定时任务这块的代码实现。并且这里刚好借此问题,学习下ScheduledThreadPoolExecutor是如何实现延迟及定时任务实现

首先由于要支持任务延迟执行的能力,因此在SchedudledThreadPoolExecutor构造函数中使用的是一个特殊的Queue:DelayedWorkQueue

    public ScheduledThreadPoolExecutor(int corePoolSize) {super(corePoolSize, Integer.MAX_VALUE,DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,new DelayedWorkQueue());}

DelayedWorkQueue 是一个支持优先级排序的Queue,为了提高新任务入队时的性能,内部并不是线性排序的,采取的是类似最小堆的方式存储。
当一个新的任务入队时,首先会调用DelayQueue.offer()方法,offer()函数先进行数组扩容的一些判断,如果数组长度为0,就直接插入到队首,如果不为0 则调用siftUp() ,siftUp()函数内部会基于最小堆的特性,将元素插入合适的位置, 这里判断优先级的方式是直接调用compare()函数

        public boolean offer(Runnable x) {if (x == null)throw new NullPointerException();RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;final ReentrantLock lock = this.lock;lock.lock();try {int i = size;if (i >= queue.length)grow();size = i + 1;if (i == 0) {queue[0] = e;setIndex(e, 0);} else {siftUp(i, e);}if (queue[0] == e) {leader = null;available.signal();}} finally {lock.unlock();}return true;}private void siftUp(int k, RunnableScheduledFuture<?> key) {while (k > 0) {int parent = (k - 1) >>> 1;RunnableScheduledFuture<?> e = queue[parent];//基于优先级排序if (key.compareTo(e) >= 0)break;//替换元素queue[k] = e;setIndex(e, k);k = parent;}queue[k] = key;setIndex(key, k);}

compare() 函数的实现是基于 ScheduledFutureTask�的 time属性进行排序,其中如果time 时间一样,则基于入队时间进行排序

        public int compareTo(Delayed other) {if (other == this) // compare zero if same objectreturn 0;if (other instanceof ScheduledFutureTask) {ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;long diff = time - x.time;if (diff < 0)return -1;else if (diff > 0)return 1;else if (sequenceNumber < x.sequenceNumber)return -1;elsereturn 1;}long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;}

因此,我们接下来分析对于scheduleAtFixedRate调度的任务,它的time是如何设置的。
在构造ScheduledFutureTask 对象时,任务的初始化执行时间是通过 triggerTime()计算的, triggerTime() 内部以
System.nanoTime()为时间基线,计算下一次任务的执行时间。

    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit) {if (command == null || unit == null)throw new NullPointerException();if (period <= 0L)throw new IllegalArgumentException();ScheduledFutureTask<Void> sft =new ScheduledFutureTask<Void>(command,null,// 调用triggerTime 设置任务下一次执行的时间triggerTime(initialDelay, unit),unit.toNanos(period),sequencer.getAndIncrement());RunnableScheduledFuture<Void> t = decorateTask(command, sft);sft.outerTask = t;delayedExecute(t);return t;}/*** Returns the nanoTime-based trigger time of a delayed action.*/long triggerTime(long delay) {return System.nanoTime() +((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));}

接下来我们分析 在任务插入到队列直到第一次被执行时的逻辑, 首先任务被执行时,会判断该任务是不是一个周期性任务,如果不是周期性任务,则执行后该任务就结束。 而如果是周期性执行的任务,会调用 runAndReset()函数。

        /*** Overrides FutureTask version so as to reset/requeue if periodic.*/public void run() {boolean periodic = isPeriodic();if (!canRunInCurrentRunState(periodic))cancel(false);else if (!periodic) //非周期性任务,直接调用super.run();else if (super.runAndReset()) { //周期性任务,调用runAndReset()//更新任务的time属性setNextRunTime();//重新入队,等待下次调度reExecutePeriodic(outerTask);}}protected boolean runAndReset() {if (state != NEW ||!RUNNER.compareAndSet(this, null, Thread.currentThread()))return false;boolean ran = false;int s = state;try {Callable<V> c = callable;if (c != null && s == NEW) {try {c.call(); // don't set resultran = true;} catch (Throwable ex) {setException(ex);}}} finally {// runner must be non-null until state is settled to// prevent concurrent calls to run()runner = null;// state must be re-read after nulling runner to prevent// leaked interruptss = state;if (s >= INTERRUPTING)handlePossibleCancellationInterrupt(s);}return ran && s == NEW;}

runAndReset()函数内部,会执行原始的调度任务,这里的reset主要是判断和重置 的一些状态。比如如果发现目前的state不是NEW,则说明该任务已经被取消了,就不进行原始任务的执行。 另外由于是一个周期性任务,任务执行后,并不会把该Task的状态设置为 COMPLETING
在任务执行完成后,会调用 setNextRuntime() 重置任务下一次执行的时间,并且将任务重新offer到队列中。

    /*** Sets the next time to run for a periodic task.*/private void setNextRunTime() {long p = period;if (p > 0)time += p;elsetime = triggerTime(-p);}

这里period >0时,会用Task当前的time + period 计算下一次任务执行的时间,
而 period <0 时,是调用 triigerTime(-p) 计算时间的, triggerTime()函数内部计算时间上面讲过,是用
当前的System.naonoTime() 作为基准时间计算的。
image.png
初次看到这里时,肯定会疑惑 **period**为负数又是个什么情况,这里跟踪代码后发现,
原来 scheduleAtFixedRate()scheduleWithFIxedDelay() 2个函数实现时的唯一区别,就是
scheduleWithFixedDelay时会将传入的 period 设置为负数,也就说底层通过 period 的正负来判断开发者调用的是哪个函数,封装通用逻辑函数时,省去了一个参数。
image.png
代码分析到这里,我们也就理解了这2个函数计算任务下一次调度时间的不同点:

  • scheduleAtFixedRate�():计算任务下一次执行的时间,不是根据当前基准时间计算的,而是上一次任务设置的time的值
  • scheduleWithFIxedDelay():计算任务下一次执行的时间,是以当前基准时间计算的

此时再回到 系统对 shceudleAtFixRate函数的定义上,其实在函数注释上已经解释的很清楚了。

Submits a periodic action that becomes enabled first after the given initial delay, and
subsequently with the given period; that is, executions will commence after initialDelay,
then initialDelay + period, then initialDelay + 2 * period, and so on.

image.png
任务初始执行的时间是基于设置的延迟时间开始执行的,之后每一次执行的时间也是基于该初始时间调度的,
比如初始时间是 1000,周期是1000, 那么后续任务的执行时间正常情况下应该是 **2000 3000 4000**
而 shceduleAtFixedDelay 是以任务执行结束后,后System.nanotime()重新计算下次调度时间的。
scheduleAtFixedRate()** **这种以任务初始时间作为计算任务调度时间会有什么情况的现象出现呢?

问题复现

从上文中的实现中,可以发现Java线程池对于周期性任务,下一次的任务调度会依赖于上一次任务的执行结束,如果任务的执行时间,超过任务设置的间隔时间,那么后续任务执行的间隔,会变成任务的执行耗时为间隔

这里写了个Demo测试下,创建一个执行耗时为5S的任务,设置任务执行间隔为2S
image.png此时间戳上,可以发现 最终任务其实没有按2秒的时间执行。

不过这个案例和我在线上遇到的案例不一样,线上的场景所执行的任务通常只需要耗时几十毫秒,执行的间隔为10S。 但是我们要知道,线上环境和线下执行的环境可能是不一样,在线下这个任务可能只有几十毫秒,但线上会不会因为各种异常情况,导致这个任务在某一次执行的时候超时很多? 比如因为时间片分配不足,或者是其他特殊情况,比如GC暂停了线程?。如果是这种情况,理论上是会出现由于某一次超时,导致任务的 time 属性一直< System.nanoTime(), 然后又由于后续任务的执行耗时可能又恢复正常,导致任务无限被调度,当然每一次调度后 time = time + period, 直到任务的time属性值> System.nanoTime()才停止!

下面这个Demo,我默认任务调度时间为1S, 模拟第一次执行时耗时异常为15S, 可以发现后续十四次的任务调度都出现异常,次数~= 执行超时的时间/任务调度间隔。
从另一个角度来说,这保证了**shceudleAtFixRate**最终执行的次数 总是等于 时间间隔/任务调度间隔,不会因为某一次任务执行耗时较长,导致总次数变少。
image.png

解决方式

对我周期性性能采样的任务,我们可以接受某一次运行间隔异常,但无法接受 任务瞬间调度次数异常的情况,因为这可能会消耗大量的CPU或者其他资源。因此最终将任务调度改为使用 scheduleAtFixDelay()函数执行,并且记录任务执行时实际的间隔,如果间隔超过一定阈值,则可以根据任务的特性,选择丢弃本次采样的结果,或者是对数据结果进行一些校准。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_376414.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

有没有人声和背景音乐分离的神器?

做视频剪辑&#xff0c;二次创作的朋友&#xff0c;需要去除视频中的背景音乐&#xff0c;保留人声&#xff1b;或者去除人声&#xff0c;保留背景音乐。随着用户需求的增多&#xff0c;科技的发展&#xff0c;让我们能通过智能的人声分离软件来实现&#xff0c;下面就来看看如…

XPS的锂电池表征技巧-科学指南针

XPS作为一种表面表征手段&#xff0c;这是它一个很大的限制&#xff0c;同时也是它一个很大的优势&#xff0c;正所谓“其术专则其艺必精”。限于笔者专业背景&#xff0c;此次主要讲述XPS在锂离子电池中的应用。 而由于XPS测试是一种表面分析手段&#xff0c;故其在锂离子电池…

Qt中QFile、QByteArray QDataStream和QTextStream区别及示例

在Qt中&#xff0c;QFile、QByteArray、QDataStream和QTextStream是常用的文件和数据处理类。 主要功能和区别 QFile&#xff1a; QFile是用于读写文本和二进制文件以及资源的I/O设备。可以单独使用QFile&#xff0c;或者更方便地与QTextStream或QDataStream一起使用。 通常在…

最新2023版完美可用的聚合支付系统源码,全开源无后门,适合二开

最新2023版完美可用的聚合易支付系统源码&#xff0c;全开源无后门&#xff0c;真正安全可用。 更新日志&#xff1a; 1.新增微信公众号消息提醒功能 2.重构转账付款功能&#xff0c;支持通过插件扩展 3.商户后台新增代付功能 4.后台新增付款记录列表 5.支付宝插件新增预…

嵌入式养成计划-46----QT--简易版网络聊天室实现--QT如何连接数据库

一百一十九、简易版网络聊天室实现 119.1 QT实现连接TCP协议 119.1.1 基于TCP的通信流程 119.1.2 QT中实现服务器过程 使用QTcpServer实例化一个服务器对象设置监听状态&#xff0c;通过listen()函数&#xff0c;可以监听特定的主机&#xff0c;也可以监听所有客户端&#x…

C++23:多维视图(std::mdspan)

C23&#xff1a;多维视图&#xff08;std::mdspan&#xff09; 介绍 在 C23 中&#xff0c;std::mdspan 是一个非拥有的多维视图&#xff0c;用于表示连续对象序列。这个连续对象序列可以是一个简单的 C 数组、带有大小的指针、std::array、std::vector 或 std::string。 这…

外汇天眼:如何快速玩转外汇市场?这个技巧你必须知道!

在外汇市场中&#xff0c;决定交易成功与否的关键在于投资者的技能和知识扎不扎实&#xff0c;这对投资者获取利润至关重要。然而对于投资者来说&#xff0c;外汇交易市场又是一个复杂且多变的市场&#xff0c;要在外汇市场中获得成功并不容易&#xff0c;需要深入地了解、不断…

部署Vue项目到githubPage中

上传Vue项目到githubPage 例如: 看我发布的地址 前提条件 1. github上有一个仓库并且仓库下有两个分支(main 和 gh-pages) 1.1 main分支保存你的vue项目源码(react或者其他框架的都行) 1.2 gh-pages分支保存的是你项目打包之后的代码(如Vue项目打包完之后是个dist包,…

【Javascrpt】比较,逻辑运算符

目录 比较运算符 逻辑运算符 &&(与&#xff09; ||&#xff08;或&#xff09; 两真&#xff08;||左侧为真&#xff0c;||右侧为真&#xff09; 两假&#xff08;||左侧为假&#xff0c;右侧为假&#xff09; 一真一假&#xff08;||一侧为假&#xff0c;另一侧为…

数据集的特征提取

1、 特征提取 1.1、 将任意数据&#xff08;如文本或图像&#xff09;转换为可用于机器学习的数字特征 注&#xff1a;特征值化是为了计算机更好的去理解数据 字典特征提取(特征离散化)文本特征提取图像特征提取&#xff08;深度学习将介绍&#xff09; 2 特征提取API sklear…

物联网AI MicroPython传感器学习 之 GC7219点阵屏驱动模块

学物联网&#xff0c;来万物简单IoT物联网&#xff01;&#xff01; 一、产品简介 LED-8 * 32点阵屏显示板由 4 块单色 8x8 共阴红色点阵单元组成&#xff0c;通过 SPI 菊花链模式将多块显示屏连接后可以组成更大的分辨率显示屏幕&#xff0c;任意组合分辨率。可用于简单仪表显…

1、VMware虚拟机及网络配置

一、VMware虚拟网络编辑器 1、选择NAT模式并配置子网 2、进入NAT设置&#xff0c;配置网关 3、宿主机网络适配器设置 二、创建虚拟机 在这里插入图片描述 三、开启虚拟机&#xff0c;安装操作系统 在该网段内配置静态ip&#xff0c;指定网关为前面NAT配置的网关地址…

Java8 BiConsumer<T, U> 函数接口浅析分享(含示例,来戳!)

文章目录 Java8 BiConsumer<T, U> 函数接口浅析分享&#xff08;含示例&#xff0c;来戳&#xff01;&#xff09;源码accept 方法示例示例一示例二 andThen 方法示例示例一示例二 示例相关代码类dohandler 方法student.javaStudentScore.javaStudentScoreDto.java Java8…

运行报错(三)git bash报错fatal: detected dubious ownership in repository at

报错现象 在运行git 命令时&#xff0c;出现报错 “fatal: detected dubious ownership in repository at” 报错原因 文件夹的所有者和现在的用户不一致 栗子&#xff1a; 文件夹的所有者是root&#xff0c;而当前用户是admin 解决方案 方法一、 将文件夹的所有者替换成ad…

【JAVA学习一:基础语法】

记录学习过程和代码编写&#xff0c;小白纯属记录。 目录 一、运算符 二、数组 三、面向对象 一、运算符 赋值运算符 public class Demo01 { public static void main(String[] args){ System.out.println(11); System.out.println(1-1); System.out.printl…

【Javascript】运算符(赋值,算术,自增,自减)

目录 赋值 算术 单个变量&#xff1a; 多个变量&#xff1a; 在字符串&#xff0c;数组中充当连接符 自符串与字符串 数组与数组 数组与字符串 自增与自减 前置 自增 自减 后置 自增 自减 赋值 var a 1;算术 单个变量&#xff1a; var a 1;a 1;console.l…

【WCA-KELM预测】基于水循环算法优化核极限学习机回归预测研究(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

文心一言 VS 讯飞星火 VS chatgpt (119)-- 算法导论10.3 4题

四、用go语言&#xff0c;我们往往希望双向链表的所有元素在存储器中保持紧凑&#xff0c;例如&#xff0c;在多数组表示中占用前m 个下标位置。(在页式虚拟存储的计算环境下&#xff0c;即为这种情况。)假设除指向链表本身的指针外没有其他指针指向该链表的元素&#xff0c;试…

nginx安装详细步骤和使用说明

下载地址&#xff1a; https://download.csdn.net/download/jinhuding/88463932 详细说明和使用参考&#xff1a; 地址&#xff1a;http://www.gxcode.top/code 一 nginx安装步骤&#xff1a; 1.nginx安装与运行 官网 http://nginx.org/1.1安装gcc环境 # yum install gcc-c…

功能基础篇8——图形用户界面

图形用户界面 Graphics User Interface&#xff0c;GUI&#xff0c;图形用户界面 Ubuntu GUI Command Line Interface&#xff0c;CLI&#xff0c;命令行界面 Centos CLI tkinter GUI&#xff0c;Python标准库 from tkinter import ttk, Tkroot Tk() frm ttk.Frame(…