您的位置:首页 > 其它

学习笔记---分布式调度之xxlJob的调度任务源码分析

2020-03-23 09:38 681 查看

学习笔记—分布式调度之xxlJob执行器的启动源码解析以及学习笔记—分布式调度之xxlJob调度中心的启动源码解析中可以知道,执行器已经开始nettyServer时刻等待任务调度并且开启了后台线程时刻监视着回调队列是否有数据,有数据则立马执行http调用调度中心的对外暴露http接口,而调度中心也在后台开启了日志线程监视是否有失败执行日志,有则生成代理对象,并且打开nettyClient来走rpc,以及开启了监听job触发时间数据状态的线程,分梯度的执行对应的,在中间梯度下则走日志相同的调用逻辑,其余的直接刷新执行时间后者直接刷新执行时间以及放到时间轮缓存上等待执行,并且更新数据库的job触发时间信息;

既然调度中心走的是相同的调度任务逻辑,就直接:XxlJobTrigger.runExecutor()

这个:ExecutorBiz是个代理对象,直接跑到:run 后,就到:XxlRpcReferenceBean

如果调度中心集群的话,则在这个类中,会先:

直接:

而在:clientInstance.asyncSend(finalAddress, xxlRpcRequest);
后需要等待逻辑,因为这里采取的是 SYNC 的方式
XxlRpcResponse xxlRpcResponse = futureResponse.get(timeout, TimeUnit.MILLISECONDS);
这里采取的阻塞方式,个人认为是跟dubbo 一样的阻塞方式,可以参考:学习笔记—DUBBO的服务调用(消费端)接着就返回封装日志结果持久化数据库;

到执行器这层:与学习笔记—DUBBO的服务调用(服务端)一样,从 NettyHttpServerHandler 看起:

可以看出来执行器端接收到调度通知后立马转交给线程池来处理:

可以看到这里的请求是调用:com.xxl.job.core.biz.ExecutorBiz的 run , 继续到:

反射到了:ExecutorBizImpl.run 方法里面:

只关心Bean的注册方式,略过其余的代码

xxlJob在执行器这层,每一个JobHandler对应一个JobThread(Thread),这样保证了调度的高效,但是有可能存在大量的任务的时候,线程就会很多了;
断点在第一次执行调度任务的时候:


看下JobThread:

