关于如何保证事务顺序性的源码分析
我们看一下事务协调TransactionalSpoutCoordinator类的主要代码实现...
首先看看最重要的nextTuple方法
@Override
public void nextTuple() {
sync();
}
这里调用了sync()方法
private void sync() {
// note that sometimes the tuples active may be less than max_spout_pending, e.g.
// max_spout_pending = 3
// tx 1, 2, 3 active, tx 2 is acked. there won't be a commit for tx 2 (because tx 1 isn't committed yet),
// and there won't be a batch for tx 4 because there's max_spout_pending tx active
TransactionStatus maybeCommit = _activeTx.get(_currTransaction);
if(maybeCommit!=null && maybeCommit.status == AttemptStatus.PROCESSED) {
maybeCommit.status = AttemptStatus.COMMITTING;
_collector.emit(TRANSACTION_COMMIT_STREAM_ID, new Values(maybeCommit.attempt), maybeCommit.attempt);
}
try {
if(_activeTx.size() < _maxTransactionActive) {
BigInteger curr = _currTransaction;
for(int i=0; i<_maxTransactionActive; i++) {
if((_coordinatorState.hasCache(curr) || _coordinator.isReady())
&& !_activeTx.containsKey(curr)) {
TransactionAttempt attempt = new TransactionAttempt(curr, _rand.nextLong());
Object state = _coordinatorState.getState(curr, _initializer);
_activeTx.put(curr, new TransactionStatus(attempt));
_collector.emit(TRANSACTION_BATCH_STREAM_ID, new Values(attempt, state, previousTransactionId(_currTransaction)), attempt);
}
curr = nextTransactionId(curr);
}
}
} catch(FailedException e) {
LOG.warn("Failed to get metadata for a transaction", e);
}
}
sync()方法分析,假设我们是第一次发生消息,这里会直接进入发送事务消息流( Batch流 ),即会走sync方法中的这样一段代码。
if(_activeTx.size() < _maxTransactionActive) {
BigInteger curr = _currTransaction;
for(int i=0; i<_maxTransactionActive; i++) {
if((_coordinatorState.hasCache(curr) || _coordinator.isReady())
&& !_activeTx.containsKey(curr)) {
TransactionAttempt attempt = new TransactionAttempt(curr, _rand.nextLong());
Object state = _coordinatorState.getState(curr, _initializer);
_activeTx.put(curr, new TransactionStatus(attempt));
_collector.emit(TRANSACTION_BATCH_STREAM_ID, new Values(attempt, state, previousTransactionId(_currTransaction)), attempt);
}
curr = nextTransactionId(curr);
}
}
上面代码的关键是走了_collector.emit()方法,这里发送了事务消息流,接下来会以全局分组方式进行消息传输, 也就意味着从协调Spout中发送的一条事务尝试消息都会被所有的消息发送Bolt节点接收。
接下来会到消息发送bolt节点去看看源代码
public void execute(Tuple tuple) {
...
if(type==TupleType.ID) {
synchronized(_tracked) {
track.receivedId = true;
}
checkFinishId(tuple, type);
} else if(type==TupleType.COORD) {
int count = (Integer) tuple.getValue(1);
synchronized(_tracked) {
track.reportCount++;
track.expectedTupleCount+=count;
}
checkFinishId(tuple, type);
} else {
synchronized(_tracked) {
//看这里
_delegate.execute(tuple);
}
}
}
上面这是消息发送节点的代码,最重要的execute方法中会调用_delegate.execute(tuple);方法发送tuple,需要注意消息发送节点其实是对TransactionalSpoutBatchExecutor实例对象的封装,_delegate的真实类型就是TransactionalSpoutBatchExecutor实例对象。
接下来看看TransactionalSpoutBatchExecutor类的execute方法
@Override
public void execute(Tuple input) {
TransactionAttempt attempt = (TransactionAttempt) input.getValue(0);
....略
//看这里,提交批处理
_emitter.emitBatch(attempt, input.getValue(1), _collector);
_activeTransactions.put(attempt.getTransactionId(), attempt);
_collector.ack(input);
BigInteger committed = (BigInteger) input.getValue(2);
if(committed!=null) {
// valid to delete before what's been committed since
// those batches will never be accessed again
_activeTransactions.headMap(committed).clear();
_emitter.cleanupBefore(committed);
}
}
....略
上面的_emitter.emitBatch(attempt, input.getValue(1), _collector);很关键。
会调用我们自定义ITransactionalSpout接口的内部接口的实现类实例对象_emitter,真实类型为ITransactionalSpout.Emitter,
进而调用我们自定义的emitBatch方法,去发送tuple到后续的bolt。
这样消息就发送完毕了,那么消息是通过什么机制提交的呢?
其实消息是通过ack机制完成提交的...
回到事务协调TransactionalSpoutCoordinator类的代码,我们看他的ack方法
@Override
public void ack(Object msgId) {
TransactionAttempt tx = (TransactionAttempt) msgId;
TransactionStatus status = _activeTx.get(tx.getTransactionId());
if(status!=null && tx.equals(status.attempt)) {
if(status.status==AttemptStatus.PROCESSING) {
status.status = AttemptStatus.PROCESSED;
} else if(status.status==AttemptStatus.COMMITTING) {
_activeTx.remove(tx.getTransactionId());
_coordinatorState.cleanupBefore(tx.getTransactionId());
_currTransaction = nextTransactionId(tx.getTransactionId());
_state.setData(CURRENT_TX, _currTransaction);
}
sync();
}
}
上面ack代码可以知道,如果事务状态的类型为PROCESSING,就将该事务变为PROCESSED,这主要为sync中方法中能够正确执行。
看一下sync中方法
private void sync() {
TransactionStatus maybeCommit = _activeTx.get(_currTransaction);
if(maybeCommit!=null && maybeCommit.status == AttemptStatus.PROCESSED) {
maybeCommit.status = AttemptStatus.COMMITTING;
_collector.emit(TRANSACTION_COMMIT_STREAM_ID, new Values(maybeCommit.attempt), maybeCommit.attempt);
}
}
上面可以看到,sync方法会进行判断,会发送事务提交流( Commit流 )提交该事务。那么如何保证事务的顺序性呢,其实就是依靠_currTransaction这个属性,总览TransactionalSpoutCoordinator类的所有方法中,有两个地方对该属性进行了更新,一是open初始化时才会对_currTransaction这个值进行更新,也就是最初的事务,第二是刚才的ack方法中,如果提交了事务就会调用nextTransactionId方法,对_currTransaction值进行更新,同时删除_activeTx集合里的该事务信息,这样通过_currTransaction值就可以保证事务的有序性。只有前一个事务提交后续的事务才能提交。
而且下面这段sync方法的代码也很关键。
if(_activeTx.size() < _maxTransactionActive) {
BigInteger curr = _currTransaction;
for(int i=0; i<_maxTransactionActive; i++) {
if((_coordinatorState.hasCache(curr) || _coordinator.isReady())
&& !_activeTx.containsKey(curr)) {
TransactionAttempt attempt = new TransactionAttempt(curr, _rand.nextLong());
Object state = _coordinatorState.getState(curr, _initializer);
_activeTx.put(curr, new TransactionStatus(attempt));
_collector.emit(TRANSACTION_BATCH_STREAM_ID, new Values(attempt, state, previousTransactionId(_currTransaction)), attempt);
}
curr = nextTransactionId(curr);
}
}
可以看到,只有_activeTx集合里的_currTransaction信息不存在了,才会提交下一个事务信息。
这样通过上一个事务正确的提交和下一个事务的正确开启,两者共同作用,保证了事务的有序性。
下篇文章会详细讨论Storm中CoordinatedBolt类是如何保证finishBatch方法被正确调用的原理进行分析。
转载于:https://www.jianshu.com/p/7b4892965dac