ActiveMQ完全遵循jms规范,而jms是支持事务的,即要么全部成功,要么全部失败。很多时间,我们的JMS操作需要和数据库操作的事务一致,即要么jms和数据库操作都成功,要么jms和数据库操作都失败,这就是分布式事务(xa事务, 也就是所谓的两段式提交事务,在java中的编程接口为JTA)的用武之地。
完美实现j2ee规范的web server是提供对JTA的实现的,但是tomcat,jetty等轻量级的web server没有提供,在j2se环境也没有,幸运的是还有第三的实现。atomikos就是其中的一个,因其有完整的文档,活跃的社区,商业的支持,而成为第三方jta实现的佼佼者。本文就使用该实现来实践xa事务。
步骤:
为了让本demo有最大的可移植性,本文使用h2数据库,而h2虽然是嵌入式数据库,但是它实现了xa事务,这正是我需要的。
1、新建一个Maven 工程
2、在Pom.Xml中添加对应的依赖
主要有spring-jms,spring-jdbc,activemq-all,com.atomikos:transactions-jta,com.atomikos:transactions-jdbc,com.atomikos:transactions-jms,com.h2database:h2
3、新建Schema.Sql,用于创建测试用的表
1
2
3
4
5
6
7
8
9
|
DROP
TABLE
IF EXISTStest_user;
create
tabletest_user(
username
varchar(32),
password
varchar(32),
email
varchar(32),
phone
varchar(11),
sex
char(1),
primary key(email)
);
|
4、新建ApplicationContext.Xml
配置对应的bean,具体可以参考附件中的对应文件,注意xa.dataSource和xa.connectionFactory bean的定义,都指定了init-method="init" destroy-method="close"属性,因为在init方法中会将其加入Transaction Manager中去。
2)为了便于管理h2数据库,我声明了org.h2.tools.Server ,这样就可以将浏览器指向http://localhost:8082/来查看h2数据库的内容。
5、开发Service类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
public
class
TestXaService
{
@Autowired
JdbcTemplate
jdbcTemplate
;
@Autowired
JmsTemplate
jmsTemplate
;
@Transactional
(
readOnly
=
false
,
rollbackFor
=
{
JdbcSQLException
.
class
,
RuntimeException
.
class
}
)
public
void
persistence
(
List
<User>
list
)
{
for
(
User
user
:
list
)
{
this
.
insertBatch
(
user
)
;
this
.
send_amq
(
user
)
;
}
}
public
void
insertBatch
(
User
user
)
{
String
sql
=
"insert into test_user(username, password, email, phone,sex) values "
;
String
pattern
=
" ('%s', '%s', '%s', '%s', '%c') "
;
sql
+=
String
.
format
(
pattern
,
user
.
getUsername
(
)
,
user
.
getPassword
(
)
,
user
.
getEmail
(
)
,
user
.
getPhone
(
)
,
user
.
getSex
(
)
)
;
System
.
out
.
println
(
sql
)
;
jdbcTemplate
.
execute
(
sql
)
;
}
public
void
send_amq
(
final
User
user
)
{
jmsTemplate
.
send
(
"test.queue"
,
new
MessageCreator
(
)
{
public
Message
createMessage
(
Session
session
)
throws
JMSException
{
return
session
.
createObjectMessage
(
user
)
;
}
}
)
;
}
}
|
代码很简单,将User对象同时插入数据库和ActiveMQ队列。只要有某一步操作异常,就会造成数据库和ActiveMQ操作都回滚。使用了Spring经典的声明式事务@Transactional,当发生JdbcSQLException或者RuntimeException时回滚。
6、添加Main方法来触发测试
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
public
static
void
main
(
String
[
]
args
)
{
User
user
=
new
User
(
)
;
user
.
setPassword
(
"123456"
)
;
user
.
setPhone
(
"123456"
)
;
user
.
setSex
(
'M'
)
;
user
.
setUsername
(
"javacoder.cn"
)
;
User
user1
=
new
User
(
)
;
user1
.
setPassword
(
"123456"
)
;
user1
.
setPhone
(
"123456"
)
;
user1
.
setSex
(
'M'
)
;
user1
.
setUsername
(
"javacoder.cn"
)
;
ApplicationContext
context
=
new
ClassPathXmlApplicationContext
(
"classpath:applicationContext.xml"
)
;
List
<User>
list
=
new
ArrayList
<User>
(
)
;
list
.
add
(
user
)
;
list
.
add
(
user1
)
;
list
.
add
(
user
)
;
//让数据库报主键重复的异常
TestXaService
testXaService
=
(
TestXaService
)
context
.
getBean
(
"testXaService"
)
;
testXaService
.
persistence
(
list
)
;
System
.
out
.
println
(
"send successfully, please visit http://localhost:8161/admin to see it"
)
;
}
|
代码很简单,我们构造了两个User对象,注意我们将user 两次加入到了list中,目的为了造成主键重复!!
从context中获取TestXaService对象,调用testXaService.persistence(list)来持久化该list中的对象。
7、测试:
1、启动ActiveMQ 服务器
2、运行程序,发现插入失败,ActiveMQ事务和数据库事务都回滚;将user对象只插入一次到list中,事务提交成功。
参考文档:
1、《ActiveMQ in action》
2、Configuring ActiveMQ transactions in Spring
3、atomikos官网
4、Atomikos TransactionsEssentials Guide.pdf
http://www.javacoder.cn/?p=428