您的位置:首页 > 编程语言

并发编程经历:同步加锁之业务锁

2016-03-30 12:06 357 查看
业务锁
在处理并发问题时,很多情况下需要用到业务锁来达到按照某个维度同步执行业务块。

以申请中心的venuscore后台申请报名为例:

@Override
@Transactional(rollbackFor = Exception.class, noRollbackFor = TerminateException.class)
public ApplyDO submitApply(ApplyDOapplyDO) {
LockResultEnum lockResultEnum =null;
String lockName =
new
StringBuffer().append(applyDO.getSite()).append("_").append(applyDO.getSiteMemId()).toString();
try {
//加锁
lockResultEnum =
lockManager.getLock(lockName, LockTypeEnum.APPLY_LOCK.getCode());
if (LockResultEnum.没有获取到锁.equals(lockResultEnum)){
throw
new
BizException(ErrorCode.LOCK_FAIL);
}

returnapplyDO;
} catch (TerminateExceptione) {
throwe;
} catch (BizExceptione) {
throw
new
BizException(e.getErrorCode(),e);
} catch (Exceptione) {
throw
new
BizException(ErrorCode.GENERIC_ERROR,e);
} finally {
//释放锁
lockManager.releaseLock(lockName, LockTypeEnum.APPLY_LOCK.getCode(),lockResultEnum);
}
}

LockManager的getLock方法实现如下:

@Override
public LockResultEnum getLock(StringlockName,StringlockType){
if(StringUtil.isEmpty(lockName)){
LOG.error("getLock()参数为空,param:" +lockName);
throw
new
BizException(ErrorCode.ILLEGAL_ARGUMENT,"参数为空!");

}
//只是生成一个数据库锁名,纯粹的字符串拼接过程
String lockName_ = getDBLockName(lockName,lockType);
booleanisGetDbLocked =lockDao.getDbLock(lockName_);
if (isGetDbLocked) {
LockDO lock =
lockDao.getRowLockByName(lockName);
if (lock !=null){
return LockResultEnum.获取锁成功;
} else {
return LockResultEnum.仅数据库锁;
}
} else {
LOG.warn("获取锁【" +lockName_+"】失败");
return LockResultEnum.没有获取到锁;
}
}

LockManager的releaseLock方法实现如下:

@Override
public
void
releaseLock(StringlockName,StringlockType,LockResultEnumlockResultEnum) {
String lockName_ = getDBLockName(lockName,lockType);
if (StringUtil.isEmpty(lockName)) {
LOG.error("releaseLock()参数为空,lockName:{}",lockName);
throw
new
BizException(ErrorCode.ILLEGAL_ARGUMENT,"参数为空!");
}
if (LockResultEnum.获取锁成功.equals(lockResultEnum)|| LockResultEnum.仅数据库锁.equals(lockResultEnum))
{
booleanisReleased =lockDao.releaseDbLock(lockName_);
if (!isReleased) {
LOG.warn("释放锁【" +lockName_+"】失败");
}
} else {
LOG.debug("不需要释放锁【" +lockName_+"】");
}
}

LockDao的实现如下:

@Override
public
boolean
getDbLock(String lockCode){
Long lock = (Long)super.getSqlMapClientTemplate().queryForObject("LockDO.getLockDbByCode",lockCode);
booleanresult = (lock !=null&&lock.longValue()==
1) ? true:false;
returnresult;
}
@Override
public
boolean
releaseDbLock(String lockCode) {
Long lock = (Long)super.getSqlMapClientTemplate().queryForObject("LockDO.releaseLockDbByCode",lockCode);
booleanresult = (lock !=null&&lock.longValue()==
1) ? true:false;
returnresult;
}
@Override
public LockDO getRowLockByName(Stringname) {
return (LockDO)super.getSqlMapClientTemplate().queryForObject("LockDO.selectForUpdateByLockName",name);
}

LockDao对应sqlMap文件里的执行sql如下:

