mysql jdbc驱动源码分析(Statement的executeQuery 和executeUpdate方法)

2016-01-03
在前面的章节中我们获取了Statement对象,下面我们来看看Statement的executeQuery() 和executeUpdate() 方法来执行相关操作。

首先来看看StatementImpl对象的executeQuery() 方法,源码如下:

* Execute a SQL statement that returns a single ResultSet
* @param sql
*            typically a static SQL SELECT statement
* @return a ResulSet that contains the data produced by the query
* @exception SQLException
*                if a database access error occurs
// 查询方法,执行给定的sql获取返回结果集
public java.sql.ResultSet executeQuery(String sql) throws SQLException {
synchronized (checkClosed().getConnectionMutex()) {
// mysql 服务的链接
MySQLConnection locallyScopedConn = this.connection;

this.retrieveGeneratedKeys = false;

// 检测sql是不是null,或者sql.length=0,如果为null 或为0 则抛出异常
// 设置超时

if (this.doEscapeProcessing) {
// 开始执行,执行sql语句,获取结果
Object escapedSqlResult = EscapeProcessor.escapeSQL(sql, locallyScopedConn.serverSupportsConvertFn(), this.connection);

if (escapedSqlResult instanceof String) {
sql = (String) escapedSqlResult;
} else {
sql = ((EscapeProcessorResult) escapedSqlResult).escapedSql;

char firstStatementChar = StringUtils.firstAlphaCharUc(sql, findStartOfStatement(sql));

if (sql.charAt(0) == '/') {
if (sql.startsWith(PING_MARKER)) {

return this.results;

checkForDml(sql, firstStatementChar);


CachedResultSetMetaData cachedMetaData = null;

if (useServerFetch()) {
this.results = createResultSetUsingServerFetch(sql);

return this.results;

CancelTask timeoutTask = null;

String oldCatalog = null;

try {
if (locallyScopedConn.getEnableQueryTimeouts() && this.timeoutInMillis != 0 && locallyScopedConn.versionMeetsMinimum(5, 0, 0)) {
timeoutTask = new CancelTask(this);
locallyScopedConn.getCancelTimer().schedule(timeoutTask, this.timeoutInMillis);

if (!locallyScopedConn.getCatalog().equals(this.currentCatalog)) {
oldCatalog = locallyScopedConn.getCatalog();

// Check if we have cached metadata for this query...
// 检测这个查询语句有没有缓存,如果有则直接取缓存
Field[] cachedFields = null;

if (locallyScopedConn.getCacheResultSetMetadata()) {
cachedMetaData = locallyScopedConn.getCachedMetaData(sql);

if (cachedMetaData != null) {
cachedFields = cachedMetaData.fields;


// 调用 MySQLConnection的方法execSQL() 方法来执行给定的sql语句,返回结构集
// 我们发现MySQLConnection是一个接口,而他的具体实现是ConnectionImpl这个类
this.results = locallyScopedConn.execSQL(this, sql, this.maxRows, null, this.resultSetType, this.resultSetConcurrency,
createStreamingResultSet(), this.currentCatalog, cachedFields);

if (timeoutTask != null) {
if (timeoutTask.caughtWhileCancelling != null) {
throw timeoutTask.caughtWhileCancelling;



timeoutTask = null;

synchronized (this.cancelTimeoutMutex) {
if (this.wasCancelled) {
SQLException cause = null;

if (this.wasCancelledByTimeout) {
cause = new MySQLTimeoutException();
} else {
cause = new MySQLStatementCancelledException();


throw cause;
} finally {

if (timeoutTask != null) {


if (oldCatalog != null) {

this.lastInsertId = this.results.getUpdateID();

if (cachedMetaData != null) {
locallyScopedConn.initializeResultsMetadataFromCache(sql, cachedMetaData, this.results);
} else {
if (this.connection.getCacheResultSetMetadata()) {
locallyScopedConn.initializeResultsMetadataFromCache(sql, null /* will be created */, this.results);

return this.results;


* Send a query to the server. Returns one of the ResultSet objects. This is
* synchronized, so Statement's queries will be serialized.
* @param callingStatement
* @param sql
*            the SQL statement to be executed
* @param maxRows
* @param packet
* @param resultSetType
* @param resultSetConcurrency
* @param streamResults
* @param queryIsSelectOnly
* @param catalog
* @param unpackFields
* @return a ResultSet holding the results
* @exception SQLException
*                if a database error occurs

// ResultSet execSQL(Statement callingStatement, String sql,
// int maxRowsToRetreive, String catalog) throws SQLException {
// return execSQL(callingStatement, sql, maxRowsToRetreive, null,
// java.sql.ResultSet.TYPE_FORWARD_ONLY,
// }
// ResultSet execSQL(Statement callingStatement, String sql, int maxRows,
// int resultSetType, int resultSetConcurrency, boolean streamResults,
// boolean queryIsSelectOnly, String catalog, boolean unpackFields) throws
// SQLException {
// return execSQL(callingStatement, sql, maxRows, null, resultSetType,
// resultSetConcurrency, streamResults, queryIsSelectOnly, catalog,
// unpackFields);
// }
// 查询方法开始
// 方法参数 statementImp实例,查询sql语句,查询最大行数,buffer缓存区,result结果的一些设置,result结果并发设置,结果流,日志,缓存属性
public ResultSetInternalMethods execSQL(StatementImpl callingStatement, String sql, int maxRows, Buffer packet, int resultSetType,
int resultSetConcurrency, boolean streamResults, String catalog, Field[] cachedMetadata) throws SQLException {
return execSQL(callingStatement, sql, maxRows, packet, resultSetType, resultSetConcurrency, streamResults, catalog, cachedMetadata, false);

通过注释和参数我们也能大体了解到这个方法所实现的功能。就是向服务发送查询语句,而获取一个ResultSet object


public ResultSetInternalMethods execSQL(StatementImpl callingStatement, String sql, int maxRows, Buffer packet, int resultSetType,
int resultSetConcurrency, boolean streamResults, String catalog, Field[] cachedMetadata, boolean isBatch) throws SQLException {
// 同步方法
synchronized (getConnectionMutex()) {
// Fall-back if the master is back online if we've issued queriesBeforeRetryMaster queries since we failed over
long queryStartTime = 0;
// 查询末尾位置
int endOfQueryPacketPosition = 0;
// 这个那边第一次调用的时候传递就是null
if (packet != null) {
endOfQueryPacketPosition = packet.getPosition();

if (getGatherPerformanceMetrics()) {
// 设置时间为当前系统时间
queryStartTime = System.currentTimeMillis();

this.lastQueryFinishedTime = 0; // we're busy!

if ((getHighAvailability()) && (this.autoCommit || getAutoReconnectForPools()) && this.needsPing && !isBatch) {
try {
pingInternal(false, 0);

this.needsPing = false;
} catch (Exception Ex) {

try {
// packet 就是查询结果数据包
if (packet == null) {
String encoding = null;
// 获取使用的编码
if (getUseUnicode()) {
// 设置编码方法
encoding = getEncoding();
// 直接执行查询
return this.io.sqlQueryDirect(callingStatement, sql, encoding, null, maxRows, resultSetType, resultSetConcurrency, streamResults, catalog,
// 从缓存中获取数据
return this.io.sqlQueryDirect(callingStatement, null, null, packet, maxRows, resultSetType, resultSetConcurrency, streamResults, catalog,
} catch (java.sql.SQLException sqlE) {
// don't clobber SQL exceptions

if (getDumpQueriesOnException()) {
String extractedSql = extractSqlFromPacket(sql, packet, endOfQueryPacketPosition);
StringBuilder messageBuf = new StringBuilder(extractedSql.length() + 32);
messageBuf.append("\n\nQuery being executed when exception was thrown:\n");

sqlE = appendMessageToException(sqlE, messageBuf.toString(), getExceptionInterceptor());

if ((getHighAvailability())) {
this.needsPing = true;
} else {
String sqlState = sqlE.getSQLState();

if ((sqlState != null) && sqlState.equals(SQLError.SQL_STATE_COMMUNICATION_LINK_FAILURE)) {

throw sqlE;
} catch (Exception ex) {
if (getHighAvailability()) {
this.needsPing = true;
} else if (ex instanceof IOException) {

SQLException sqlEx = SQLError.createSQLException(Messages.getString("Connection.UnexpectedException"), SQLError.SQL_STATE_GENERAL_ERROR,

throw sqlEx;
} finally {
if (getMaintainTimeStats()) {
this.lastQueryFinishedTime = System.currentTimeMillis();

if (getGatherPerformanceMetrics()) {
long queryTime = System.currentTimeMillis() - queryStartTime;


在上面的方法中我们看到了其调用了 this.io.sqlQueryDirect() 方法进行查询,而这个io对象就是ConnectionImpl 类的一个变量,类型是MysqlIO类型具体代码如下:

/** The I/O abstraction interface (network conn to MySQL server */
private transient MysqlIO io = null;
而从中我们看到具体的查询方法是在MysqlIO 这个类中实现的。调用方法的源码如下:

* Send a query stored in a packet directly to the server.
* 将一个查询存储在一个数据包中,直接发送到服务器。
* @param callingStatement       调用查询语句
* @param resultSetConcurrency   ResultSet结果并发参数设置
* @param characterEncoding      查询字符编码
* @param queryPacket            查询报文
* @param maxRows                最大行数
* @param conn                   和server的链接
* @param resultSetType          ResultSet结果集类型
* @param resultSetConcurrency   ResultSet结果集并发设置
* @param streamResults          流结果集
* @param catalog
* @param unpackFieldInfo
*            should we read MYSQL_FIELD info (if available)?
* @throws Exception
// 这是一个finall 方法我们不行从写也不能重载
final ResultSetInternalMethods sqlQueryDirect(StatementImpl callingStatement, String query, String characterEncoding, Buffer queryPacket, int maxRows,
int resultSetType, int resultSetConcurrency, boolean streamResults, String catalog, Field[] cachedMetadata) throws Exception {

try {
if (this.statementInterceptors != null) {
ResultSetInternalMethods interceptedResults = invokeStatementInterceptorsPre(query, callingStatement, false);

if (interceptedResults != null) {
return interceptedResults;

long queryStartTime = 0;
long queryEndTime = 0;

String statementComment = this.connection.getStatementComment();

if (this.connection.getIncludeThreadNamesAsStatementComment()) {
statementComment = (statementComment != null ? statementComment + ", " : "") + "java thread: " + Thread.currentThread().getName();

if (query != null) {
// We don't know exactly how many bytes we're going to get from the query. Since we're dealing with Unicode, the max is 2, so pad it
// (2 * query) + space for headers
int packLength = HEADER_LENGTH + 1 + (query.length() * 3) + 2;

byte[] commentAsBytes = null;

if (statementComment != null) {
commentAsBytes = StringUtils.getBytes(statementComment, null, characterEncoding, this.connection.getServerCharset(),
this.connection.parserKnowsUnicode(), getExceptionInterceptor());

packLength += commentAsBytes.length;
packLength += 6; // for /*[space] [space]*/

if (this.sendPacket == null) {
this.sendPacket = new Buffer(packLength);
} else {

this.sendPacket.writeByte((byte) MysqlDefs.QUERY);

if (commentAsBytes != null) {

if (characterEncoding != null) {
if (this.platformDbCharsetMatches) {
this.sendPacket.writeStringNoNull(query, characterEncoding, this.connection.getServerCharset(), this.connection.parserKnowsUnicode(),
} else {
if (StringUtils.startsWithIgnoreCaseAndWs(query, "LOAD DATA")) {
} else {
this.sendPacket.writeStringNoNull(query, characterEncoding, this.connection.getServerCharset(),
this.connection.parserKnowsUnicode(), this.connection);
} else {

queryPacket = this.sendPacket;

byte[] queryBuf = null;
int oldPacketPosition = 0;

if (this.needToGrabQueryFromPacket) {
queryBuf = queryPacket.getByteBuffer();

// save the packet position
// 获取查询报文的位置,并保存。
oldPacketPosition = queryPacket.getPosition();

queryStartTime = getCurrentTimeNanosOrMillis();

if (this.autoGenerateTestcaseScript) {
String testcaseQuery = null;

if (query != null) {
if (statementComment != null) {
testcaseQuery = "/* " + statementComment + " */ " + query;
} else {
testcaseQuery = query;
} else {
testcaseQuery = StringUtils.toString(queryBuf, 5, (oldPacketPosition - 5));

StringBuilder debugBuf = new StringBuilder(testcaseQuery.length() + 32);

// Send query command and sql query string
// 发送查询命令和sql语句,获取查询报文
Buffer resultPacket = sendCommand(MysqlDefs.QUERY, null, queryPacket, false, null, 0);

long fetchBeginTime = 0;
long fetchEndTime = 0;

String profileQueryToLog = null;

boolean queryWasSlow = false;

if (this.profileSql || this.logSlowQueries) {
queryEndTime = getCurrentTimeNanosOrMillis();

boolean shouldExtractQuery = false;

if (this.profileSql) {
shouldExtractQuery = true;
} else if (this.logSlowQueries) {
long queryTime = queryEndTime - queryStartTime;

boolean logSlow = false;

if (!this.useAutoSlowLog) {
logSlow = queryTime > this.connection.getSlowQueryThresholdMillis();
} else {
logSlow = this.connection.isAbonormallyLongQuery(queryTime);


if (logSlow) {
shouldExtractQuery = true;
queryWasSlow = true;

if (shouldExtractQuery) {
// Extract the actual query from the network packet
boolean truncated = false;

int extractPosition = oldPacketPosition;

if (oldPacketPosition > this.connection.getMaxQuerySizeToLog()) {
extractPosition = this.connection.getMaxQuerySizeToLog() + 5;
truncated = true;

profileQueryToLog = StringUtils.toString(queryBuf, 5, (extractPosition - 5));

if (truncated) {
profileQueryToLog += Messages.getString("MysqlIO.25");

fetchBeginTime = queryEndTime;
// 查询的结果集
ResultSetInternalMethods rs = readAllResults(callingStatement, maxRows, resultSetType, resultSetConcurrency, streamResults, catalog, resultPacket,
false, -1L, cachedMetadata);

if (queryWasSlow && !this.serverQueryWasSlow /* don't log slow queries twice */) {
StringBuilder mesgBuf = new StringBuilder(48 + profileQueryToLog.length());

new Object[] { String.valueOf(this.useAutoSlowLog ? " 95% of all queries " : this.slowQueryThreshold), this.queryTimingUnits,
Long.valueOf(queryEndTime - queryStartTime) }));

ProfilerEventHandler eventSink = ProfilerEventHandlerFactory.getInstance(this.connection);

eventSink.consumeEvent(new ProfilerEvent(ProfilerEvent.TYPE_SLOW_QUERY, "", catalog, this.connection.getId(),
(callingStatement != null) ? callingStatement.getId() : 999, ((ResultSetImpl) rs).resultId, System.currentTimeMillis(),
(int) (queryEndTime - queryStartTime), this.queryTimingUnits, null, LogUtils.findCallingClassAndMethod(new Throwable()), mesgBuf

if (this.connection.getExplainSlowQueries()) {
if (oldPacketPosition < MAX_QUERY_SIZE_TO_EXPLAIN) {
explainSlowQuery(queryPacket.getBytes(5, (oldPacketPosition - 5)), profileQueryToLog);
} else {
this.connection.getLog().logWarn(Messages.getString("MysqlIO.28") + MAX_QUERY_SIZE_TO_EXPLAIN + Messages.getString("MysqlIO.29"));

if (this.logSlowQueries) {

ProfilerEventHandler eventSink = ProfilerEventHandlerFactory.getInstance(this.connection);

if (this.queryBadIndexUsed && this.profileSql) {
eventSink.consumeEvent(new ProfilerEvent(ProfilerEvent.TYPE_SLOW_QUERY, "", catalog, this.connection.getId(),
(callingStatement != null) ? callingStatement.getId() : 999, ((ResultSetImpl) rs).resultId, System.currentTimeMillis(),
(queryEndTime - queryStartTime), this.queryTimingUnits, null, LogUtils.findCallingClassAndMethod(new Throwable()), Messages
.getString("MysqlIO.33") + profileQueryToLog));

if (this.queryNoIndexUsed && this.profileSql) {
eventSink.consumeEvent(new ProfilerEvent(ProfilerEvent.TYPE_SLOW_QUERY, "", catalog, this.connection.getId(),
(callingStatement != null) ? callingStatement.getId() : 999, ((ResultSetImpl) rs).resultId, System.currentTimeMillis(),
(queryEndTime - queryStartTime), this.queryTimingUnits, null, LogUtils.findCallingClassAndMethod(new Throwable()), Messages
.getString("MysqlIO.35") + profileQueryToLog));

if (this.serverQueryWasSlow && this.profileSql) {
eventSink.consumeEvent(new ProfilerEvent(ProfilerEvent.TYPE_SLOW_QUERY, "", catalog, this.connection.getId(),
(callingStatement != null) ? callingStatement.getId() : 999, ((ResultSetImpl) rs).resultId, System.currentTimeMillis(),
(queryEndTime - queryStartTime), this.queryTimingUnits, null, LogUtils.findCallingClassAndMethod(new Throwable()), Messages
.getString("MysqlIO.ServerSlowQuery") + profileQueryToLog));

if (this.profileSql) {
fetchEndTime = getCurrentTimeNanosOrMillis();

ProfilerEventHandler eventSink = ProfilerEventHandlerFactory.getInstance(this.connection);

eventSink.consumeEvent(new ProfilerEvent(ProfilerEvent.TYPE_QUERY, "", catalog, this.connection.getId(),
(callingStatement != null) ? callingStatement.getId() : 999, ((ResultSetImpl) rs).resultId, System.currentTimeMillis(),
(queryEndTime - queryStartTime), this.queryTimingUnits, null, LogUtils.findCallingClassAndMethod(new Throwable()), profileQueryToLog));

eventSink.consumeEvent(new ProfilerEvent(ProfilerEvent.TYPE_FETCH, "", catalog, this.connection.getId(),
(callingStatement != null) ? callingStatement.getId() : 999, ((ResultSetImpl) rs).resultId, System.currentTimeMillis(),
(fetchEndTime - fetchBeginTime), this.queryTimingUnits, null, LogUtils.findCallingClassAndMethod(new Throwable()), null));

if (this.hadWarnings) {

if (this.statementInterceptors != null) {
ResultSetInternalMethods interceptedResults = invokeStatementInterceptorsPost(query, callingStatement, rs, false, null);

if (interceptedResults != null) {
rs = interceptedResults;

return rs;
} catch (SQLException sqlEx) {
if (this.statementInterceptors != null) {
invokeStatementInterceptorsPost(query, callingStatement, null, false, sqlEx); // we don't do anything with the result set in this case

if (callingStatement != null) {
synchronized (callingStatement.cancelTimeoutMutex) {
if (callingStatement.wasCancelled) {
SQLException cause = null;

if (callingStatement.wasCancelledByTimeout) {
cause = new MySQLTimeoutException();
} else {
cause = new MySQLStatementCancelledException();


throw cause;

throw sqlEx;
} finally {

在方法中我们看到了,调用readAllResults() 这个方法来获取ResultSet 对象,readAllResults() 方法源码如下:

// 获取结果解
ResultSetImpl readAllResults(StatementImpl callingStatement, int maxRows, int resultSetType, int resultSetConcurrency, boolean streamResults,
String catalog, Buffer resultPacket, boolean isBinaryEncoded, long preSentColumnCount, Field[] metadataFromCache) throws SQLException {
resultPacket.setPosition(resultPacket.getPosition() - 1);
// 调用方法获取ResultSet对象
ResultSetImpl topLevelResultSet = readResultsForQueryOrUpdate(callingStatement, maxRows, resultSetType, resultSetConcurrency, streamResults, catalog,
resultPacket, isBinaryEncoded, preSentColumnCount, metadataFromCache);

ResultSetImpl currentResultSet = topLevelResultSet;

boolean checkForMoreResults = ((this.clientParam & CLIENT_MULTI_RESULTS) != 0);

boolean serverHasMoreResults = (this.serverStatus & SERVER_MORE_RESULTS_EXISTS) != 0;

// TODO: We need to support streaming of multiple result sets
if (serverHasMoreResults && streamResults) {
//throw SQLError.createSQLException(Messages.getString("MysqlIO.23"),
if (topLevelResultSet.getUpdateCount() != -1) {


return topLevelResultSet;

boolean moreRowSetsExist = checkForMoreResults & serverHasMoreResults;

while (moreRowSetsExist) {
Buffer fieldPacket = checkErrorPacket();

ResultSetImpl newResultSet = readResultsForQueryOrUpdate(callingStatement, maxRows, resultSetType, resultSetConcurrency, streamResults, catalog,
fieldPacket, isBinaryEncoded, preSentColumnCount, metadataFromCache);


currentResultSet = newResultSet;

moreRowSetsExist = (this.serverStatus & SERVER_MORE_RESULTS_EXISTS) != 0;

if (!streamResults) {


return topLevelResultSet;

在这方法中我们看到了,是调用了readResultsForQueryOrUpdate() 方法进行查询获取ResultSet 对象的。下面我们看看这个方法源码:

* Reads one result set off of the wire, if the result is actually an
* update count, creates an update-count only result set.
* 读取一个结果,如果结果是实际的
* 更新计数,创建一个更新计数结果集。
* @param callingStatement
* @param maxRows
*            the maximum rows to return in the result set.
* 结果集合中返回的最大行
* @param resultSetType
*            scrollability
* @param resultSetConcurrency
*            updatability
* @param streamResults
*            should the driver leave the results on the wire,
*            and read them only when needed?
* 驱动应该把结果放在电报上,只有在需要时才看出来吗?
* @param catalog
*            the catalog in use
* @param resultPacket
*            the first packet of information in the result set
* 结果集合中的第一个数据包
* @param isBinaryEncoded
*            is this result set from a prepared statement?
* @param preSentColumnCount
*            do we already know the number of columns?
* 我们已经知道的列数吗?
* @param unpackFieldInfo
*            should we unpack the field information?
* @return a result set that either represents the rows, or an update count
* @throws SQLException
*             if an error occurs while reading the rows
// 这里是执行sql查询或update的具体方法。而各个参数具体意义通过注释也能了解一二

protected final ResultSetImpl readResultsForQueryOrUpdate(StatementImpl callingStatement, int maxRows, int resultSetType, int resultSetConcurrency,
boolean streamResults, String catalog, Buffer resultPacket, boolean isBinaryEncoded, long preSentColumnCount, Field[] metadataFromCache)
throws SQLException {
// 从查询的结果的数据包中获取总行数
long columnCount = resultPacket.readFieldLength();
// 如果是0 则执行updates方法来执行
if (columnCount == 0) {
// 调用方法执行保存获取ResultSet方法
return buildResultSetWithUpdates(callingStatement, resultPacket);
} else if (columnCount == Buffer.NULL_LENGTH) {
String charEncoding = null;

if (this.connection.getUseUnicode()) {
charEncoding = this.connection.getEncoding();

String fileName = null;

if (this.platformDbCharsetMatches) {
fileName = ((charEncoding != null) ? resultPacket.readString(charEncoding, getExceptionInterceptor()) : resultPacket.readString());
} else {
fileName = resultPacket.readString();
// 向服务器发送信息
return sendFileToServer(callingStatement, fileName);
} else {
// 调用方法获取resultset
com.mysql.jdbc.ResultSetImpl results = getResultSet(callingStatement, columnCount, maxRows, resultSetType, resultSetConcurrency, streamResults,
catalog, isBinaryEncoded, metadataFromCache);

return results;

这个方法中我们看到了 调用getResultSet() 方法获取ResultSet实例,源码如下:

* Build a result set. Delegates to buildResultSetWithRows() to build a
* JDBC-version-specific ResultSet, given rows as byte data, and field
* information.
* @param callingStatement
* @param columnCount
*            the number of columns in the result set
* @param maxRows
*            the maximum number of rows to read (-1 means all rows)
* @param resultSetType
*            (TYPE_FORWARD_ONLY, TYPE_SCROLL_????)
* @param resultSetConcurrency
*            the type of result set (CONCUR_UPDATABLE or
*            READ_ONLY)
* @param streamResults
*            should the result set be read all at once, or
*            streamed?
* @param catalog
*            the database name in use when the result set was created
* @param isBinaryEncoded
*            is this result set in native encoding?
* @param unpackFieldInfo
*            should we read MYSQL_FIELD info (if available)?
* @return a result set
* @throws SQLException
*             if a database access error occurs
protected ResultSetImpl getResultSet(StatementImpl callingStatement, long columnCount, int maxRows, int resultSetType, int resultSetConcurrency,
boolean streamResults, String catalog, boolean isBinaryEncoded, Field[] metadataFromCache) throws SQLException {
Buffer packet; // The packet from the server
Field[] fields = null;

// Read in the column information

if (metadataFromCache == null /* we want the metadata from the server */) {
fields = new Field[(int) columnCount];

for (int i = 0; i < columnCount; i++) {
Buffer fieldPacket = null;

fieldPacket = readPacket();
fields[i] = unpackField(fieldPacket, false);
} else {
for (int i = 0; i < columnCount; i++) {

packet = reuseAndReadPacket(this.reusablePacket);


// Handle cursor-based fetch first

if (this.connection.versionMeetsMinimum(5, 0, 2) && this.connection.getUseCursorFetch() && isBinaryEncoded && callingStatement != null
&& callingStatement.getFetchSize() != 0 && callingStatement.getResultSetType() == ResultSet.TYPE_FORWARD_ONLY) {
ServerPreparedStatement prepStmt = (com.mysql.jdbc.ServerPreparedStatement) callingStatement;

boolean usingCursor = true;

// Server versions 5.0.5 or newer will only open a cursor and set this flag if they can, otherwise they punt and go back to mysql_store_results()
// behavior

if (this.connection.versionMeetsMinimum(5, 0, 5)) {
usingCursor = (this.serverStatus & SERVER_STATUS_CURSOR_EXISTS) != 0;

if (usingCursor) {
RowData rows = new RowDataCursor(this, prepStmt, fields);

ResultSetImpl rs = buildResultSetWithRows(callingStatement, catalog, fields, rows, resultSetType, resultSetConcurrency, isBinaryEncoded);

if (usingCursor) {

return rs;

RowData rowData = null;

if (!streamResults) {
rowData = readSingleRowSet(columnCount, maxRows, resultSetConcurrency, isBinaryEncoded, (metadataFromCache == null) ? fields : metadataFromCache);
} else {
rowData = new RowDataDynamic(this, (int) columnCount, (metadataFromCache == null) ? fields : metadataFromCache, isBinaryEncoded);
this.streamingData = rowData;

ResultSetImpl rs = buildResultSetWithRows(callingStatement, catalog, (metadataFromCache == null) ? fields : metadataFromCache, rowData, resultSetType,
resultSetConcurrency, isBinaryEncoded);

return rs;

我们又看到 他是调用buildResultSetWithRows 这个方法来获取 ResultSetImpl 实例的。具体源码如下:

// 获取resultset 类型的对象
private com.mysql.jdbc.ResultSetImpl buildResultSetWithRows(StatementImpl callingStatement, String catalog, com.mysql.jdbc.Field[] fields, RowData rows,
int resultSetType, int resultSetConcurrency, boolean isBinaryEncoded) throws SQLException {
ResultSetImpl rs = null;
// 按照结果集并发的设置来执行获取。
switch (resultSetConcurrency) {
case java.sql.ResultSet.CONCUR_READ_ONLY:
rs = com.mysql.jdbc.ResultSetImpl.getInstance(catalog, fields, rows, this.connection, callingStatement, false);

if (isBinaryEncoded) {


case java.sql.ResultSet.CONCUR_UPDATABLE:
rs = com.mysql.jdbc.ResultSetImpl.getInstance(catalog, fields, rows, this.connection, callingStatement, true);


return com.mysql.jdbc.ResultSetImpl.getInstance(catalog, fields, rows, this.connection, callingStatement, false);


return rs;

我们看到这里直接调用到了com.mysql.jdbc.ResultSetImpl.getInstance() 方法来获取实例。具体源码如下:

protected static ResultSetImpl getInstance(String catalog, Field[] fields, RowData tuples, MySQLConnection conn, StatementImpl creatorStmt,
boolean isUpdatable) throws SQLException {
if (!Util.isJdbc4()) {
if (!isUpdatable) {
return new ResultSetImpl(catalog, fields, tuples, conn, creatorStmt);

return new UpdatableResultSet(catalog, fields, tuples, conn, creatorStmt);

if (!isUpdatable) {
return (ResultSetImpl) Util.handleNewInstance(JDBC_4_RS_5_ARG_CTOR, new Object[] { catalog, fields, tuples, conn, creatorStmt },

return (ResultSetImpl) Util.handleNewInstance(JDBC_4_UPD_RS_5_ARG_CTOR, new Object[] { catalog, fields, tuples, conn, creatorStmt },

ResultSetImpl 类的构造函数如下:

* Creates a new ResultSet object.
* @param catalog
*            the database in use when we were created
* @param fields
*            an array of Field objects (basically, the ResultSet MetaData)
* @param tuples
*            actual row data
* @param conn
*            the Connection that created us.
* @param creatorStmt
* @throws SQLException
*             if an error occurs
public ResultSetImpl(String catalog, Field[] fields, RowData tuples, MySQLConnection conn, StatementImpl creatorStmt) throws SQLException {
this.connection = conn;

this.retainOwningStatement = false;

if (this.connection != null) {
this.exceptionInterceptor = this.connection.getExceptionInterceptor();
this.useStrictFloatingPoint = this.connection.getStrictFloatingPoint();
this.connectionId = this.connection.getId();
this.useFastDateParsing = this.connection.getUseFastDateParsing();
this.profileSql = this.connection.getProfileSql();
this.retainOwningStatement = this.connection.getRetainStatementAfterResultSetClose();
this.jdbcCompliantTruncationForReads = this.connection.getJdbcCompliantTruncationForReads();
this.useFastIntParsing = this.connection.getUseFastIntParsing();
this.serverTimeZoneTz = this.connection.getServerTimezoneTZ();
this.padCharsWithSpace = this.connection.getPadCharsWithSpace();

this.owningStatement = creatorStmt;

this.catalog = catalog;

this.fields = fields;
this.rowData = tuples;
this.updateCount = this.rowData.size();

if (NonRegisteringDriver.DEBUG) {
System.out.println(Messages.getString("ResultSet.Retrieved__1") + this.updateCount + " rows");

this.reallyResult = true;

// Check for no results
if (this.rowData.size() > 0) {
if (this.updateCount == 1) {
if (this.thisRow == null) {
this.rowData.close(); // empty result set
this.updateCount = -1;
} else {
this.thisRow = null;


if (this.fields != null) {
} // else called by Connection.initializeResultsMetadataFromCache() when cached
this.useLegacyDatetimeCode = this.connection.getUseLegacyDatetimeCode();

this.useColumnNamesInFindColumn = this.connection.getUseColumnNamesInFindColumn();


OK 到这里我们也知道,其实什么也没看到就是调用了ResultSetImpl的构造函数创建了一个对象。


我们从整个过程中看到,所有的基础都是Connection 即客户端和服务端的链接也就是和服务器Socket的链接,如果socket断了,就不能进行沟通了。而当有了Socket之后我们可以执行查询方法或更新方法来获取结果集对象,但是在获取结果集对象的时候我们要设置结果集的类型,因为设置不同的结果集类型,我们对获得的结果集对象ResultSet对象会有不同的操作即有的可以向结果集前端移动动,有的可以向结果集后端移动等等,这些都要在生成ResultSet 对象前进行设置。


