您的位置:首页 > 数据库 > Redis

Redis系列四 - 在springboot中通过Lua脚本在redis中实现定时任务

2017-07-27 08:02 1401 查看

1. 概述

有时我们需要在特定时间执行特定的任务,然而一般的定时任务又不满足我们的需求。如

重推任务:我们向第三方发送话单,但是有可能推送失败,此时我们需要隔一段时间再重推。重推N次后,仍然失败,则不重推,标志无法推送

程序需要在N秒后执行特定任务,但是任务的参数由当前决定。

本文演示使用redis,lua和spring boot实现如上的功能。

2. redis+lua实现基本的定时任务主功能

2.1. ITimedTaskService

此接口定义服务的基本方法:添加,删除和获取需要执行的定时任务

public interface ITimedTaskService{
/**
* 添加需要定时执行的任务
* @param keySuffix
* @param executeTime 执行的时间
* @param value
*/
<T extends ITimedTaskModel> T add(String keySuffix, final Date executeTime,final T value);

/**
* 批量删除已经执行的定时任务
* @param keySuffix
* @param relationValues
*/
void bathDel(String keySuffix, final String... ids);

/**
* 获取当前需要执行的定时任务
* @param keySuffix
* @return
*/
<T extends ITimedTaskModel> List<T> getTimedTaskContent(String keySuffix, Class<T> cls);
}


2.2. TimedTaskService

定时任务的主服务类,ITimedTaskService的实现类

具体实现原理说明

1. 变量定义

unique_keySuffi:任务的定时任务可以被多种定时任务共用,为了区分不同定时任务,所以不同任务的key后缀不同。每个不同的定时任务,需要定义唯一的后缀,如”cdrs”,”repush”

id = UUID; //将ZSet和Hash里相应记录关联起来的值

2. redis定义两个key来保存定时任务的信息,2个key通过id值进行关联

A. ZSet: 核心是保存所有的定时任务计划将要执行的时间和hash关联的id值。不同类型的定时任务unique_keySuffix不同。相同类型的定时任务存储在相同的key,不同的同类型的任务通过member值区分,score存储将要执行的时间。通过zset的对score的排序功能,可以获取已经达到执行时间点的任务

key各个参数值的说明

key:timedTask_#{unique_keySuffix}

member:#{id}

score: 执行时间

B. Hash:保存每个定时任务的详细信息。相同类型的任务zset和hash的key的unique_keySuffix相同。从zset获取id后和hash的field进行匹配,从而获得fieldValue。fieldValue存储任务的详细信息,目前使用json字符串存储信息。

各个参数值的说明

key:timedTaskContent_#{unique_keySuffix}

field: #{id}

fieldValue: 执行定时任务所需要的参数

3. 关键方法说明:

添加任务:

• 一个任务需要同时在zset和hash中添加一条记录,两条记录通过id值关联在一起
• 在ZSet和Hash里根据以上规则各自添加1条新的记录


获取需要执行的任务:

• ZSet使用score保存任务执行时间,先从ZSet里面获取所有score <= 当前时间 的记录,
• 逐个根据zset的member值从hash中获取field和zset的member相同的fieldValue值(member和fieldValue都是id值),fieldValue存储本次需要执行任务的详细内容


删除记录

• 根据传入id值,从ZSet和Hash删除记录


使用lua脚本:

由于同时操作两个key,为了需要保证事物性,需要使用脚本

详细的实现Lua脚本如下:

add.lua:添加任务

-- save
-- hash info
local hashKey = KEYS[1]
local hashField = KEYS[2]
local hashFieldValue = KEYS[3]
-- zset info
local zSetKey = KEYS[4]
local zSetScore = KEYS[5]
local zSetMember = KEYS[6]

-- save hash
local result_1 = redis.call('HSET', hashKey, hashField, hashFieldValue)
-- save zset
local result_2 = redis.call('ZADD', zSetKey, zSetScore, zSetMember)
return result_1 + result_2


querycontents.lua :获取需要执行的任务

-- querycontents

-- ZSET key
local zSetKey = KEYS[1]
local zSetMin = KEYS[2]
local zSetMax = KEYS[3]
-- hash
local hashKey = KEYS[4]

