使用redis 实现分布式锁,处理并发问题
2017-12-14 15:03
801 查看
处理逻辑,为了防止同一个订单,产生并发的问题,这里设置了一个锁,向redis 中加入以订单号为key的键值对,
每次执行订单处理时,会先判断redis 缓存中是否有这个key,
已存在的话,就挂起一段之间,重试5次,
如果在业务逻辑处理完,会删除redis 中的关于该订单的数据
callback 接口
oneByone类
每次执行订单处理时,会先判断redis 缓存中是否有这个key,
已存在的话,就挂起一段之间,重试5次,
如果在业务逻辑处理完,会删除redis 中的关于该订单的数据
import java.util.ArrayList; import java.util.List; import java.util.Set; import org.apache.commons.collections.CollectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import com.suning.fsp.common.exception.AppException; /** * 一个接一个业务处理模版默认实现<br> * * <p> * 用一个业务处理表记录,在处理前对锁状态进行判断 ,判断逻辑参见{@link #beforeInvoke}方法<br> * * 业务处理表: 业务类型 PK|业务ID PK|方法|创建时间<br> * * */ @Service("redisOneByOneTemplate") public class RedisOneByOneTemplateImpl implements RedisOneByOneTemplate { /** 业务正在处理中 */ public static final String ERROR_BIZ_PROCESSING = "error.biz.processing"; /** logger */ private static final Logger LOGGER = LoggerFactory.getLogger(RedisOneByOneTemplateImpl.class); private static final String CACHE_PREFIX_KEY = "EPP:FSP:ONEBYONE:CACHE:"; private static final Long REDIS_OPT_RES_FAIL = 0L; /** * 并发时循环等待的最大次数 */ private static final int REPEAT_MAX_COUNT = 5; @Autowired RedisOneByOneCacheDao redisOneByOneCacheDao; /** * {@inheritDoc} */ public <T> T execute(OneByOne oneByOne, CallBack<T> callBack) { oneByOne.setDescription(oneByOne.getBizType() + "-" + oneByOne.getBizId() + "-" + oneByOne.getMethod()); try { this.beforeInvoke(oneByOne); return callBack.invoke(); } finally { this.afterInvoke(oneByOne); } } /** * 查询所有的存在的OneByOne锁 * @return OneByOne锁列表 */ @Override public List<String> queryAllOneByOne() { Set<String> values = redisOneByOneCacheDao.smembers(CACHE_PREFIX_KEY); return new ArrayList<String>(values); } /** * 清除所有的OneByOne锁 */ @Override public void clearOneByOne(List<String> values) { if (CollectionUtils.isNotEmpty(values)) { for (String value : values) { redisOneByOneCacheDao.srem(CACHE_PREFIX_KEY, value); } } } /** * 回调前置 * * @param oneByOne 一个接一个处理记录 */ private void beforeInvoke(final OneByOne oneByOne) { int count = 0; do { try { oneByOne.setInsertSuccess(true); // 插入处理记录 Long addRes = redisOneByOneCacheDao.sadd(CACHE_PREFIX_KEY, oneByOne.getBizType() + "|" + oneByOne.getBizId()); if (REDIS_OPT_RES_FAIL == addRes) { oneByOne.setInsertSuccess(false); LOGGER.info(oneByOne.getDescription() + "插入处理记录发生错误!"); } } catch (Throwable t) { oneByOne.setInsertSuccess(false); LOGGER.error(oneByOne.getDescription() + "插入处理记录发生异常!t:{}", t); } if (!oneByOne.isInsertSuccess()) { LOGGER.info(oneByOne.getDescription() + "插入处理记录失败!"); // 重试次数累加 count++; if (count >= REPEAT_MAX_COUNT) { // 如果插入失败,抛出AppException throw new AppException(ERROR_BIZ_PROCESSING, oneByOne.getDescription() + "业务正在处理中"); } // 等待一段时间 try { // 每次等待时间拉长 Thread.sleep(2000); } catch (InterruptedException e) { LOGGER.info(oneByOne.getDescription() + " occur InterruptedException while wait."); } } } while (!oneByOne.isInsertSuccess()); } /** * 回调后置 * * @param oneByOne 一个接一个处理记录 */ private void afterInvoke(final OneByOne oneByOne) { // 插入失败,不删除处理记录 if (!oneByOne.isInsertSuccess()) { return; } try { // 删除处理记录 Long res = redisOneByOneCacheDao.srem(CACHE_PREFIX_KEY, oneByOne.getBizType() + "|" + oneByOne.getBizId()); if (res > 0) { LOGGER.debug("Remove oneByOne success, the bizType is: {}, the bizId is: {}", oneByOne.getBizType(), oneByOne.getBizId()); } else { LOGGER.debug("No value exist in redis, the bizType is: {}, the bizId is: {}", oneByOne.getBizType(), oneByOne.getBizId()); } } catch (Throwable t) { LOGGER.error(oneByOne.getDescription() + "删除处理记录失败!t:{}", t); } } }
callback 接口
public interface CallBack<T> { /** * 调用 * @return 结果 */ T invoke(); }
oneByone类
public class OneByOne { /** 业务类型 */ private String bizType; /** 业务ID */ private String bizId; /** 方法 */ private String method; /** 创建时间 */ private Date createTime; private String description; private boolean insertSuccess; /** * 同时最大等待执行的个数(默认为10个) */ private int maxWaitCount = 10; /** * 创建一个接一个处理记录 * * @param bizType 业务类型 * @param bizId 业务ID * @param method 方法 */ public OneByOne(String bizType, String bizId, String method) { this.bizType = bizType; this.bizId = bizId; this.method = method; } /** * 创建一个接一个处理记录 * * @param bizType 业务类型 * @param bizId 业务ID * @param method 方法 * @param maxWaitCount 最大等待数 */ public OneByOne(String bizType, String bizId, String method, int maxWaitCount) { this.bizType = bizType; this.bizId = bizId; this.method = method; this.setMaxWaitCount(maxWaitCount); } /** * 获取业务类型 * * @return 业务类型 */ public String getBizType() { return bizType; } /** * 设置业务类型 * * @param bizType 业务类型 */ public void setBizType(String bizType) { this.bizType = bizType; } /** * 获取业务ID * * @return 业务ID */ public String getBizId() { return bizId; } /** * 设置业务ID * * @param bizId 业务ID */ public void setBizId(String bizId) { this.bizId = bizId; } /** * 获取方法 * * @return 方法 */ public String getMethod() { return method; } /** * 设置方法 * * @param method 方法 */ public void setMethod(String method) { this.method = method; } /** * 获取创建时间 * * @return 创建时间 */ public Date getCreateTime() { return createTime; } /** * 设置创建时间 * * @param createTime 创建时间 */ public void setCreateTime(Date createTime) { this.createTime = createTime; } /** * @return the description */ String getDescription() { return description; } /** * @param description the description to set */ void setDescription(String description) { this.description = description; } /** * @return the insertSuccess */ boolean isInsertSuccess() { return insertSuccess; } /** * @param insertSuccess the insertSuccess to set */ void setInsertSuccess(boolean insertSuccess) { this.insertSuccess = insertSuccess; } public void setMaxWaitCount(int maxWaitCount) { this.maxWaitCount = maxWaitCount; } public int getMaxWaitCount() { return maxWaitCount; } }
相关文章推荐
- MVC 4中使用ServiceStack.Redis实现Redis队列【错误日志并发处理】
- Java之——redis并发读写锁,使用Redisson实现分布式锁
- 这是一个秒杀系统,即大量用户抢有限的商品,先到先得 用户并发访问流量非常大,需要分布式的机器集群处理请求 系统实现使用Java
- Redis总结(五)缓存雪崩和缓存穿透等问题 Web API系列(三)统一异常处理 C#总结(一)AutoResetEvent的使用介绍(用AutoResetEvent实现同步) C#总结(二)事件Event 介绍总结 C#总结(三)DataGridView增加全选列 Web API系列(二)接口安全和参数校验 RabbitMQ学习系列(六): RabbitMQ 高可用集群
- API接口非幂等性问题及使用redis实现简单的分布式锁
- redis使用乐观锁时处理竞争问题,高并发时失败率高如何解决
- 使用EF6和MVC5实现一个简单的选课系统--使用EF6处理并发操作(10/12)
- 使用scrapy,redis, mongodb实现的一个分布式网络爬虫
- Linux网络编程之socket:使用select函数实现并发处理
- Rails中实现后台处理:Redis, Sidekiq 使用总结
- 使用mc-tool实现empathy的自动登录与处理密钥环问题
- 分布式中使用Redis实现Session共享(二)
- 使用消息队列+js实现分布式服务器热切换业务处理功能
- (转)分布式中使用Redis实现Session共享(二)
- 使用 Redis 实现分布式系统轻量级协调技术
- 使用 Redis 实现分布式锁
- 使用dom4j工具包实现对xml文件的增删改查和乱码问题的处理
- 如何使用WCF服务实现分布式处理数据?
- 使用Spring实现异常统一处理【三】--java.lang.IllegalStateException: STREAM问题的解决
- 使用dom4j工具包实现对xml文件的增删改查和乱码问题的处理