您的位置:首页 > 数据库 > Redis

任务调度在分布式部署环境下保证task的正确运行

2016-09-26 20:21 756 查看
在项目开发中,会经常使用的到定时任务。一般对与定时任务根据需求的不同、程序员的编码习惯,通常会有2种部署方式:

使用统一的任务调度系统进行管理。在分布式环境下,通过任务调度系统使用锁机制来保证同时只有一条服务器在运行定时任务。

在项目中直接配置task执行。在单服务器的情况下,此种方式是没有问题的,当在分布式环境下,就会存在多台服务器同时出发task。此时就需要单进行特殊的设置,以保证其同时只有一个服务器在运行task。

本文以讨论第2中情况下怎么解决在分布式环境下出现的问题。讨论的解决方法如下:

分别请求rpc服务,在服务中通过锁机制,来判断获得执行task权限的服务器

通过zk创建节点是否成功来进行判断

通过redis的Hsetnx 、Setnx的特性进行判断。

假设有同一个Task不是在3台服务器中,分别是A、B、C,远程服务S

RPC服务

描述:A、B、C三台服务器在执行task时同时请求远程服务S,在S中,通过一定的机制判定某台服务器具有执行task的权限,其他几台服务器则不具备。

在S服务中,判断机制特性为:先请求的具有执行权限。A\B\C请求时传输task唯一标识名,在S服务中接收到请求时,判断当前task标识是否进行记录,如已记录,则返回flase,表示不具备执行条件;反之不存在记录,对先对task标识进行存储,返回true,表示具有执行条件。在判断时,需要使用同步锁机制,保证同时只有一个请求在处于判断条件中。在请求执行完成后,再次S服务的刷新方法,释放已选择标识。

Zookeeper创建节点的方式(Java举例)

描述:A\B\C三台服务器在执行task时同时通过zk客户端创建同一node,创建成功的服务器则具有执行task的权限,否则不具备。

zk的判断机制为:zk客户端在创建节点时,会判断节点是否存在,如不存在则创建,反之出现异常。因此在执行task时,同时创建节点成功,则具有执行权限。伪代码如下:

/**
* 使用zk在分布式部署情况下确保并发情况中只有一台服务在执行
* 利用zk的创建node机制:当节点存在,则返回异常,否则创建成功。
* @author aiyungui
* @create 2016-09-27-11:24
**/
public class ZkDistributedUtil {

private CuratorFramework client;

private String defaultName = "ink";
private boolean isExecute = true;

public ZkDistributedUtil(String connectStr){
this(connectStr,null);
}

/**
* 初始化zk
* @param connectStr
* @param namespace
*/
public ZkDistributedUtil(String connectStr,String namespace){
if (StringUtils.isBlank(namespace)){
namespace = defaultName;
}

CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder();
client = builder.connectString(connectStr).sessionTimeoutMs(30000).connectionTimeoutMs(30000)
.canBeReadOnly(true).namespace(namespace).retryPolicy(new ExponentialBackoffRetry(1000, Integer.MAX_VALUE))
.defaultData(null).build();

client.start();
}

/**
* 创建节点
* @param nodePath
* @throws Exception
*/
public void createNode(String nodePath) throws Exception {
client.create().creatingParentsIfNeeded().forPath(nodePath);
}

/**
* 删除节点
* @param nodePath
* @throws Exception
*/
public void deleteNode(String nodePath) throws Exception {
client.delete().forPath(nodePath);
}

public static void main(String args[]){
//调用示例
try {
ZkDistributedUtil watcherService = new ZkDistributedUtil("10.1.5.217:2181");
watcherService.createNode("/timer/node");
System.out.println("execute module ....");
watcherService.deleteNode("/timer/node");

} catch (Exception e) {
System.out.println("创建节点失败");
}
}
}

redis判断方式

描述:A\B\C三台服务器在执行task时同时请求远程服务器S,在S中,通过redis的Hsetnx、setnx命令的特性,返回1则具有执行task的权限,否则不具备

redis的判断机制特性:redis的Hsetnx、setnx命令会在执行时判断是否存在对应的key,如存在则返回0,不存在则创建key,并进行赋值操作,返回1。此时可根据返回值为1或0来判定是否具有执行权限。伪代码如下:

/**
*使用Redis在分布式部署情况下确保并发情况中只有一台服务在执行
*利用redis的hsetnx命令特性:当key存在则返回0,反之创建成功并返回1
* @author aiyungui
* @create 2016-09-27-10:44
**/
public class RedisDistributedUtil {

private JedisPool jedisPool;

/**
* 初始化redis pool
*/
public void init(){
if (jedisPool == null)
jedisPool = (JedisPool) SpringApplicationContext.getBean("jedisPool");
}

/**
*是否获取执行权限 true为是,false为否
* @param key
* @param fieldId
* @param value
* @return
*/
public boolean isCanExecute(String key,String fieldId,String value){
boolean isExecute = true;
Jedis jedis = null;
try{
if (jedisPool == null){
init();
}
jedis = jedisPool.getResource();

value = value==null?"1":value;
Long result = jedis.hsetnx(key,fieldId,value);
if (result == 0){
isExecute = false;
}

}catch (Exception e){
e.printStackTrace();
}finally {
if (jedis != null){
jedis.close();
}
}

return isExecute;
}

/**
* 释放锁定的key信息
* @param key
* @param fieldId
*/
public void refreshKey(String key,String fieldId){
Jedis jedis = null;
try{
if (jedisPool == null){
init();
}
jedis = jedisPool.getResource();
jedis.hdel(key,fieldId);
}catch (Exception e){
e.printStackTrace();
}finally {
if (jedis != null){
jedis.close();
}
}
}

public static void main(String args[]){
RedisDistributedUtil distributedUtil = new RedisDistributedUtil();
boolean result = distributedUtil.isCanExecute("task","ink.test","1");
if (result) {
System.out.println("execute module ....");

distributedUtil.refreshKey("task","ink.test");
}

}
}


上述3种方式能保证在分布式部署的环境下同时只有一台服务器具有task执行权限。但task的执行又主要分为一些两种:

每隔多长时间执行

在某个时间点执行

对于2,则上述方式能完全满足,对于1,则会因服务器启动时间的不同,出现在不同的时间点执行task,此时需要根据业务再进行区分,如A\B\C服务执行的task是否在不同时间执行都对业务数据的操作是否具有冥等性。如果具有冥等性,则不受影响。否则需要根据业务进行特殊处理。处理方式可有:

在获得task执行权限的服务器在task任务执行完成后,休眠一段时间再通知zookeeper删除节点、redis删除key,RPC服务刷新,已保证task的执行是有效的

对第一个获得执行task权限的服务器进行记录,下次执行依然使用该服务器,直至此服务器出现异常。出现异常后,再次选择最先获得task执行权限的服务器
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息