APScheduler 定时任务模块实现

news/2024/4/30 17:42:51/文章来源:https://blog.csdn.net/qq_25305833/article/details/127169521

目录

定时任务简介

安装

一、调度器(schedulers)

二、任务存储器(job stores)

三、触发器

四、执行器

五、定时任务 调度配置

六、任务操作

七、异常监听与日志

八、示例代码

九、管理定时任务,实现对任务的增、删、改、查操作


定时任务简介

    APScheduler使用起来十分方便。提供了基于日期、固定时间间隔以及 crontab类型的任务。还可以在程序运行过程中动态的新增任务和删除任务。在任务运行过程中,还可以把任务存储起来,下次启动运行依然保留之前的状态。另外最重要的一个特点是,因为他是基于 Python语言的库,所以是可以跨平台的,一段代码,处处运行。

 

安装

pip install apscheduler

一、调度器(schedulers)

    任务调度器是属于整个调度的总指挥官。他会合理安排作业存储器、执行器、触发器进行工作,并进行添加和删除任务等。调度器通常是只有一个的。开发人员很少直接操作触发器、存储器、执行器等。因为这些都由调度器自动来实现了。

  1. BlockingScheduler :适用于调度程序是进程中唯一运行的进程,调用 start 函数会阻塞当前线程,不能立即返回。
  2. BackgroundScheduler :适用于调度程序在应用程序的后台运行,调用 start 后主线程不会阻塞。
  3. AsyncIOScheduler :适用于使用了 asyncio 模块的应用程序。
  4. GeventScheduler :适用于使用 gevent 模块的应用程序。
  5. TwistedScheduler :适用于构建 Twisted 的应用程序。
  6. QtScheduler :适用于构建 Qt 的应用程序。

二、任务存储器(job stores)

    任务存储器是可以存储任务的地方,默认情况下任务保存在内存,也可将任务保存在各种数据库中。任务存储进去后,会进行序列化,然后也可以反序列化提取出来,继续执行。

  1. MemoryJobStore :没有序列化,任务存储在内存中,增删改查都是在内存中完成。
  2. SQLAlchemyJobStore :使用 SQLAlchemy 这个 ORM 框架作为存储方式。
  3. MongoDBJobStore :使用 mongodb 作为存储器。
  4. RedisJobStore :使用 redis 作为存储器。

    任务存储器的选择有两种。一是内存,也是默认的配置。二是数据库。使用内存的方式是简单高效,但是不好的是,一旦程序出现问题,重新运行的话,会把之前已经执行了的任务重新执行一遍。数据库则可以在程序崩溃后,重新运行可以从之前中断的地方恢复正常运行。

三、触发器

    触发器有3种,第一种是date,第二种是 interval,第三种是 crontab。 interval可以具体指定多少时间间隔执行一次。 crontab可以指定执行的日期策略。

1、date触发器:

    在某个日期时间只触发一次事件。

    示例:sched . add_job ( my_job , 'date' , run_date = date ( 2020 , 5 , 22 ), args =[ 'text' ])

2、interval触发器

    想要在固定的时间间隔触发事件。 interval的触发器可以设置以下的触发参数: 

  示例:scheduler . add_job ( tick , "cron" , hour = 11 , minute = 24 )

3、crontab触发器

    在某个确切的时间周期性的触发事件,定时任务cron表达式

 示例:scheduler . add_job ( tick , "cron" , day = "4th sun" , hour = 20 , minute = 1 )

四、执行器

  1. ThreadPoolExecutor :线程池执行器。
  2. ProcessPoolExecutor :进程池执行器。
  3. GeventExecutor : Gevent 程序执行器。
  4. TornadoExecutor : Tornado 程序执行器。
  5. TwistedExecutor : Twisted 程序执行器。
  6. AsyncIOExecutor : asyncio 程序执行器。