<selectid="selectForUpdateByLockName"resultMap="jobLockMap" parameterClass="java.lang.String" >
select
ID, NAME, REMARK, IS_ENABLED
from VENUS_LOCK
where NAME = #value# and IS_ENABLED = 'y'
FOR UPDATE
</select>
<!-- 通过指定的代码取得操作数据锁-->
<selectid="getLockDbByCode"resultClass="java.lang.Long"parameterClass="string">
<![CDATA[
select get_lock(#value#, 0) as tolock;
]]>
</select>
<!-- 通过指定的代码释放操作数据锁-->
<selectid="releaseLockDbByCode"resultClass="java.lang.Long"parameterClass="string">
<![CDATA[
select release_lock(#value#) as torelease;
]]>
</select>

通过以上代码可以很清楚的看出原理了。贷款申请提交时,为了防止一个人同时提交多笔,要按照以人维度进行业务锁的加锁处理。加锁逻辑就是锁名和人直接挂钩(就是锁名里有可以直接区分人的字段),通过执行sql:select get_lock(#锁名#, 0) as tolock;来获取数据库锁,如果获取成功,返回1。这里还去获取了一下行锁,获取的行锁它锁住的是venus_lock表的符合where条件的那些行,执行sql: select ID, NAME, REMARK,IS_ENABLED
from VENUS_LOCK where NAME = #锁名#and IS_ENABLED = 'y' FOR UPDATE;这里行锁是否获取成功其实都没有关系。获取到锁之后就可以执行业务逻辑了,执行完一定要释放锁,执行sql:select release_lock(#锁名#) as torelease;为了保证释放锁操作一定执行,一般在finally子句中执行它即可。通过以上的步骤,当一个人同时申请多笔时,锁名是一样的,所以获取到锁后返回值就是1、2、3…具体看你是第几个获取的了,只有第一个获取的返回值是1,从lockDao
.getDbLock里的booleanresult = (lock !=null&&lock.longValue()==
1) ? true:false;就可以看出,只有第一个可以执行业务逻辑,其他就认为是没有获取到锁而抛出异常终止执行:if (LockResultEnum.没有获取到锁.equals(lockResultEnum)){thrownewBizException(ErrorCode.LOCK_FAIL);
}

还有一个例子:
下面的是任务分发器,它实现了Runnable接口,在任务分发器执行时会去获取各种异步任务类型的待执行任务列表,这里也用到了业务锁,调用的和上面的一样都是lockManager.getLock(...)方法。
public class JobDispatcher implements Runnable {

private static final Logger LOG = LoggerFactory.getLogger("applyCenterJobLog");

/** 守护线程名称 */

private String name;

/** 一天秒数 */

private static final long ONE_DAY_SEC = 24 * 60 * 60;

/** 线程池队列长度 */

private int queueSize = 5;

/** 初始处理线程数 */

private int coreSize = 5;

/** 最大处理线程数 */

private int maxSize = 5;

/** 空闲线程最大闲置时间 */

private long keepAliveTime = ONE_DAY_SEC;

/** 线程池接收新任务阀值 */

private int hungrySize = 2;

/** 分发器运行状态标记 */

private boolean isRunning = true;

/** 无命令处理时休息时常(毫秒) */

private long noCmdSleepMillis = 1000;

/** 出现系统异常时休息时常(毫秒),防止把系统拖垮 */

private long errorCmdSleepMillis = 10000;

private JobManager jobManager;

/** handler产生工厂类 */

private JobHandlerFactory jobHandlerFactory;

private List<String> jobTypeList;

/**

* spring init

*/

public void init() {

LOG.info("分发器【" + name + "】init!!!!!");

jobTypeList = jobHandlerFactory.getJobTypeList();

}

/**

* spring destroy

*/

public void destroy() {

LOG.warn("收到分发器【" + name + "】停止通知!!!!!");

isRunning = false;

}

@Override

public void run() {

LOG.info("分发器【" + name + "】启动ing...");

BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(queueSize);

ThreadPoolExecutor executor = new ThreadPoolExecutor(coreSize, maxSize, keepAliveTime, TimeUnit.SECONDS, queue);

while (isRunning) {

try {

int i = 0;

if (queue.size() < hungrySize) {

for (String jobType : jobTypeList) {
List<JobDO> jobDOList = jobManager.assignJob(jobType, queueSize - queue.size());

for (JobDO jobDO : jobDOList) {

i++;

JobHandler<JobDO> tmpJobHandler = jobHandlerFactory.getHandler(jobDO);

ExecuteJobThread<JobDO> executeCmdThread = new ExecuteJobThread<JobDO>(jobDO, tmpJobHandler);

executor.execute(executeCmdThread);

}

}

} else {

ThreadUtil.sleep(noCmdSleepMillis, LOG);

}

if (i == 0) {

ThreadUtil.sleep(noCmdSleepMillis, LOG);

} else {

i = 0;

}

} catch (Exception e) {

LOG.error("dispacher 调度异常" + e.getMessage(), e);

ThreadUtil.sleep(errorCmdSleepMillis, LOG);

}

}

executor.shutdown();

}

/**

* 执行分发

*/

public void dispatcher() {

Thread thread = new Thread(this);

isRunning = true;

thread.start();

}

...//一些set方法
}

jobManager的assignJob方法如下:

public List<JobDO> assignJob(String jobType, int jobNum) {

if (StringUtil.isBlank(jobType) || jobNum <= 0) {

LOG.error("assignJob()参数非法jobType:{},jobNum:{}", jobType, jobNum);

throw new BizException(ErrorCode.ILLEGAL_ARGUMENT, "参数非法!");

}

LockResultEnum lockResultEnum = null;

try {

/** 1、获取业务锁 */
//这里调用的lockManager.getLock(...)就是之前例子里的

lockResultEnum = lockManager.getLock(jobType, LockTypeEnum.JOB_LOCK.getCode());

if (!LockResultEnum.获取锁成功.equals(lockResultEnum)) {//返回emptylist,dispatcher会sleep一定时间,可配置

return new ArrayList<JobDO>(0);

}

return doAssignJob(jobType, jobNum);

} catch (Exception e) {

LOG.warn("获取锁失败", e);

} finally {

lockManager.releaseLock(jobType, LockTypeEnum.JOB_LOCK.getCode(), lockResultEnum);

}

return new ArrayList<JobDO>(0);

}

从上可见,这次是要获取数据库锁和行锁都成功才行: if (!LockResultEnum.获取锁成功.equals(lockResultEnum)) {return new ArrayList<JobDO>(0);}
所以需要在venus_lock表中有对应任务类型的数据,才能使sql:select ID, NAME, REMARK,IS_ENABLED from VENUS_LOCK where NAME = #锁名#and IS_ENABLED = 'y' FOR UPDATE;执行成功,获取到行锁。
以上是一个我在并发编程上的经历。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: