您的位置:首页 > 数据库 > Redis

使用redis 实现分布式锁,处理并发问题

2017-12-14 15:03 801 查看
处理逻辑,为了防止同一个订单,产生并发的问题,这里设置了一个锁,向redis 中加入以订单号为key的键值对,

每次执行订单处理时,会先判断redis 缓存中是否有这个key,

已存在的话,就挂起一段之间,重试5次,

如果在业务逻辑处理完,会删除redis 中的关于该订单的数据

import java.util.ArrayList;
import java.util.List;
import java.util.Set;

import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import com.suning.fsp.common.exception.AppException;

/**
* 一个接一个业务处理模版默认实现<br>
*
* <p>
* 用一个业务处理表记录,在处理前对锁状态进行判断 ,判断逻辑参见{@link #beforeInvoke}方法<br>
*
* 业务处理表: 业务类型 PK|业务ID PK|方法|创建时间<br>
*
*
*/
@Service("redisOneByOneTemplate")
public class RedisOneByOneTemplateImpl implements RedisOneByOneTemplate {

/** 业务正在处理中 */
public static final String ERROR_BIZ_PROCESSING = "error.biz.processing";

/** logger */
private static final Logger LOGGER = LoggerFactory.getLogger(RedisOneByOneTemplateImpl.class);

private static final String CACHE_PREFIX_KEY = "EPP:FSP:ONEBYONE:CACHE:";

private static final Long REDIS_OPT_RES_FAIL = 0L;

/**
* 并发时循环等待的最大次数
*/
private static final int REPEAT_MAX_COUNT = 5;

@Autowired
RedisOneByOneCacheDao redisOneByOneCacheDao;

/**
* {@inheritDoc}
*/
public <T> T execute(OneByOne oneByOne, CallBack<T> callBack) {
oneByOne.setDescription(oneByOne.getBizType() + "-" + oneByOne.getBizId() + "-" + oneByOne.getMethod());

try {
this.beforeInvoke(oneByOne);

return callBack.invoke();
} finally {
this.afterInvoke(oneByOne);
}
}

/**
* 查询所有的存在的OneByOne锁
* @return OneByOne锁列表
*/
@Override
public List<String> queryAllOneByOne() {
Set<String> values = redisOneByOneCacheDao.smembers(CACHE_PREFIX_KEY);
return new ArrayList<String>(values);
}

/**
* 清除所有的OneByOne锁
*/
@Override
public void clearOneByOne(List<String> values) {
if (CollectionUtils.isNotEmpty(values)) {
for (String value : values) {
redisOneByOneCacheDao.srem(CACHE_PREFIX_KEY, value);
}
}
}

/**
* 回调前置
*
* @param oneByOne 一个接一个处理记录
*/
private void beforeInvoke(final OneByOne oneByOne) {
int count = 0;
do {
try {
oneByOne.setInsertSuccess(true);

// 插入处理记录
Long addRes = redisOneByOneCacheDao.sadd(CACHE_PREFIX_KEY,
oneByOne.getBizType() + "|" + oneByOne.getBizId());
if (REDIS_OPT_RES_FAIL == addRes) {
oneByOne.setInsertSuccess(false);
LOGGER.info(oneByOne.getDescription() + "插入处理记录发生错误!");
}

} catch (Throwable t) {
oneByOne.setInsertSuccess(false);
LOGGER.error(oneByOne.getDescription() + "插入处理记录发生异常!t:{}", t);
}

if (!oneByOne.isInsertSuccess()) {

LOGGER.info(oneByOne.getDescription() + "插入处理记录失败!");

// 重试次数累加
count++;
if (count >= REPEAT_MAX_COUNT) {
// 如果插入失败,抛出AppException
throw new AppException(ERROR_BIZ_PROCESSING, oneByOne.getDescription() + "业务正在处理中");
}

// 等待一段时间
try {
// 每次等待时间拉长
Thread.sleep(2000);
} catch (InterruptedException e) {
LOGGER.info(oneByOne.getDescription() + " occur InterruptedException while wait.");
}
}
}
while (!oneByOne.isInsertSuccess());

}

/**
* 回调后置
*
* @param oneByOne 一个接一个处理记录
*/
private void afterInvoke(final OneByOne oneByOne) {
// 插入失败,不删除处理记录
if (!oneByOne.isInsertSuccess()) {
return;
}

try {
// 删除处理记录
Long res = redisOneByOneCacheDao.srem(CACHE_PREFIX_KEY,
oneByOne.getBizType() + "|" + oneByOne.getBizId());
if (res > 0) {
LOGGER.debug("Remove oneByOne success, the bizType is: {}, the bizId is: {}",
oneByOne.getBizType(), oneByOne.getBizId());
} else {
LOGGER.debug("No value exist in redis, the bizType is: {}, the bizId is: {}",
oneByOne.getBizType(), oneByOne.getBizId());
}

} catch (Throwable t) {
LOGGER.error(oneByOne.getDescription() + "删除处理记录失败!t:{}", t);
}
}
}


callback 接口

public interface CallBack<T> {

/**
* 调用
* @return 结果
*/
T invoke();

}


oneByone类

public class OneByOne {

/** 业务类型 */
private String bizType;

/** 业务ID */
private String bizId;

/** 方法 */
private String method;

/** 创建时间 */
private Date createTime;

private String description;

private boolean insertSuccess;

/**
* 同时最大等待执行的个数(默认为10个)
*/
private int maxWaitCount = 10;

/**
* 创建一个接一个处理记录
*
* @param bizType 业务类型
* @param bizId 业务ID
* @param method 方法
*/
public OneByOne(String bizType, String bizId, String method) {
this.bizType = bizType;
this.bizId = bizId;
this.method = method;
}

/**
* 创建一个接一个处理记录
*
* @param bizType 业务类型
* @param bizId 业务ID
* @param method 方法
* @param maxWaitCount 最大等待数
*/
public OneByOne(String bizType, String bizId, String method, int maxWaitCount) {
this.bizType = bizType;
this.bizId = bizId;
this.method = method;
this.setMaxWaitCount(maxWaitCount);
}

/**
* 获取业务类型
*
* @return 业务类型
*/
public String getBizType() {
return bizType;
}

/**
* 设置业务类型
*
* @param bizType 业务类型
*/
public void setBizType(String bizType) {
this.bizType = bizType;
}

/**
* 获取业务ID
*
* @return 业务ID
*/
public String getBizId() {
return bizId;
}

/**
* 设置业务ID
*
* @param bizId 业务ID
*/
public void setBizId(String bizId) {
this.bizId = bizId;
}

/**
* 获取方法
*
* @return 方法
*/
public String getMethod() {
return method;
}

/**
* 设置方法
*
* @param method 方法
*/
public void setMethod(String method) {
this.method = method;
}

/**
* 获取创建时间
*
* @return 创建时间
*/
public Date getCreateTime() {
return createTime;
}

/**
* 设置创建时间
*
* @param createTime 创建时间
*/
public void setCreateTime(Date createTime) {
this.createTime = createTime;
}

/**
* @return the description
*/
String getDescription() {
return description;
}

/**
* @param description the description to set
*/
void setDescription(String description) {
this.description = description;
}

/**
* @return the insertSuccess
*/
boolean isInsertSuccess() {
return insertSuccess;
}

/**
* @param insertSuccess the insertSuccess to set
*/
void setInsertSuccess(boolean insertSuccess) {
this.insertSuccess = insertSuccess;
}

public void setMaxWaitCount(int maxWaitCount) {
this.maxWaitCount = maxWaitCount;
}

public int getMaxWaitCount() {
return maxWaitCount;
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