分布式调度 2022-01-19 程序之旅,记录 暂无评论 541 次阅读 ## 分布式调度 > 本博文可以分三部分进行学习,分布式调度的了解,Quartz 使用,Elastic-Job 使用。 ### 分布式调度定义 分布式任务调度有两层含义 1. 运行在分布式集群环境下的调度任务(同一个定时任务部署多分,只应该有一个定时任务在执行) 2. 分布式调度--> 定时任务的分布式 --> 定时任务的拆分 (把一个大的作业任务拆分为多个小的作业) ### 定时任务的场景 每个一定时间,特定某一时刻执行 - 订单审核、出库 - 订单超时⾃动取消、⽀付退款 - 礼券同步、⽣成、发放作业 - 物流信息推送、抓取作业、退换货处理作业 - 数据积压监控、⽇志监控、服务可⽤性探测作业 - 定时备份数据 - ⾦融系统每天的定时结算 - 数据归档、清理作业 - 报表、离线数据分析作业 ### 定时任务的实现方式 定时任务的实现方式有多种,早期没有定时任务框架的时候,我们会使用 JDK 中的 Timer 机制和多线程机制(Runnable + 线程休眠 sleep)来实现定时或者间隔一段时间执行某一段程序,后来有了定时任务框架 Quartz,使用 cron 表达式来进行定时任务。这里简单的描述 Quartz 持久化定时任务的实现,与本文的分布式调度关系不大,可以直接跳到[分布式调度框架Elastic-Job](#分布式调度框架Elastic-Job) Maven 引入 jar 包 ```xml 1.1.18 8.0.15 3.4.0 3.6.0 org.springframework.boot spring-boot-starter-quartz 2.3.8.RELEASE com.baomidou mybatis-plus-boot-starter ${mybatis.plus.version} ``` 定时任务作业主要调度程序 ```java @Service public class ScheduleTaskServiceImpl extends ServiceImpl implements IScheduleTaskService, InitializingBean{ // 定时任务管理 private Scheduler scheduler; @Autowired private SchedulerFactoryBean schedulerFactoryBean; @Override public boolean scheduleJob(ScheduleTask task) { try { JobKey jobKey = JobKey.jobKey(task.getTaskName(), task.getTaskGroup()); Job job = (Job) ApplicationContextUtil.getBean(task.getJobClass()); String cron = task.getCronExpression(); Map jobMap = task.getJobDataMap(); JobDetail jobDetail = QuartzUtil.getJobDetail(jobKey, task.getDescription(), QuartzUtil.getJobDataMap(jobMap), job); // //前端不能传cron完整表达式,补充一位秒的占位 // cron = "0/30 " + cron; Trigger trigger = QuartzUtil.getTrigger(jobKey, task.getDescription(), QuartzUtil.getJobDataMap(jobMap), cron); // 开启定时任务 if(scheduler.checkExists(jobKey)) { scheduler.deleteJob(jobKey); } scheduler.scheduleJob(jobDetail, trigger); if (scheduler.isShutdown()){ scheduler.start(); } return true; } catch (Exception e) { log.error("【定时器业务处理】添加定时任务失败,{}",ExceptionUtil.getMessage(e)); return false; } } @Override public boolean startup(int id) { ScheduleTask scheduleTask = getById(id); return scheduleJob(scheduleTask); } @Override public void deleteJob(Integer id) { stop(id); baseMapper.deleteById(id); } @Override public void insertJob(ScheduleTask task) { if (task != null) { baseMapper.insert(task); } } @Override public List getScheduleJobList() { try { List result = baseMapper.selectList(null); return result; } catch (Exception e) { log.error("【定时器业务处理】查询任务失败\n{}", ExceptionUtil.getMessage(e)); return new ArrayList(); } } @Override public ScheduleTask getTaskByGroupAndName(String group, String name) { return baseMapper.getTaskByGroupAndName(group, name); } @Override public boolean stop(ScheduleTask task) { String name = task.getTaskName(); String group = task.getTaskGroup(); JobKey jobKey = JobKey.jobKey(task.getTaskName(), task.getTaskGroup()); try { scheduler.pauseJob(jobKey); return scheduler.deleteJob(JobKey.jobKey(name, group)); } catch (SchedulerException e) { log.error("【定时器业务处理】关闭定时任务失败,name={}, group={}", name, group); throw new ServiceException("关闭定时任务失败"); } } @Override public boolean stop(int id) { ScheduleTask scheduleTask = getById(id); return stop(scheduleTask); } @Override public List getByPlatformIdAndJobClass(int platformId, String jobClass) { QueryWrapper query = Wrappers.query(); query.eq("platform_id", platformId) .eq("job_class", jobClass); return baseMapper.selectList(query); } @Override public void afterPropertiesSet() throws Exception { log.info("【定时器业务处理】初始化定时任务工厂"); // 项目启动 scheduler = schedulerFactoryBean.getScheduler(); List taskList = baseMapper.selectList(null); try { for (ScheduleTask task : taskList) { if (task.getStatus() != 0) { this.scheduleJob(task); } } } catch (Exception e) { log.error("【定时器业务处理】初始化任务失败\n{}", ExceptionUtil.getMessage(e)); } } @Override public boolean saveOrUpdate(ScheduleTask task) { if (task.getId() != null && task.getId() != 0) { ScheduleTask task2 = getById(task.getId()); task.setTaskName(task2.getTaskName()); task.setTaskGroup(task2.getTaskGroup()); super.saveOrUpdate(task); } else { if (getByPlatformIdAndJobClass(task.getPlatformId(), task.getJobClass()).size() > 0 ) { throw new ServiceException("该平台定时器添加过多"); } task.setTaskGroup("crawler"); task.setTaskName(StringUtil.getUUID(6)); task.setJobData(JSONObject.toJSONString(new HashMap() {{ put("platformId", task.getPlatformId()); }})); baseMapper.insert(task); } if (task.getStatus().equals(1)) { startup(task.getId()); } else { stop(task.getId()); } return true; } } ``` 主要的流程 - 通过 schedulerFactoryBean 创建一个调度器 Scheduler,用于调度任务 - 创建任务 Job 与触发器 Trigger - 使用调度器 Scheduler 来调度作业 定时任务持久化对象 ScheduleTask ```java /** * * 定时任务表 * * * @author liurui * @since 2021-01-20 */ @Data @EqualsAndHashCode(callSuper = false) @Accessors(chain = true) @ApiModel(value = "ScheduleTask对象", description = "定时任务表") public class ScheduleTask implements Serializable { private static final long serialVersionUID = 1L; @TableId(value = "id", type = IdType.AUTO) private Integer id; @ApiModelProperty(value = "任务名称") private String taskName; @ApiModelProperty(value = "组名") private String taskGroup; @ApiModelProperty(value = "描述") private String description; @ApiModelProperty(value = "cron表达式") private String cronExpression; @ApiModelProperty(value = "任务类型") private String jobClass; @ApiModelProperty(value = "备注") private String remark; @ApiModelProperty(value = "任务状态 (0-关闭,1-开启)") private Integer status; @ApiModelProperty(value = "任务延迟") private Integer delay; @ApiModelProperty(value = "修改时间") private LocalDateTime modifyTime; @ApiModelProperty(value = "创建时间") private LocalDateTime createTime; @ApiModelProperty(value = "平台主键") private Integer platformId; @ApiModelProperty(value = "任务所需参数,jsonstring 格式") private String jobData; @JsonIgnore public Map, ?> getJobDataMap() { Map result = new HashMap<>(); result.put("delayTime", this.delay); JSONObject jobJson = JSONObject.parseObject(jobData); result.putAll(jobJson.getInnerMap()); return result; } public String extRemark() { String[] cronTime = new String[]{"每%s秒执行一次", "每%s分钟执行一次", "每%s小时执行一次", "每%s天的随机点执行一次", "每%s月的1号随机点执行一次", "每天%s点执行一次", "默认每%s秒执行一次"}; int cronRate = CronUtil.getCronRate(cronExpression); int cronCycle = CronUtil.getCronCycle(cronExpression); String format = String.format(cronTime[cronRate], cronCycle); return format; } } ``` Quartz 的任务 Job 与触发器 Trigger 的创建工具 ```java public class QuartzUtil { /** * 获取定时任务的定义 * JobDetail是任务的定义,Job是任务的执行逻辑 * @Author liurui * @Description * @Date 10:07 2020/9/22 * @param jobKey 定时任务的名称 组名 * @param description 定时任务的 描述 * @param jobDataMap 定时任务的 元数据 * @param jobClass {@link org.quartz.Job} 定时任务的 真正执行逻辑定义类 * @return org.quartz.JobDetail **/ public static JobDetail getJobDetail(JobKey jobKey, String description, JobDataMap jobDataMap, Job jobClass) { return JobBuilder.newJob(jobClass.getClass()) .withIdentity(jobKey) .withDescription(description) .setJobData(jobDataMap) .usingJobData(jobDataMap) .requestRecovery() .storeDurably() .build(); } /** * 获取Trigger (Job的触发器,执行规则) * @Author liurui * @Description * @Date 10:07 2020/9/22 * @param jobKey 定时任务的名称 组名 * @param description 定时任务的 描述 * @param jobDataMap 定时任务的 元数据 * @param cronExpression 定时任务的 执行cron表达式 * @return org.quartz.Trigger **/ public static Trigger getTrigger(JobKey jobKey, String description, JobDataMap jobDataMap, String cronExpression) { return TriggerBuilder.newTrigger() .withIdentity(jobKey.getName(), jobKey.getGroup()) .withDescription(description) .withSchedule(CronScheduleBuilder.cronSchedule(cronExpression)) .usingJobData(jobDataMap) .startNow() .build(); } /** * 获取任务元数据 * @Author liurui * @Description 获取任务元数据 * @Date 10:06 2020/9/22 * @param map * @return org.quartz.JobDataMap **/ public static JobDataMap getJobDataMap(Map, ?> map) { return map == null ? new JobDataMap() : new JobDataMap(map); } } ``` ### 分布式调度框架Elastic-Job #### Elastic-Job 介绍 Elastic-Job 的 github 地址:https://github.com/elasticjob 网上的介绍,Elastic_Job 是当当网开源的一个分布式调度解决方案,基于 Quartz 二次开发的,功能非常丰富强大,采用 zookeeper 实现分布式调度,实现任务分片以及高可用。目前由两个相互独立的子项目 Elatstic-Job-Lite 和 Elastic-Job-Cloud 组成。目前说的是 Elastic-Job-Lite 的轻量级解决方案,使用 jar 的形式提供分布式任务的调度服务,而 Elastic-Job-Cloud 是结合 Mesos 以及 Docker 在云环境下使用,后期博文会有提及。 **功能列表:** - **分布式调度协调:**在分布式环境中,任务能够按指定的调度策略执行,并能够避免同一任务多实例重复执行 - **丰富调度策略:**基于成熟的定时任务作业框架 Quartz cron 表达式执⾏定时任务 - **弹性扩容缩容:**当集群中增加某一个实例,它应当也能够被选举并执行任务;当集群减少一个实例时,它所执行的任务能被转移到别的实例来执行。 - **失效转移:**某实例在任务执行失败后,会被转移到其他实例执行。 - **错过执⾏作业重触发:**若因某种原因导致作业错过执行,自动记录错过执行的作业,并在上次作业完成后自动触发。 - **⽀持并⾏调度:**支持任务分片,任务分片是指将一个任务分为多个小任务项在多个实例同时执行。 - **作业分⽚⼀致性:**当任务被分⽚后,保证同⼀分⽚在分布式环境中仅⼀个执⾏实例。 - 支持作业生命周期操作:可以动态对任务进行开启及停止操作。 - 丰富的作业类型:支持 Simple、DataFlow、Script 三种作业类型,后续会有详细介绍。 - Spring 整合以及命名空间支持:对Spring支持良好的整合方式,支持 Spring 自定义命名空间,支持占位符。 - 运维平台:提供运维界面,可以管理作业和注册中心。 > 下边就围绕以上的功能进行实践使用。 #### Elastic-Job 使用 ##### Zookeeper 搭建使用 Elastic-Job 依赖于 Zookeeper 尽心分布式协调。需要安装 3.4.6 版本以上。我这里安装的是 3.7.0 版本。[下载地址](https://dlcdn.apache.org/zookeeper/zookeeper-3.7.0/apache-zookeeper-3.7.0-bin.tar.gz) > 目前使用的是单例 zk,默认端口 2181 ##### 解压后的目录 ``` bin/ conf/ docs/ lib/ LICENSE.txt NOTICE.txt README.md README_packaging.md ``` 在 conf 路径下创建配置文件 ```shell $ cp zoo_sample.cfg zoo.cfg ``` **基本命令:** - 启动 ./zkServer.sh start - 停⽌ ./zkServer.sh stop - 查看状态 ./zkServer.sh status > 推荐使用的[客户端工具](https://issues.apache.org/jira/secure/attachment/12436620/ZooInspector.zip) ##### maven ```xml com.dangdang elastic-job-lite-core 2.1.5 ``` ##### 程序开发 代码示例地址:https://gitee.com/teaegg/elastic-job-test.git 具体实现可以看代码,主要测试步骤是启动多个进程。 #### 轻量级去中心化 ![Zookeeper 去中心](https://mufeng-blog.oss-cn-beijing.aliyuncs.com/typecho/Zookeeper%20%E5%8E%BB%E4%B8%AD%E5%BF%83.png) **Elastic—job的两个特点** - 轻量级 - 所有的实现都在 jar 中,必须的依赖仅仅是 zookeeper - 并非独立部署的中间件,就像 jar 程序 - 去中心化 - 执行任务的节点对等,存在不一样的是分片 - 定时调度自触发,没有中心调度节点的分配 - 服务自发现,没有通过注册中心的服务发现 - 主节点非固定 #### 任务分片与失效转移 目前存在一个问题,需要处理一亿的数据,如果用一个作业点来处理需要花费很长的时间。Elastic-Job 可以把这个巨大的作业点划分成多个作业点,作业点的处理逻辑可以自行决定。划分的策略是平均区分,也可以定制。本次实现的是划分了3个任务 ```jave // 配置切片任务 (调度器、任务业务逻辑、触发器) JobCoreConfiguration jobCoreConfiguration = JobCoreConfiguration .newBuilder("elastic-job-name", "*/10 * * * * ?", 3) .shardingItemParameters("0=task0,1=task1,2=task2").build(); ``` 启动后的打印 ![image-20220119135246635](https://mufeng-blog.oss-cn-beijing.aliyuncs.com/typecho/image-20220119135246635.png) 如果其中一台机器或程序出现问题,会把任务转移到其他程序,例如下图: ![image-20220119135752999](https://mufeng-blog.oss-cn-beijing.aliyuncs.com/typecho/image-20220119135752999.png) 停止了`程序 3 `之后,任务被转移到`程序 2 `中。 ![image-20220119140255710](https://mufeng-blog.oss-cn-beijing.aliyuncs.com/typecho/image-20220119140255710.png) #### 弹性扩容 如果`程序 3`重新启动注册到 zk 中,注册中⼼会通知 Elastic-Job 进⾏重新分⽚,3个分片又会重新平均的分配到各个实例中去。 ![image-20220119140703579](https://mufeng-blog.oss-cn-beijing.aliyuncs.com/typecho/image-20220119140703579.png) 打赏: 微信, 支付宝 标签: 分布式, 集群, 定时任务 本作品采用 知识共享署名-相同方式共享 4.0 国际许可协议 进行许可。