-- run ZRANGEBYSCORE  : 获取所有已经到了需要执行的定时任务
local zSetValues = redis.call('ZRANGEBYSCORE', zSetKey, zSetMin, zSetMax)
local rtnContentTables = {}
for k, v in pairs(zSetValues) do
-- run HGET : 获取定时任务的内容值
local hashField = v
local hashValue = redis.call('HGET', hashKey, hashField)
table.insert(rtnContentTables,hashValue)
redis.log(redis.LOG_DEBUG,hashField)
end
return rtnContentTables


batchdel.lua: 删除记录

-- del key

local result = 0
-- 参数的传入的规律:4个一组
for k, v in pairs(KEYS) do
if(k % 4 == 1 ) then
-- hash
local hashKey = KEYS[k];
local hashField = KEYS[k+1]
-- zset
local zSetKey = KEYS[k+2]
local zSetMember = KEYS[k+3]
-- run del hash
local result_1 = redis.call('HDEL', hashKey, hashField)
-- run del zset
local result_2 = redis.call('ZREM', zSetKey, zSetMember)
result = result_1 + result_2
end
end
return result


TimedTaskService:具体实现

@Service
public class TimedTaskService implements ITimedTaskService{
private static final Logger logger = LoggerFactory.getLogger(TimedTaskService.class);
private final String TIMED_TASK_KEY_PREFIX = "timedTask_"; // 所有定时任务的前缀都是此值
private final String TIMED_TASK_KEY_CONTENT_PREFIX = "timedTaskContent_"; // 所有定时任务的具体内容的前缀

@Autowired
private StringRedisTemplate redisTemplate;

// 添加操作
private DefaultRedisScript<Long> addScript;

// 删除操作
private DefaultRedisScript<Long> batchDelScript;

// 查询
private DefaultRedisScript<List> querycontentsScript;

@PostConstruct
public void init() {
// Lock script
addScript = new DefaultRedisScript<Long>();
addScript.setScriptSource(
new ResourceScriptSource(new ClassPathResource("com/hry/spring/redis/timedtask/add.lua")));
addScript.setResultType(Long.class);
// unlock script
batchDelScript = new DefaultRedisScript<Long>();
batchDelScript.setScriptSource(
new ResourceScriptSource(new ClassPathResource("com/hry/spring/redis/timedtask/batchdel.lua")));
batchDelScript.setResultType(Long.class);
// query script
querycontentsScript = new DefaultRedisScript<List>();
querycontentsScript.setScriptSource(
new ResourceScriptSource(new ClassPathResource("com/hry/spring/redis/timedtask/querycontents.lua")));
querycontentsScript.setResultType(List.class);
}

@Override
public <T extends ITimedTaskModel> T add(String keySuffix, final Date executeTime,final T value){
Assert.notNull(keySuffix,"keySuffix can't be null!");
Assert.notNull(executeTime, "executeTime can't be null!");
Assert.notNull(value, "value can't be null!");
// 生成zset和hash的key值
final String zSetKey = generateTimedTaskZsetKey(keySuffix);
final String hashKey = generateTimedTaskHashContentKey(keySuffix);
// keyId将zset和hash关联起来,此值作为zset里的value,但是作为hash里的key值
final String id = UUID.randomUUID().toString() ;
value.setId(id);
// 封装参数
List<String> keyList = new ArrayList<String>();
// hash的操作参数
keyList.add(hashKey); // hash key
keyList.add(id); // hash Field
keyList.add(JSON.toJSONString(value)); // hash Field Value
// zset的操作参数
keyList.add(zSetKey); // zSetKey
keyList.add(String.valueOf(executeTime.getTime())); // zSetScore
keyList.add(id); // zSetMember
Long result = redisTemplate.execute(addScript, keyList);
logger.info("add 执行[{}],返回[{}]", JSON.toJSONString(value), result);
return value;
}

@Override
public void bathDel(String keySuffix, final String... ids){
final String zSetKey = generateTimedTaskZsetKey(keySuffix);
final String hashKey = generateTimedTaskHashContentKey(keySuffix);

List<String> keyList = new ArrayList<String>();
for(String id : ids){
// hash
keyList.add(hashKey);
keyList.add(id);
// zset
keyList.add(zSetKey);
keyList.add(id);
}
if(keyList.size() > 0){
Long result = redisTemplate.execute(batchDelScript, keyList);
logger.info("bathDel 执行keySuffix[{}],value[{}],返回[{}]", keySuffix, Arrays.toString(ids), result);
}
}

@Override
public <T extends ITimedTaskModel> List<T> getTimedTaskContent(String keySuffix, Class<T> cls){
List<T> rtnList = new ArrayList<T>();
final String zSetKey = generateTimedTaskZsetKey(keySuffix);
final String hashKey = generateTimedTaskHashContentKey(keySuffix);
// 获取所有已经到了需要执行的定时任务
List<String> keyList = new ArrayList<String>();
// zset
keyList.add(zSetKey);
keyList.add(String.valueOf(Long.MIN_VALUE));
keyList.add(String.valueOf(System.currentTimeMillis()));
// hashkey
keyList.add(hashKey);

if(keyList.size() > 0){
List resultList = redisTemplate.execute(querycontentsScript, keyList);
for(Object o : resultList){
logger.info("read content = {}", o.toString());
rtnList.add(JSON.parseObject(o.toString(), cls));
}
}
return rtnList;
}

/**
* 获取定时任务排序的key值
* @param keySuffix
* @return
*/
private String generateTimedTaskZsetKey(String keySuffix){
StringBuilder sb = new StringBuilder();
sb.append(TIMED_TASK_KEY_PREFIX);
sb.append(keySuffix);
return sb.toString();
}

/**
* 获取定时任务排序的保存内容的值
* @param keySuffix
* @return
*/
private String generateTimedTaskHashContentKey(String keySuffix){
StringBuilder sb = new StringBuilder();
sb.append(TIMED_TASK_KEY_CONTENT_PREFIX);
sb.append(keySuffix);
return sb.toString();
}

}


