学习笔记---分布式调度之xxlJob的调度任务源码分析
在学习笔记—分布式调度之xxlJob执行器的启动源码解析以及学习笔记—分布式调度之xxlJob调度中心的启动源码解析中可以知道,执行器已经开始nettyServer时刻等待任务调度并且开启了后台线程时刻监视着回调队列是否有数据,有数据则立马执行http调用调度中心的对外暴露http接口,而调度中心也在后台开启了日志线程监视是否有失败执行日志,有则生成代理对象,并且打开nettyClient来走rpc,以及开启了监听job触发时间数据状态的线程,分梯度的执行对应的,在中间梯度下则走日志相同的调用逻辑,其余的直接刷新执行时间后者直接刷新执行时间以及放到时间轮缓存上等待执行,并且更新数据库的job触发时间信息;
既然调度中心走的是相同的调度任务逻辑,就直接:XxlJobTrigger.runExecutor()
这个:ExecutorBiz是个代理对象,直接跑到:run 后,就到:XxlRpcReferenceBean
如果调度中心集群的话,则在这个类中,会先:
直接:
而在:clientInstance.asyncSend(finalAddress, xxlRpcRequest);
后需要等待逻辑,因为这里采取的是 SYNC 的方式
XxlRpcResponse xxlRpcResponse = futureResponse.get(timeout, TimeUnit.MILLISECONDS);
这里采取的阻塞方式,个人认为是跟dubbo 一样的阻塞方式,可以参考:学习笔记—DUBBO的服务调用(消费端)接着就返回封装日志结果持久化数据库;
到执行器这层:与学习笔记—DUBBO的服务调用(服务端)一样,从 NettyHttpServerHandler 看起:
可以看出来执行器端接收到调度通知后立马转交给线程池来处理:
可以看到这里的请求是调用:com.xxl.job.core.biz.ExecutorBiz的 run , 继续到:
反射到了:ExecutorBizImpl.run 方法里面:
只关心Bean的注册方式,略过其余的代码
xxlJob在执行器这层,每一个JobHandler对应一个JobThread(Thread),这样保证了调度的高效,但是有可能存在大量的任务的时候,线程就会很多了;
断点在第一次执行调度任务的时候:
看下JobThread:
public class JobThread extends Thread{ private static Logger logger = LoggerFactory.getLogger(JobThread.class); private int jobId; private IJobHandler handler; private LinkedBlockingQueue<TriggerParam> triggerQueue; private Set<Long> triggerLogIdSet; // avoid repeat trigger for the same TRIGGER_LOG_ID private volatile boolean toStop = false; private String stopReason; private boolean running = false; // if running job private int idleTimes = 0; // idel times public JobThread(int jobId, IJobHandler handler) { this.jobId = jobId; this.handler = handler; this.triggerQueue = new LinkedBlockingQueue<TriggerParam>(); this.triggerLogIdSet = Collections.synchronizedSet(new HashSet<Long>()); } public IJobHandler getHandler() { return handler; } /** * new trigger to queue * * @param triggerParam * @return */ public ReturnT<String> pushTriggerQueue(TriggerParam triggerParam) { // avoid repeat if (triggerLogIdSet.contains(triggerParam.getLogId())) { logger.info(">>>>>>>>>>> repeate trigger job, logId:{}", triggerParam.getLogId()); return new ReturnT<String>(ReturnT.FAIL_CODE, "repeate trigger job, logId:" + triggerParam.getLogId()); } triggerLogIdSet.add(triggerParam.getLogId()); triggerQueue.add(triggerParam); return ReturnT.SUCCESS; } /** * kill job thread * * @param stopReason */ public void toStop(String stopReason) { /** * Thread.interrupt只支持终止线程的阻塞状态(wait、join、sleep), * 在阻塞出抛出InterruptedException异常,但是并不会终止运行的线程本身; * 所以需要注意,此处彻底销毁本线程,需要通过共享变量方式; */ this.toStop = true; this.stopReason = stopReason; } /** * is running job * @return */ public boolean isRunningOrHasQueue() { return running || triggerQueue.size()>0; } @Override public void run() { // init try { handler.init(); } catch (Throwable e) { logger.error(e.getMessage(), e); } // execute while(!toStop){ running = false; idleTimes++; TriggerParam triggerParam = null; ReturnT<String> executeResult = null; try { // to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout) triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS); if (triggerParam!=null) { running = true; idleTimes = 0; triggerLogIdSet.remove(triggerParam.getLogId()); // log filename, like "logPath/yyyy-MM-dd/9999.log" String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTime()), triggerParam.getLogId()); XxlJobFileAppender.contextHolder.set(logFileName); ShardingUtil.setShardingVo(new ShardingUtil.ShardingVO(triggerParam.getBroadcastIndex(), triggerParam.getBroadcastTotal())); // execute XxlJobLogger.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:" + triggerParam.getExecutorParams()); if (triggerParam.getExecutorTimeout() > 0) { // limit timeout Thread futureThread = null; try { final TriggerParam triggerParamTmp = triggerParam; FutureTask<ReturnT<String>> futureTask = new FutureTask<ReturnT<String>>(new Callable<ReturnT<String>>() { @Override public ReturnT<String> call() throws Exception { return handler.execute(triggerParamTmp.getExecutorParams()); } }); futureThread = new Thread(futureTask); futureThread.start(); executeResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS); } catch (TimeoutException e) { XxlJobLogger.log("<br>----------- xxl-job job execute timeout"); XxlJobLogger.log(e); executeResult = new ReturnT<String>(IJobHandler.FAIL_TIMEOUT.getCode(), "job execute timeout "); } finally { futureThread.interrupt(); } } else { // just execute executeResult = handler.execute(triggerParam.getExecutorParams()); } if (executeResult == null) { executeResult = IJobHandler.FAIL; } else { executeResult.setMsg( (executeResult!=null&&executeResult.getMsg()!=null&&executeResult.getMsg().length()>50000) ?executeResult.getMsg().substring(0, 50000).concat("...") :executeResult.getMsg()); executeResult.setContent(null); // limit obj size } XxlJobLogger.log("<br>----------- xxl-job job execute end(finish) -----------<br>----------- ReturnT:" + executeResult); } else { if (idleTimes > 30) { if(triggerQueue.size() == 0) { // avoid concurrent trigger causes jobId-lost XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit."); } } } } catch (Throwable e) { if (toStop) { XxlJobLogger.log("<br>----------- JobThread toStop, stopReason:" + stopReason); } StringWriter stringWriter = new StringWriter(); e.printStackTrace(new PrintWriter(stringWriter)); String errorMsg = stringWriter.toString(); executeResult = new ReturnT<String>(ReturnT.FAIL_CODE, errorMsg); XxlJobLogger.log("<br>----------- JobThread Exception:" + errorMsg + "<br>----------- xxl-job job execute end(error) -----------"); } finally { if(triggerParam != null) { // callback handler info if (!toStop) { // commonm TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(), executeResult)); } else { // is killed ReturnT<String> stopResult = new ReturnT<String>(ReturnT.FAIL_CODE, stopReason + " [job running, killed]"); TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(), stopResult)); } } } } // callback trigger request in queue while(triggerQueue !=null && triggerQueue.size()>0){ TriggerParam triggerParam = triggerQueue.poll(); if (triggerParam!=null) { // is killed ReturnT<String> stopResult = new ReturnT<String>(ReturnT.FAIL_CODE, stopReason + " [job not executed, in the job queue, killed.]"); TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(), stopResult)); } } // destroy try { handler.destroy(); } catch (Throwable e) { logger.error(e.getMessage(), e); } logger.info(">>>>>>>>>>> xxl-job JobThread stoped, hashCode:{}", Thread.currentThread()); } }
可以知道一个jobThread 对应一个JobHandler,接着进入循环方法内,并且如果该:jobThread 对应的 private LinkedBlockingQueue triggerQueue; 里面存在数据时,则执行反射调用应的handler,第一次调用是是没有数据的,但是生成JobThread的线程后紧接着调用:
ReturnT pushResult = jobThread.pushTriggerQueue(triggerParam);
那么 JobThread 就有数据操作了,在循环体内,首先生成以时间为后缀名的执行文件对象,接着判断请求参数的超时时间是否大于0,如果是 则进行再次创建线程异步执行并且利用futureTask.get(Timeout) 方法阻塞返回结果,如果否的话,直接反射调用;执行的过程中不管是否异常都要追加到文件日志里面,并且最后将结果推送到:TriggerCallbackThread 的:callBackQueue 里面,让:triggerCallbackThread 这个线程可以有日志推送回调度中心;当优雅停机时,则直接跳出这个循环体,进入:
将没有执行完的:triggerQueue 里面的数据取出来,推到 callBackQueue 接着销毁 handler ,结束该线程;
执行完后则将数据封装成 xxlRpcResponse 返回给调度中心;
- 点赞
- 收藏
- 分享
- 文章举报
- 学习笔记---分布式调度之xxlJob任务自动生成与注册的改造
- 轻量级分布式任务调度框架XXL-JOB 学习笔记
- TBSchedule源码学习笔记-线程组任务调度
- Ogre源码分析与学习笔记-2 纹理
- Quartz.NET 2.0 学习笔记(5) :实例创建Windows服务实现任务调度
- netty5源码分析(1)--学习笔记
- nginx 源码学习笔记(七)——内存分配相关源码分析
- Ui学习笔记---EasyUI的EasyLoader组件源码分析
- Spring 整合 Quartz 任务调度框架学习笔记
- Yarn源码分析之MapReduce作业中任务Task调度整体流程(一)
- Quartz.NET 2.0 学习笔记(5) :实例创建Windows服务实现任务调度
- μC/OS Ⅱ学习笔记--任务的调度
- Cocos2d-x学习笔记(19)(TestCpp源码分析-3)
- spring学习笔记(26)spring整合Quartz2持久化稳健任务调度
- Quartz.NET 2.0 学习笔记(3) :通过配置文件实现任务调度
- Java集合源码学习笔记(三)LinkedList分析
- UCOS_II学习笔记---任务管理之删除任务函数分析
- ucos-II 任务调度源码分析(三)
- OpenCV学习笔记(29)KAZE 算法原理与源码分析(三)特征检测与描述