目录
1 初始化DruidXADataSource
初始化DruidXADataSource由应用开发人员配置如下完成:
/**
* jdbc.ds1获取配置信息,初始化druidDataSource1
* @return
*/
@Bean(name="druidDataSource1")
@ConfigurationProperties(prefix = "jdbc.ds1")
public DruidXADataSource dataSource0(){
DruidXADataSource dataSource = new DruidXADataSource();
return dataSource;
}
点击DruidXADataSource源码发现继承了DruidDataSource,所以根据@ConfigurationProperties(prefix = "jdbc.ds1")将会读取application.properties配置文件下jdbc.ds1开头的配置信息并初始化DruidDataSource的连接配置,继承关系如下:
public class DruidXADataSource extends DruidDataSource implements XADataSource {
}
通过debug观察结果如下:
2 初始化AtomikosDataSourceBean
这里用X/Open DTP模型再做下说明。前面提到模型包括了应用程序(AP)、事务管理器(TM)、资源管理器(RM)、通信资源管理器(CRM)四部分。
- 应用程序(AP)则就是我们的客户端程序
- 事务管理器(TM)就是我们引入的Atomikos工具的作用,它将帮助我们协调RM的分布式事务的提交与回滚来保证分布式数据库数据的一致性
- 资源管理器(RM)则相当于是每一个本地dataSource的管理,Druid主要是这一块做了支持
- 通信资源管理器(CRM)我理解这里不涉及,不做说明
接下来继续查看下来初始化的代码,将new一个AtomikosDataSourceBean实例,并初始化DruidXADataSource作为成员对象管理。
/**
* 实例化AtomikosDataSourceBean,并且set Druid初始化的DruidXADataSource
* @param druidDataSource1
* @return
*/
@Primary
@Bean(name = "dataSource1")
public AtomikosDataSourceBean dataSource(@Qualifier("druidDataSource1") DruidXADataSource druidDataSource1){
AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
try {
druidDataSource1.setFilters("stat");
xaDataSource.setXaDataSource(druidDataSource1);
xaDataSource.setMaxPoolSize(10);
xaDataSource.setMinPoolSize(5);
xaDataSource.setUniqueResourceName("dataSource1");
} catch (SQLException e) {
System.out.println("dataSource1 init error!"+e);
}
return xaDataSource;
}
查看AtomikosDataSourceBean源码如下,AtomikosDataSourceBean实现了InitializingBean接口,将会初始化调用afterPropertiesSet()方法:
public class AtomikosDataSourceBean extends com.atomikos.jdbc.AtomikosDataSourceBean implements BeanNameAware, InitializingBean, DisposableBean {
private String beanName;
public AtomikosDataSourceBean() {
}
public void setBeanName(String name) {
this.beanName = name;
}
// 该方法会在实现InitializingBean接口后,在对象初始化后调用
public void afterPropertiesSet() throws Exception {
if (!StringUtils.hasLength(this.getUniqueResourceName())) {
this.setUniqueResourceName(this.beanName);
}
// 调用初始化方法
this.init();
}
public void destroy() throws Exception {
this.close();
}
}
查看this.init()方法里的核心代码如下,将会new ConnectionPoolWithSynchronizedValidation(cf, this),继续往里面看:
ConnectionFactory cf = this.doInit();
if (this.enableConcurrentConnectionValidation) {
this.connectionPool = new ConnectionPoolWithConcurrentValidation(cf, this);
} else {
if (this.getTestQuery() != null) {
LOGGER.logWarning(this + ": testQuery set - pool may be slower / you might want to consider setting maxLifetime instead...");
}
// 初始化连接
this.connectionPool = new ConnectionPoolWithSynchronizedValidation(cf, this);
}
this.getReference();
查看调用栈发现ConnectionPool又调用了一个init()方法,继续往里边看
public ConnectionPoolWithConcurrentValidation(ConnectionFactory connectionFactory, ConnectionPoolProperties properties) throws ConnectionPoolException {
super(connectionFactory, properties);
}
public ConnectionPool(ConnectionFactory connectionFactory, ConnectionPoolProperties properties) throws ConnectionPoolException {
this.connectionFactory = connectionFactory;
this.properties = properties;
this.destroyed = false;
this.name = properties.getUniqueResourceName();
this.init();
}
从这个方法中可以看出开始初始化连接池,源码如下:
private void init() throws ConnectionPoolException {
if (LOGGER.isTraceEnabled()) {
LOGGER.logTrace(this + ": initializing...");
}
// 开始初始化连接池
this.addConnectionsIfMinPoolSizeNotReached();
this.launchMaintenanceTimer();
}
源码分析并查看如下:
private synchronized void addConnectionsIfMinPoolSizeNotReached() {
int connectionsToAdd = this.properties.getMinPoolSize() - this.totalSize();
for(int i = 0; i < connectionsToAdd; ++i) {
try {
// 创建XPooledConnection连接
XPooledConnection xpc = this.createPooledConnection();
// 加入到连接池
this.connections.add(xpc);
xpc.registerXPooledConnectionEventListener(this);
} catch (Exception var4) {
if (LOGGER.isTraceEnabled()) {
LOGGER.logTrace(this + ": could not establish initial connection", var4);
}
}
}
}
继续跟踪如下:
private XPooledConnection createPooledConnection() throws CreateConnectionException {
XPooledConnection xpc = this.connectionFactory.createPooledConnection();
EventPublisher.publish(new PooledConnectionCreatedEvent(this.properties.getUniqueResourceName(), xpc));
return xpc;
}
发现新大陆,找到this.xaDataSource.getXAConnection()调用,xaDataSource不就是我们初始化的Druid的数据源了吗?
public XPooledConnection createPooledConnection() throws CreateConnectionException {
try {
// 开始调用xaDataSource的getXAConnection方法
XAConnection xaConnection = this.xaDataSource.getXAConnection();
return new AtomikosXAPooledConnection(xaConnection, this.jdbcTransactionalResource, this.props);
} catch (SQLException var3) {
String msg = "XAConnectionFactory: failed to create pooled connection - DBMS down or unreachable?";
LOGGER.logWarning(msg, var3);
throw new CreateConnectionException(msg, var3);
}
}
分析Druid的getXAConnection如下:
@Override
public XAConnection getXAConnection() throws SQLException {
// 获取DruidPooledConnection,前面文章分析后这个就是暴露给客户端的连接对象
DruidPooledConnection conn = this.getConnection();
// 解封装获取真实的数据库连接对象
Connection physicalConn = conn.unwrap(Connection.class);
// 根据physicalConn获取XAConnection
XAConnection rawXAConnection = createPhysicalXAConnection(physicalConn);
// 封装一个DruidPooledXAConnection返回
return new DruidPooledXAConnection(conn, rawXAConnection);
}
查看createPhysicalXAConnection(physicalConn)获取XAConnection的核心代码逻辑处理如下:
if (utilClass == null && !utilClassError) {
try {
utilClass = Class.forName("com.mysql.jdbc.Util");
Method method = utilClass.getMethod("isJdbc4");
utilClass_isJdbc4 = (Boolean) method.invoke(null);
// 获取连接
class_5_connection = Class.forName("com.mysql.jdbc.Connection");
method_5_getPinGlobalTxToPhysicalConnection = class_5_connection.getMethod("getPinGlobalTxToPhysicalConnection");
class_5_suspendableXAConnection = Class.forName("com.mysql.jdbc.jdbc2.optional.SuspendableXAConnection");
constructor_5_suspendableXAConnection = class_5_suspendableXAConnection.getConstructor(class_5_connection);
class_5_JDBC4SuspendableXAConnection = Class.forName("com.mysql.jdbc.jdbc2.optional.JDBC4SuspendableXAConnection");
constructor_5_JDBC4SuspendableXAConnection = class_5_JDBC4SuspendableXAConnection.getConstructor(class_5_connection);
// 创建XA连接对象
class_5_MysqlXAConnection = Class.forName("com.mysql.jdbc.jdbc2.optional.MysqlXAConnection");
constructor_5_MysqlXAConnection = class_5_MysqlXAConnection.getConstructor(class_5_connection, boolean.class);
} catch (Exception ex) {
ex.printStackTrace();
utilClassError = true;
}
}
DruidPooledXAConnection则是对DruidPooledConnection对象和XAConnection对象的封装。
public DruidPooledXAConnection(DruidPooledConnection pooledConnection, XAConnection xaConnection){
this.pooledConnection = pooledConnection;
this.xaConnection = xaConnection;
}
继续跟踪createPooledConnection分析如下:
public XPooledConnection createPooledConnection() throws CreateConnectionException {
try {
// 获取XAConnection
XAConnection xaConnection = this.xaDataSource.getXAConnection();
// atomikos自行封装AtomikosXAPooledConnection返回
return new AtomikosXAPooledConnection(xaConnection, this.jdbcTransactionalResource, this.props);
} catch (SQLException var3) {
String msg = "XAConnectionFactory: failed to create pooled connection - DBMS down or unreachable?";
LOGGER.logWarning(msg, var3);
throw new CreateConnectionException(msg, var3);
}
}
根据调用栈会发现会调用到Druid的DruidPooledXAConnection.getXAResource()方法如下:
@Override
public XAResource getXAResource() throws SQLException {
return xaConnection.getXAResource();
}
debug截图如下,这里会发现Druid就调用了MysqlXAConnection
查看这个类是Mysql对Xa协议的支持的实现,对象关系如下:
public class MysqlXAConnection extends MysqlPooledConnection implements XAConnection, XAResource {}
到这里整个的事务管理器以及XA的对象的初始化就完成了,将根据配置的连接数量循环初始化
List<XPooledConnection> connections的连接池出来。
下一篇将分析初始化这么个连接池后,client执行多个数据库的操作后如何实现工作的。