2.3. ITimedTaskModel

定时任务的mode必须实现此接口

public interface ITimedTaskModel {

String getId();

void setId(String id);
}


2.4. TimedTaskEnum

我们的定时任务需要处理不同类型的任务,这里通过枚举类定义不同的任务类型。类包括类型id,名称以及存储到redis中key的后缀

public enum TimedTaskEnum {
ONCE_RUN(1, "OnceRun", "cdrs"), REPUSH(2, "Repush", "repush");

private int id; // 类型id
private String name; // 名称
private String keySuffix; // 存储到redis中key的后缀

private TimedTaskEnum(int id, String name, String keySuffix){
this.id = id;
this.name = name;
this.keySuffix = keySuffix;
}

public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getKeySuffix() {
return keySuffix;
}

public void setKeySuffix(String keySuffix) {
this.keySuffix = keySuffix;
}

public int getId() {
return id;
}

public void setId(int id) {
this.id = id;
}
}


2.5. 测试类TimedTaskServiceTest

此类除了演示我们的功能外,还演示的如何简单应用TimedTaskService。

TestModel:

// 实现 ITimedTaskModel 定时我们的任务
class TestModel implements ITimedTaskModel{
private String id;

@Override
public String getId() {
return id;
}

@Override
public void setId(String id) {
this.id = id;
}

}


TimedTaskServiceTest : 测试类,模拟3个方法的调用

@RunWith(SpringRunner.class)
@SpringBootTest(classes=TimedTaskSimpleApplication.class)
public class TimedTaskServiceTest {
@Autowired
private ITimedTaskService timedTaskService;

private String keySuffix = "test";
private Date executeTime = new Date();
private TestModel testModel = new TestModel();

@Test
public void fullProcess(){
// add
timedTaskService.add(keySuffix, executeTime, testModel);
// query
List<TestModel> testModelList = timedTaskService.getTimedTaskContent(keySuffix, TestModel.class);
// del
for(TestModel model : testModelList){
System.out.println("--" + JSON.toJSONString(model));
timedTaskService.bathDel(keySuffix, model.getId());
}
}
}


经测试,符合我们的要求,打印信息略

3. 应用一:使用TimedTaskService实现一次的定时任务

本节应用TimedTaskService,实现“程序需要在N秒后执行特定任务,但是任务的参数由现在决定”

3.1. OnceRunModel

定义一次任务的model

public class OnceRunModel implements ITimedTaskModel {
private String id;
private String content;
// set/get
}


OnceRunService

一次任务的服务类

@Component
public class OnceRunService implements IOnceRunService {

private String keySuffix = TimedTaskEnum.ONCE_RUN.getKeySuffix();

@Autowired
private ITimedTaskService timedTaskService;

@Override
public void save(OnceRunModel model, Date executeTime) {
Assert.notNull(model, "model can't be null!");
Assert.notNull(executeTime, "executeTime can't be null!");
// 保存到缓存
timedTaskService.add(keySuffix, executeTime, model);
}

@Override
public void delete(String id) {
Assert.notNull(id, "id can't be null!");
timedTaskService.bathDel(keySuffix, id);
}

@Override
public List<OnceRunModel> queryAll() {
List<OnceRunModel> list = timedTaskService.getTimedTaskContent(keySuffix, OnceRunModel.class);
return list;
}

}


3.2. OneRunConsumerJob

定时任务:定时从redis中获取任务,并执行任务

@Scheduled:这里使用spring的定时任务,此类使用方法见这篇文章 Spring @Async异步线程池用法总结

@Component
public class OneRunConsumerJob {
private static final Logger logger = LoggerFactory.getLogger(OneRunConsumerJob.class);

@Autowired
private IOnceRunService onceRunService;

// 接受数量
private int receiveCount = 0;

@Scheduled(initialDelay=3000, fixedRate=5000)
public void consumer(){
List<OnceRunModel> list = onceRunService.queryAll();
for(OnceRunModel model : list){
int newReceiverCount = receiveCount++;
logger.info("{}, 处理请求 :{}", newReceiverCount, model);
// 处理完后,删除记录
onceRunService.delete(model.getId());
}
}
}


3.3. TimedTaskSimpleApplication启动类

@SpringBootApplication(scanBasePackages={"com.hry.spring.redis.timedtask","com.hry.spring.redis.timedtask.simple"})
@EnableScheduling // 启动定时任务
public class TimedTaskSimpleApplication {
private static final Logger log = LoggerFactory.getLogger(TimedTaskSimpleApplication.class);

public static void main(String[] args) {
log.info("Start FirstApplication.. ");
SpringApplication.run(TimedTaskSimpleApplication.class, args);
}
}


3.4. 测试类OneRunTes

启动TimedTaskSimpleApplication后,执行OneRunTes,则生成任务,并加入redis中,OneRunConsumerJob 会定时从redis中获取并消费任务

@RunWith(SpringRunner.class)
@SpringBootTest(classes=TimedTaskSimpleApplication.class)
public class OneRunTes {

@Autowired
private IOnceRunService onceRunService;

@Test
public void producer(){
// 发送数量
int sendMaxCount = 10;
int newCount = 0;
while(newCount++ < sendMaxCount){
Date executeTime = new Date();

OnceRunModel model = new OnceRunModel();
model.setId(String.valueOf(newCount));
model.setContent("这是第"+ newCount + "次执行定时任务!");

onceRunService.save(model, executeTime);
}

}

}


4. 应用二:使用TimedTaskService实现任务重推功能

重推任务:我们向第三方发送话单,但是有可能推送失败,此时我们需要隔一段时间再重推。重推N次后,仍然失败,则不重推,标志无法推送。因为同一项目中重推又分为不同的子类型,不同子类型的处理的逻辑也不同。可以在ITimedTaskModel的实现类中定义type来进行区分,使用枚举RetryPushEnum 定义子类型。

4.1. RetryPushModel和RetryPushEnum

public class RetryPushModel implements ITimedTaskModel{
private String id; //
private String url; // 推送地址
private String content; // 内容
private Date pushNextTime; // 下次推送时间
private Integer pushTime; // 已经推送次数,值从1开始
private Integer type ; // 类型
// set/get略
}


/**
* 重推:向第三方发送信息,如果推送失败,则需要重新推送。每次重推需要间隔一段时间,最后推送N次
*  这里定义了重推的几种类型
* @author Administrator
*
*/
public enum RetryPushEnum {
SMS(1, "sms"), CDRS(2, "cdrs");

private int type; // 类型
private String name; // 名称

private RetryPushEnum(int type, String name){
this.type = type;
this.name = name;
}

public int getType() {
return type;
}
public void setType(int type) {
this.type = type;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}

public static RetryPushEnum getRetryPushEnum(int type){
switch(type){
case 1 : return SMS;
case 2 : return CDRS;
default : throw new IllegalArgumentException("unkow enum type = " + type);
}
}
}