public class JobThread extends Thread{
private static Logger logger = LoggerFactory.getLogger(JobThread.class);

private int jobId;
private IJobHandler handler;
private LinkedBlockingQueue<TriggerParam> triggerQueue;
private Set<Long> triggerLogIdSet;		// avoid repeat trigger for the same TRIGGER_LOG_ID

private volatile boolean toStop = false;
private String stopReason;

private boolean running = false;    // if running job
private int idleTimes = 0;			// idel times

public JobThread(int jobId, IJobHandler handler) {
this.jobId = jobId;
this.handler = handler;
this.triggerQueue = new LinkedBlockingQueue<TriggerParam>();
this.triggerLogIdSet = Collections.synchronizedSet(new HashSet<Long>());
}
public IJobHandler getHandler() {
return handler;
}

/**
* new trigger to queue
*
* @param triggerParam
* @return
*/
public ReturnT<String> pushTriggerQueue(TriggerParam triggerParam) {
// avoid repeat
if (triggerLogIdSet.contains(triggerParam.getLogId())) {
logger.info(">>>>>>>>>>> repeate trigger job, logId:{}", triggerParam.getLogId());
return new ReturnT<String>(ReturnT.FAIL_CODE, "repeate trigger job, logId:" + triggerParam.getLogId());
}

triggerLogIdSet.add(triggerParam.getLogId());
triggerQueue.add(triggerParam);
return ReturnT.SUCCESS;
}

/**
* kill job thread
*
* @param stopReason
*/
public void toStop(String stopReason) {
/**
* Thread.interrupt只支持终止线程的阻塞状态(wait、join、sleep),
* 在阻塞出抛出InterruptedException异常,但是并不会终止运行的线程本身;
* 所以需要注意,此处彻底销毁本线程,需要通过共享变量方式;
*/
this.toStop = true;
this.stopReason = stopReason;
}

/**
* is running job
* @return
*/
public boolean isRunningOrHasQueue() {
return running || triggerQueue.size()>0;
}

@Override
public void run() {

// init
try {
handler.init();
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}

// execute
while(!toStop){
running = false;
idleTimes++;

TriggerParam triggerParam = null;
ReturnT<String> executeResult = null;
try {
// to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout)
triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);
if (triggerParam!=null) {
running = true;
idleTimes = 0;
triggerLogIdSet.remove(triggerParam.getLogId());

// log filename, like "logPath/yyyy-MM-dd/9999.log"
String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTime()), triggerParam.getLogId());
XxlJobFileAppender.contextHolder.set(logFileName);
ShardingUtil.setShardingVo(new ShardingUtil.ShardingVO(triggerParam.getBroadcastIndex(), triggerParam.getBroadcastTotal()));

// execute
XxlJobLogger.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:" + triggerParam.getExecutorParams());

if (triggerParam.getExecutorTimeout() > 0) {
// limit timeout
Thread futureThread = null;
try {
final TriggerParam triggerParamTmp = triggerParam;
FutureTask<ReturnT<String>> futureTask = new FutureTask<ReturnT<String>>(new Callable<ReturnT<String>>() {
@Override
public ReturnT<String> call() throws Exception {
return handler.execute(triggerParamTmp.getExecutorParams());
}
});
futureThread = new Thread(futureTask);
futureThread.start();

executeResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS);
} catch (TimeoutException e) {

XxlJobLogger.log("<br>----------- xxl-job job execute timeout");
XxlJobLogger.log(e);

executeResult = new ReturnT<String>(IJobHandler.FAIL_TIMEOUT.getCode(), "job execute timeout ");
} finally {
futureThread.interrupt();
}
} else {
// just execute
executeResult = handler.execute(triggerParam.getExecutorParams());
}

if (executeResult == null) {
executeResult = IJobHandler.FAIL;
} else {
executeResult.setMsg(
(executeResult!=null&&executeResult.getMsg()!=null&&executeResult.getMsg().length()>50000)
?executeResult.getMsg().substring(0, 50000).concat("...")
:executeResult.getMsg());
executeResult.setContent(null);	// limit obj size
}
XxlJobLogger.log("<br>----------- xxl-job job execute end(finish) -----------<br>----------- ReturnT:" + executeResult);

} else {
if (idleTimes > 30) {
if(triggerQueue.size() == 0) {	// avoid concurrent trigger causes jobId-lost
XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit.");
}
}
}
} catch (Throwable e) {
if (toStop) {
XxlJobLogger.log("<br>----------- JobThread toStop, stopReason:" + stopReason);
}

StringWriter stringWriter = new StringWriter();
e.printStackTrace(new PrintWriter(stringWriter));
String errorMsg = stringWriter.toString();
executeResult = new ReturnT<String>(ReturnT.FAIL_CODE, errorMsg);

XxlJobLogger.log("<br>----------- JobThread Exception:" + errorMsg + "<br>----------- xxl-job job execute end(error) -----------");
} finally {
if(triggerParam != null) {
// callback handler info
if (!toStop) {
// commonm
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(), executeResult));
} else {
// is killed
ReturnT<String> stopResult = new ReturnT<String>(ReturnT.FAIL_CODE, stopReason + " [job running, killed]");
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(), stopResult));
}
}
}
}

// callback trigger request in queue
while(triggerQueue !=null && triggerQueue.size()>0){
TriggerParam triggerParam = triggerQueue.poll();
if (triggerParam!=null) {
// is killed
ReturnT<String> stopResult = new ReturnT<String>(ReturnT.FAIL_CODE, stopReason + " [job not executed, in the job queue, killed.]");
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(), stopResult));
}
}

// destroy
try {
handler.destroy();
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}

logger.info(">>>>>>>>>>> xxl-job JobThread stoped, hashCode:{}", Thread.currentThread());
}
}

可以知道一个jobThread 对应一个JobHandler,接着进入循环方法内,并且如果该:jobThread 对应的 private LinkedBlockingQueue triggerQueue; 里面存在数据时,则执行反射调用应的handler,第一次调用是是没有数据的,但是生成JobThread的线程后紧接着调用:
ReturnT pushResult = jobThread.pushTriggerQueue(triggerParam);
那么 JobThread 就有数据操作了,在循环体内,首先生成以时间为后缀名的执行文件对象,接着判断请求参数的超时时间是否大于0,如果是 则进行再次创建线程异步执行并且利用futureTask.get(Timeout) 方法阻塞返回结果,如果否的话,直接反射调用;执行的过程中不管是否异常都要追加到文件日志里面,并且最后将结果推送到:TriggerCallbackThread 的:callBackQueue 里面,让:triggerCallbackThread 这个线程可以有日志推送回调度中心;当优雅停机时,则直接跳出这个循环体,进入:

将没有执行完的:triggerQueue 里面的数据取出来,推到 callBackQueue 接着销毁 handler ,结束该线程;
执行完后则将数据封装成 xxlRpcResponse 返回给调度中心;

  • 点赞
  • 收藏
  • 分享
  • 文章举报
lbingk 发布了11 篇原创文章 · 获赞 0 · 访问量 224 私信 关注
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: