声明:以下关于“JTA规范事务模型”、“Spring JTA分布式事务的实现”等内容均来源于其他大佬的博客内容,并已经表明出处。
1、JTA规范事务模型
Java Transaction API,通常称为JTA,是用于管理 Java中的事务的API 。它允许我们以资源无关的方式启动,提交和回滚事务。
JTA为J2EE平台提供了分布式事务服务(distributed transaction)的能力。 某种程度上,可以认为JTA规范是XA规范的Java版,其把XA规范中规定的DTP模型交互接口抽象成Java接口中的方法,并规定每个方法要实现什么样的功能。
在DTP模型中,规定了模型的五个组成元素:应用程序(Application)、资源管理器(Resource Manager)、事务管理器(Transaction Manager)、通信资源管理器(Communication Resource Manager)、 通信协议(Communication Protocol)。
而在JTA规范中,模型中又多了一个元素Application Server,如下所示:
-
事务管理器(transaction manager)
处于图中最为核心的位置,其他的事务参与者都是与事务管理器进行交互。事务了管理器提供事务声明,事务资源管理,同步,事务上下文传播等功能。JTA规范定义了事务管理器与其他事务参与者交互的接口,而JTS规范定义了事务管理器的实现要求,因此我们看到事务管理器底层是基于JTS的。
-
应用服务器(application server):
顾名思义,是应用程序运行的容器。JTA规范规定,事务管理器的功能应该由application server提供,如上图中的EJB Server。一些常见的其他web容器,如:jboss、weblogic、websphere等,都可以作为application server,这些web容器都实现了JTA规范。特别需要注意的是,并不是所有的web容器都实现了JTA规范,如tomcat并没有实现JTA规范,因此并不能提供事务管理器的功能。
-
应用程序(application):
简单来说,就是我们自己编写的应用,部署到了实现了JTA规范的application server中,之后我们就可以我们JTA规范中定义的UserTransaction类来声明一个分布式事务。通常情况下,application server为了简化开发者的工作量,并不一定要求开发者使用UserTransaction来声明一个事务,开发者可以在需要使用分布式事务的方法上添加一个注解,就像spring的声明式事务一样,来声明一个分布式事务。
特别需要注意的是,JTA规范规定事务管理器的功能由application server提供。但是如果我们的应用不是一个web应用,而是一个本地应用,不需要被部署到application server中,无法使用application server提供的事务管理器功能。又或者我们使用的web容器并没有事务管理器的功能,如tomcat。对于这些情况,我们可以直接使用一些第三方的事务管理器类库,如JOTM和Atomikos。将事务管理器直接整合进应用中,不再依赖于application server。
-
资源管理器(resource manager):
理论上任何可以存储数据的软件,都可以认为是资源管理器RM。最典型的RM就是关系型数据库了,如mysql,另外一种比较常见的资源管理器是消息中间件,如ActiveMQ、RabbitMQ等, 这些都是真正的资源管理器。
事实上,将资源管理器(resource manager)称为资源适配器(resource adapter)似乎更为合适。因为在java程序中,我们都是通过client来于RM进行交互的,例如:我们通过mysql-connector-java-x.x.x.jar驱动包,获取Conn、执行sql,与mysql服务端进行通信;通过ActiveMQ、RabbitMQ等的客户端,来发送消息等。
正常情况下,一个数据库驱动供应商只需要实现JDBC规范即可,一个消息中间件供应商只需要实现JMS规范即可。 而引入了分布式事务的概念后,DB、MQ等在DTP模型中的作用都是RM,二者是等价的,需要由TM统一进行协调。
为此,JTA规范定义了一个XAResource接口,其定义RM必须要提供给TM调用的一些方法。之后,不管这个RM是DB,还是MQ,TM并不关心,因为其操作的是XAResource接口。而其他规范(如JDBC、JMS)的实现者,同时也对此接口进行实现。如MysqlXAConnection,就实现了XAResource接口。
-
通信资源管理器(Communication Resource Manager):
这个是DTP模型中就已经存在的概念,对于需要跨应用的分布式事务,事务管理器彼此之间需要通信,这是就是通过CRM这个组件来完成的。JTA规范中,规定CRM需要实现JTS规范定义的接口。
下图更加直观的演示了JTA规范中各个模型组件之间是如何交互的:
上述内容主要来自《3.0 JTA规范》这一篇博客中。
2、Spring JTA分布式事务的实现
根据用于管理事务的底层实现,Spring中的事务策略可以分为两个主要部分:
- 单连接器策略(相当于本地事务管理器) - 底层技术使用单连接器。例如,JDBC使用连接级事务、Hibernate以及JDO使用会话级事务。可以应用使用AOP和拦截器的声明式事务管理。
- 多连接器策略(相当于全局事务管理器) - 底层技术具有使用多个连接器的能力。当有这方面需求时,JTA是最好的选择。此策略需要启用JTA的数据源实例。JBossTS、Atomikos、Bitronix都是开源的JTA实现。
JTA的真正强大之处在于它能够在单个事务中管理多个资源(如数据库,消息服务)。
上面内容来自《Spring JTA分布式事务实现》。
3、Atomikos简介
Atomikos是一个非常流行的开源事务管理器,并且可以嵌入到你的Spring Boot应用中。
在前面我们提到了application server,而Tomcat应用服务器没有实现JTA规范,所以当使用Tomcat作为应用服务器的时候,需要使用第三方的事务管理器类来作为全局的事务管理器,而Atomikos框架就是这个作用,即将事务管理整合到应用中,而不依赖于application server。
关于Atomikos可以参考《atomikos JTA/XA全局事务》。
4、基于Atomikos实现分布式事务
我们这里主要使用Spring Boot + Mybatis + MySql + Atomikos实现一个分布式事务的示例,具体实现过程如下:
4.1、pom.xml文件
引入Atomikos依赖(mybatis、mysql依赖省略了),如下所示:
<!--atomikos 依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jta-atomikos</artifactId>
<version>2.4.1</version>
</dependency>
4.2、application.properties文件
application.properties文件,定义数据源信息,后续通过指定的配置类进行读取配置参数。
server.port=8080
#配置first数据源
spring.datasource.first.username=root
spring.datasource.first.password=123456
spring.datasource.first.url=jdbc:mysql://192.168.1.8:3306/db_8?useSSL=true&useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai
#配置second数据源
spring.datasource.second.username=root
spring.datasource.second.password=123456
spring.datasource.second.url=jdbc:mysql://192.168.1.9:3306/db_9?useSSL=true&useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai
logging.level.com.qriver.distributed.transaction=debug
4.3、配置文件读取类FirstDbProperties、SecondDbProperties
这两个配置类的参数一样,只是扫描配置文件参数的前缀不一样,所以我们先定义一个基类DbProperties,具体如下:
public class DbProperties {
private String type;
private String driverClassName;
private String url;
private String username;
private String password;
//省略setter 和 getter
}
FirstDbProperties、SecondDbProperties两个类定义如下:
//读取first数据源的配置
@Configuration
@ConfigurationProperties(prefix = "spring.datasource.first")
public class FirstDbProperties extends DbProperties{
}
//读取second数据源的配置
@Configuration
@ConfigurationProperties(prefix = "spring.datasource.second")
public class SecondDbProperties extends DbProperties{
}
4.4、配置XA数据源和mybatis的SqlSessionFactoryBean
每个数据源都需要配置,我们这里分别配置first、second两个数据源。这里主要以first为例进行分析。
@Configuration
//basePackages 定义了扫描的包路径。sqlSessionFactoryRef 主要引入SqlSessionFactoryBean 对象
@MapperScan(basePackages = "com.qriver.distributed.transaction.mapper.first", sqlSessionFactoryRef = "firstSqlSessionFactoryBean")
public class FirstDataSourceConfig {
/**
* 创建数据源
* @param firstDbProperties
* @return
*/
@Bean("firstDataSource")
public DataSource firstDataSource(FirstDbProperties firstDbProperties){
//创建Mysql的xa数据源
MysqlXADataSource xaDataSource = new MysqlXADataSource();
xaDataSource.setURL(firstDbProperties.getUrl());
xaDataSource.setUser(firstDbProperties.getUsername());
xaDataSource.setPassword(firstDbProperties.getPassword());
//创建atomikos数据源
AtomikosDataSourceBean atomikosDataSourceBean = new AtomikosDataSourceBean();
atomikosDataSourceBean.setXaDataSource(xaDataSource);
return atomikosDataSourceBean;
}
@Bean("firstSqlSessionFactoryBean")
public SqlSessionFactoryBean sqlSessionFactoryBean(@Qualifier("firstDataSource") DataSource dataSource) throws IOException {
//创建SqlSessionFactoryBean对象
SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
sqlSessionFactoryBean.setDataSource(dataSource);
//设置mapper映射文件路径
ResourcePatternResolver resourceResolver = new PathMatchingResourcePatternResolver();
sqlSessionFactoryBean.setMapperLocations(resourceResolver.getResources("classpath:/mappings/first/*Mapper.xml"));
return sqlSessionFactoryBean;
}
}
4.5、全局事务管理器配置 JtaTransactionConfig
前面配置的多个数据源相当于XA模型中的RM,我们还需要配置一个全局事务管理器,即TM,进行协调处理全局事务,配置如下:
@Configuration
public class JtaTransactionConfig {
@Bean("jtaTransaction")
public JtaTransactionManager jtaTransactionManager(){
UserTransaction userTransaction = new UserTransactionImp();
UserTransactionManager userTransactionManager = new UserTransactionManager();
return new JtaTransactionManager(userTransaction,userTransactionManager);
}
}
上述UserTransaction 接口,是jta规范中提供的, UserTransactionImp实现类由Atomikos提供,UserTransactionManager 也是由Atomikos提供。
4.6、实现业务逻辑
前面已经完成了Atomikos的配置,后续我们按照前面的配置创建对应的数据库操作逻辑即可。
业务代码的编写和普通的spring boot + mybatis的方式没有区别,这里只需要注意以下两点即可:
- mapper配置文件的路径,因为我们在配置数据源的时候,在代码中设置了读取映射文件的位置,所以业务代码中的文件路径应该相匹配。
- 再一个就是注解@MapperScan中定义了基础扫描路径,所以Mapper接口包名也应该匹配。
4.7、测试
完成了业务代码的编写,我们这里编写一个测试的Service类,在insertData()方法中,我们操作了两个数据库,注意,这里需要添加@Transactional注解,引入使用的transactionManager 对象。
@Service
public class DemoTestService {
@Autowired
private FirstMapper firstMapper;
@Autowired
private SecondMapper secondMapper;
@Transactional(transactionManager = "jtaTransaction")
public void insertData(){
//插入first
FirstEntity firstEntity = new FirstEntity();
firstEntity.setId(1);
firstEntity.setName("test");
firstMapper.insert(firstEntity);
//插入second
SecondEntity secondEntity = new SecondEntity();
secondEntity.setId(1);
secondEntity.setName("test2");
secondMapper.insert(secondEntity);
}
}
然后,再编写单元测试类,如下:
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = DistributedTransactionAtomikosApplication.class)
public class DemoTest {
@Autowired
private DemoTestService demoTestService;
@Test
public void insertTest(){
demoTestService.insertData();
}
}
我们可以通过执行单元测试方法进行测试,通过修改数据库表的字段让插入操作失败,从而验证Atomikos跨数据的分布式事务。这里不再演示。