mysql jdbc驱动源码分析(Statement的executeQuery 和executeUpdate方法)
2016-01-03 20:10
1461 查看
在前面的章节中我们获取了Statement对象,下面我们来看看Statement的executeQuery() 和executeUpdate() 方法来执行相关操作。
首先来看看StatementImpl对象的executeQuery() 方法,源码如下:
在上面我们看到了具体的实现是在ConnectionImpl这个类中。下面这个这个类中的execSQL()方法,源码如下:
通过注释和参数我们也能大体了解到这个方法所实现的功能。就是向服务发送查询语句,而获取一个ResultSet object
而通过上面的方法我们看到,具体的实现是调用了下面的方法,即方法重载,源码如下:
在上面的方法中我们看到了其调用了 this.io.sqlQueryDirect() 方法进行查询,而这个io对象就是ConnectionImpl 类的一个变量,类型是MysqlIO类型具体代码如下:
在方法中我们看到了,调用readAllResults() 这个方法来获取ResultSet 对象,readAllResults() 方法源码如下:
在这方法中我们看到了,是调用了readResultsForQueryOrUpdate() 方法进行查询获取ResultSet 对象的。下面我们看看这个方法源码:
这个方法中我们看到了 调用getResultSet() 方法获取ResultSet实例,源码如下:
我们又看到 他是调用buildResultSetWithRows 这个方法来获取 ResultSetImpl 实例的。具体源码如下:
我们看到这里直接调用到了com.mysql.jdbc.ResultSetImpl.getInstance() 方法来获取实例。具体源码如下:
ResultSetImpl 类的构造函数如下:
OK 到这里我们也知道,其实什么也没看到就是调用了ResultSetImpl的构造函数创建了一个对象。
简单总结:
我们从整个过程中看到,所有的基础都是Connection 即客户端和服务端的链接也就是和服务器Socket的链接,如果socket断了,就不能进行沟通了。而当有了Socket之后我们可以执行查询方法或更新方法来获取结果集对象,但是在获取结果集对象的时候我们要设置结果集的类型,因为设置不同的结果集类型,我们对获得的结果集对象ResultSet对象会有不同的操作即有的可以向结果集前端移动动,有的可以向结果集后端移动等等,这些都要在生成ResultSet 对象前进行设置。
其实看源码的目的是更清楚,明白的了解jdbc的整个过程但是,在看的过程中发现,能力有限,不能静下心来一个一个的看懂,而只能草草的结束,从中了解了大体过程,但是提升不大,希望下次来一次的时候能够有提升,不过学习的时候还是有些收获在会后的一篇中会进行总结。
在写这文章之前搜了一下,其中分析jdbc源码的文章很少,而自己水平有限,所以如果有好的文章或建议还请指教,这里先谢谢!
首先来看看StatementImpl对象的executeQuery() 方法,源码如下:
/** * Execute a SQL statement that returns a single ResultSet * * @param sql * typically a static SQL SELECT statement * * @return a ResulSet that contains the data produced by the query * * @exception SQLException * if a database access error occurs */ // 查询方法,执行给定的sql获取返回结果集 public java.sql.ResultSet executeQuery(String sql) throws SQLException { synchronized (checkClosed().getConnectionMutex()) { // mysql 服务的链接 MySQLConnection locallyScopedConn = this.connection; this.retrieveGeneratedKeys = false; resetCancelledState(); // 检测sql是不是null,或者sql.length=0,如果为null 或为0 则抛出异常 checkNullOrEmptyQuery(sql); // 设置超时 setupStreamingTimeout(locallyScopedConn); if (this.doEscapeProcessing) { // 开始执行,执行sql语句,获取结果 Object escapedSqlResult = EscapeProcessor.escapeSQL(sql, locallyScopedConn.serverSupportsConvertFn(), this.connection); if (escapedSqlResult instanceof String) { sql = (String) escapedSqlResult; } else { sql = ((EscapeProcessorResult) escapedSqlResult).escapedSql; } } char firstStatementChar = StringUtils.firstAlphaCharUc(sql, findStartOfStatement(sql)); if (sql.charAt(0) == '/') { if (sql.startsWith(PING_MARKER)) { doPingInstead(); return this.results; } } checkForDml(sql, firstStatementChar); implicitlyCloseAllOpenResults(); CachedResultSetMetaData cachedMetaData = null; if (useServerFetch()) { this.results = createResultSetUsingServerFetch(sql); return this.results; } CancelTask timeoutTask = null; String oldCatalog = null; try { if (locallyScopedConn.getEnableQueryTimeouts() && this.timeoutInMillis != 0 && locallyScopedConn.versionMeetsMinimum(5, 0, 0)) { timeoutTask = new CancelTask(this); locallyScopedConn.getCancelTimer().schedule(timeoutTask, this.timeoutInMillis); } if (!locallyScopedConn.getCatalog().equals(this.currentCatalog)) { oldCatalog = locallyScopedConn.getCatalog(); locallyScopedConn.setCatalog(this.currentCatalog); } // // Check if we have cached metadata for this query... // // 检测这个查询语句有没有缓存,如果有则直接取缓存 Field[] cachedFields = null; if (locallyScopedConn.getCacheResultSetMetadata()) { cachedMetaData = locallyScopedConn.getCachedMetaData(sql); if (cachedMetaData != null) { cachedFields = cachedMetaData.fields; } } locallyScopedConn.setSessionMaxRows(this.maxRows); statementBegins(); // 调用 MySQLConnection的方法execSQL() 方法来执行给定的sql语句,返回结构集 // 我们发现MySQLConnection是一个接口,而他的具体实现是ConnectionImpl这个类 this.results = locallyScopedConn.execSQL(this, sql, this.maxRows, null, this.resultSetType, this.resultSetConcurrency, createStreamingResultSet(), this.currentCatalog, cachedFields); if (timeoutTask != null) { if (timeoutTask.caughtWhileCancelling != null) { throw timeoutTask.caughtWhileCancelling; } timeoutTask.cancel(); locallyScopedConn.getCancelTimer().purge(); timeoutTask = null; } synchronized (this.cancelTimeoutMutex) { if (this.wasCancelled) { SQLException cause = null; if (this.wasCancelledByTimeout) { cause = new MySQLTimeoutException(); } else { cause = new MySQLStatementCancelledException(); } resetCancelledState(); throw cause; } } } finally { this.statementExecuting.set(false); if (timeoutTask != null) { timeoutTask.cancel(); locallyScopedConn.getCancelTimer().purge(); } if (oldCatalog != null) { locallyScopedConn.setCatalog(oldCatalog); } } this.lastInsertId = this.results.getUpdateID(); if (cachedMetaData != null) { locallyScopedConn.initializeResultsMetadataFromCache(sql, cachedMetaData, this.results); } else { if (this.connection.getCacheResultSetMetadata()) { locallyScopedConn.initializeResultsMetadataFromCache(sql, null /* will be created */, this.results); } } return this.results; } }
在上面我们看到了具体的实现是在ConnectionImpl这个类中。下面这个这个类中的execSQL()方法,源码如下:
/** * Send a query to the server. Returns one of the ResultSet objects. This is * synchronized, so Statement's queries will be serialized. * * @param callingStatement * @param sql * the SQL statement to be executed * @param maxRows * @param packet * @param resultSetType * @param resultSetConcurrency * @param streamResults * @param queryIsSelectOnly * @param catalog * @param unpackFields * @return a ResultSet holding the results * @exception SQLException * if a database error occurs */ // ResultSet execSQL(Statement callingStatement, String sql, // int maxRowsToRetreive, String catalog) throws SQLException { // return execSQL(callingStatement, sql, maxRowsToRetreive, null, // java.sql.ResultSet.TYPE_FORWARD_ONLY, // DEFAULT_RESULT_SET_CONCURRENCY, catalog); // } // ResultSet execSQL(Statement callingStatement, String sql, int maxRows, // int resultSetType, int resultSetConcurrency, boolean streamResults, // boolean queryIsSelectOnly, String catalog, boolean unpackFields) throws // SQLException { // return execSQL(callingStatement, sql, maxRows, null, resultSetType, // resultSetConcurrency, streamResults, queryIsSelectOnly, catalog, // unpackFields); // } // 查询方法开始 // 方法参数 statementImp实例,查询sql语句,查询最大行数,buffer缓存区,result结果的一些设置,result结果并发设置,结果流,日志,缓存属性 public ResultSetInternalMethods execSQL(StatementImpl callingStatement, String sql, int maxRows, Buffer packet, int resultSetType, int resultSetConcurrency, boolean streamResults, String catalog, Field[] cachedMetadata) throws SQLException { return execSQL(callingStatement, sql, maxRows, packet, resultSetType, resultSetConcurrency, streamResults, catalog, cachedMetadata, false); }
通过注释和参数我们也能大体了解到这个方法所实现的功能。就是向服务发送查询语句,而获取一个ResultSet object
而通过上面的方法我们看到,具体的实现是调用了下面的方法,即方法重载,源码如下:
//方法重载 public ResultSetInternalMethods execSQL(StatementImpl callingStatement, String sql, int maxRows, Buffer packet, int resultSetType, int resultSetConcurrency, boolean streamResults, String catalog, Field[] cachedMetadata, boolean isBatch) throws SQLException { // 同步方法 synchronized (getConnectionMutex()) { // // Fall-back if the master is back online if we've issued queriesBeforeRetryMaster queries since we failed over // //查询开始时间 long queryStartTime = 0; // 查询末尾位置 int endOfQueryPacketPosition = 0; // 这个那边第一次调用的时候传递就是null if (packet != null) { //查询结束数据包位置 endOfQueryPacketPosition = packet.getPosition(); } if (getGatherPerformanceMetrics()) { // 设置时间为当前系统时间 queryStartTime = System.currentTimeMillis(); } this.lastQueryFinishedTime = 0; // we're busy! if ((getHighAvailability()) && (this.autoCommit || getAutoReconnectForPools()) && this.needsPing && !isBatch) { try { pingInternal(false, 0); this.needsPing = false; } catch (Exception Ex) { createNewIO(true); } } try { // packet 就是查询结果数据包 if (packet == null) { String encoding = null; // 获取使用的编码 if (getUseUnicode()) { // 设置编码方法 encoding = getEncoding(); } // 直接执行查询 return this.io.sqlQueryDirect(callingStatement, sql, encoding, null, maxRows, resultSetType, resultSetConcurrency, streamResults, catalog, cachedMetadata); } // 从缓存中获取数据 return this.io.sqlQueryDirect(callingStatement, null, null, packet, maxRows, resultSetType, resultSetConcurrency, streamResults, catalog, cachedMetadata); } catch (java.sql.SQLException sqlE) { // don't clobber SQL exceptions if (getDumpQueriesOnException()) { String extractedSql = extractSqlFromPacket(sql, packet, endOfQueryPacketPosition); StringBuilder messageBuf = new StringBuilder(extractedSql.length() + 32); messageBuf.append("\n\nQuery being executed when exception was thrown:\n"); messageBuf.append(extractedSql); messageBuf.append("\n\n"); sqlE = appendMessageToException(sqlE, messageBuf.toString(), getExceptionInterceptor()); } if ((getHighAvailability())) { this.needsPing = true; } else { String sqlState = sqlE.getSQLState(); if ((sqlState != null) && sqlState.equals(SQLError.SQL_STATE_COMMUNICATION_LINK_FAILURE)) { cleanup(sqlE); } } throw sqlE; } catch (Exception ex) { if (getHighAvailability()) { this.needsPing = true; } else if (ex instanceof IOException) { cleanup(ex); } SQLException sqlEx = SQLError.createSQLException(Messages.getString("Connection.UnexpectedException"), SQLError.SQL_STATE_GENERAL_ERROR, getExceptionInterceptor()); sqlEx.initCause(ex); throw sqlEx; } finally { if (getMaintainTimeStats()) { this.lastQueryFinishedTime = System.currentTimeMillis(); } if (getGatherPerformanceMetrics()) { long queryTime = System.currentTimeMillis() - queryStartTime; registerQueryExecutionTime(queryTime); } } } }
在上面的方法中我们看到了其调用了 this.io.sqlQueryDirect() 方法进行查询,而这个io对象就是ConnectionImpl 类的一个变量,类型是MysqlIO类型具体代码如下:
/** The I/O abstraction interface (network conn to MySQL server */ private transient MysqlIO io = null;而从中我们看到具体的查询方法是在MysqlIO 这个类中实现的。调用方法的源码如下:
/** * Send a query stored in a packet directly to the server. * 将一个查询存储在一个数据包中,直接发送到服务器。 * @param callingStatement 调用查询语句 * @param resultSetConcurrency ResultSet结果并发参数设置 * @param characterEncoding 查询字符编码 * @param queryPacket 查询报文 * @param maxRows 最大行数 * @param conn 和server的链接 * @param resultSetType ResultSet结果集类型 * @param resultSetConcurrency ResultSet结果集并发设置 * @param streamResults 流结果集 * @param catalog * @param unpackFieldInfo * should we read MYSQL_FIELD info (if available)? * * @throws Exception */ // 这是一个finall 方法我们不行从写也不能重载 final ResultSetInternalMethods sqlQueryDirect(StatementImpl callingStatement, String query, String characterEncoding, Buffer queryPacket, int maxRows, int resultSetType, int resultSetConcurrency, boolean streamResults, String catalog, Field[] cachedMetadata) throws Exception { this.statementExecutionDepth++; try { if (this.statementInterceptors != null) { ResultSetInternalMethods interceptedResults = invokeStatementInterceptorsPre(query, callingStatement, false); if (interceptedResults != null) { return interceptedResults; } } long queryStartTime = 0; long queryEndTime = 0; String statementComment = this.connection.getStatementComment(); if (this.connection.getIncludeThreadNamesAsStatementComment()) { statementComment = (statementComment != null ? statementComment + ", " : "") + "java thread: " + Thread.currentThread().getName(); } if (query != null) { // We don't know exactly how many bytes we're going to get from the query. Since we're dealing with Unicode, the max is 2, so pad it // (2 * query) + space for headers int packLength = HEADER_LENGTH + 1 + (query.length() * 3) + 2; byte[] commentAsBytes = null; if (statementComment != null) { commentAsBytes = StringUtils.getBytes(statementComment, null, characterEncoding, this.connection.getServerCharset(), this.connection.parserKnowsUnicode(), getExceptionInterceptor()); packLength += commentAsBytes.length; packLength += 6; // for /*[space] [space]*/ } if (this.sendPacket == null) { this.sendPacket = new Buffer(packLength); } else { this.sendPacket.clear(); } this.sendPacket.writeByte((byte) MysqlDefs.QUERY); if (commentAsBytes != null) { this.sendPacket.writeBytesNoNull(Constants.SLASH_STAR_SPACE_AS_BYTES); this.sendPacket.writeBytesNoNull(commentAsBytes); this.sendPacket.writeBytesNoNull(Constants.SPACE_STAR_SLASH_SPACE_AS_BYTES); } if (characterEncoding != null) { if (this.platformDbCharsetMatches) { this.sendPacket.writeStringNoNull(query, characterEncoding, this.connection.getServerCharset(), this.connection.parserKnowsUnicode(), this.connection); } else { if (StringUtils.startsWithIgnoreCaseAndWs(query, "LOAD DATA")) { this.sendPacket.writeBytesNoNull(StringUtils.getBytes(query)); } else { this.sendPacket.writeStringNoNull(query, characterEncoding, this.connection.getServerCharset(), this.connection.parserKnowsUnicode(), this.connection); } } } else { this.sendPacket.writeStringNoNull(query); } queryPacket = this.sendPacket; } byte[] queryBuf = null; int oldPacketPosition = 0; if (this.needToGrabQueryFromPacket) { queryBuf = queryPacket.getByteBuffer(); // save the packet position // 获取查询报文的位置,并保存。 oldPacketPosition = queryPacket.getPosition(); queryStartTime = getCurrentTimeNanosOrMillis(); } if (this.autoGenerateTestcaseScript) { String testcaseQuery = null; if (query != null) { if (statementComment != null) { testcaseQuery = "/* " + statementComment + " */ " + query; } else { testcaseQuery = query; } } else { testcaseQuery = StringUtils.toString(queryBuf, 5, (oldPacketPosition - 5)); } StringBuilder debugBuf = new StringBuilder(testcaseQuery.length() + 32); this.connection.generateConnectionCommentBlock(debugBuf); debugBuf.append(testcaseQuery); debugBuf.append(';'); this.connection.dumpTestcaseQuery(debugBuf.toString()); } // Send query command and sql query string // 发送查询命令和sql语句,获取查询报文 Buffer resultPacket = sendCommand(MysqlDefs.QUERY, null, queryPacket, false, null, 0); long fetchBeginTime = 0; long fetchEndTime = 0; String profileQueryToLog = null; boolean queryWasSlow = false; if (this.profileSql || this.logSlowQueries) { queryEndTime = getCurrentTimeNanosOrMillis(); boolean shouldExtractQuery = false; if (this.profileSql) { shouldExtractQuery = true; } else if (this.logSlowQueries) { long queryTime = queryEndTime - queryStartTime; boolean logSlow = false; if (!this.useAutoSlowLog) { logSlow = queryTime > this.connection.getSlowQueryThresholdMillis(); } else { logSlow = this.connection.isAbonormallyLongQuery(queryTime); this.connection.reportQueryTime(queryTime); } if (logSlow) { shouldExtractQuery = true; queryWasSlow = true; } } if (shouldExtractQuery) { // Extract the actual query from the network packet boolean truncated = false; int extractPosition = oldPacketPosition; if (oldPacketPosition > this.connection.getMaxQuerySizeToLog()) { extractPosition = this.connection.getMaxQuerySizeToLog() + 5; truncated = true; } profileQueryToLog = StringUtils.toString(queryBuf, 5, (extractPosition - 5)); if (truncated) { profileQueryToLog += Messages.getString("MysqlIO.25"); } } fetchBeginTime = queryEndTime; } // 查询的结果集 ResultSetInternalMethods rs = readAllResults(callingStatement, maxRows, resultSetType, resultSetConcurrency, streamResults, catalog, resultPacket, false, -1L, cachedMetadata); if (queryWasSlow && !this.serverQueryWasSlow /* don't log slow queries twice */) { StringBuilder mesgBuf = new StringBuilder(48 + profileQueryToLog.length()); mesgBuf.append(Messages.getString( "MysqlIO.SlowQuery", new Object[] { String.valueOf(this.useAutoSlowLog ? " 95% of all queries " : this.slowQueryThreshold), this.queryTimingUnits, Long.valueOf(queryEndTime - queryStartTime) })); mesgBuf.append(profileQueryToLog); ProfilerEventHandler eventSink = ProfilerEventHandlerFactory.getInstance(this.connection); eventSink.consumeEvent(new ProfilerEvent(ProfilerEvent.TYPE_SLOW_QUERY, "", catalog, this.connection.getId(), (callingStatement != null) ? callingStatement.getId() : 999, ((ResultSetImpl) rs).resultId, System.currentTimeMillis(), (int) (queryEndTime - queryStartTime), this.queryTimingUnits, null, LogUtils.findCallingClassAndMethod(new Throwable()), mesgBuf .toString())); if (this.connection.getExplainSlowQueries()) { if (oldPacketPosition < MAX_QUERY_SIZE_TO_EXPLAIN) { explainSlowQuery(queryPacket.getBytes(5, (oldPacketPosition - 5)), profileQueryToLog); } else { this.connection.getLog().logWarn(Messages.getString("MysqlIO.28") + MAX_QUERY_SIZE_TO_EXPLAIN + Messages.getString("MysqlIO.29")); } } } if (this.logSlowQueries) { ProfilerEventHandler eventSink = ProfilerEventHandlerFactory.getInstance(this.connection); if (this.queryBadIndexUsed && this.profileSql) { eventSink.consumeEvent(new ProfilerEvent(ProfilerEvent.TYPE_SLOW_QUERY, "", catalog, this.connection.getId(), (callingStatement != null) ? callingStatement.getId() : 999, ((ResultSetImpl) rs).resultId, System.currentTimeMillis(), (queryEndTime - queryStartTime), this.queryTimingUnits, null, LogUtils.findCallingClassAndMethod(new Throwable()), Messages .getString("MysqlIO.33") + profileQueryToLog)); } if (this.queryNoIndexUsed && this.profileSql) { eventSink.consumeEvent(new ProfilerEvent(ProfilerEvent.TYPE_SLOW_QUERY, "", catalog, this.connection.getId(), (callingStatement != null) ? callingStatement.getId() : 999, ((ResultSetImpl) rs).resultId, System.currentTimeMillis(), (queryEndTime - queryStartTime), this.queryTimingUnits, null, LogUtils.findCallingClassAndMethod(new Throwable()), Messages .getString("MysqlIO.35") + profileQueryToLog)); } if (this.serverQueryWasSlow && this.profileSql) { eventSink.consumeEvent(new ProfilerEvent(ProfilerEvent.TYPE_SLOW_QUERY, "", catalog, this.connection.getId(), (callingStatement != null) ? callingStatement.getId() : 999, ((ResultSetImpl) rs).resultId, System.currentTimeMillis(), (queryEndTime - queryStartTime), this.queryTimingUnits, null, LogUtils.findCallingClassAndMethod(new Throwable()), Messages .getString("MysqlIO.ServerSlowQuery") + profileQueryToLog)); } } if (this.profileSql) { fetchEndTime = getCurrentTimeNanosOrMillis(); ProfilerEventHandler eventSink = ProfilerEventHandlerFactory.getInstance(this.connection); eventSink.consumeEvent(new ProfilerEvent(ProfilerEvent.TYPE_QUERY, "", catalog, this.connection.getId(), (callingStatement != null) ? callingStatement.getId() : 999, ((ResultSetImpl) rs).resultId, System.currentTimeMillis(), (queryEndTime - queryStartTime), this.queryTimingUnits, null, LogUtils.findCallingClassAndMethod(new Throwable()), profileQueryToLog)); eventSink.consumeEvent(new ProfilerEvent(ProfilerEvent.TYPE_FETCH, "", catalog, this.connection.getId(), (callingStatement != null) ? callingStatement.getId() : 999, ((ResultSetImpl) rs).resultId, System.currentTimeMillis(), (fetchEndTime - fetchBeginTime), this.queryTimingUnits, null, LogUtils.findCallingClassAndMethod(new Throwable()), null)); } if (this.hadWarnings) { scanForAndThrowDataTruncation(); } if (this.statementInterceptors != null) { ResultSetInternalMethods interceptedResults = invokeStatementInterceptorsPost(query, callingStatement, rs, false, null); if (interceptedResults != null) { rs = interceptedResults; } } return rs; } catch (SQLException sqlEx) { if (this.statementInterceptors != null) { invokeStatementInterceptorsPost(query, callingStatement, null, false, sqlEx); // we don't do anything with the result set in this case } if (callingStatement != null) { synchronized (callingStatement.cancelTimeoutMutex) { if (callingStatement.wasCancelled) { SQLException cause = null; if (callingStatement.wasCancelledByTimeout) { cause = new MySQLTimeoutException(); } else { cause = new MySQLStatementCancelledException(); } callingStatement.resetCancelledState(); throw cause; } } } throw sqlEx; } finally { this.statementExecutionDepth--; } }
在方法中我们看到了,调用readAllResults() 这个方法来获取ResultSet 对象,readAllResults() 方法源码如下:
// 获取结果解 ResultSetImpl readAllResults(StatementImpl callingStatement, int maxRows, int resultSetType, int resultSetConcurrency, boolean streamResults, String catalog, Buffer resultPacket, boolean isBinaryEncoded, long preSentColumnCount, Field[] metadataFromCache) throws SQLException { resultPacket.setPosition(resultPacket.getPosition() - 1); // 调用方法获取ResultSet对象 ResultSetImpl topLevelResultSet = readResultsForQueryOrUpdate(callingStatement, maxRows, resultSetType, resultSetConcurrency, streamResults, catalog, resultPacket, isBinaryEncoded, preSentColumnCount, metadataFromCache); ResultSetImpl currentResultSet = topLevelResultSet; boolean checkForMoreResults = ((this.clientParam & CLIENT_MULTI_RESULTS) != 0); boolean serverHasMoreResults = (this.serverStatus & SERVER_MORE_RESULTS_EXISTS) != 0; // // TODO: We need to support streaming of multiple result sets // if (serverHasMoreResults && streamResults) { //clearInputStream(); // //throw SQLError.createSQLException(Messages.getString("MysqlIO.23"), //SQLError.SQL_STATE_DRIVER_NOT_CAPABLE); if (topLevelResultSet.getUpdateCount() != -1) { tackOnMoreStreamingResults(topLevelResultSet); } reclaimLargeReusablePacket(); return topLevelResultSet; } boolean moreRowSetsExist = checkForMoreResults & serverHasMoreResults; while (moreRowSetsExist) { Buffer fieldPacket = checkErrorPacket(); fieldPacket.setPosition(0); ResultSetImpl newResultSet = readResultsForQueryOrUpdate(callingStatement, maxRows, resultSetType, resultSetConcurrency, streamResults, catalog, fieldPacket, isBinaryEncoded, preSentColumnCount, metadataFromCache); currentResultSet.setNextResultSet(newResultSet); currentResultSet = newResultSet; moreRowSetsExist = (this.serverStatus & SERVER_MORE_RESULTS_EXISTS) != 0; } if (!streamResults) { clearInputStream(); } reclaimLargeReusablePacket(); return topLevelResultSet; }
在这方法中我们看到了,是调用了readResultsForQueryOrUpdate() 方法进行查询获取ResultSet 对象的。下面我们看看这个方法源码:
/** * Reads one result set off of the wire, if the result is actually an * update count, creates an update-count only result set. * 读取一个结果,如果结果是实际的 * 更新计数,创建一个更新计数结果集。 * @param callingStatement * @param maxRows * the maximum rows to return in the result set. * 结果集合中返回的最大行 * @param resultSetType * scrollability * @param resultSetConcurrency * updatability * @param streamResults * should the driver leave the results on the wire, * and read them only when needed? * 驱动应该把结果放在电报上,只有在需要时才看出来吗? * @param catalog * the catalog in use * @param resultPacket * the first packet of information in the result set * 结果集合中的第一个数据包 * @param isBinaryEncoded * is this result set from a prepared statement? * @param preSentColumnCount * do we already know the number of columns? * 我们已经知道的列数吗? * @param unpackFieldInfo * should we unpack the field information? * * @return a result set that either represents the rows, or an update count * * @throws SQLException * if an error occurs while reading the rows */ // 这里是执行sql查询或update的具体方法。而各个参数具体意义通过注释也能了解一二 protected final ResultSetImpl readResultsForQueryOrUpdate(StatementImpl callingStatement, int maxRows, int resultSetType, int resultSetConcurrency, boolean streamResults, String catalog, Buffer resultPacket, boolean isBinaryEncoded, long preSentColumnCount, Field[] metadataFromCache) throws SQLException { // 从查询的结果的数据包中获取总行数 long columnCount = resultPacket.readFieldLength(); // 如果是0 则执行updates方法来执行 if (columnCount == 0) { // 调用方法执行保存获取ResultSet方法 return buildResultSetWithUpdates(callingStatement, resultPacket); } else if (columnCount == Buffer.NULL_LENGTH) { String charEncoding = null; if (this.connection.getUseUnicode()) { charEncoding = this.connection.getEncoding(); } String fileName = null; if (this.platformDbCharsetMatches) { fileName = ((charEncoding != null) ? resultPacket.readString(charEncoding, getExceptionInterceptor()) : resultPacket.readString()); } else { fileName = resultPacket.readString(); } // 向服务器发送信息 return sendFileToServer(callingStatement, fileName); } else { // 调用方法获取resultset com.mysql.jdbc.ResultSetImpl results = getResultSet(callingStatement, columnCount, maxRows, resultSetType, resultSetConcurrency, streamResults, catalog, isBinaryEncoded, metadataFromCache); return results; } }
这个方法中我们看到了 调用getResultSet() 方法获取ResultSet实例,源码如下:
/** * Build a result set. Delegates to buildResultSetWithRows() to build a * JDBC-version-specific ResultSet, given rows as byte data, and field * information. * * @param callingStatement * @param columnCount * the number of columns in the result set * @param maxRows * the maximum number of rows to read (-1 means all rows) * @param resultSetType * (TYPE_FORWARD_ONLY, TYPE_SCROLL_????) * @param resultSetConcurrency * the type of result set (CONCUR_UPDATABLE or * READ_ONLY) * @param streamResults * should the result set be read all at once, or * streamed? * @param catalog * the database name in use when the result set was created * @param isBinaryEncoded * is this result set in native encoding? * @param unpackFieldInfo * should we read MYSQL_FIELD info (if available)? * * @return a result set * * @throws SQLException * if a database access error occurs */ protected ResultSetImpl getResultSet(StatementImpl callingStatement, long columnCount, int maxRows, int resultSetType, int resultSetConcurrency, boolean streamResults, String catalog, boolean isBinaryEncoded, Field[] metadataFromCache) throws SQLException { Buffer packet; // The packet from the server Field[] fields = null; // Read in the column information if (metadataFromCache == null /* we want the metadata from the server */) { fields = new Field[(int) columnCount]; for (int i = 0; i < columnCount; i++) { Buffer fieldPacket = null; fieldPacket = readPacket(); fields[i] = unpackField(fieldPacket, false); } } else { for (int i = 0; i < columnCount; i++) { skipPacket(); } } packet = reuseAndReadPacket(this.reusablePacket); readServerStatusForResultSets(packet); // // Handle cursor-based fetch first // if (this.connection.versionMeetsMinimum(5, 0, 2) && this.connection.getUseCursorFetch() && isBinaryEncoded && callingStatement != null && callingStatement.getFetchSize() != 0 && callingStatement.getResultSetType() == ResultSet.TYPE_FORWARD_ONLY) { ServerPreparedStatement prepStmt = (com.mysql.jdbc.ServerPreparedStatement) callingStatement; boolean usingCursor = true; // // Server versions 5.0.5 or newer will only open a cursor and set this flag if they can, otherwise they punt and go back to mysql_store_results() // behavior // if (this.connection.versionMeetsMinimum(5, 0, 5)) { usingCursor = (this.serverStatus & SERVER_STATUS_CURSOR_EXISTS) != 0; } if (usingCursor) { RowData rows = new RowDataCursor(this, prepStmt, fields); ResultSetImpl rs = buildResultSetWithRows(callingStatement, catalog, fields, rows, resultSetType, resultSetConcurrency, isBinaryEncoded); if (usingCursor) { rs.setFetchSize(callingStatement.getFetchSize()); } return rs; } } RowData rowData = null; if (!streamResults) { rowData = readSingleRowSet(columnCount, maxRows, resultSetConcurrency, isBinaryEncoded, (metadataFromCache == null) ? fields : metadataFromCache); } else { rowData = new RowDataDynamic(this, (int) columnCount, (metadataFromCache == null) ? fields : metadataFromCache, isBinaryEncoded); this.streamingData = rowData; } ResultSetImpl rs = buildResultSetWithRows(callingStatement, catalog, (metadataFromCache == null) ? fields : metadataFromCache, rowData, resultSetType, resultSetConcurrency, isBinaryEncoded); return rs; }
我们又看到 他是调用buildResultSetWithRows 这个方法来获取 ResultSetImpl 实例的。具体源码如下:
// 获取resultset 类型的对象 private com.mysql.jdbc.ResultSetImpl buildResultSetWithRows(StatementImpl callingStatement, String catalog, com.mysql.jdbc.Field[] fields, RowData rows, int resultSetType, int resultSetConcurrency, boolean isBinaryEncoded) throws SQLException { ResultSetImpl rs = null; // 按照结果集并发的设置来执行获取。 switch (resultSetConcurrency) { case java.sql.ResultSet.CONCUR_READ_ONLY: rs = com.mysql.jdbc.ResultSetImpl.getInstance(catalog, fields, rows, this.connection, callingStatement, false); if (isBinaryEncoded) { rs.setBinaryEncoded(); } break; case java.sql.ResultSet.CONCUR_UPDATABLE: rs = com.mysql.jdbc.ResultSetImpl.getInstance(catalog, fields, rows, this.connection, callingStatement, true); break; default: return com.mysql.jdbc.ResultSetImpl.getInstance(catalog, fields, rows, this.connection, callingStatement, false); } rs.setResultSetType(resultSetType); rs.setResultSetConcurrency(resultSetConcurrency); return rs; }
我们看到这里直接调用到了com.mysql.jdbc.ResultSetImpl.getInstance() 方法来获取实例。具体源码如下:
protected static ResultSetImpl getInstance(String catalog, Field[] fields, RowData tuples, MySQLConnection conn, StatementImpl creatorStmt, boolean isUpdatable) throws SQLException { if (!Util.isJdbc4()) { if (!isUpdatable) { return new ResultSetImpl(catalog, fields, tuples, conn, creatorStmt); } return new UpdatableResultSet(catalog, fields, tuples, conn, creatorStmt); } if (!isUpdatable) { return (ResultSetImpl) Util.handleNewInstance(JDBC_4_RS_5_ARG_CTOR, new Object[] { catalog, fields, tuples, conn, creatorStmt }, conn.getExceptionInterceptor()); } return (ResultSetImpl) Util.handleNewInstance(JDBC_4_UPD_RS_5_ARG_CTOR, new Object[] { catalog, fields, tuples, conn, creatorStmt }, conn.getExceptionInterceptor()); }
ResultSetImpl 类的构造函数如下:
/** * Creates a new ResultSet object. * * @param catalog * the database in use when we were created * @param fields * an array of Field objects (basically, the ResultSet MetaData) * @param tuples * actual row data * @param conn * the Connection that created us. * @param creatorStmt * * @throws SQLException * if an error occurs */ public ResultSetImpl(String catalog, Field[] fields, RowData tuples, MySQLConnection conn, StatementImpl creatorStmt) throws SQLException { this.connection = conn; this.retainOwningStatement = false; if (this.connection != null) { this.exceptionInterceptor = this.connection.getExceptionInterceptor(); this.useStrictFloatingPoint = this.connection.getStrictFloatingPoint(); this.connectionId = this.connection.getId(); this.useFastDateParsing = this.connection.getUseFastDateParsing(); this.profileSql = this.connection.getProfileSql(); this.retainOwningStatement = this.connection.getRetainStatementAfterResultSetClose(); this.jdbcCompliantTruncationForReads = this.connection.getJdbcCompliantTruncationForReads(); this.useFastIntParsing = this.connection.getUseFastIntParsing(); this.serverTimeZoneTz = this.connection.getServerTimezoneTZ(); this.padCharsWithSpace = this.connection.getPadCharsWithSpace(); } this.owningStatement = creatorStmt; this.catalog = catalog; this.fields = fields; this.rowData = tuples; this.updateCount = this.rowData.size(); if (NonRegisteringDriver.DEBUG) { System.out.println(Messages.getString("ResultSet.Retrieved__1") + this.updateCount + " rows"); } this.reallyResult = true; // Check for no results if (this.rowData.size() > 0) { if (this.updateCount == 1) { if (this.thisRow == null) { this.rowData.close(); // empty result set this.updateCount = -1; } } } else { this.thisRow = null; } this.rowData.setOwner(this); if (this.fields != null) { initializeWithMetadata(); } // else called by Connection.initializeResultsMetadataFromCache() when cached this.useLegacyDatetimeCode = this.connection.getUseLegacyDatetimeCode(); this.useColumnNamesInFindColumn = this.connection.getUseColumnNamesInFindColumn(); setRowPositionValidity(); }
OK 到这里我们也知道,其实什么也没看到就是调用了ResultSetImpl的构造函数创建了一个对象。
简单总结:
我们从整个过程中看到,所有的基础都是Connection 即客户端和服务端的链接也就是和服务器Socket的链接,如果socket断了,就不能进行沟通了。而当有了Socket之后我们可以执行查询方法或更新方法来获取结果集对象,但是在获取结果集对象的时候我们要设置结果集的类型,因为设置不同的结果集类型,我们对获得的结果集对象ResultSet对象会有不同的操作即有的可以向结果集前端移动动,有的可以向结果集后端移动等等,这些都要在生成ResultSet 对象前进行设置。
其实看源码的目的是更清楚,明白的了解jdbc的整个过程但是,在看的过程中发现,能力有限,不能静下心来一个一个的看懂,而只能草草的结束,从中了解了大体过程,但是提升不大,希望下次来一次的时候能够有提升,不过学习的时候还是有些收获在会后的一篇中会进行总结。
在写这文章之前搜了一下,其中分析jdbc源码的文章很少,而自己水平有限,所以如果有好的文章或建议还请指教,这里先谢谢!
相关文章推荐
- iOS UI-UIScrollView控件实现图片轮播 (UIPageControl-分页指示器)
- [置顶]【Mood 20】DailyBuild 4月
- 【开源项目13】Volley框架 以及 设置request超时时间
- 【Android 界面效果15】Android UI 之一步步教你自定义控件(自定义属性、合理设计onMeasure、合理设计onDraw等)
- 【Android 界面效果3】Android_UI_点击按钮切换背景效果实现
- iOS UI-UIScrollView控件实现图片缩放功能
- HUD4010 Query on The Trees
- HDU - 5036 Operation the Sequence
- iOS UI 沙盒路径的获取及文件的简单存储
- 【LWJGL2 WIKI】【辅助库篇】Slick-Util库:第三部分-读取TrueType字体
- easyui datagrid使用(好)
- UIAlertController在8以下不支持
- UIAlertController在8以下不支持
- 熔断器模式(CircuitBreaker)
- Android开发之Buidler模式初探结合AlertDialog.Builder讲解
- Easyui-DataGrid 的增删查改
- request对象
- 解决 UITableViewCell的点击事件和手势的冲突问题
- 并发队列ConcurrentLinkedQueue和阻塞队列LinkedBlockingQueue用法
- HDFS2—SequenceFile(小文件的解决方案)