目录
5. 事务和ReactiveMongoTransactionManager
从版本4开始,MongoDB支持 事务。事务是建立在 会话之上的,因此,需要一个活跃的 ClientSession。
除非你在你的应用程序上下文中指定一个 MongoTransactionManager,否则事务支持是 DISABLED(禁用的)。你可以使用 setSessionSynchronization(ALWAYS) 来参与正在进行的非本地 MongoDB 事务。
为了获得对事务的完全程序化控制,你可能想在 MongoOperations 上使用会话回调。
下面的例子显示了在一个 SessionCallback 中的程序化事务控制。
Example 124. 程序性事务
ClientSession session = client.startSession(options);
template.withSession(session)
.execute(action -> {
session.startTransaction();
try {
Step step = // ...;
action.insert(step);
process(step);
action.update(Step.class).apply(Update.set("state", // ...
session.commitTransaction();
} catch (RuntimeException e) {
session.abortTransaction();
}
}, ClientSession::close)
获得一个新的 ClientSession。
开始事务。
如果一切按预期进行,就提交修改。
出现意外,所以要回滚一切。
完成后不要忘记关闭会话。
前面的例子让你完全控制事务行为,同时在回调中使用会话范围的 MongoOperations 实例,以确保会话被传递给每个服务器调用。为了避免这种方法带来的一些开销,你可以使用 TransactionTemplate 来消除手动事务流的一些噪音。
1. 事务和TransactionTemplate
Spring Data MongoDB事务支持一个 TransactionTemplate。下面的例子展示了如何创建和使用 TransactionTemplate。
Example 125. 事务和 TransactionTemplate
template.setSessionSynchronization(ALWAYS);
// ...
TransactionTemplate txTemplate = new TransactionTemplate(anyTxManager);
txTemplate.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(TransactionStatus status) {
Step step = // ...;
template.insert(step);
process(step);
template.update(Step.class).apply(Update.set("state", // ...
};
});
在 Template API 配置中启用事务同步。
使用提供的 PlatformTransactionManager 创建 TransactionTemplate。
在回调中,ClientSession 和事务已经被注册。
在运行期间改变 MongoTemplate 的状态(就像你可能认为在前面列表的第1项中可能发生的那样)会导致线程和可见性问题。
2. 事务和MongoTransactionManager
MongoTransactionManager 是通往众所周知的Spring事务支持的网关。它可以让应用程序使用 Spring的事务托管功能。MongoTransactionManager 将一个 ClientSession 绑定到线程上。MongoTemplate 会检测会话,并相应地对这些与事务相关的资源进行操作。MongoTemplate 也可以参与到其他正在进行的事务中。下面的例子展示了如何用 MongoTransactionManager 创建和使用事务。
Example 126. 事务和 MongoTransactionManager
@Configuration
static class Config extends AbstractMongoClientConfiguration {
@Bean
MongoTransactionManager transactionManager(MongoDatabaseFactory dbFactory) {
return new MongoTransactionManager(dbFactory);
}
// ...
}
@Component
public class StateService {
@Transactional
void someBusinessFunction(Step step) {
template.insert(step);
process(step);
template.update(Step.class).apply(Update.set("state", // ...
};
});
在应用 application context 中注册 MongoTransactionManager。
将方法标记为事务性。
@Transactional(readOnly = true) 建议 MongoTransactionManager 也启动一个事务,将 ClientSession 添加到发出的请求中。
3. 响应式事务
与支持响应式 ClientSession 一样,ReactiveMongoTemplate 提供了专门的方法,用于在事务中进行操作,而不必担心根据操作结果提交或停止操作。
除非你在你的 application context 中指定一个 ReactiveMongoTransactionManager,否则事务支持是 DISABLED(禁用的)。你可以使用 setSessionSynchronization(ALWAYS) 来参与正在进行的非本地MongoDB事务。
使用普通的MongoDB响应式驱动API,在一个事务性流程中的 delete 可能看起来像这样。
Example 127. 原生驱动的支持
Mono<DeleteResult> result = Mono
.from(client.startSession())
.flatMap(session -> {
session.startTransaction();
return Mono.from(collection.deleteMany(session, ...))
.onErrorResume(e -> Mono.from(session.abortTransaction()).then(Mono.error(e)))
.flatMap(val -> Mono.from(session.commitTransaction()).then(Mono.just(val)))
.doFinally(signal -> session.close());
});
首先,我们显然需要启动session。
一旦我们有了 ClientSession,就开始事务。
通过向操作传递 ClientSession,在事务中进行操作。
如果操作异常完成,我们需要停止事务并保留错误。
当然,也可以在成功的情况下提交更改。仍然保留操作结果。
最后,我们需要确保关闭会话。
上述操作的罪魁祸首是在保留 main flow DeleteResult,而不是通过 commitTransaction() 或 abortTransaction() 发布的事务结果,这导致了相当复杂的设置。
4. 事务和TransactionalOperator
Spring Data MongoDB事务支持一个 TransactionalOperator。下面的例子展示了如何创建和使用一个 TransactionalOperator。
Example 128. 事务和 TransactionalOperator
template.setSessionSynchronization(ALWAYS);
// ...
TransactionalOperator rxtx = TransactionalOperator.create(anyTxManager,
new DefaultTransactionDefinition());
Step step = // ...;
template.insert(step);
Mono<Void> process(step)
.then(template.update(Step.class).apply(Update.set("state", …))
.as(rxtx::transactional)
.then();
为事务性参与启用事务同步。
使用提供的 ReactiveTransactionManager 创建 TransactionalOperator。
TransactionalOperator.transactional(…) 为所有上游操作提供事务管理。
5. 事务和ReactiveMongoTransactionManager
ReactiveMongoTransactionManager 是通往众所周知的 Spring事务支持 的网关。它允许应用程序利用Spring的管理事务功能。ReactiveMongoTransactionManager 将 ClientSession 绑定到 subscriber Context。ReactiveMongoTemplate 会检测会话,并对这些与事务相关的资源进行相应操作。 ReactiveMongoTemplate 也可以参与其他正在进行的事务。下面的例子展示了如何用 ReactiveMongoTransactionManager 创建和使用事务。
Example 129. 事务和 ReactiveMongoTransactionManager
@Configuration
public class Config extends AbstractReactiveMongoConfiguration {
@Bean
ReactiveMongoTransactionManager transactionManager(ReactiveMongoDatabaseFactory factory) {
return new ReactiveMongoTransactionManager(factory);
}
// ...
}
@Service
public class StateService {
@Transactional
Mono<UpdateResult> someBusinessFunction(Step step) {
return template.insert(step)
.then(process(step))
.then(template.update(Step.class).apply(Update.set("state", …));
};
});
在application context中注册 ReactiveMongoTransactionManager。
将方法标记为事务性的。
@Transactional(readOnly = true) 建议 ReactiveMongoTransactionManager 也启动一个事务,将 ClientSession 添加到发出的请求中。
6. 事务内部的特殊行为
在事务内部,MongoDB服务器有一个稍微不同的行为。
Connection Settings
MongoDB驱动提供了一个专门的副本集名称配置选项,使驱动进入自动检测模式。这个选项有助于识别主要的副本集节点和事务中的命令路由。
确保在MongoDB的URI中添加 replicaSet。请参考 连接字符串选项 以了解更多细节。
Collection Operations
MongoDB不支持集合操作,例如在事务中创建集合。这也会影响到第一次使用时发生的即时集合创建。因此,请确保所有需要的结构都已到位。
Transient Errors
MongoDB可以为在事务性操作中出现的错误添加特殊标签。这些标签可能表示暂时性的故障,这些故障可能通过重试操作而消失。我们强烈推荐 Spring Retry 用于这些目的。然而,我们可以覆写 MongoTransactionManager#doCommit(MongoTransactionObject),以实现MongoDB参考手册中所述的重试提交操作行为。
Count
MongoDB的 count 操作是基于集合统计的,可能无法反映事务中的实际情况。当在一个多文档事务中发出 count 命令时,服务器会响应 error 50851。一旦 MongoTemplate 检测到一个活动的事务,所有暴露的 count() 方法都会被转换,并使用 $match 和 $count 操作符委托给聚合框架,保留 Query 设置,如 collation。
在 aggregation count helper 中使用 geo 命令时,有一些限制。以下运算符不能使用,必须用不同的运算符代替。
- $where → $expr
- $near → $geoWithin with $center
- $nearSphere → $geoWithin with $centerSphere
使用 Criteria.near(…) 和 Criteria.nearSphere(…) 的查询必须改写为 Criteria.within(…) 各自的 Criteria.withinSphere(…)。同样适用于 repository 查询方法中的 near 查询关键字,必须改为 within。也请参见MongoDB JIRA ticket DRIVERS-518 以进一步参考。
下面的片段显示了会话绑定闭包内的 count 用法。
session.startTransaction();
template.withSession(session)
.execute(action -> {
action.count(query(where("state").is("active")), Step.class)
...
上面的片段具体化为以下命令。
db.collection.aggregate(
[
{ $match: { state: "active" } },
{ $count: "totalEntityCount" }
]
)
而不是。
db.collection.find( { state: "active" } ).count()