学习笔记---分布式调度之xxlJob执行器的启动源码解析
可以实现调度任务的手段蛮多的,比如说spring本身的定时任务SpringTask,或者是jdk的Timer、时间线程池ScheduledExecutorService,Quartz等,
如果要实现分布式调度的话,则以上方式需要分布式锁的机制,加之如果需要做到统一管理这些调度任务的以及可配置的话,则上面的方式需要额外的逻辑处理才行,
许雪晴老师的xxlJob以及当当网开发的elasticJob可以做到解决以上的问题;
先看下官网的这张图,xxJob主要分为两大组件,调度中心以及执行器;
- 调度中心不参与业务处理,只负责配置对应的执行器以及任务,记录日志,生成报表显示;
- 执行器对应的是每台真正执行业务的逻辑 , 对应多个调度任务,真正执行业务逻辑,执行完后,记录日志并发送日志给调度中心
因此,这两个组件是紧紧结合在一起的,缺一不可;目前xxlJob已经可以集群化,这个基本保证调度中心这一层面的保障;
先来探讨下xxl-job-core的代码实现:
通过使用方式可以知道,入口:XxlJob 注解入口,全局搜索得到:XxlJobSpringExecutor :
public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationContextAware, InitializingBean, DisposableBean { // start @Override public void afterPropertiesSet() throws Exception { // init JobHandler Repository initJobHandlerRepository(applicationContext); // init JobHandler Repository (for method) initJobHandlerMethodRepository(applicationContext); // refresh GlueFactory GlueFactory.refreshInstance(1); // super start super.start(); } // destroy @Override public void destroy() { super.destroy(); } private void initJobHandlerRepository(ApplicationContext applicationContext) { if (applicationContext == null) { return; } // init job handler action Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(JobHandler.class); if (serviceBeanMap != null && serviceBeanMap.size() > 0) { for (Object serviceBean : serviceBeanMap.values()) { if (serviceBean instanceof IJobHandler) { String name = serviceBean.getClass().getAnnotation(JobHandler.class).value(); IJobHandler handler = (IJobHandler) serviceBean; if (loadJobHandler(name) != null) { throw new RuntimeException("xxl-job jobhandler[" + name + "] naming conflicts."); } registJobHandler(name, handler); } } } } private void initJobHandlerMethodRepository(ApplicationContext applicationContext) { if (applicationContext == null) { return; } // init job handler from method String[] beanDefinitionNames = applicationContext.getBeanDefinitionNames(); for (String beanDefinitionName : beanDefinitionNames) { Object bean = applicationContext.getBean(beanDefinitionName); Method[] methods = bean.getClass().getDeclaredMethods(); for (Method method: methods) { XxlJob xxlJob = AnnotationUtils.findAnnotation(method, XxlJob.class); if (xxlJob != null) { // name String name = xxlJob.value(); if (name.trim().length() == 0) { throw new RuntimeException("xxl-job method-jobhandler name invalid, for[" + bean.getClass() + "#"+ method.getName() +"] ."); } if (loadJobHandler(name) != null) { throw new RuntimeException("xxl-job jobhandler[" + name + "] naming conflicts."); } // execute method if (!(method.getParameterTypes()!=null && method.getParameterTypes().length==1 && method.getParameterTypes()[0].isAssignableFrom(String.class))) { throw new RuntimeException("xxl-job method-jobhandler param-classtype invalid, for[" + bean.getClass() + "#"+ method.getName() +"] , " + "The correct method format like \" public ReturnT<String> execute(String param) \" ."); } if (!method.getReturnType().isAssignableFrom(ReturnT.class)) { throw new RuntimeException("xxl-job method-jobhandler return-classtype invalid, for[" + bean.getClass() + "#"+ method.getName() +"] , " + "The correct method format like \" public ReturnT<String> execute(String param) \" ."); } method.setAccessible(true); // init and destory Method initMethod = null; Method destroyMethod = null; if(xxlJob.init().trim().length() > 0) { try { initMethod = bean.getClass().getDeclaredMethod(xxlJob.init()); initMethod.setAccessible(true); } catch (NoSuchMethodException e) { throw new RuntimeException("xxl-job method-jobhandler initMethod invalid, for[" + bean.getClass() + "#"+ method.getName() +"] ."); } } if(xxlJob.destroy().trim().length() > 0) { try { destroyMethod = bean.getClass().getDeclaredMethod(xxlJob.destroy()); destroyMethod.setAccessible(true); } catch (NoSuchMethodException e) { throw new RuntimeException("xxl-job method-jobhandler destroyMethod invalid, for[" + bean.getClass() + "#"+ method.getName() +"] ."); } } // registry jobhandler registJobHandler(name, new MethodJobHandler(bean, method, initMethod, destroyMethod)); } } } } // ---------------------- applicationContext ---------------------- private static ApplicationContext applicationContext; @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; } public static ApplicationContext getApplicationContext() { return applicationContext; } }
可以看出来,通过利用spring容器以及注解获取对应的任务:JobHandler,调用:
XxlJobExecutor:
public static IJobHandler registJobHandler(String name, IJobHandler jobHandler) { logger.info(">>>>>>>>>>> xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler); return jobHandlerRepository.put(name, jobHandler); }
存储所有的JobHandler,如果是方法级别的,则采取类似包装类的方法,封装一个:MethodJobHandler实现类,调用时候,采用反射即可;
在这里就已经完成了调度任务JobHandler的本地注册;
接着调用: super.start();
XxlJobExecutor:
// ---------------------- start + stop ---------------------- public void start() throws Exception { // init logpath XxlJobFileAppender.initLogPath(logPath); // init invoker, admin-client initAdminBizList(adminAddresses, accessToken); // init JobLogFileCleanThread JobLogFileCleanThread.getInstance().start(logRetentionDays); // init TriggerCallbackThread TriggerCallbackThread.getInstance().start(); // init executor-server port = port > 0 ? port : NetUtil.findAvailablePort(9999); ip = (ip != null && ip.trim().length() > 0) ? ip : IpUtil.getIp(); initRpcProvider(ip, port, appName, accessToken); }
第一个是:XxlJobFileAppender.initLogPath(logPath); 根据配置的本地配置路径来创建日志文件夹;
第二个是:initAdminBizList(adminAddresses, accessToken); 则是根据配置来创建与调度中心管理台的对应的AdminBizClient实现类;
第三个是:JobLogFileCleanThread.getInstance().start(logRetentionDays); 则是根据配置的实效天数来生成后台线程并启动清除超过有效期的日志;
第四个是:TriggerCallbackThread.getInstance().start(); 则是开始与调度中心打交道了,
public class TriggerCallbackThread { private static Logger logger = LoggerFactory.getLogger(TriggerCallbackThread.class); private static TriggerCallbackThread instance = new TriggerCallbackThread(); public static TriggerCallbackThread getInstance(){ return instance; } /** * job results callback queue */ private LinkedBlockingQueue<HandleCallbackParam> callBackQueue = new LinkedBlockingQueue<HandleCallbackParam>(); public static void pushCallBack(HandleCallbackParam callback){ getInstance().callBackQueue.add(callback); logger.debug(">>>>>>>>>>> xxl-job, push callback request, logId:{}", callback.getLogId()); } /** * callback thread */ private Thread triggerCallbackThread; private Thread triggerRetryCallbackThread; private volatile boolean toStop = false; public void start() { // valid if (XxlJobExecutor.getAdminBizList() == null) { logger.warn(">>>>>>>>>>> xxl-job, executor callback config fail, adminAddresses is null."); return; } // callback triggerCallbackThread = new Thread(new Runnable() { @Override public void run() { // normal callback while(!toStop){ try { HandleCallbackParam callback = getInstance().callBackQueue.take(); if (callback != null) { // callback list param List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>(); int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList); callbackParamList.add(callback); // callback, will retry if error if (callbackParamList!=null && callbackParamList.size()>0) { doCallback(callbackParamList); } } } catch (Exception e) { if (!toStop) { logger.error(e.getMessage(), e); } } } // last callback try { List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>(); int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList); if (callbackParamList!=null && callbackParamList.size()>0) { doCallback(callbackParamList); } } catch (Exception e) { if (!toStop) { logger.error(e.getMessage(), e); } } logger.info(">>>>>>>>>>> xxl-job, executor callback thread destory."); } }); triggerCallbackThread.setDaemon(true); triggerCallbackThread.setName("xxl-job, executor TriggerCallbackThread"); triggerCallbackThread.start(); // retry triggerRetryCallbackThread = new Thread(new Runnable() { @Override public void run() { while(!toStop){ try { retryFailCallbackFile(); } catch (Exception e) { if (!toStop) { logger.error(e.getMessage(), e); } } try { TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT); } catch (InterruptedException e) { if (!toStop) { logger.error(e.getMessage(), e); } } } logger.info(">>>>>>>>>>> xxl-job, executor retry callback thread destory."); } }); triggerRetryCallbackThread.setDaemon(true); triggerRetryCallbackThread.start(); } public void toStop(){ toStop = true; // stop callback, interrupt and wait if (triggerCallbackThread != null) { // support empty admin address triggerCallbackThread.interrupt(); try { triggerCallbackThread.join(); } catch (InterruptedException e) { logger.error(e.getMessage(), e); } } // stop retry, interrupt and wait if (triggerRetryCallbackThread != null) { triggerRetryCallbackThread.interrupt(); try { triggerRetryCallbackThread.join(); } catch (InterruptedException e) { logger.error(e.getMessage(), e); } } } /** * do callback, will retry if error * @param callbackParamList */ private void doCallback(List<HandleCallbackParam> callbackParamList){ boolean callbackRet = false; // callback, will retry if error for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) { try { ReturnT<String> callbackResult = adminBiz.callback(callbackParamList); if (callbackResult!=null && ReturnT.SUCCESS_CODE == callbackResult.getCode()) { callbackLog(callbackParamList, "<br>----------- xxl-job job callback finish."); callbackRet = true; break; } else { callbackLog(callbackParamList, "<br>----------- xxl-job job callback fail, callbackResult:" + callbackResult); } } catch (Exception e) { callbackLog(callbackParamList, "<br>----------- xxl-job job callback error, errorMsg:" + e.getMessage()); } } if (!callbackRet) { appendFailCallbackFile(callbackParamList); } } /** * callback log */ private void callbackLog(List<HandleCallbackParam> callbackParamList, String logContent){ for (HandleCallbackParam callbackParam: callbackParamList) { String logFileName = XxlJobFileAppender.makeLogFileName(new Date(callbackParam.getLogDateTim()), callbackParam.getLogId()); XxlJobFileAppender.contextHolder.set(logFileName); XxlJobLogger.log(logContent); } } // ---------------------- fail-callback file ---------------------- private static String failCallbackFilePath = XxlJobFileAppender.getLogPath().concat(File.separator).concat("callbacklog").concat(File.separator); private static String failCallbackFileName = failCallbackFilePath.concat("xxl-job-callback-{x}").concat(".log"); private void appendFailCallbackFile(List<HandleCallbackParam> callbackParamList){ // valid if (callbackParamList==null || callbackParamList.size()==0) { return; } // append file byte[] callbackParamList_bytes = XxlJobExecutor.getSerializer().serialize(callbackParamList); File callbackLogFile = new File(failCallbackFileName.replace("{x}", String.valueOf(System.currentTimeMillis()))); if (callbackLogFile.exists()) { for (int i = 0; i < 100; i++) { callbackLogFile = new File(failCallbackFileName.replace("{x}", String.valueOf(System.currentTimeMillis()).concat("-").concat(String.valueOf(i)) )); if (!callbackLogFile.exists()) { break; } } } FileUtil.writeFileContent(callbackLogFile, callbackParamList_bytes); } private void retryFailCallbackFile(){ // valid File callbackLogPath = new File(failCallbackFilePath); if (!callbackLogPath.exists()) { return; } if (callbackLogPath.isFile()) { callbackLogPath.delete(); } if (!(callbackLogPath.isDirectory() && callbackLogPath.list()!=null && callbackLogPath.list().length>0)) { return; } // load and clear file, retry for (File callbaclLogFile: callbackLogPath.listFiles()) { byte[] callbackParamList_bytes = FileUtil.readFileContent(callbaclLogFile); List<HandleCallbackParam> callbackParamList = (List<HandleCallbackParam>) XxlJobExecutor.getSerializer().deserialize(callbackParamList_bytes, HandleCallbackParam.class); callbaclLogFile.delete(); doCallback(callbackParamList); } } }
来具体看下这个类:这个类是专门来处理日志回调的,单调度中心触发调度任务执行后生成执行结果到:callBackQueue 队列里面,然后由:triggerCallbackThread 线程回调,并记录回调结果,并且开启:triggerRetryCallbackThread 线程来检查有没有失败记录产生,重复上面的操作;
在这里许雪晴线程优雅停机的方式:1.采用共享可变变量:toStop 来首次执行停机逻辑;2.采用线程的: interrupt 来设置 triggerRetryCallbackThread 以及 triggerCallbackThread 标志位为false,它们执行到睡眠阻塞的时候,则到:InterruptedException 里面,如此来处理的话,则不会影响到原来正在执行的逻辑,整个xxl-job框架多出使用这种方式来处理;
第五是:initRpcProvider(ip, port, appName, accessToken); 这个就最最关键的了:
可以看到这里是要初始化NettyServer等相关类的操作了:
xxlRpcProviderFactory.addService(ExecutorBiz.class.getName(), null, new ExecutorBizImpl());
创建ExecutorBizImpl 实现类,并且用集合Map存储,这个类相当关键:
public interface ExecutorBiz { /** * beat * @return */ public ReturnT<String> beat(); /** * idle beat * * @param jobId * @return */ public ReturnT<String> idleBeat(int jobId); /** * kill * @param jobId * @return */ public ReturnT<String> kill(int jobId); /** * log * @param logDateTim * @param logId * @param fromLineNum * @return */ public ReturnT<LogResult> log(long logDateTim, long logId, int fromLineNum); /** * run * @param triggerParam * @return */ public ReturnT<String> run(TriggerParam triggerParam); }
可以看出这个是通讯,业务执行以及记录日志的入口;
接着:xxlRpcProviderFactory.start();
这里默认的是走:Hessian序列化方式,并且底层通讯框架是Netty, 在:serverInstance.start(this); 这里是创建NettyServer 里面的逻辑跟一般的使用并没有什么区别,回到回调方法:
setStartedCallback:serviceRegistryInstance 这个是 ExecutorServiceRegistry 的实体对象,主要是
在调用 start 方法时创建线程并且循环(当然停机时采用停机的方式)调用:adminBiz.registry 方式来实现将本地的执行器注册到调度中心上,可以看下代码:
AdminBizClient:
@Override public ReturnT<String> registry(RegistryParam registryParam) { return XxlJobRemotingUtil.postBody(addressUrl + "api/registry", accessToken, registryParam, 3); }
XxlJobRemotingUtil 里面封装了:java.net.HttpURLConnection 类,通过这个类,调用了调度中心的:controller 层的暴露的对外接口,实现了调度任务以及执行器的远程注册,所以这里采用的是http的方式进行调用的;同样的,优雅停机时调用了:registryRemove的方法,实现的逻辑基本一致;小下面的:
if (serviceData.size() > 0) {
serviceRegistryInstance.registry(serviceData.keySet(), serviceAddress);
}
这段尚未理解,暂且略过;
而在:NettyHttpServer 创建的时候调用的了: onStarted(); 接着调用了这个回调 startedCallback.run(); 而在下面的:startedCallback 就在优雅停机的时候回调用的;
到目前为止,执行器这端的已经:
- 开启了nettyServer服务,时刻等待调度中心的调度任务通知;
- 创建并启动后台线程来处理日志的回调;
- 创建并启动后台线程来处理JobHandler以及执行器的注册以;
- 点赞
- 收藏
- 分享
- 文章举报
- 学习笔记---分布式调度之xxlJob调度中心的启动源码解析
- 轻量级分布式任务调度框架XXL-JOB 学习笔记
- 分布式任务调度平台XXL-JOB配置笔记
- 使用xxl-job调度平台时,启动执行器工程报错:unknown code for readObject at 0x3c (<)
- 分布式任务调度系统xxl-job源码探究(一、客户端)
- 学习笔记---分布式调度之xxlJob的调度任务源码分析
- 分布式任务调度系统xxl-job源码探究(二、服务中心)
- JUC.Condition学习笔记[附详细源码解析]
- Jsoup学习笔记4:Jsoup 解析Html源码实例
- 作业调度框架 Quartz 学习笔记(六) -- job生病了(抛出异常)时的处理
- AngularJS学习笔记--002--Angular JS路由插件ui.router源码解析
- 分布式任务调度平台XXL-JOB
- ROS学习笔记------ROS深度解析----- day 8 2019/3/16 帅某(Cartographer源码阅读(2):Node和MapBuilder对象)
- Jsoup学习笔记3:Jsoup 解析Html源码实例
- HBase-1.0.1学习笔记(八)启动脚本解析
- Driver在Cluster模式下的启动、两种不同的资源调度方式源码彻底解析、资源调度内幕总结
- 第31课: Spark资源调度分配内幕天机彻底解密:Driver在Cluster模式下的启动、两种不同的资源调度方式源码彻底解析、资源调度内幕总结
- 分布式任务调度平台XXL-JOB
- Hadoop源码学习笔记之NameNode启动流程分析一:源码环境搭建和项目模块及NameNode结构简单介绍
- 一文读懂分布式任务调度平台XXL-JOB