Socket的连接创建
代码往下调试进入ConnectionImp下的createNewIO()方法
public synchronized void createNewIO(boolean isForReconnect) throws SQLException { Properties mergedProps = exposeAsProperties(this.props); if (!getHighAvailability()) { //进入该方法 connectOneTryOnly(isForReconnect, mergedProps); return; } //??? connectWithRetries(isForReconnect, mergedProps); }
private void connectOneTryOnly(boolean isForReconnect,
Properties mergedProps) throws SQLException { Exception connectionNotEstablishedBecause = null; try { //IO创建主要在这个方法里 coreConnect(mergedProps); this.connectionId = this.io.getThreadId(); this.isClosed = false; // save state from old connection boolean oldAutoCommit = getAutoCommit(); int oldIsolationLevel = this.isolationLevel; boolean oldReadOnly = isReadOnly(); String oldCatalog = getCatalog(); this.io.setStatementInterceptors(this.statementInterceptors); // Server properties might be different // from previous connection, so initialize // again... initializePropsFromServer(); if (isForReconnect) { // Restore state from old connection setAutoCommit(oldAutoCommit); if (this.hasIsolationLevels) { setTransactionIsolation(oldIsolationLevel); } setCatalog(oldCatalog); setReadOnly(oldReadOnly); } return; } catch (Exception EEE) { if (this.io != null) { this.io.forceClose(); } connectionNotEstablishedBecause = EEE; if (EEE instanceof SQLException) { throw (SQLException)EEE; } SQLException chainedEx = SQLError.createSQLException( Messages.getString("Connection.UnableToConnect"), SQLError.SQL_STATE_UNABLE_TO_CONNECT_TO_DATASOURCE, getExceptionInterceptor()); chainedEx.initCause(connectionNotEstablishedBecause); throw chainedEx; } }
进入coreconnect()方法
private void coreConnect(Properties mergedProps) throws SQLException, IOException { int newPort = 3306; String newHost = "localhost"; String protocol = mergedProps.getProperty(NonRegisteringDriver.PROTOCOL_PROPERTY_KEY); if (protocol != null) { // "new" style URL if ("tcp".equalsIgnoreCase(protocol)) { newHost = normalizeHost(mergedProps.getProperty(NonRegisteringDriver.HOST_PROPERTY_KEY)); newPort = parsePortNumber(mergedProps.getProperty(NonRegisteringDriver.PORT_PROPERTY_KEY, "3306")); } else if ("pipe".equalsIgnoreCase(protocol)) { setSocketFactoryClassName(NamedPipeSocketFactory.class.getName()); String path = mergedProps.getProperty(NonRegisteringDriver.PATH_PROPERTY_KEY); if (path != null) { mergedProps.setProperty(NamedPipeSocketFactory.NAMED_PIPE_PROP_NAME, path); } } else { // normalize for all unknown protocols newHost = normalizeHost(mergedProps.getProperty(NonRegisteringDriver.HOST_PROPERTY_KEY)); newPort = parsePortNumber(mergedProps.getProperty(NonRegisteringDriver.PORT_PROPERTY_KEY, "3306")); } } else { String[] parsedHostPortPair = NonRegisteringDriver .parseHostPortPair(this.hostPortPair); newHost = parsedHostPortPair[NonRegisteringDriver.HOST_NAME_INDEX]; newHost = normalizeHost(newHost); if (parsedHostPortPair[NonRegisteringDriver.PORT_NUMBER_INDEX] != null) { newPort = parsePortNumber(parsedHostPortPair[NonRegisteringDriver.PORT_NUMBER_INDEX]); } } this.port = newPort; this.host = newHost; //MysqlIO的创建,用来于服务器进行通信 this.io = new MysqlIO(newHost, newPort, mergedProps, getSocketFactoryClassName(), getProxy(), getSocketTimeout(), this.largeRowSizeThreshold.getValueAsInt()); this.io.doHandshake(this.user, this.password, this.database); }
MysqlIO构造器的创建
public MysqlIO(String host, int port, Properties props, String socketFactoryClassName, MySQLConnection conn, int socketTimeout, int useBufferRowSizeThreshold) throws IOException, SQLException { this.connection = conn; if (this.connection.getEnablePacketDebug()) { this.packetDebugRingBuffer = new LinkedList(); } this.traceProtocol = this.connection.getTraceProtocol(); this.useAutoSlowLog = this.connection.getAutoSlowLog(); this.useBufferRowSizeThreshold = useBufferRowSizeThreshold; this.useDirectRowUnpack = this.connection.getUseDirectRowUnpack(); this.logSlowQueries = this.connection.getLogSlowQueries(); this.reusablePacket = new Buffer(INITIAL_PACKET_SIZE); this.sendPacket = new Buffer(INITIAL_PACKET_SIZE); this.port = port; this.host = host; this.socketFactoryClassName = socketFactoryClassName; this.socketFactory = createSocketFactory(); this.exceptionInterceptor = this.connection.getExceptionInterceptor(); try {//创建得到一个socket this.mysqlConnection = this.socketFactory.connect(this.host, this.port, props); if (socketTimeout != 0) { try { this.mysqlConnection.setSoTimeout(socketTimeout); } catch (Exception ex) { /* Ignore if the platform does not support it */ } } this.mysqlConnection = this.socketFactory.beforeHandshake(); if (this.connection.getUseReadAheadInput()) { //创建input流 this.mysqlInput = new ReadAheadInputStream(this.mysqlConnection.getInputStream(), 16384, this.connection.getTraceProtocol(), this.connection.getLog()); } else if (this.connection.useUnbufferedInput()) { this.mysqlInput = this.mysqlConnection.getInputStream(); } else { this.mysqlInput = new BufferedInputStream(this.mysqlConnection.getInputStream(), 16384); } //创建output流 this.mysqlOutput = new BufferedOutputStream(this.mysqlConnection.getOutputStream(), 16384); this.isInteractiveClient = this.connection.getInteractiveClient(); this.profileSql = this.connection.getProfileSql(); this.autoGenerateTestcaseScript = this.connection.getAutoGenerateTestcaseScript(); this.needToGrabQueryFromPacket = (this.profileSql || this.logSlowQueries || this.autoGenerateTestcaseScript); if (this.connection.getUseNanosForElapsedTime() && Util.nanoTimeAvailable()) { this.useNanosForElapsedTime = true; this.queryTimingUnits = Messages.getString("Nanoseconds"); } else { this.queryTimingUnits = Messages.getString("Milliseconds"); } if (this.connection.getLogSlowQueries()) { calculateSlowQueryThreshold(); } } catch (IOException ioEx) { throw SQLError.createCommunicationsException(this.connection, 0, 0, ioEx, getExceptionInterceptor()); } }