4.2. RetryPushService

重推主类

@Service
public class RetryPushService implements IRetryPushService{

private String keySuffix = TimedTaskEnum.REPUSH.getKeySuffix();

@Autowired
private ITimedTaskService timedTaskService;

@Override
public void save(RetryPushModel model) {
// 输入验证
Assert.notNull(model, "model can't be null!");
Assert.notNull(model.getUrl(), "url can't be null!");
Assert.notNull(model.getType(), "type cant't be null!");
Date executeTime = model.getPushNextTime();
Assert.notNull(executeTime, "executeTime can't be null!");
// 给一个默认值
model.setPushTime(model.getPushTime() == null ? 1 : model.getPushTime());
// 保存到缓存
model = timedTaskService.add(keySuffix, executeTime, model);
}

@Override
public void delete(String id) {
Assert.notNull(id, "id can't be null!");
timedTaskService.bathDel(keySuffix, id);
}

@Override
public List<RetryPushModel> queryAll() {
List<RetryPushModel> list = timedTaskService.getTimedTaskContent(
keySuffix, RetryPushModel.class);
// 按照时间排序
Collections.sort(list, new Comparator<RetryPushModel>() {
@Override
public int compare(RetryPushModel o1, RetryPushModel o2) {
if(o1 == null || o1.getPushNextTime() == null){
return -1;
}
if(o2 == null || o2.getPushNextTime() == null){
return 1;
}
return (int)(o1.getPushNextTime().getTime() - o1.getPushNextTime().getTime());
}
});
return list;
}

}


4.3. RepushConsumerMsg

在RetryPushService 的基础上,实现重推流程。在处理任务,模拟有一定概率推送失败,根据配置设置下次推送的时间,随着推送次数增加,推送的间隔也变长,且最多推送4次。

@Service
public class RepushConsumerMsg {
private static final Logger logger = LoggerFactory.getLogger(RepushRunConsumerJob.class);

@Autowired
private IRetryPushService retryPushService;

// 每次重推送的间隔:单位s
private List<Integer> retryIntervalSecondsList;

// 最大的重推次数
private Integer maxPushTime;

@PostConstruct
public void init(){
// 每次重推送的间隔:单位s
retryIntervalSecondsList = new ArrayList<Integer>();
retryIntervalSecondsList.add(5);
retryIntervalSecondsList.add(10);
retryIntervalSecondsList.add(15);

// 最大推送次数
maxPushTime = 4;
}

/**
* 模拟推送
*  1. 随机设置本次请求成功,如果推送成功,则此请求结束
*  2. 如果推送失败,则
*      a. 如果重推超过maxPushTime,则不在推送
*      b. 如果重推不超过maxPushTime,则设置下次推送时间,则储存到redis中,等待下次执行
* @param model
*/
public void execute(RetryPushModel model){
int time = model.getPushTime() == null ? 1: (model.getPushTime() < 1 ? 1 : model.getPushTime());
String url = model.getUrl();
// 这里使用随机函数进行判定如果 > 5,则认为发送到url成功
boolean isNeedRetry = true;
if(ThreadLocalRandom.current().nextInt(10) > 5){
// 模拟调用第三方url
logger.info("模拟调用[{}]成功,内容如下:[{}]", url, model);
isNeedRetry = false;
}else if(time > maxPushTime){
isNeedRetry = false;
logger.info("重推[{}]仍然失败,不再重推,接受者url[{}],内容信息如下[{}]", time, url, model);
}
if(isNeedRetry){
// 失败需要重推送
generateRetryPush(url, time, model.getContent(), model.getType());
}

}

/**
* 获取下次执行的时间
* @param time
* @return
*/
protected Date getPushNextTime(int time) {
long delayTime;
int size = retryIntervalSecondsList.size();
// 次数从1开始,索引从0开始
if(size >= time){
delayTime =  retryIntervalSecondsList.get(time-1) * 1000;
}else if(size > 0){
// 超过配置的次数,则延迟时间使用最后一次
delayTime = retryIntervalSecondsList.get(retryIntervalSecondsList.size() - 1) * 1000;
}else{
delayTime = 300 * 1000; // 延迟时间,默认5分钟
}
Date rtnDate = new Date(System.currentTimeMillis() + delayTime);
return rtnDate;
}

/**
* 生成重推消息
*  如果超过重推最大次数,则重推结束
* @param url
* @param time
* @param jsonBody
* @param sessionId
*/
protected void generateRetryPush(String url, int time, String jsonBody, int type) {
time++; //  // 推送次数+1
RetryPushModel model = new RetryPushModel();
model.setContent(time + "_" + jsonBody);
model.setType(type);
model.setUrl(url);
model.setPushTime(time+1);
model.setPushNextTime(getPushNextTime(time));
retryPushService.save(model);
}
}


