您的位置:首页 > 其它

基于session的事务管理组件实现与应用

2017-02-05 10:39 579 查看

背景

服务器开发中经常会遇到一些多个步骤连续处理的场景,而这种场景中只要有一个失败就需要整个流程失败以及进行相应的回滚操作,例如找回密码就是一个典型的场景,找回密码分多步骤进行,每个步骤失败预示着整个找回密码流程的失败,以及二维码扫描动作,一键注册动作等都是多步骤连环相扣的场景。基于此场景的考虑实现一个基于session的事务管理组件,他可以做到:事务状态记录,事务状态从某一处回滚,事务提交以及事务完成操作。

实现流程



代码实现

首先定义一个事务管理接口,包含事务管理的一些基本操作,例如:事务提交、回滚、完成等等
/**
* @author flykinghg
* @date 2015-1-5
* @time 上午11:35:54
*/
public interface TransactionManager {
/**
* 事务的提交
* @author flykinghg
* @date 2015-1-6
* @time 上午11:11:02
* @param status
*/
void commit(TransactionStatus status,boolean isSuccess);

/**
* 事务回滚
* @author flykinghg
* @date 2015-1-6
* @time 上午11:11:09
* @param status
*/
void rollback(TransactionStatus status);

/**
* 事务的完成
* @author flykinghg
* @date 2015-1-6
* @time 上午11:11:17
* @param status
*/
void complete(TransactionStatus status);
}
接下来就是它的一个实现类,事务的状态是其中非常关键的一个接口,下面再具体介绍
public class MzRedisTransactionManager implements TransactionManager {

@Override
public void commit(TransactionStatus transactionStatus,boolean isSuccess){
if(transactionStatus.isCommit()){
transactionStatus.setCurrentTransactionCellStatus(isSuccess);
}
}

@Override
public void rollback(TransactionStatus transactionStatus){
if(transactionStatus.isRollbackOnly()){
List<String> currentTransactionChain = transactionStatus.getCurrentTransactionChain();
if(currentTransactionChain == null){
return;
}

for(String transactionCell : currentTransactionChain){
transactionStatus.removeTransactionCell(transactionCell);
}
transactionStatus.removeCurrentTransactionChain();
}
}

@Override
public void complete(TransactionStatus transactionStatus) {
if(transactionStatus.isCompleted()){
transactionStatus.setRollbackOnly();
this.rollback(transactionStatus);
}
}

}
事务状态定义了状态的一些基本操作,例如状态设置,移除事务单元,查询当前状态等等,接口如下
import java.util.List;

/**
* 本事务操作组件支持基于session的事务管理
* <br/>事务单元:就是一个session中的多个操作步骤流程(例如验证手机号码就是找回密码里面的一个事务单元)
* <br/>事务:多个事务单元组成了一个事务(例如找回密码整个过程就是一个事务)
* @author flykinghg
* @date 2015-1-5
* @time 上午11:37:14
*/
public interface TransactionStatus extends CurrentPointContainer{

/**
* 设置回滚状态标记
* @author flykinghg
* @date 2015-1-6
* @time 上午11:11:47
*/
void setRollbackOnly();

/**
* 判断是否是回滚
* @author flykinghg
* @date 2015-1-6
* @time 上午11:11:57
* @return
*/
boolean isRollbackOnly();

/**
* 判断是否是事务完成
* @author flykinghg
* @date 2015-1-6
* @time 上午11:12:07
* @return
*/
boolean isCompleted();

/**
* 判断是否是事务提交
* @author flykinghg
* @date 2015-1-6
* @time 上午11:12:24
* @return
*/
boolean isCommit();

/**
* 设置事务提交
* @author flykinghg
* @date 2015-1-6
* @time 上午11:12:33
*/
void setCommit();

/**
* 设置当前事务链
* @author flykinghg
* @date 2015-1-6
* @time 上午11:12:42
*/
void setCurrentTransactionChain();

/**
* 获取当前事务链
* @author flykinghg
* @date 2015-1-6
* @time 上午11:12:53
* @return
*/
List<String> getCurrentTransactionChain();

/**
* 获取前一个事务单元状态
* @author flykinghg
* @date 2015-1-6
* @time 上午11:13:03
* @return
*/
boolean getPreTransactionCellStatus();

/**
* 设置当前事务单元状态
* @author flykinghg
* @date 2015-1-6
* @time 上午11:13:18
* @param status
*/
void setCurrentTransactionCellStatus(boolean status);

/**
* 移除事务单元
* @author flykinghg
* @date 2015-1-6
* @time 上午11:13:32
* @param transactionCell
*/
void removeTransactionCell(String transactionCell);

/**
* 移除事务链
* @author flykinghg
* @date 2015-1-6
* @time 上午11:13:46
*/
void removeCurrentTransactionChain();

/**
* 获取当前transactionStatusKey
* @author flykinghg
* @date 2015-1-6
* @time 下午12:44:39
*/
String getTransactionStatusKey();

/**
* 事务是否启动
* @author flykinghg
* @date 2015-1-7
* @time 下午1:03:07
* @return
*/
boolean isStart();

/**
* 当前事务是否和之前事务名称一致
* @author flykinghg
* @date 2015-4-24
* @time 下午12:48:23
* @param currentTansactionName
* @return
*/
boolean equalsWithPreTransaction(String currentTansactionName);
}
它的具体实现如下,其中状态链的设置是通过redis来实现的,而当前状态量是放在threadLocal里面的,供线程上下文使用
public class MzTransactionStatus implements TransactionStatus {
private final ThreadLocal<Object> currentPoint = new ThreadLocal<Object>();

private final String transactionStatusKey;

private final boolean start;

private final boolean completed;

private boolean rollbackOnly = false;

private boolean commit = false;

public MzTransactionStatus(String transactionStatusKey,boolean isStart, boolean isComplete){
this.transactionStatusKey = transactionStatusKey;
this.start = isStart;
this.completed = isComplete;
}

@Override
public void setCurrentPoint(Object currentPoint) {
this.currentPoint.set(currentPoint);
}

@Override
public Object getCurrentPoint() {
return this.currentPoint.get();
}

@Override
public void setRollbackOnly() {
this.rollbackOnly = true;
}

@Override
public boolean isRollbackOnly() {
return this.rollbackOnly;
}

@Override
public boolean isCompleted() {
return this.completed;
}

@Override
public void setCurrentTransactionChain() {
String key = TransactionKeyUtil.getCurrentTransactionChainKey(transactionStatusKey);
String currentTransactionChain = ActionLockContainer.getActionLockService().doGet(key);
if(currentTransactionChain == null){
currentTransactionChain = String.valueOf(this.getCurrentPoint());
}else{
currentTransactionChain = currentTransactionChain + "," + String.valueOf(this.getCurrentPoint());
}
ActionLockContainer.getActionLockService().doSet(key, currentTransactionChain);
}

@Override
public List<String> getCurrentTransactionChain() {
String key = TransactionKeyUtil.getCurrentTransactionChainKey(transactionStatusKey);
String currentTransactionChain = ActionLockContainer.getActionLockService().doGet(key);
if(currentTransactionChain == null){
return null;
}

String[] transactionChain = currentTransactionChain.split(",");
return Arrays.asList(transactionChain);
}

@Override
public boolean getPreTransactionCellStatus() {
List<String> currentTransactionChain = this.getCurrentTransactionChain();
if(currentTransactionChain == null){
return true;
}
String preTransactionCell = currentTransactionChain.get(currentTransactionChain.size() - 1);
String key = TransactionKeyUtil.getTransactionCellStatusKey(transactionStatusKey,preTransactionCell);
String transactionCellStatus = ActionLockContainer.getActionLockService().doGet(key);
return Boolean.parseBoolean(transactionCellStatus);
}

@Override
public void setCurrentTransactionCellStatus(boolean status) {
String currentTransactionCell = String.valueOf(this.getCurrentPoint());
String key = TransactionKeyUtil.getTransactionCellStatusKey(transactionStatusKey,currentTransactionCell);
ActionLockContainer.getActionLockService().doSet(key, String.valueOf(status));
}

@Override
public void removeTransactionCell(String transactionCell) {
String key = TransactionKeyUtil.getTransactionCellStatusKey(transactionStatusKey,transactionCell);
ActionLockContainer.getActionLockService().doDel(key);
}

@Override
public void removeCurrentTransactionChain() {
String key = TransactionKeyUtil.getCurrentTransactionChainKey(transactionStatusKey);
ActionLockContainer.getActionLockService().doDel(key);
}

@Override
public boolean isCommit() {
return this.commit;
}

@Override
public void setCommit() {
this.commit = true;
}

@Override
public String getTransactionStatusKey() {
return this.transactionStatusKey;
}

@Override
public boolean isStart() {
return this.start;
}

@Override
public boolean equalsWithPreTransaction(String currentTansactionName) {
List<String> currentTransactionChain = this.getCurrentTransactionChain();
if(currentTransactionChain == null){
return false;
}
String preTransactionCell = currentTransactionChain.get(currentTransactionChain.size() - 1);
return currentTansactionName.equals(preTransactionCell);
}

}

