前言:
在上一篇文章中,我们分析了client与mysql-server连接的过程。在连接建立完成后,后续就要在连接上执行CRUD操作了。本篇文章我们来分析下最复杂的查询操作。
1.代码准备
环境与上一篇文章一样,下面是使用JDBC进行查询的示例
public static void query() {
// 使用DBConn中提供的创建连接的方法
conn = DBConn.conn();
String sql = "select * from student";
ResultSet res = null;
try {
// 执行查询
res = conn.createStatement().executeQuery(sql);
ResultSetMetaData meta = res.getMetaData();
String str = "";
for (int i = 1; i <= meta.getColumnCount(); i++) {
str += meta.getColumnName(i) + " ";
}
System.out.println("\t" + str);
str = "";
// 遍及结果集
while (res.next()) {
for (int i = 1; i <= meta.getColumnCount(); i++) {
str += res.getString(i) + " ";
}
System.out.println("\t" + str);
str = "";
}
} catch (SQLException e) {
e.printStackTrace();
}
}
2.查询过程分析
2.1 client发送查询请求
// 1.StatementImpl.executeQuery()
public ResultSet executeQuery(String sql) throws SQLException {
synchronized(this.checkClosed().getConnectionMutex()) {
MySQLConnection locallyScopedConn = this.connection;
...
locallyScopedConn.setSessionMaxRows(this.maxRows);
this.statementBegins();
// 执行SQL
this.results = locallyScopedConn.execSQL(this, sql, this.maxRows, (Buffer)null, this.resultSetType, this.resultSetConcurrency, doStreaming, this.currentCatalog, cachedFields);
...
}
}
// 2.ConnectionImpl.execSQL()
public ResultSetInternalMethods execSQL(StatementImpl callingStatement, String sql, int maxRows, Buffer packet, int resultSetType, int resultSetConcurrency, boolean streamResults, String catalog, Field[] cachedMetadata, boolean isBatch) throws SQLException {
synchronized(this.getConnectionMutex()) {
long queryStartTime = 0L;
...
if (packet == null) {
String encoding = null;
if (this.getUseUnicode()) {
encoding = this.getEncoding();
}
// 交由MysqlIO来执行
ResultSetInternalMethods var36 = this.io.sqlQueryDirect(callingStatement, sql, encoding, (Buffer)null, maxRows, resultSetType, resultSetConcurrency, streamResults, catalog, cachedMetadata);
return var36;
}
}
}
// 3.MysqlIO.sqlQueryDirect()
final ResultSetInternalMethods sqlQueryDirect(StatementImpl callingStatement, String query, String characterEncoding, Buffer queryPacket, int maxRows, int resultSetType, int resultSetConcurrency, boolean streamResults, String catalog, Field[] cachedMetadata) throws Exception {
++this.statementExecutionDepth;
if (characterEncoding != null) {
if (this.platformDbCharsetMatches) {
this.sendPacket.writeStringNoNull(query, characterEncoding, this.connection.getServerCharset(), this.connection.parserKnowsUnicode(), this.connection);
} else if (StringUtils.startsWithIgnoreCaseAndWs(query, "LOAD DATA")) {
this.sendPacket.writeBytesNoNull(StringUtils.getBytes(query));
} else {
// 拼装发送包信息
this.sendPacket.writeStringNoNull(query, characterEncoding, this.connection.getServerCharset(), this.connection.parserKnowsUnicode(), this.connection);
}
} else {
this.sendPacket.writeStringNoNull(query);
}
...
// 执行发送
Buffer resultPacket = this.sendCommand(3, (String)null, queryPacket, false, (String)null, 0);
...
}
}
// 4.MysqlIO.sendCommand()
final Buffer sendCommand(int command, String extraData, Buffer queryPacket, boolean skipCheck, String extraDataCharEncoding, int timeoutMillis) throws SQLException {
...
this.packetSequence = -1;
this.compressedPacketSequence = -1;
// 调用send方法进行最终发送
this.send(queryPacket, queryPacket.getPosition());
}
// 5.MysqlIO.send()
private final void send(Buffer packet, int packetLen) throws SQLException {
...
Buffer packetToSend = packet;
packet.setPosition(0);
packet.writeLongInt(packetLen - 4);
packet.writeByte(this.packetSequence);
...
// 最终使用outputStream进行write操作,将包发送出去
this.mysqlOutput.write(packetToSend.getByteBuffer(), 0, packetLen);
this.mysqlOutput.flush();
}
客户端发送请求协议可以参考:dev.mysql.com/doc/internals/en/com-query.html
2.2 客户端解析服务端返回结果集
在client发送完请求协议之后,服务端处理完成之后,将结果集返回给client。client的处理过程如下:
// MysqlIO.sendCommand()
final Buffer sendCommand(int command, String extraData, Buffer queryPacket, boolean skipCheck, String extraDataCharEncoding, int timeoutMillis) throws SQLException {
...
// 2.1中分析的发送请求代码
this.send(queryPacket, queryPacket.getPosition());
// 接收响应结果集
Buffer returnPacket = null;
if (!skipCheck) {
if (command == 23 || command == 26) {
this.readPacketSequence = 0;
this.packetSequenceReset = true;
}
// 在这里解析
returnPacket = this.checkErrorPacket(command);
}
}
// 2.MysqlIO.checkErrorPacket()
private Buffer checkErrorPacket(int command) throws SQLException {
// 读取结果
resultPacket = this.reuseAndReadPacket(this.reusablePacket);
// 检查包异常信息
this.checkErrorPacket(resultPacket);
}
// 3.MysqlIO.reuseAndReadPacket
private final Buffer reuseAndReadPacket(Buffer reuse, int existingPacketLength) throws SQLException {
// 先读取包头信息,
int lengthRead = this.readFully(this.mysqlInput, this.packetHeaderBuf, 0, 4);
if (lengthRead < 4) {
this.forceClose();
throw new IOException(Messages.getString("MysqlIO.43"));
}
// 获取整包的长度
packetLength = (this.packetHeaderBuf[0] & 255) + ((this.packetHeaderBuf[1] & 255) << 8) + ((this.packetHeaderBuf[2] & 255) << 16);
...
// 根据获取到的包长度,读取对应长度的字节,并读入到Buffer中
int numBytesRead = this.readFully(this.mysqlInput, reuse.getByteBuffer(), 0, packetLength);
...
}
// sendCommand()整个方法主要返回的就是Buffer,里面是响应集字节,下面继续回到sendComand的上层方法
// 4.MysqlIO.sqlQueryDirect
final ResultSetInternalMethods sqlQueryDirect(StatementImpl callingStatement, String query, String characterEncoding, Buffer queryPacket, int maxRows, int resultSetType, int resultSetConcurrency, boolean streamResults, String catalog, Field[] cachedMetadata) throws Exception {
...
Buffer resultPacket = this.sendCommand(3, (String)null, queryPacket, false, (String)null, 0);
...
// 在这里将结果集解析成result
ResultSetInternalMethods rs = this.readAllResults(callingStatement, maxRows, resultSetType, resultSetConcurrency, streamResults, catalog, resultPacket, false, -1L, cachedMetadata);
}
// 5.MysqlIO.readAllResults
protected final ResultSetImpl readResultsForQueryOrUpdate(StatementImpl callingStatement, int maxRows, int resultSetType, int resultSetConcurrency, boolean streamResults, String catalog, Buffer resultPacket, boolean isBinaryEncoded, long preSentColumnCount, Field[] metadataFromCache) throws SQLException {
// 获取返回的column数
long columnCount = resultPacket.readFieldLength();
// 解析结果
ResultSetImpl results = this.getResultSet(callingStatement, columnCount, maxRows, resultSetType, resultSetConcurrency, streamResults, catalog, isBinaryEncoded, metadataFromCache);
}
// 6.MysqlIO.getResultSet
protected ResultSetImpl getResultSet(StatementImpl callingStatement, long columnCount, int maxRows, int resultSetType, int resultSetConcurrency, boolean streamResults, String catalog, boolean isBinaryEncoded, Field[] metadataFromCache) throws SQLException {
Field[] fields = null;
// 先解析field包信息
for(i = 0; (long)i < columnCount; ++i) {
Buffer fieldPacket = null;
fieldPacket = this.readPacket();
fields[i] = this.unpackField(fieldPacket, false);
}
// 解析具体内容包
Buffer packet = this.reuseAndReadPacket(this.reusablePacket);
this.readServerStatusForResultSets(packet);
...
prepStmt = null;
Object rowData;
if (!streamResults) {
rowData = this.readSingleRowSet(columnCount, maxRows, resultSetConcurrency, isBinaryEncoded, metadataFromCache == null ? fields : metadataFromCache);
} else {
rowData = new RowDataDynamic(this, (int)columnCount, metadataFromCache == null ? fields : metadataFromCache, isBinaryEncoded);
this.streamingData = (RowData)rowData;
}
// 将解析到的内容包信息封装到JDBC4ResultSet.rowData中
ResultSetImpl rs = this.buildResultSetWithRows(callingStatement, catalog, metadataFromCache == null ? fields : metadataFromCache, (RowData)rowData, resultSetType, resultSetConcurrency, isBinaryEncoded);
return rs;
}
总结:
针对ResultSet结果集,我们先将column的信息解析出来,然后再解析column的明细信息,为什么会这样做呢?
具体可以参考:dev.mysql.com/doc/internals/en/binary-protocol-resultset.html