五、定时任务 调度配置

    1、执行器:配置 default 执行器为 ThreadPoolExecutor ,并且设置最多的线程数是x个。

    2、存储器:本系统采用默认存储在运行内存中管理

    3、任务配置:

  • 3.1 coalesce = True,设置这个目的是,比如由于某个原因导致某个任务积攒了很多次没有执行(比如有一个任务是1分钟跑一次,但是系统原因断了5分钟),如果 coalesce = True ,那么下次恢复运行的时候会只执行一次,而如果设置 coalesce = False ,那么就不会合并会5次全部执行。
  • 3.2 max_instances = 5,同一个任务同一时间最多只能有5个实例在运行。比如一个耗时10分钟的job,被指定每分钟运行1次,如果我 max_instance 值5,那么在第 6 ~ 10 分钟上,新的运行实例不会被执行,因为已经有5个实例在跑了。

六、任务操作

 1. 添加任务:scheduler.add_job(job_obj,args,id,trigger,**trigger_kwargs)。
 2. 删除任务:scheduler.remove_job(job_id,jobstore=None)。
 3. 暂停任务:scheduler.pause_job(job_id,jobstore=None)。
 4. 恢复任务:scheduler.resume_job(job_id,jobstore=None)。
 5. 修改某个任务属性信息:scheduler.modify_job(job_id,jobstore=None,**changes)。
 6. 修改单个作业的触发器并更新下次运行时间:scheduler.reschedule_job(job_id,jobstore=None,trigger=None,**trigger_args)
 7. 输出作业信息:scheduler.print_jobs(jobstore=None,out=sys.stdout)

七、异常监听与日志

    当我们的任务抛出异常后,我们可以监听到,然后把错误信息进行记录。

八、示例代码

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.executors.pool import ThreadPoolExecutor
from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR, EVENT_JOB_MISSED
from apscheduler.triggers.cron import CronTrigger
import logging
logger = logging.getLogger('job')
logging.basicConfig(level=logging.INFO,format = '%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',datefmt = '%Y-%m-%d %H:%M:%S',filename = 'log/TaskChedulerLog.txt',filemode = 'a',# encoding = 'utf-8', # Python 低版本不支持该配置
)# 重写Cron表达式 提供6个参数进行处理,不支持?
class my_CronTrigger(CronTrigger):@classmethoddef my_from_crontab(cls, expr, timezone="Asia/Shanghai"):values = expr.split()if len(values) != 6:raise ValueError('Wrong number of fields; got {}, expected 7'.format(len(values)))return cls(second=values[0], minute=values[1], hour=values[2], day=values[3], month=values[4],day_of_week=values[5], timezone=timezone)# 定时任务 任务监听器
def my_listener(event):job = scheduler.get_job(event.job_id)if event.exception:logger.info("#### job_error Task error!!!!!!")logger.info(event.exception)logger.info("#### job_id=%s|obname=%s|jobtrigger=%s|jobtime=%s|retval=%s", event.job_id, job.name, job.trigger,event.scheduled_run_time, event.retval)else:logger.info('#### jon_normal Task Running...')logger.info("#### job_id=%s|jobname=%s|jobtrigger=%s|jobtime=%s|retval=%s", event.job_id,job.name, job.trigger, event.scheduled_run_time, event.retval)# 定义定时 调度配置
executors = {'default': ThreadPoolExecutor(20)
}
job_defaults = {'coalesce': True,'max_instances': 1
}# 定时任务 初始化
scheduler = BackgroundScheduler(executors = executors, job_defaults=job_defaults,timezone="Asia/Shanghai")
scheduler.add_listener(my_listener, EVENT_JOB_ERROR | EVENT_JOB_MISSED | EVENT_JOB_EXECUTED)
scheduler._logger = logger# Cron 6位:* * * * * * 秒、分、时、天、月、周
# project_id,project_name,branch,path,run_env = "1","国内酒店","testSute","/test","dev-test"
# interval_task 为调用Jenkins的函数方法,或也可为调试定时任务执行情况# scheduler.add_job(interval_task ,my_CronTrigger.my_from_crontab("*/2 * * * * *"),args=[project_name,branch,path,run_env],id=project_id)
# scheduler.add_job(getProJectPlanInfo ,my_CronTrigger.my_from_crontab("*/2 * * * * *"))# 定义每间隔15分钟更新一次配置,getProJectPlanInfo函数方法为查询数据库定时任务配置筛选启用的定时任务,并对定时任务进行 增、删、改、查操作
scheduler.add_job(getProJectPlanInfo ,my_CronTrigger.my_from_crontab("0 */15 * * * *"))# 显示timing 任务列表
# scheduler.print_jobs(jobstore=None,out=sys.stdout)scheduler.start()

九、管理定时任务,实现对任务的增、删、改、查操作

    1、定时任务信息dict,用户存储现有定时任务的信息如:项目id、项目定时计划cron表达式、项目运行目录、运行环境等

    2、定时任务信息dict-keyList,现有运行定时任务的keyList,方便管理运行中的job_id

    3、每15分钟获取一次已设置定时任务的project_id List,获取当前可运行的定时任务。通过获取步骤1的信息“查”询判断定时任务 增、删、改操作

    4、对比 步骤2、步骤3 的数据对,完成对定时任务信息dict、定时任务信息dict-keyList完成数据更新,同时移除已关闭的定时任务

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_18679.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Java中synchronized关键字到底怎么用,这个例子一定要看!

在平时开发中,synchronized关键字经常遇到,你知道synchronized怎么用吗?本文给大家介绍一下。 我们有两种方法使用同步: 使用同步方法使用同步语句或块 使用同步方法 要使方法同步,只需将synchronized关键字添加到其…

重识Nginx - 18 网络收发与Nginx事件间的对应关系

文章目录概述网络传输TCP流与报文TCP 协议与非阻塞接口读事件写事件WireShark抓包分析WireShark设置抓包分析概述 Nginx是一个事件驱动的框架, 所谓事件即网络事件。 Nginx每个连接自然对应两个网络事件,即 读事件和写事件。 要想理解Nginx的原理&…

【牛客刷题--SQL篇】SQL9查找除复旦大学的用户信息SQL10用where过滤空值练习

个人主页:与自己作战 大数据领域创作者 牛客刷题系列篇:【SQL篇】【Python篇】【Java篇】 推荐刷题网站注册地址:【牛客网–SQL篇】 推荐理由:从0-1起步,循序渐进 网址注册地址:【牛客网–注册地址】 文章目录一、条件…

算法得到3次(2次、3次、4次、5次、n次)B样条曲线公式( Matlab)

Matlab得到2次B样条曲线公式 Matlab得到3次B样条曲线公式 Matlab得到4次B样条曲线公式 Matlab得到5次B样条曲线公式 Matlab得到6次B样条曲线公式 Matlab得到。。。。曲线公式 Matlab得到n次B样条曲线公式 B样条曲线公式 %% %%%%%%%%%%%%%%%%%%%%%%%B样条曲线测试%%%%%%%…

grep练习题

找出有关root的行 grep root pwd.txt找出root用户的行 grep ^root /etc/passwd匹配以root开头的行或者以dwj开头的行 grep ^(root|dwj) /etc/passwd过滤出bin开头的行,且显示行号 grep -n ^bin /etc/passwd过滤掉root开头的行 grep -v ^root /etc/passwd统计dwj用户…

【问题解决】大佬亲授的姿势——PlatformIO生成bin文件方法

微信关注公众号 “DLGG创客DIY”设为“星标”,重磅干货,第一时间送达。最近写创客项目程序基本都用PlatformIO(以后简称PIO),PIO在很多方面都优于arduino IDE,今天就不展开了啊,回头专门起一篇文…

Probabilistic Case-based Reasoning forOpen-World Knowledge Graph Completion

摘要 基于案例的推理(CBR)系统通过检索与给定问题相似的“案例”来解决新问题。如果这样一个系统能够达到很高的精度,它就会因为它的简单性、可解释性和可扩展性而具有吸引力。 在本文中,我们证明了这样一个系统是可实现的推理知识库(KBs)。我们的方法…

大数据讲课笔记4.2 HDFS架构和原理

文章目录零、学习目标一、导入新课二、新课讲解(一)HDFS存储架构(二)HDFS文件读写原理1、HDFS写数据原理2、HDFS读数据原理三、归纳总结四、上机操作零、学习目标 了解HDFS存储架构理解HDFS文件读写原理 一、导入新课 通过上次…

[Linux-文件I/O] 文件函数系统文件接口缓冲区文件描述符dup2inode软硬链接动静态库

