需求
- 服务使用集群部署(多Pod)
- 基础服务提供调度任务注册,删除,查看的功能
- 尽可能减少客户端的使用成本
- 开发工作量尽可能少,成本尽可能小
基于以上的需求,设计如下,调度中心非独立部署,集成在base服务中。客户端目前属于同一个项目,直接使用公共模块的代码,非sdk使用。
客户端接入调度中心,只需2步。
- 使用公共模块的服务,调度任务注册
- 实现公共模块的job接口,注册中心会按照客户端提供信息,触发任务
- 服务端与客户端的交互使用http通讯,由k8s提供的域名调用,路由逻辑由k8s ingress 提供(默认多pod,循环调用)
- 服务端不保证客户端执行结束,只保证调度任务正确触发,客户端任务为异步执行
实现demo如下
- springboot,依赖
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-jdbc</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-quartz</artifactId></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><scope>runtime</scope></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.16.18</version><scope>provided</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.47</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies>
yaml 配置
server:port: 8082# 应用名称
spring:application:name: quartz-demoquartz:job-store-type: jdbcjdbc:# 是否自动初始化quartz的表结构initialize-schema: never#相关属性配置properties:org:quartz:jobStore:# 使用的数据源名称dataSource: quartzDataSource# 设置为“true”以打开群集功能,多个quartz实力必须打开isClustered: trueclass: org.quartz.impl.jdbcjobstore.JobStoreTXtablePrefix: QRTZ_# 标准jdbc数据库代理driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegatedatasource:driver-class-name: com.mysql.jdbc.Driverurl: jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf8&useSSL=falseusername: rootpassword: 123456
客户端主要使用以下的代码
job 接口
package com.lq.quartzdemo.controller;/*** @author seven* @version 1.0* @description 公共job* @date 2022/9/11 14:33*/
public interface CommonJob {/*** JOB 的名字** @return*/String jobName();/*** 执行job** @param param* @return*/Object exec(Object param);
}
job 工厂
package com.lq.quartzdemo.controller;import com.google.common.util.concurrent.ThreadFactoryBuilder;
import jdk.internal.org.objectweb.asm.tree.TryCatchBlockNode;
import lombok.extern.log4j.Log4j2;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.*;/*** @author seven* @version 1.0* @description job factory* @date 2022/9/11 14:37*/
@Component
@Log4j2
public class CommonJobFactory implements ApplicationContextAware {// 任务映射private static Map<String, CommonJob> jobNameMapping = new HashMap<>();// 线程池private static ExecutorService threadPool;@Value("${schedule.core_pool_size:5}")private Integer CORE_POOL_SIZE;@Value("${schedule.max_pool_size:20}")private Integer MAX_POOL_SIZE;@Value("${schedule.queue_size:10000}")private Integer QUEUE_SIZE;public static String run(String jobName, Object param) {CompletableFuture.supplyAsync(() -> {final CommonJob commonJob = jobNameMapping.get(jobName);if (null == commonJob) {throw new RuntimeException("job not exist,please check jobName");}return commonJob.exec(param);}, threadPool).exceptionally(e -> {log.error("job exec fail,jobName={},param={},e={}", jobName, param == null ? "" : param.toString(), e);return null;});return "ack ok";}public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {this.initJobNameMapping(applicationContext);this.initThreadPool();}public void initThreadPool() {log.info("start CommonJobFactory.initThreadPool...");ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("batch-save-nodes-%d").build();threadPool = new ThreadPoolExecutor(CORE_POOL_SIZE, MAX_POOL_SIZE, 0L,TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(QUEUE_SIZE), threadFactory, new ThreadPoolExecutor.AbortPolicy());log.info("end CommonJobFactory.initThreadPool...core_size={},max_size={},queue_size={}",CORE_POOL_SIZE, MAX_POOL_SIZE, QUEUE_SIZE);}public void initJobNameMapping(ApplicationContext applicationContext) {log.info("start CommonJobFactory.initJobNameMapping...");final Collection<CommonJob> values = applicationContext.getBeansOfType(CommonJob.class).values();for (CommonJob commonJob : values) {if (StringUtils.isEmpty(commonJob.jobName())) {log.error("job name is not null");throw new JobNameException("");}if (jobNameMapping.containsKey(commonJob.jobName())) {log.error("job name is repeat,{} and {} has the same job name [{}]",commonJob.getClass().getName(), jobNameMapping.get(commonJob.jobName()).getClass().getName(), commonJob.jobName());throw new JobNameException("");}jobNameMapping.put(commonJob.jobName(), commonJob);}log.info("end CommonJobFactory.initJobNameMapping...job_size={},jobName={}", jobNameMapping.size(), jobNameMapping.keySet());}
}
调度执行controller
package com.lq.quartzdemo.controller;import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;/*** @author seven* @version 1.0* @description 公共执行逻辑模块* @date 2022/9/11 14:16*/
@RestController
@RequestMapping("common-task")
public class CommonScheController {/*** 任务执行* @param taskName* @param param*/@PostMapping("{taskName}")public String taskExec(@PathVariable("taskName") String taskName, Object param) {return CommonJobFactory.run(taskName,param);}
}
任务提交、修改、删除
package com.lq.quartzdemo.task;import lombok.Data;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;import java.util.Date;/*** @author seven* @version 1.0* @description 定时任务提交器* @date 2022/9/11 14:06*/
@Component
public class ScheTaskCommit {private RestTemplate restTemplate = new RestTemplate();@Datapublic static class ScheTaskDto {// 系统名private String appname;// 任务名private String taskName;// corn 表达式private String cronExpression;// corn 开始时间private Date startTime;// corn 结束时间private Date endTime;// 触发地址 域名private String triggerAddr;// 触发参数private String triggerParam;}public void commit(ScheTaskDto scheTaskDto) {final String s = restTemplate.postForObject("http://localhost:8082/taskReceiver/register", scheTaskDto, String.class);System.out.println(s);}public void remove(String appName, String taskName) {restTemplate.delete("http://localhost:8082/taskReceiver/remove/" + appName + "/" + taskName);}
}
服务端的代码
任务接收
package com.lq.quartzdemo.controller;import com.lq.quartzdemo.task.CommonTask;
import com.lq.quartzdemo.task.QuartzManager;
import com.lq.quartzdemo.task.ScheTaskCommit;
import org.quartz.JobDataMap;
import org.springframework.web.bind.annotation.*;import javax.annotation.Resource;/*** @author seven* @version 1.0* @description base 的调度任务接收controller* @date 2022/9/11 15:57*/
@RestController
@RequestMapping("taskReceiver")
public class BaseTaskReceiver {@Resourceprivate QuartzManager quartzManager;@PutMapping("register")public void registerTask(ScheTaskCommit.ScheTaskDto request){JobDataMap jobDataMap=new JobDataMap();jobDataMap.put("beginTime",request.getStartTime());jobDataMap.put("endTime",request.getEndTime());quartzManager.addJob(request.getAppname(), request.getTaskName(),CommonTask.class,request.getCronExpression(),jobDataMap);}@DeleteMapping("{appName}/{taskName}")public void deleteTask(@PathVariable("appName") String appName,@PathVariable("taskName") String taskName){quartzManager.removeJob(taskName,appName);}@GetMapping("{appName}")public Object tasks(@PathVariable("appName") String appName){return null;}
}
任务管理器
package com.lq.quartzdemo.task;import org.quartz.*;
import org.quartz.impl.triggers.CronTriggerImpl;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.util.Date;
import java.util.HashSet;
import java.util.Set;import static org.quartz.CronScheduleBuilder.cronSchedule;
import static org.quartz.JobBuilder.newJob;
import static org.quartz.TriggerBuilder.newTrigger;/*** @author seven* @version 1.0* @description 任务管理器* @date 2022/9/11 11:03*/
@Component
public class QuartzManager {@Autowiredprivate Scheduler sched;/*** 添加任务** @param groupName 组名* @param jobName 任务名* @param cls class文件*/@SuppressWarnings("unchecked")public void addJob(String groupName, String jobName, @SuppressWarnings("rawtypes") Class cls, String time,JobDataMap jobDataMap) {try {//任务JobDetail jobDetail = newJob(cls).withIdentity(jobName, groupName).storeDurably().usingJobData(jobDataMap).build();// 触发器TriggerBuilder<Trigger> triggerTriggerBuilder = newTrigger();if (jobDataMap.get("beginTime") != null && jobDataMap.get("endTime") != null) {Date beginTime = (Date) jobDataMap.get("beginTime");Date endTime = (Date) jobDataMap.get("endTime");triggerTriggerBuilder.startAt(beginTime).endAt(endTime);}CronTriggerImpl trigger = (CronTriggerImpl) triggerTriggerBuilder.withIdentity(groupName + jobName, groupName).withSchedule(cronSchedule(time).withMisfireHandlingInstructionDoNothing()).build();Set<Trigger> set = new HashSet();set.add(trigger);sched.scheduleJob(jobDetail, set,true);// 启动if (!sched.isShutdown()) {sched.start();}} catch (SchedulerException e) {e.printStackTrace();}}/*** @param jobName* @param jobGroupName* @Description: 移除一个任务*/public void removeJob(String jobName, String jobGroupName) {try {TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroupName);sched.pauseTrigger(triggerKey);// 停止触发器sched.unscheduleJob(triggerKey);// 移除触发器sched.deleteJob(JobKey.jobKey(jobName, jobGroupName));// 删除任务} catch (Exception e) {throw new RuntimeException(e);}}
}
在数据库中创建quartz提供的表
#
# Quartz seems to work best with the driver mm.mysql-2.0.7-bin.jar
#
# PLEASE consider using mysql with innodb tables to avoid locking issues
#
# In your Quartz properties file, you'll need to set
# org.quartz.jobStore.driverDelegateClass = org.quartz.impl.jdbcjobstore.StdJDBCDelegate
#DROP TABLE IF EXISTS QRTZ_FIRED_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_PAUSED_TRIGGER_GRPS;
DROP TABLE IF EXISTS QRTZ_SCHEDULER_STATE;
DROP TABLE IF EXISTS QRTZ_LOCKS;
DROP TABLE IF EXISTS QRTZ_SIMPLE_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_SIMPROP_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_CRON_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_BLOB_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_JOB_DETAILS;
DROP TABLE IF EXISTS QRTZ_CALENDARS;CREATE TABLE QRTZ_JOB_DETAILS(SCHED_NAME VARCHAR(120) NOT NULL,JOB_NAME VARCHAR(200) NOT NULL,JOB_GROUP VARCHAR(200) NOT NULL,DESCRIPTION VARCHAR(250) NULL,JOB_CLASS_NAME VARCHAR(250) NOT NULL,IS_DURABLE VARCHAR(1) NOT NULL,IS_NONCONCURRENT VARCHAR(1) NOT NULL,IS_UPDATE_DATA VARCHAR(1) NOT NULL,REQUESTS_RECOVERY VARCHAR(1) NOT NULL,JOB_DATA BLOB NULL,PRIMARY KEY (SCHED_NAME,JOB_NAME,JOB_GROUP)
);CREATE TABLE QRTZ_TRIGGERS(SCHED_NAME VARCHAR(120) NOT NULL,TRIGGER_NAME VARCHAR(200) NOT NULL,TRIGGER_GROUP VARCHAR(200) NOT NULL,JOB_NAME VARCHAR(200) NOT NULL,JOB_GROUP VARCHAR(200) NOT NULL,DESCRIPTION VARCHAR(250) NULL,NEXT_FIRE_TIME BIGINT(13) NULL,PREV_FIRE_TIME BIGINT(13) NULL,PRIORITY INTEGER NULL,TRIGGER_STATE VARCHAR(16) NOT NULL,TRIGGER_TYPE VARCHAR(8) NOT NULL,START_TIME BIGINT(13) NOT NULL,END_TIME BIGINT(13) NULL,CALENDAR_NAME VARCHAR(200) NULL,MISFIRE_INSTR SMALLINT(2) NULL,JOB_DATA BLOB NULL,PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),FOREIGN KEY (SCHED_NAME,JOB_NAME,JOB_GROUP)REFERENCES QRTZ_JOB_DETAILS(SCHED_NAME,JOB_NAME,JOB_GROUP)
);CREATE TABLE QRTZ_SIMPLE_TRIGGERS(SCHED_NAME VARCHAR(120) NOT NULL,TRIGGER_NAME VARCHAR(200) NOT NULL,TRIGGER_GROUP VARCHAR(200) NOT NULL,REPEAT_COUNT BIGINT(7) NOT NULL,REPEAT_INTERVAL BIGINT(12) NOT NULL,TIMES_TRIGGERED BIGINT(10) NOT NULL,PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
);CREATE TABLE QRTZ_CRON_TRIGGERS(SCHED_NAME VARCHAR(120) NOT NULL,TRIGGER_NAME VARCHAR(200) NOT NULL,TRIGGER_GROUP VARCHAR(200) NOT NULL,CRON_EXPRESSION VARCHAR(200) NOT NULL,TIME_ZONE_ID VARCHAR(80),PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
);CREATE TABLE QRTZ_SIMPROP_TRIGGERS(SCHED_NAME VARCHAR(120) NOT NULL,TRIGGER_NAME VARCHAR(200) NOT NULL,TRIGGER_GROUP VARCHAR(200) NOT NULL,STR_PROP_1 VARCHAR(512) NULL,STR_PROP_2 VARCHAR(512) NULL,STR_PROP_3 VARCHAR(512) NULL,INT_PROP_1 INT NULL,INT_PROP_2 INT NULL,LONG_PROP_1 BIGINT NULL,LONG_PROP_2 BIGINT NULL,DEC_PROP_1 NUMERIC(13,4) NULL,DEC_PROP_2 NUMERIC(13,4) NULL,BOOL_PROP_1 VARCHAR(1) NULL,BOOL_PROP_2 VARCHAR(1) NULL,PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
);CREATE TABLE QRTZ_BLOB_TRIGGERS(SCHED_NAME VARCHAR(120) NOT NULL,TRIGGER_NAME VARCHAR(200) NOT NULL,TRIGGER_GROUP VARCHAR(200) NOT NULL,BLOB_DATA BLOB NULL,PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
);CREATE TABLE QRTZ_CALENDARS(SCHED_NAME VARCHAR(120) NOT NULL,CALENDAR_NAME VARCHAR(200) NOT NULL,CALENDAR BLOB NOT NULL,PRIMARY KEY (SCHED_NAME,CALENDAR_NAME)
);CREATE TABLE QRTZ_PAUSED_TRIGGER_GRPS(SCHED_NAME VARCHAR(120) NOT NULL,TRIGGER_GROUP VARCHAR(200) NOT NULL,PRIMARY KEY (SCHED_NAME,TRIGGER_GROUP)
);CREATE TABLE QRTZ_FIRED_TRIGGERS(SCHED_NAME VARCHAR(120) NOT NULL,ENTRY_ID VARCHAR(95) NOT NULL,TRIGGER_NAME VARCHAR(200) NOT NULL,TRIGGER_GROUP VARCHAR(200) NOT NULL,INSTANCE_NAME VARCHAR(200) NOT NULL,FIRED_TIME BIGINT(13) NOT NULL,SCHED_TIME BIGINT(13) NOT NULL,PRIORITY INTEGER NOT NULL,STATE VARCHAR(16) NOT NULL,JOB_NAME VARCHAR(200) NULL,JOB_GROUP VARCHAR(200) NULL,IS_NONCONCURRENT VARCHAR(1) NULL,REQUESTS_RECOVERY VARCHAR(1) NULL,PRIMARY KEY (SCHED_NAME,ENTRY_ID)
);CREATE TABLE QRTZ_SCHEDULER_STATE(SCHED_NAME VARCHAR(120) NOT NULL,INSTANCE_NAME VARCHAR(200) NOT NULL,LAST_CHECKIN_TIME BIGINT(13) NOT NULL,CHECKIN_INTERVAL BIGINT(13) NOT NULL,PRIMARY KEY (SCHED_NAME,INSTANCE_NAME)
);CREATE TABLE QRTZ_LOCKS(SCHED_NAME VARCHAR(120) NOT NULL,LOCK_NAME VARCHAR(40) NOT NULL,PRIMARY KEY (SCHED_NAME,LOCK_NAME)
);commit;