事务状态和事务管理被包装在一个事务信息类里面,从这个类里面获取到当前事务状态和管理的一些基础操作接口
public class MzTransactionInfo {
private final TransactionManager transactionManager;

private TransactionStatus transactionStatus;

public MzTransactionInfo(TransactionManager transactionManager){
this.transactionManager = transactionManager;
}

public TransactionStatus getTransactionStatus() {
return transactionStatus;
}

public void setTransactionStatus(TransactionStatus transactionStatus) {
this.transactionStatus = transactionStatus;
}

public TransactionManager getTransactionManager() {
return transactionManager;
}
}


再往上一层就是一个filter的辅助类handler,对接filter的一些基本操作实现,底层对接MzTransactionInfo

/**
* @author flykinghg
* @date 2015-1-5
* @time 下午3:24:57
*/
public class TransactionFilterHandler {
private final static Logger logger = LoggerFactory
.getLogger(TransactionFilterHandler.class);

/**
* 保存token
* @author flykinghg
* @date 2015-1-6
* @time 上午11:05:12
* @param sessionId
* @param token
*/
private void setToken(String sessionId, String token){
String key = TransactionKeyUtil.getTransactionTokenKey(sessionId);
ActionLockContainer.getActionLockService().doSet(key, token);
}

/**
* 获取token
* @author flykinghg
* @date 2015-1-6
* @time 上午11:05:22
* @param sessionId
* @return
*/
private String getToken(String sessionId){
String key = TransactionKeyUtil.getTransactionTokenKey(sessionId);
return ActionLockContainer.getActionLockService().doGet(key);
}

/**
* 删除token,在rollback以及complete的时候才执行
* @author flykinghg
* @date 2015-1-6
* @time 下午12:42:02
* @param sessionId
*/
private void removeToken(String sessionId){
String key = TransactionKeyUtil.getTransactionTokenKey(sessionId);
ActionLockContainer.getActionLockService().doDel(key);
}

/**
* 事务回滚,从当前事务作为起始点回滚至最开始的状态
* @author flykinghg
* @date 2015-1-6
* @time 上午11:05:30
* @param mzTransactionInfo
*/
private void doRollBack(MzTransactionInfo mzTransactionInfo){
mzTransactionInfo.getTransactionStatus().setRollbackOnly();
mzTransactionInfo.getTransactionManager().rollback(mzTransactionInfo.getTransactionStatus());
this.removeToken(mzTransactionInfo.getTransactionStatus().getTransactionStatusKey());
this.removeTransactionStatus(mzTransactionInfo);
}

/**
* 事务回滚
* @author flykinghg
* @date 2015-1-7
* @time 下午1:21:55
* @param mzTransactionInfo
*/
public void rollBack(MzTransactionInfo mzTransactionInfo){
this.doRollBack(mzTransactionInfo);
}

/**
* 当事务启动时需要检查是否需要回滚之前的操作
* @author flykinghg
* @date 2015-1-6
* @time 下午7:41:05
* @param mzTransactionInfo
*/
private void doRollBackIfNecessary(MzTransactionInfo mzTransactionInfo){
this.doRollBack(mzTransactionInfo);
}

/**
* 获取sessionId
* @author flykinghg
* @date 2015-1-7
* @time 上午11:50:21
* @param request
* @return
*/
public String getSessionId(HttpServletRequest request){
Cookie[] cookies = request.getCookies();
String sessionid = "";
if(cookies == null){
sessionid = request.getSession().getId();
return sessionid;
}

for (int i = 0; i < cookies.length; i++) {
Cookie c = cookies[i];
if (c.getName().equalsIgnoreCase("JSESSIONID")) {
sessionid = c.getValue();
}
}
if(StringUtil.isEmpty(sessionid)){
sessionid = request.getSession().getId();
}else if(sessionid.contains(".")){
sessionid = StringUtils.substring(sessionid, 0, sessionid.indexOf("."));
}
return sessionid;

}

/**
* 提交事务
* @author flykinghg
* @date 2015-1-7
* @time 下午1:20:28
* @param mzTransactionInfo
* @param filter
* @param request
*/
public void commit(MzTransactionInfo mzTransactionInfo, AbstractTransactionFilter filter, boolean isSuccess
,HttpServletRequest request){
boolean isStart = mzTransactionInfo.getTransactionStatus().isStart();
boolean isComplete = mzTransactionInfo.getTransactionStatus().isCompleted();
String sessionId = mzTransactionInfo.getTransactionStatus().getTransactionStatusKey();
String currentTransactionName = String.valueOf(mzTransactionInfo.getTransactionStatus().getCurrentPoint());

if(!isStart){
if (isComplete) {
this.doComplete(mzTransactionInfo);
} else {
this.doCommit(mzTransactionInfo, isSuccess);
this.setToken(sessionId, filter.getTransactionToken(request, currentTransactionName, sessionId));
}
}else{
this.doRollBackIfNecessary(mzTransactionInfo);
}
}

/**
* 将当前请求事务单元进行提交
* @author flykinghg
* @date 2015-1-6
* @time 上午11:06:30
* @param mzTransactionInfo
*/
private void doCommit(MzTransactionInfo mzTransactionInfo, boolean isSuccess){
mzTransactionInfo.getTransactionStatus().setCommit();
mzTransactionInfo.getTransactionManager().commit(mzTransactionInfo.getTransactionStatus(), isSuccess);
this.removeTransactionStatus(mzTransactionInfo);
}

/**
* 将整条事务提交完成,实际上事务完成后仍然保持最初的状态,所以最终仍然使用回滚操作
* 它和回滚不同的是只有在事务链处理完成后才启用
* @author flykinghg
* @date 2015-1-6
* @time 上午11:06:50
* @param mzTransactionInfo
*/
private void doComplete(MzTransactionInfo mzTransactionInfo){
mzTransactionInfo.getTransactionManager().complete(mzTransactionInfo.getTransactionStatus());
this.removeToken(mzTransactionInfo.getTransactionStatus().getTransactionStatusKey());
this.removeTransactionStatus(mzTransactionInfo);
}

/**
* 创建事务状态,每次事务单元均会创建一次事务状态
* <br/>事务状态伴随事务的回滚、提交、完成而终结
* @author flykinghg
* @date 2015-1-6
* @time 上午11:08:22
* @param mzTransactionInfo
* @param transactionStatusKey
*/
public void createTransactionStatus(MzTransactionInfo mzTransactionInfo,String transactionStatusKey, boolean isStart, boolean isComplete){
mzTransactionInfo.setTransactionStatus(new MzTransactionStatus(transactionStatusKey, isStart, isComplete));
}

/**
* 获取前一个事务单元的状态,每次事务请求到来之前都需要判断前一次事务单元是否成功
* @author flykinghg
* @date 2015-1-6
* @time 上午11:09:11
* @param mzTransactionInfo
* @return
*/
private boolean getPreTransactionCellStatus(MzTransactionInfo mzTransactionInfo){
return mzTransactionInfo.getTransactionStatus().getPreTransactionCellStatus();
}

/**
* 设置当前事务点,以及添加到事务链
* @author flykinghg
* @date 2015-1-6
* @time 上午11:09:48
* @param mzTransactionInfo
* @param currentTransaction
*/
private void setCurrentTransaction(MzTransactionInfo mzTransactionInfo,String currentTransaction){
mzTransactionInfo.getTransactionStatus().setCurrentPoint(currentTransaction);
mzTransactionInfo.getTransactionStatus().setCurrentTransactionChain();
}

/**
* 移除事务状态
* @author flykinghg
* @date 2015-1-6
* @time 上午11:10:41
* @param mzTransactionInfo
*/
private void removeTransactionStatus(MzTransactionInfo mzTransactionInfo){
mzTransactionInfo.setTransactionStatus(null);
}

/**
* 事务执行前的校验
* @author flykinghg
* @date 2015-1-7
* @time 下午1:14:13
* @param mzTransactionInfo
* @param currentTransactionName
* @param filter
* @param request
* @throws MzTransactionException
*/
public void beforeTransactionVerify(MzTransactionInfo mzTransactionInfo, String currentTransactionName,
AbstractTransactionFilter filter, HttpServletRequest request) throws MzTransactionException{
boolean isStart = mzTransactionInfo.getTransactionStatus().isStart();
String sessionId = mzTransactionInfo.getTransactionStatus().getTransactionStatusKey();

if(!isStart){
String token = this.getToken(sessionId);
boolean isValidTransaction = filter.isValidTransaction(request, token, currentTransactionName);
if (!isValidTransaction) {
throw new MzTransactionException("not valid transaction");
}

boolean preTransactionCellStatus = this.getPreTransactionCellStatus(mzTransactionInfo);
if(!preTransactionCellStatus && !mzTransactionInfo.getTransactionStatus().equalsWithPreTransaction(currentTransactionName)){
throw new MzTransactionException("pre transaction status is not ok");
}

this.setCurrentTransaction(mzTransactionInfo, currentTransactionName);
}
}
}
最后再看对接应用层的filter实现
/**
* @author flykinghg
* @date 2015-1-5
* @time 上午11:05:23
*/
public abstract class AbstractTransactionFilter implements Filter {
private final static Logger logger = LoggerFactory
.getLogger(AbstractTransactionFilter.class);

private String cookieDomain = "";

private String cookiePath = "/";

private TransactionFilterHandler transactionFilterHandler;
private String redirectUrlOnException;
private String completeUri;
private String startUri;

@Override
public void init(FilterConfig config) throws ServletException {
transactionFilterHandler = new TransactionFilterHandler();
redirectUrlOnException = config
.getInitParameter("redirectUrlOnException");
completeUri = config.getInitParameter("completeUri");
startUri = config.getInitParameter("startUri");
cookieDomain = config.getInitParameter("cookieDomain");
}

@Override
public void doFilter(ServletRequest servletRequest,
ServletResponse servletResponse, FilterChain filterChain)
throws IOException, ServletException {
HttpServletRequest request = (HttpServletRequest) servletRequest;
HttpServletResponse response = (HttpServletResponse) servletResponse;
Cookie cookies[] = request.getCookies();

String transactionId = TransactionCookieUtil.getTransactionId(cookies);

if (transactionId == null || transactionId.length() == 0) {
transactionId = java.util.UUID.randomUUID().toString();
response.addHeader("Set-Cookie", TransactionCookieUtil.transactionIdName + "=" + transactionId
+ ";domain=" + this.cookieDomain + ";Path="
+ this.cookiePath + ";HTTPOnly");
}

String uri = request.getRequestURI();
boolean isStart = uri.equals(startUri);
boolean isComplete = uri.equals(completeUri);
String currentTransactionName = uri.substring(uri.lastIndexOf("/") + 1);
MzTransactionInfo mzTransactionInfo = new MzTransactionInfo(
new MzRedisTransactionManager());

try {
this.transactionFilterHandler.createTransactionStatus(mzTransactionInfo, transactionId, isStart, isComplete);

this.transactionFilterHandler.beforeTransactionVerify(mzTransactionInfo, currentTransactionName, this, request);

filterChain.doFilter(request, response);

//判断response的error status是否存在,如果存在说明有异常信息,此次步骤不能提交
if(response.containsHeader("est")){
this.transactionFilterHandler.commit(mzTransactionInfo, this, false, request);
return;
}

} catch (MzTransactionException e) {
logger.error("sessionId is "+ transactionId +" ,currentTransactionName is " + currentTransactionName + " ,error message is "+e.getMessage());
if(this.isRedirectOnExceptionForCurrentTransaction(currentTransactionName)){
this.transactionFilterHandler.rollBack(mzTransactionInfo);
response.sendRedirect(redirectUrlOnException);
}else{
this.transactionFilterHandler.commit(mzTransactionInfo, this, false, request);
}
return;
}

this.transactionFilterHandler.commit(mzTransactionInfo, this, true, request);

}

/**
* 验证该事务请求是否合法
* @author flykinghg
* @date 2015-1-6
* @time 上午11:04:20
* @param request
* @param token
* @param currentTransactionName
* @return
*/
public abstract boolean isValidTransaction(HttpServletRequest request,String token,String currentTransactionName);

/**
* 判断该节点是否适合重定向
* @author flykinghg
* @date 2015-4-25
* @time 下午3:33:46
* @param currentTransactionName
* @return
*/
public abstract boolean isRedirectOnExceptionForCurrentTransaction(String currentTransactionName);

/**
* 有客户端给出一个事务请求的token,供每次请求验证使用
* @author flykinghg
* @date 2015-1-6
* @time 上午11:04:34
* @param request
* @param currentTransactionName
* @param sessionId
* @return
*/
public abstract String getTransactionToken(HttpServletRequest request, String currentTransactionName, String sessionId);

@Override
public void destroy() {
this.completeUri = null;
this.redirectUrlOnException = null;
this.transactionFilterHandler = null;
}

}
一个基于session的事务管理器就如上所示,有问题大家可以留言交流。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  事务 session 分布式
相关文章推荐