[Linux-文件I/O] 文件函数&系统文件接口&缓冲区&文件描述符&dup2&inode&软硬链接&动静态库文件IOC语言文件操作系统接口文件操作文件描述符文件描述符分配进程和文件之间的对应关系是如何建立的?打开用openmode关闭用close读用read写用…

TEE OS中断篇(一):系统的中断处理

前面我学习了线程方面的东西,这个假期,空闲了来看看《手机安全和可信应用开发指南》这本书的中断篇。 中断处理一个完整的系统都会存在中断,ARMv7架构扩展出了Monitor模式而ARMv8使用EL的方式对ARM异常运行模式进行了重新定义,分为…

Spring 测试运行的时候提示 Unable to find a @SpringBootConfiguration 错误

Spring 进行测试的时候提示的错误信息如下: SEVERE: Caught exception while closing extension context: org.junit.jupiter.engine.descriptor.JupiterEngineExtensionContext@c63c11ed java.lang.IllegalStateException: Unable to find a @SpringBootConfiguration, you n…

Flink学习笔记(4)——Flink运行架构

目录 一、Flink运行时架构 1.1 系统架构 1.1.1 整体构成 1.1.2 作业管理器(JobManager) 1.1.3 任务管理器(TaskManager) 1.2 作业提交流程 1.2.1 高层级抽象视角 1.2.2 独立模式(Standalone) 1.2.…

SpringCloud 使用 Turbine 聚合监控 Hystrix 健康状态

Hystrix 的降级熔断,只是被迫的折中方案,并不是我们所期望的结果,我们还是期望系统能够永远健康运行。绝大多数情况下,一个系统有很多微服务组成,在高峰期很可能个别微服务会发生降级熔断,我们必须能够通过监控才行,这样才能快速发现并解决问题。 Hystrix 是 Netflix 的…

soc的核间通信机制-->mailbox

对于mailbox,这个东西其实看到了很多次,但是一直不知道是啥。这里大概看了一下,知道了为甚有这个玩意儿,以及这个玩意相关的有啥,至于具体怎么使用,以及详细的工作原因等着以后再说吧。 正文 目前很多芯片…

微信小程序开发实战(SM周期及WXS脚本)

作者 : SYFStrive 博客首页 : HomePage 📜: 微信小程序 📌:个人社区(欢迎大佬们加入) 👉:社区链接🔗 📌:觉得文章不错可以点点关注 &#x1f4…

webshell 提权

在我们使用cve或者其他方式获取shell 后 python -c import pty;pty.spawn("/bin/bash") 获取一个交互式的bash shell 使用id 命令可以查看当前的用户权限 查看当前的linux 系统版本 利用kali自带的漏洞检索库检索漏洞 searchsploit privilege | grep -i linux |…

【MySQL】数据库介绍以及MySQL数据库

目录 数据库介绍 数据库概述 数据表 MySql数据库 MySql安装 登录MySQL数据库 ​​​​​​​SQLyog(MySQL图形化开发工具) 数据库介绍 数据库概述 什么是数据库(DB:DataBase) 数据库就是存储数据的仓库,其本质是一个文件系统&#…

实训任务1:Linux基本操作

文章目录一、实训目的二、实训要求三、实训任务1、创建并配置三个虚拟机2、创建SSH连接3、实现IP地址与主机名的映射4、关闭和禁用防火墙5、创建目录结构6、压缩打包7、安装软件包8、创建脚本文件9、直接运行脚本10、虚拟机相互免密登录11、远程拷贝文件一、实训目的 通过实训…

代谢组学和宏基因组学研究不同添加剂对青贮品质的影响

​ 发表期刊:Bioresource Technology 影响因子:9.642 百趣生物提供服务:代谢组学宏基因组 研究背景 人口增长促进了全球肉类和牛奶消费量增加,养殖所需饲料用量也逐年上升,发酵后的饲料是进行农副产品处理更好的选…

大数据技术Spark3.0详解

一、Spark3.0 简介 Spark3.0版本包含了3400多个补丁程序,是开源社区做出巨大贡献的最高峰,带来了Python和SQL功能的重大进步,并着眼于探索和生产的易用性。 1、Spark3.0新功能 (1)通过自适应查询执行,动…