概述
- RingBuffer#next()这个方法干的事情是,生产者问RingBuffer要一个能“下蛋”的位置,具体怎么给生产者给出这个位置,是由Sequencer的实现类完成的;
- Disruptor这个无锁并行框架中的“无锁”,在这个方法中也体现出来了;
0)客户端代码
- 从这一句开始:long sequence = ringBuffer.next();
import java.nio.ByteBuffer;
import com.lmax.disruptor.RingBuffer;
public class OrderEventProducer {
private RingBuffer<OrderEvent> ringBuffer;
public OrderEventProducer(RingBuffer<OrderEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
}
public void sendData(ByteBuffer data) {
//1 在生产者发送消息的时候, 首先 需要从我们的ringBuffer里面 获取一个可用的序号
long sequence = ringBuffer.next(); //0
try {
//2 根据这个序号, 找到具体的 "OrderEvent" 元素 注意:此时获取的OrderEvent对象是一个没有被赋值的"空对象"
OrderEvent event = ringBuffer.get(sequence);
//3 进行实际的赋值处理
event.setValue(data.getLong(0));
} finally {
//4 提交发布操作
ringBuffer.publish(sequence);
}
}
}
1)RingBuffer#next()
- sequencer是一个接口,所以要看其实现类,这里是单消费者模式,所以去SingleProducerSequencer中看next();
@Override
public long next()
{
return sequencer.next();
}
2)SingleProducerSequencer#next()
@Override
public long next()
{
return next(1);
}
3)SingleProducerSequencer#next(int)
其作为服务提供者,向RingBuffer提供服务,RingBuffer中的相关信息它都是有的,比如:bufferSize,并且维护了一些辅助信息,比如:nextValue,cachedValue;
this.nextValue
- 表示生产者最后一次投递的位置,在方法结束的时候,会向前走一位,追上nextSequence;
nextSequence
- 表示生产者这次要投递的位置;
wrapPoint
- 表示包裹点,当生产者还没有生产完一圈的时候,其值为负,没有和生产者中最慢的位置比较的意义,因为就算消费者一个都没消费,其最慢的消费位置为-1;就算wrapPoint为-1了,表示生产者要投递第一圈的最后一个位置了,也还没追上消费者,无需阻塞;
this.cachedValue
- 表示上一次投递,消费者最慢的位置,这个位置在最新一次投递时,有可能已经落后了,不过没关系,其作为第一道门槛还是有意义的:如果生产者这次的投递位置还没到上次最慢的消费者的位置,那么无需担心生产者阻塞的问题,直接投递,不会覆盖尚未消费的Event的;
minSequence
- 表示最慢的消费者的消费位置,如果消费者一个都没消费,其值为-1;minSequence和wrapPoint的关系是:如果生产者这次要投递的位置已经超过最慢的消费者的位置,那么生产者这次要投递的位置上有待消费的Event,生产者会阻塞 1 纳秒,直到生产者这次要投递的位置的Event被消费掉;
@Override
public long next(int n)
{
if (n < 1) // n = 1
{
throw new IllegalArgumentException("n must be > 0");
}
long nextValue = this.nextValue; // -1 0
/**
* nextValue 是生产者当前最后投递成功的位置;
* nextSequence 是生产者的下一个投递序号;
*/
long nextSequence = nextValue + n; // -1 + 1 = 0, 0+1=1
/**
* 如果生产者还没有投够一圈,wrapPoint < 0
* wrapPoint > 0,才有和 cachedGatingSequence 比较的意义;
* wrapPoint 理解成包裹点,生产者投了一圈了才能把entries包裹上;
* wrapPoint 也是生产者的投递sequence
*/
long wrapPoint = nextSequence - bufferSize; // 0 - 10 = -10, 1-10=-9
/**
* cachedValue 应该是上一次生产者获取sequence的时候,最慢的消费者的sequence
*/
long cachedGatingSequence = this.cachedValue; // -1
/**
* cachedGatingSequence > nextValue
* 上次生产者投递的时候,最慢的消费者的位置都比生产者最后投递成功的位置大,消费者领先于生产者;
* wrapPoint > cachedGatingSequence
* 生产者已经追上上次最慢的额消费者;
*/
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)
{
/**
* 当前消费者中最慢的sequence;
*/
long minSequence;
/**
* 生产者的速度太快了,追上消费者了
*/
while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue)))
{
/**
* 生产者阻塞一丢丢,让消费者赶紧接着消费;
*/
LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin?
}
/**
* 生产者的速度没那么快,记录下这次最慢的消费者;
* cachedValue 存的应该是消费者中最慢的
*/
this.cachedValue = minSequence;
}
/**
* nextValue:已经成功投递的最远的位置;
* nextSequence:本次要投递的位置;
* this.nextValue = nextSequence; 可以理解成投递成功;虽然新的Event还没有更新到entries中,但意思上已经投递成功了;
*/
this.nextValue = nextSequence; // 0, 1
/**
* 给生产者这次投递的位置;
*/
return nextSequence;
}
总结
- 生产者生产的时候,按顺序生产,能生产就生产,不能生产就等着;
- 生产者和消费者都没有对Event加锁;