Mybatis Select...for update用法
2017-10-13 15:58
399 查看
Mybatis Select…for update用法
最近有需求批量处理大量数据,由于数据量很大,如果加分布式锁让一个线程跑需要太长时间,所以考虑集群中二十几台机器并行执行,每次取1000条数据处理。选择了使用select…for update悲观锁,每次把取出来的1000条数据加锁之后更改状态字段再commit,从而保证所有线程不重复取数据。
很容易想到的用法就是把select for upate和之后的更新语句放在一个事务中:
SqlSession sqlSession = sqlSessionFactory.openSession(false); try{ List<TestObject> records=sqlSession.selectList("testForUpdate"); //testForUpdate的sql语句为: select * from test_table where status='0' and rownum<1000 order by create_date desc for update if(records!=null && records.size>0){ Map<String,Object> updateParam=new HashMap<>(); updateParam.put("records", records); updateParam.put("status", "01"); sqlSession.update("batchUpdate", updateParam); } }finally{ sqlSession.commit(true); sqlSession.close(); }
首先开一个session,参数传false表示autocommit=false,不自动提交事务
执行select for update
更新取出来的数据
commit并且close session
commit的参数必须为true,这样在没有数据更新时也可以commit。否则commit时会判断一个isDirty的参数,这个参数只有在更新或者插入是会为true,如果不为true就不会commit,
sqlSession.commit(true)
具体的源码如下,我们传入的true就是这个force参数。
private boolean isCommitOrRollbackRequired(boolean force) { return !this.autoCommit && this.dirty || force; }
这种做法看起来没有什么问题,但是当select for update查不到数据时,也会对表加行锁,在我这个情况下就是给status=0的数据加行锁,即使这些数据并不存在,这个锁会一直存在,如果此时插入status=0的数据则会报错。这个时候问题就暴露出来了,当查出的数据为空时,就不会执行下面的update语句,然后就发现这个事务不会被提交,行锁一直不会被释放。
按理说finally语句块里的commit是无论如何都会执行的,为什么最后事务没有提交呢?
Debug进去才发现了事情并不是我们一开始想象的这样。
太长不看版本:
由于我们使用的数据源是阿里的DruidPooledDataSource,这个数据源的autoCommit配置默认为true,导致我们开启session时设置的false并不起作用,实际上我们的事务都是自动提交的,最后finally块中的commit会判断如果自动提交已经开启它就不会执行。所以当select for update的数据为空时,由于不会执行update语句, 所以没有被自动commt,当我们想要手动commit时,由于已经开启了自动commit,所以手动commit也乜嘢执行,最红导致事务不能提交。详解
以DefaultSqlSession为例,Session实例中包含的属性有:private Configuration configuration; //配置信息 private Executor executor; //实际执行select的对象 private boolean autoCommit; //openSession时传入的参数,select之前为false private boolean dirty; //表示是否有脏数据,执行update或者insert时会为true
当sqlSession执行selectList方法时:
public <E> List<E> selectList(String statement, Object parameter, RowBounds rowBounds) { List var5; try { MappedStatement ms = this.configuration.getMappedStatement(statement); var5 = this.executor.query(ms, this.wrapCollection(parameter), rowBounds, Executor.NO_RESULT_HANDLER); } catch (Exception var9) { throw ExceptionFactory.wrapException("Error querying database. Cause: " + var9, var9); } finally { ErrorContext.instance().reset(); } return var5; }
可以看到实际执行query方法进行查询的是Executor对象。
以BaseExecutor为例,属性有
protected Transaction transaction; protected Executor wrapper; protected ConcurrentLinkedQueue<BaseExecutor.DeferredLoad> deferredLoads; protected PerpetualCache localCache; protected PerpetualCache localOutputParameterCache; protected Configuration configuration; protected int queryStack = 0; private boolean closed;
可以看到transaction实际是在Executor对象中,再看transaction的属性,SpringManagedTransaction为例:
protected Connection connection; protected DataSource dataSource; protected TransactionIsolationLevel level; protected boolean autoCommmit;
可以看到connection实际是在transaction对象中获取的,这里也有一个autoCommit属性,这个属性的值在打开session时和我们传入的参数值相等,是false。
再看connection对象,以我们用的DruidPooledConnection为例
public static final int MAX_RECORD_SQL_COUNT = 10; protected Connection conn; protected volatile DruidConnectionHolder holder; protected TransactionInfo transactionInfo; private final boolean dupCloseLogEnable; private volatile boolean traceEnable = false; private boolean disable = false; private boolean closed = false; private final Thread ownerThread; private long connectedTimeNano; private volatile boolean running = false; private volatile boolean abandoned = false; private StackTraceElement[] connectStackTrace; private Throwable disableError = null;
其中有get和set autoCommit的方法,然而这个类里面并没有autoCommit属性,可以看到这里还有一个conn属性,这个get和set的autoCommit的值都是从这个conn里来的。
现在把主要的类大致屡清楚了,那么再看一下selectList的执行过程,以及我们openSession时配置的autoCommit是怎么被覆盖的。
前面说到了执行sqlSession的select方法实际执行的是Executor的query方法。执行时会先试图从缓存中找,如果没有对应的key则会执行queryFromDatabase方法,从DB中查询。
在从DB查询之前会执行prepareStatement方法生成一个Statement,在这个方法里
private Statement prepareStatement(StatementHandler handler, Log statementLog) throws SQLException { Connection connection = this.getConnection(statementLog); Statement stmt = handler.prepare(connection); handler.parameterize(stmt); return stmt; }
可以看到首先是获取一个连接,最终调用的是Transaction的getConnection方法:
public Connection getConnection() throws SQLException { if (this.connection == null) { this.openConnection(); } return this.connection; }
如果当前没有连接则open一个,注意,这个时候transaction对象的autoCommit还是我们设置的false。
接下来再看openConnection方法
private void openConnection() throws SQLException { this.connection = DataSourceUtils.getConnection(this.dataSource); this.autoCommit = this.connection.getAutoCommit(); this.isConnectionTransactional = DataSourceUtils.isConnectionTransactional(this.connection, this.dataSource); if (LOGGER.isDebugEnabled()) { LOGGER.debug("JDBC Connection [" + this.connection + "] will" + (this.isConnectionTransactional ? " " : " not ") + "be managed by Spring"); } }
首先由datasource获取connection,大致过程是dataSource从自己的连接池里面取一个连接返回过来,这里的连接都是根据dataSource的配置提前生成的,也就是说由于我们dataSource默认配置的autoCommit为true,所以这里拿到的所有connection的autoCommit属性都是true。
拿到connection后执行了this.autoCommit = this.connection.getAutoCommit();这样transaction的autoCommit属性就被设置成了true。
最后sqlSession执行commit时,实际执行commit的是Executor的commit,实际又是执行的transaction对象的commit:
public void commit() throws SQLException { if (this.connection != null && !this.isConnectionTransactional && !this.autoCommit) { if (LOGGER.isDebugEnabled()) { LOGGER.debug("Committing JDBC Connection [" + this.connection + "]"); } this.connection.commit(); } }
可以看到执行commit的条件是connection不为空,并且这个连接是事务性的,并且autoCommit=false,由于在获取连接时transaction的autoCommit属性已经被替换成了true,所以这里的commit不会被执行。
终于解释清楚了为什么commit没有执行,事务没有被提交。
实际上如果在openSession(false)之后执行一个update方法,在session.commit()之前这个事务就已经被自动提交了。由于select for update并不会触发事务自动提交所以它的锁不会被释放。
解决方法
这里整个过程中都是用的connection中的autoCommit属性,而不是sqlSession的这个属性,因此解决方法有:用sqlSession.getConnection().setAutoCommit(false);来设置autoCommit属性为false
或者提交时用直接调用connection的commit方法:sqlSession.getConnection().commit();
啊!好长啊,终于写完了。可能有理解不对的地方,还请多多指教哈!
相关文章推荐
- mybatis 获取更新(update)记录的id 之< selectKey > 用法
- select for update用法
- Oracle中select ... for update的用法
- 关于SELECT....FOR UPDATE OF [fields]用法
- Select For update语句浅析
- 数据库中Select For update语句的解析
- Select For update语句浅析
- 用锁实现SQLSERVER中并发控制--实现Oracle中select .... for update功能
- 关于mybatis 动态 sql 的一些陷阱:防止批量update,delete,select...
- Select for update, nowait, skip locked
- mybatis select、insert、update、delete标签
- select for update引发死锁分析
- MySQL中的行级锁SELECT FOR UPDATE 和LOCK IN SHARE MODE 区别
- SELECT * ...... FOR UPDATE 锁机制
- mysql锁SELECT FOR UPDATE【转】
- Oracle数据库封锁和select...[for update [of tab.col]]的研究
- Select For update语句浅析
- [习题]给初学者的范例,多重字段搜寻引擎 for GridView,兼论 SqlDataSource与SelectParameter的用法
- select...for update使用方法
- 数据库的update、delete、insert和select用法