4.4. RepushRunConsumerJob

使用RepushConsumerMsg 定时消费任务

@Component
public class RepushRunConsumerJob {
private static final Logger logger = LoggerFactory.getLogger(RepushRunConsumerJob.class);

@Autowired
private RepushConsumerMsg repushConsumerMsg;

@Autowired
private IRetryPushService retryPushService;

// 接受数量
private int receiveCount = 0;

@Scheduled(initialDelay=3000, fixedRate=5000)
public void consumer(){
List<RetryPushModel> list = retryPushService.queryAll();
for(RetryPushModel model : list){
int newReceiverCount = receiveCount++;
// 根据不同的重推对象,进行不同的逻辑处理
RetryPushEnum retryPushEnum = RetryPushEnum.getRetryPushEnum(model.getType());
switch (retryPushEnum){
case SMS: repushConsumerMsg.execute(model); break;
case CDRS:repushConsumerMsg.execute(model); break;
default :
logger.error("类型[{}]没有处理对象,内容如下[{}]", model.getType(), retryPushEnum);
}
// 处理完后,删除记录
retryPushService.delete(model.getId());
logger.info("已经处理[{}]条记录", newReceiverCount);
}
}

}


4.5. 启动类TimedTaskRepushApplication

@SpringBootApplication(scanBasePackages={"com.hry.spring.redis.timedtask","com.hry.spring.redis.timedtask.repush"})
@EnableScheduling // 启动定时任务
public class TimedTaskRepushApplication {
private static final Logger log = LoggerFactory.getLogger(TimedTaskRepushApplication.class);

public static void main(String[] args) {
log.info("Start FirstApplication.. ");
SpringApplication.run(TimedTaskRepushApplication.class, args);
}
}


4.6. 测试类RepushRunTest

启动TimedTaskRepushApplication后,执行producerCdrs和producerSms,则生成两种子类型任务,并加入redis中,OneRunConsumerJob 会定时从redis中获取并消费任务

@RunWith(SpringRunner.class)
@SpringBootTest(classes=TimedTaskRepushApplication.class)
public class RepushRunTest {

@Autowired
private IRetryPushService retryPushService;

/**
* 生成生推话单的任务
*/
@Test
public void producerCdrs(){
// 发送数量
int sendMaxCount = 10;
int newCount = 1;
while(newCount++ < sendMaxCount){
Date executeTime = new Date();

RetryPushModel model = new RetryPushModel();
model.setId(String.valueOf(newCount));
model.setContent("这是第"+ newCount + "次执行定时任务 CDRS!");
model.setPushNextTime(executeTime);
model.setType(RetryPushEnum.CDRS.getType());
model.setUrl("http://127.0.0.1:test");
model.setPushTime(0);

retryPushService.save(model);
}
}

/**
* 生成生推短信的任务
*
*/
@Test
public void producerSms(){
// 发送数量
int sendMaxCount = 10;
int newCount = 1;
while(newCount++ < sendMaxCount){
Date executeTime = new Date();

RetryPushModel model = new RetryPushModel();
model.setId(String.valueOf(newCount));
model.setContent("这是第"+ newCount + "次执行定时任务 CDRS!");
model.setPushNextTime(executeTime);
model.setType(RetryPushEnum.SMS.getType());
model.setUrl("http://127.0.0.1:test");
model.setPushTime(0);

retryPushService.save(model);
}
}

}


5. 代码

本文代码见这里
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: