学习笔记---分布式调度之xxlJob调度中心的启动源码解析
调度中心的代码启动源码是从:XxlJobAdminConfig 入口;
直接进入: xxlJobScheduler.init();
第一个: initI18n() 处理国际化;
第二个:JobRegistryMonitorHelper.getInstance().start(); 创建启动后台线程来维护在线的执行器组下的机器列表,从上篇学习笔记—分布式调度之xxlJob执行器的启动源码解析可以知道,执行器端通过http来注册并维护与调度中心的心跳的,那么在此处就是调度中心维护失效的执行器节点,主要测处理逻辑就是从数据load出数据,然后比较过期时间(默认为90ms),过期则进行移除,正常则略过;
第三个: JobFailMonitorHelper.getInstance().start(); 创建启动后台线程来处理重试(调度失败的任务),截图相关代码:
可以知道处理方式时通过将load出已经持久化在数据库的执行日志来重试,这也是多数mq防止消息丢失的手段,至于如何防止重试做到幂等,只能通过根据对应的业务来处理了,比如单号;
看sql:
可以知道是通过状态来,那么我们可以大胆的猜想:在某处肯定存在有持久化日志,并且改变执行状态的逻辑的;假如重试次数>1的话,则走调用逻辑:
JobTriggerPoolHelper.trigger(log.getJobId(), TriggerTypeEnum.RETRY, (log.getExecutorFailRetryCount() - 1), log.getExecutorShardingParam(), log.getExecutorParam());
进入到:JobTriggerPoolHelper
public class JobTriggerPoolHelper { private static Logger logger = LoggerFactory.getLogger(JobTriggerPoolHelper.class); // ---------------------- trigger pool ---------------------- // fast/slow thread pool private ThreadPoolExecutor fastTriggerPool = null; private ThreadPoolExecutor slowTriggerPool = null; public void start() { fastTriggerPool = new ThreadPoolExecutor( 10, XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax(), 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(1000), new ThreadFactory() { @Override public Thread newThread(Runnable r) { return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-fastTriggerPool-" + r.hashCode()); } }); slowTriggerPool = new ThreadPoolExecutor( 10, XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax(), 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(2000), new ThreadFactory() { @Override public Thread newThread(Runnable r) { return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-slowTriggerPool-" + r.hashCode()); } }); } public void stop() { //triggerPool.shutdown(); fastTriggerPool.shutdownNow(); slowTriggerPool.shutdownNow(); logger.info(">>>>>>>>> xxl-job trigger thread pool shutdown success."); } // job timeout count private volatile long minTim = System.currentTimeMillis() / 60000; // ms > min private volatile ConcurrentMap<Integer, AtomicInteger> jobTimeoutCountMap = new ConcurrentHashMap<>(); /** * add trigger */ public void addTrigger(final int jobId, final TriggerTypeEnum triggerType, final int failRetryCount, final String executorShardingParam, final String executorParam) { // choose thread pool 快线程池 ThreadPoolExecutor triggerPool_ = fastTriggerPool; AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId); if (jobTimeoutCount != null && jobTimeoutCount.get() > 10) { // job-timeout 10 times in 1 min // 慢线程池 triggerPool_ = slowTriggerPool; } // trigger triggerPool_.execute(new Runnable() { @Override public void run() { long start = System.currentTimeMillis(); try { // do trigger XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam); } catch (Exception e) { logger.error(e.getMessage(), e); } finally { // check timeout-count-map long minTim_now = System.currentTimeMillis() / 60000; if (minTim != minTim_now) { minTim = minTim_now; jobTimeoutCountMap.clear(); } // incr timeout-count-map long cost = System.currentTimeMillis() - start; if (cost > 500) { // ob-timeout threshold 500ms AtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1)); if (timeoutCount != null) { timeoutCount.incrementAndGet(); } } } } }); } // ---------------------- helper ---------------------- private static JobTriggerPoolHelper helper = new JobTriggerPoolHelper(); public static void toStart() { helper.start(); } public static void toStop() { helper.stop(); } /** * @param jobId * @param triggerType * @param failRetryCount >=0: use this param * <0: use param from job info config * @param executorShardingParam * @param executorParam null: use job param * not null: cover job param */ public static void trigger(int jobId, TriggerTypeEnum triggerType, int failRetryCount, String executorShardingParam, String executorParam) { helper.addTrigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam); } }
进入到:addTrigger 方法,开始分配快慢线程池,根据对应的调用的 jobId 对应的超时时间的次数来判断分配;个人认为这里存在问题:因为这个快慢线程池的初始化是在后面执行的,这里会存在空指针的问题的;接着:
这里主要是根据判断书是不是分片广播来是否循环的调用:processTrigger 方法;分片广播处理方式就是对应的多个相同的执行器的话,让每一台执行器执行部分,比如:1000张订单,则每个执行器平均执行;
private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){ // param ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION); // block strategy ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null); // route strategy String shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==executorRouteStrategyEnum)?String.valueOf(index).concat("/").concat(String.valueOf(total)):null; // 1、save log-id 保存执行日辉 XxlJobLog jobLog = new XxlJobLog(); jobLog.setJobGroup(jobInfo.getJobGroup()); jobLog.setJobId(jobInfo.getId()); jobLog.setTriggerTime(new Date()); XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().save(jobLog); logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId()); // 2、init trigger-param TriggerParam triggerParam = new TriggerParam(); triggerParam.setJobId(jobInfo.getId()); triggerParam.setExecutorHandler(jobInfo.getExecutorHandler()); triggerParam.setExecutorParams(jobInfo.getExecutorParam()); triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy()); triggerParam.setExecutorTimeout(jobInfo.getExecutorTimeout()); triggerParam.setLogId(jobLog.getId()); triggerParam.setLogDateTime(jobLog.getTriggerTime().getTime()); triggerParam.setGlueType(jobInfo.getGlueType()); triggerParam.setGlueSource(jobInfo.getGlueSource()); triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime()); triggerParam.setBroadcastIndex(index); triggerParam.setBroadcastTotal(total); // 3、init address String address = null; ReturnT<String> routeAddressResult = null; if (group.getRegistryList()!=null && !group.getRegistryList().isEmpty()) { if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) { if (index < group.getRegistryList().size()) { address = group.getRegistryList().get(index); } else { address = group.getRegistryList().get(0); } } else { routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList()); if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) { address = routeAddressResult.getContent(); } } } else { routeAddressResult = new ReturnT<String>(ReturnT.FAIL_CODE, I18nUtil.getString("jobconf_trigger_address_empty")); } // 4、trigger remote executor ReturnT<String> triggerResult = null; if (address != null) { triggerResult = runExecutor(triggerParam, address); } else { triggerResult = new ReturnT<String>(ReturnT.FAIL_CODE, null); } // 5、collection trigger info StringBuffer triggerMsgSb = new StringBuffer(); triggerMsgSb.append(I18nUtil.getString("jobconf_trigger_type")).append(":").append(triggerType.getTitle()); triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_admin_adress")).append(":").append(IpUtil.getIp()); triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regtype")).append(":") .append( (group.getAddressType() == 0)?I18nUtil.getString("jobgroup_field_addressType_0"):I18nUtil.getString("jobgroup_field_addressType_1") ); triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regaddress")).append(":").append(group.getRegistryList()); triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorRouteStrategy")).append(":").append(executorRouteStrategyEnum.getTitle()); if (shardingParam != null) { triggerMsgSb.append("("+shardingParam+")"); } triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorBlockStrategy")).append(":").append(blockStrategy.getTitle()); triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_timeout")).append(":").append(jobInfo.getExecutorTimeout()); triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorFailRetryCount")).append(":").append(finalFailRetryCount); triggerMsgSb.append("<br><br><span style=\"color:#00c0ef;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_run") +"<<<<<<<<<<< </span><br>") .append((routeAddressResult!=null&&routeAddressResult.getMsg()!=null)?routeAddressResult.getMsg()+"<br><br>":"").append(triggerResult.getMsg()!=null?triggerResult.getMsg():""); // 6、save log trigger-info jobLog.setExecutorAddress(address); jobLog.setExecutorHandler(jobInfo.getExecutorHandler()); jobLog.setExecutorParam(jobInfo.getExecutorParam()); jobLog.setExecutorShardingParam(shardingParam); jobLog.setExecutorFailRetryCount(finalFailRetryCount); //jobLog.setTriggerTime(); jobLog.setTriggerCode(triggerResult.getCode()); jobLog.setTriggerMsg(triggerMsgSb.toString()); XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(jobLog); logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId()); }
可以看到这段代码是:1.准备触发参数,2.持久化执行日志,3.按照配置的负载均衡(后面再分析),4. 执行调度逻辑,5. 根据执行结果来更新对应的日志;
执行调度逻辑:
看这里:ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address); 这个相当关键了,试想下,到目前为止,代码逻辑还没有打开服务调用IO,这里就是处理的逻辑:
看: executorBiz = (ExecutorBiz) referenceBean.getObject();
这里获取代理类,在后面任务调用的时分析;然后存储:
executorBizRepository.put(address, executorBiz); 保证了一个执行器地址对应一个执行逻辑类对象;
接着就是: runResult = executorBiz.run(triggerParam); 执行代理类的方法了;
最后把执行结果返回,将日志的执行的状态跟新;
回到:XxlJobScheduler.init();
第四个: JobTriggerPoolHelper.toStart() 才初始化快慢线程池
第五个: JobLogReportHelper.getInstance().start(); 开始生成日志报表,通过查询执行日志记录来反向生成,较为简单的逻辑;
最后一个: JobScheduleHelper.getInstance().start(); 必定是开始调度的任务了;
先看这部分循环逻辑:
{ // Scan Job long start = System.currentTimeMillis(); Connection conn = null; Boolean connAutoCommit = null; PreparedStatement preparedStatement = null; boolean preReadSuc = true; try { conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection(); connAutoCommit = conn.getAutoCommit(); conn.setAutoCommit(false); preparedStatement = conn.prepareStatement( "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" ); preparedStatement.execute(); // tx start // 1、pre read long nowTime = System.currentTimeMillis(); List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount); if (scheduleList!=null && scheduleList.size()>0) { // 2、push time-ring for (XxlJobInfo jobInfo: scheduleList) { // time-ring jump if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) { // 2.1、trigger-expire > 5s:pass && make next-trigger-time logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId()); // fresh next refreshNextValidTime(jobInfo, new Date()); } else if (nowTime > jobInfo.getTriggerNextTime()) { // 2.2、trigger-expire < 5s:direct-trigger && make next-trigger-time // 1、trigger JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null); logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() ); // 2、fresh next refreshNextValidTime(jobInfo, new Date()); // next-trigger-time in 5s, pre-read again if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) { // 1、make ring second int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60); // 2、push time ring pushTimeRing(ringSecond, jobInfo.getId()); // 3、fresh next refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime())); } } else { // 2.3、trigger-pre-read:time-ring trigger && make next-trigger-time // 1、make ring second int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60); // 2、push time ring pushTimeRing(ringSecond, jobInfo.getId()); // 3、fresh next refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime())); } } // 3、update trigger info for (XxlJobInfo jobInfo: scheduleList) { XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo); } } else { preReadSuc = false; } // tx stop } catch (Exception e) { if (!scheduleThreadToStop) { logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}", e); } } finally { // commit if (conn != null) { try { conn.commit(); } catch (SQLException e) { if (!scheduleThreadToStop) { logger.error(e.getMessage(), e); } } try { conn.setAutoCommit(connAutoCommit); } catch (SQLException e) { if (!scheduleThreadToStop) { logger.error(e.getMessage(), e); } } try { conn.close(); } catch (SQLException e) { if (!scheduleThreadToStop) { logger.error(e.getMessage(), e); } } } // close PreparedStatement if (null != preparedStatement) { try { preparedStatement.close(); } catch (SQLException e) { if (!scheduleThreadToStop) { logger.error(e.getMessage(), e); } } } } long cost = System.currentTimeMillis()-start; // Wait seconds, align second if (cost < 1000) { // scan-overtime, not wait try { // pre-read period: success > scan each second; fail > skip this period; TimeUnit.MILLISECONDS.sleep((preReadSuc?1000:PRE_READ_MS) - System.currentTimeMillis()%1000); } catch (InterruptedException e) { if (!scheduleThreadToStop) { logger.error(e.getMessage(), e); } } } }
这里采取了数据库方式的分布式锁,只要是在表:xxl_job_lock 上对其加行锁来达到在这一刻只有一个调度中心执行调度任务
首先这里查询当前时间戳+50的并且运行的Job,然后分为几个梯度比较:
以及放在时间轮上,集合线程的停顿时间,取出时间轮上的job执行;
这部分逻辑许雪晴老师写的相当精细,暂未吃透;
到此,调度中心的启动时已经做了:
- 执行器机器列表的维护
- 创建并开启线程来查询是否存在未成功的日志,存在则调度任务
- 初始化快慢线程池
- 创建并开启线程来生成日志报表
- 创建开启线程来查询一定时间范围内的job,并按时间梯度分开,放在时间轮上执行或者不执行或者直接执行;
- 点赞
- 收藏
- 分享
- 文章举报
- 学习笔记---分布式调度之xxlJob的调度任务源码分析
- Linux 学习笔记_5_Linux引导流程解析_2_inittab文件剖析及系统启动流程分析
- 第43讲:Scala中类型变量Bounds代码实战及其在Spark中的应用源码解析学习笔记
- hadoop2.5.2学习及实践笔记(四)—— namenode启动过程源码概览
- 安卓学习笔记---Android源码解析--Material Design之水波纹点击效果RippleEffect使用
- 第67讲:Scala并发编程匿名Actor、消息传递、偏函数实战解析及其在Spark源码中的应用解析学习笔记
- TQ2440 学习笔记—— 30、移植U-Boot【U-Boot 的启动过程第一阶段源码分析】
- Retrofit源码学习笔记(2)-CallAdapter解析
- Laravel学习笔记之Session源码解析(中)
- ROS学习笔记------ROS深度解析----- day 7 2019/3/16 帅某(Cartographer源码阅读(1):程序入口)
- 第46讲:ClassTag 、Manifest、ClassManifest、TypeTag代码实战及其在Spark中的应用源码解析学习笔记
- Scala中View Bounds代码实战及其在Spark中的应用源码解析之Scala学习笔记-35
- Scala中Variance代码实战及其在Spark中的应用源码解析之Scala学习笔记-40
- nginx 源码学习笔记(十九)—— nginx启动过程函数调用图
- Scala中隐式转换内幕操作规则揭秘、最佳实践及其在Spark中的应用源码解析之Scala学习笔记-55
- Spark资源调度分配内幕解密:Driver在Cluster模式下的启动、两种不同的资源调度方式源码彻底解析、资源调度内幕总结
- Hadoop源码学习笔记之NameNode启动流程分析一:源码环境搭建和项目模块及NameNode结构简单介绍
- Spring源码学习笔记之基于ClassPathXmlApplicationContext进行bean标签解析
- ROS学习笔记------ROS深度解析----- day 8 2019/3/16 帅某(Cartographer源码阅读(2):Node和MapBuilder对象)
- nginx 源码学习笔记(十九)—— nginx启动过程函数调用图