您的位置:首页 > 数据库

hsqldb源码分析系列6之事务处理

2013-01-20 15:16 423 查看
在session的 public Result executeCompiledStatement(Statement cs, Object[] pvals)方法执行中会处理事务

boolean isTX = cs.isTransactionStatement();

if (!isTX) {
if (database.logger.getSqlEventLogLevel()
>= SimpleLog.LOG_NORMAL) {
sessionContext.setDynamicArguments(pvals);
database.logger.logStatementEvent(this, cs, pvals,
SimpleLog.LOG_NORMAL);
}

r                               = cs.execute(this);
sessionContext.currentStatement = null;

return r;
}

while (true) {
actionIndex = rowActionList.size();

database.txManager.beginAction(this, cs);

cs = sessionContext.currentStatement;

if (cs == null) {
return Result.newErrorResult(Error.error(ErrorCode.X_07502));
}

if (abortTransaction) {
rollback(false);

sessionContext.currentStatement = null;

return Result.newErrorResult(Error.error(ErrorCode.X_40001));
}

try {
latch.await();
} catch (InterruptedException e) {
abortTransaction = true;
}

if (abortTransaction) {
rollback(false);

sessionContext.currentStatement = null;

return Result.newErrorResult(Error.error(ErrorCode.X_40001));
}

database.txManager.beginActionResume(this);

//        tempActionHistory.add("sql execute " + cs.sql + " " + actionTimestamp + " " + rowActionList.size());
sessionContext.setDynamicArguments(pvals);

if (database.logger.getSqlEventLogLevel()
>= SimpleLog.LOG_NORMAL) {
database.logger.logStatementEvent(this, cs, pvals,
SimpleLog.LOG_NORMAL);
}

r             = cs.execute(this);
lockStatement = sessionContext.currentStatement;

//        tempActionHistory.add("sql execute end " + actionTimestamp + " " + rowActionList.size());
endAction(r);

if (abortTransaction) {
rollback(false);

sessionContext.currentStatement = null;

return Result.newErrorResult(Error.error(r.getException(),
ErrorCode.X_40001, null));
}

if (redoAction) {
redoAction = false;

try {
latch.await();
} catch (InterruptedException e) {
abortTransaction = true;
}
} else {
break;
}
}

if (sessionContext.depth == 0
&& (sessionContext.isAutoCommit.booleanValue()
|| cs.isAutoCommitStatement())) {
try {
if (r.mode == ResultConstants.ERROR) {
rollback(false);
} else {
commit(false);
}
} catch (Exception e) {
sessionContext.currentStatement = null;

return Result.newErrorResult(Error.error(ErrorCode.X_40001,
e));
}
}

sessionContext.currentStatement = null;

return r;


database.txManager.beginAction(this, cs); 事务开始前需要先对当前的statement cs需要操作的表进行加锁,读表加读锁,写的需要加写锁

这是两阶段提交协议的事务实现TransactionManager2PL的实现

public void beginAction(Session session, Statement cs) {

if (session.hasLocks(cs)) {
return;
}

writeLock.lock();

try {
if (cs.getCompileTimestamp()
< database.schemaManager.getSchemaChangeTimestamp()) {
cs = session.statementManager.getStatement(session, cs);
session.sessionContext.currentStatement = cs;

if (cs == null) {
return;
}
}

boolean canProceed = setWaitedSessionsTPL(session, cs);

if (canProceed) {
if (session.tempSet.isEmpty()) {
lockTablesTPL(session, cs);

// we don't set other sessions that would now be waiting for this one too
// next lock release will do it
} else {
setWaitingSessionTPL(session);
}
}
} finally {
writeLock.unlock();
}
}


session.hasLocks(cs) 判断是否当前的statement是否已经加了锁了

public boolean hasLocks(Statement statement) {

if (lockStatement == statement) {
if (isolationLevel == SessionInterface.TX_REPEATABLE_READ
|| isolationLevel == SessionInterface.TX_SERIALIZABLE) {
return true;
}

if (statement.getTableNamesForRead().length == 0) {
return true;
}
}

return false;
}


boolean canProceed = setWaitedSessionsTPL(session, cs); 判断下当前Statement对读写的表是否能够加锁成功,temSet保存每个session依赖的其他session的锁,checkDeadlock(session, session.tempSet)检测下session依赖的锁是否会出现死锁,如果加锁失败则设置session.abortTransaction = true;,后面就直接回滚事务。

boolean setWaitedSessionsTPL(Session session, Statement cs) {

session.tempSet.clear();

if (cs == null) {
return true;
}

if (session.abortTransaction) {
return false;
}

HsqlName[] nameList = cs.getTableNamesForWrite();

for (int i = 0; i < nameList.length; i++) {
HsqlName name = nameList[i];

if (name.schema == SqlInvariants.SYSTEM_SCHEMA_HSQLNAME) {
continue;
}

Session holder = (Session) tableWriteLocks.get(name);

if (holder != null && holder != session) {
session.tempSet.add(holder);
}

Iterator it = tableReadLocks.get(name);

while (it.hasNext()) {
holder = (Session) it.next();

if (holder != session) {
session.tempSet.add(holder);
}
}
}

nameList = cs.getTableNamesForRead();

if (txModel == TransactionManager.MVLOCKS && session.isReadOnly()) {
nameList = catalogNameList;
}

for (int i = 0; i < nameList.length; i++) {
HsqlName name = nameList[i];

if (name.schema == SqlInvariants.SYSTEM_SCHEMA_HSQLNAME) {
continue;
}

Session holder = (Session) tableWriteLocks.get(name);

if (holder != null && holder != session) {
session.tempSet.add(holder);
}
}

if (session.tempSet.isEmpty()) {
return true;
}

if (checkDeadlock(session, session.tempSet)) {
return true;
}

session.tempSet.clear();

session.abortTransaction = true;

return false;
}


boolean checkDeadlock(Session session, OrderedHashSet newWaits) {

int size = session.waitingSessions.size();

for (int i = 0; i < size; i++) {
Session current = (Session) session.waitingSessions.get(i);

if (newWaits.contains(current)) {
return false;
}

if (!checkDeadlock(current, newWaits)) {
return false;
}
}

return true;
}


lockTablesTPL(session, cs);就是对读的表加读锁,对写的表加写锁, setWaitingSessionTPL(session); 把依赖当前session的锁的session加到当前session的等待session队列中,

void lockTablesTPL(Session session, Statement cs) {

if (cs == null || session.abortTransaction) {
return;
}

HsqlName[] nameList = cs.getTableNamesForWrite();

for (int i = 0; i < nameList.length; i++) {
HsqlName name = nameList[i];

if (name.schema == SqlInvariants.SYSTEM_SCHEMA_HSQLNAME) {
continue;
}

tableWriteLocks.put(name, session);
}

nameList = cs.getTableNamesForRead();

for (int i = 0; i < nameList.length; i++) {
HsqlName name = nameList[i];

if (name.schema == SqlInvariants.SYSTEM_SCHEMA_HSQLNAME) {
continue;
}

tableReadLocks.put(name, session);
}
}


加锁失败就执行

if (abortTransaction) {
rollback(false);

sessionContext.currentStatement = null;

return Result.newErrorResult(Error.error(ErrorCode.X_40001));
}


database.txManager.beginActionResume(this);设置当前session的事务执行时间和事务数

public void beginActionResume(Session session) {

session.actionTimestamp = nextChangeTimestamp();

if (!session.isTransaction) {
session.transactionTimestamp = session.actionTimestamp;
session.isTransaction        = true;

transactionCount++;
}

return;
}


cs.execute(this);执行完之后执行endAction(r); 如果执行失败就rollback,成功就commit

public void endAction(Result result) {

//        tempActionHistory.add("endAction " + actionTimestamp);
sessionData.persistentStoreCollection.clearStatementTables();

if (result.mode == ResultConstants.ERROR) {
sessionData.persistentStoreCollection.clearResultTables(
actionTimestamp);
database.txManager.rollbackAction(this);
} else {
sessionContext
.diagnosticsVariables[ExpressionColumn.idx_row_count] =
result.mode == ResultConstants.UPDATECOUNT
? Integer.valueOf(result.getUpdateCount())
: ValuePool.INTEGER_0;

database.txManager.completeActions(this);
}

//        tempActionHistory.add("endAction ends " + actionTimestamp);
}


先看看completeActions(this);就是释放锁,然后唤醒等待该session的锁的其他session

void endActionTPL(Session session) {

if (session.isolationLevel == SessionInterface.TX_REPEATABLE_READ
|| session.isolationLevel
== SessionInterface.TX_SERIALIZABLE) {
return;
}

if (session.sessionContext.currentStatement == null) {

// after java function / proc with db access
return;
}

if (session.sessionContext.depth > 0) {

// routine or trigger
return;
}

HsqlName[] readLocks =
session.sessionContext.currentStatement.getTableNamesForRead();

if (readLocks.length == 0) {
return;
}

writeLock.lock();

try {
unlockReadTablesTPL(session, readLocks);

final int waitingCount = session.waitingSessions.size();

if (waitingCount == 0) {
return;
}

boolean canUnlock = false;

// if write lock was used for read lock
for (int i = 0; i < readLocks.length; i++) {
if (tableWriteLocks.get(readLocks[i]) != session) {
canUnlock = true;

break;
}
}

if (!canUnlock) {
return;
}

canUnlock = false;

for (int i = 0; i < waitingCount; i++) {
Session current = (Session) session.waitingSessions.get(i);

if (current.abortTransaction) {
canUnlock = true;

break;
}

Statement currentStatement =
current.sessionContext.currentStatement;

if (currentStatement == null) {
canUnlock = true;

break;
}

if (ArrayUtil.containsAny(
readLocks, currentStatement.getTableNamesForWrite())) {
canUnlock = true;

break;
}
}

if (!canUnlock) {
return;
}

resetLocks(session);
resetLatchesMidTransaction(session);
} finally {
writeLock.unlock();
}
}


我们看看rollbackAction的回滚操作,之前每个insert,update,delete操作都会有一个action加到当前session的actionlist,回滚的时候根据这个actionlist来对每个action进行回滚,还需要根据当前事务的时间戳找到需要执行操作的acion列表,

public void rollbackAction(Session session) {
rollbackPartial(session, session.actionIndex, session.actionTimestamp);
endActionTPL(session);
}


void rollbackPartial(Session session, int start, long timestamp) {

Object[] list  = session.rowActionList.getArray();
int      limit = session.rowActionList.size();

if (start == limit) {
return;
}

for (int i = limit - 1; i >= start; i--) {
RowAction action = (RowAction) list[i];

if (action == null || action.type == RowActionBase.ACTION_NONE
|| action.type == RowActionBase.ACTION_DELETE_FINAL) {
continue;
}

Row row = action.memoryRow;

if (row == null) {
row = (Row) action.store.get(action.getPos(), false);
}

if (row == null) {
continue;
}

action.rollback(session, timestamp);

int type = action.mergeRollback(session, timestamp, row);

action.store.rollbackRow(session, row, type, txModel);
}

session.rowActionList.setSize(start);
}


public void rollbackRow(Session session, Row row, int changeAction,
int txModel) {

switch (changeAction) {

case RowAction.ACTION_DELETE :
if (txModel == TransactionManager.LOCKS) {
((RowAVL) row).setNewNodes(this);
indexRow(session, row);
}
break;

case RowAction.ACTION_INSERT :
if (txModel == TransactionManager.LOCKS) {
delete(session, row);
remove(row.getPos());
}
break;

case RowAction.ACTION_INSERT_DELETE :

// INSERT + DELETE
if (txModel == TransactionManager.LOCKS) {
remove(row.getPos());
}
break;
}
}


mergeRollback还会对同一个行如果有插入再删除则会把两个action做合并之类的。

事务隔离级别在2PL的处理

If a table is read-only, it will not be locked by any transaction.
The READ UNCOMMITTED isolation level can be used in 2PL modes for read-only operations. It is the same as
READ COMMITTED plus read only.
The READ COMMITTED isolation level is the default. It keeps write locks on tables until commit, but releases the
read locks after each operation.
The REPEATABLE READ level is upgraded to SERIALIZABLE. These levels keep both read and write locks on
tables until commit.
It is possible to perform some critical operations at the SERIALIZABLE level, while the rest of the operations are
performed at the READ COMMITTED level.
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: