5. 通过ProcessingSequenceBarrier#waitFor来获取可以消费的下标。检查警告标志是否正常,
public long waitFor(final long sequence)
throws AlertException, InterruptedException, TimeoutException
{
checkAlert();
long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);
if (availableSequence < sequence)
{
return availableSequence;
}
return sequencer.getHighestPublishedSequence(sequence, availableSequence);
}
通过阻塞策略获取下标BlockingWaitStrategy#waitFor,cursorSequence为提供端的下标,当没有消息产生时线程会等待mutex.wait(),被唤醒后也会检查依赖的其他线程的定序器是否已经消费过dependentSequence,否的话进入死循环。
public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier)
throws AlertException, InterruptedException
{
long availableSequence;
if (cursorSequence.get() < sequence)
{
synchronized (mutex)
{
while (cursorSequence.get() < sequence)
{
barrier.checkAlert();
mutex.wait();
}
}
}
while ((availableSequence = dependentSequence.get()) < sequence)
{
barrier.checkAlert();
ThreadHints.onSpinWait();
}
return availableSequence;
}
选取下标和可用标志位的最大值。MultiProducerSequencer
public long getHighestPublishedSequence(long lowerBound, long availableSequence)
{
for (long sequence = lowerBound; sequence <= availableSequence; sequence++)
{
if (!isAvailable(sequence))
{
return sequence - 1;
}
}
return availableSequence;
}
6. 保存本线程本次获取的可用下标最大值cachedAvailableSequence,WorkProcessor#run,第二次循环processedSequence已经是false,不获取workSequence值,cachedAvailableSequence >= nextSequence满足条件,获取对应的值
public E get(long sequence)
{
return elementAt(sequence);
}
protected final E elementAt(long sequence)
{
return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT));
}
消费线程处理workHandler.onEvent(event);最后把processedSequence设置为true,这样下一次循环就重新从workSequence中获取互斥的下标。
7. 发布消息Disruptor#publishEvent=》RingBuffer#publishEvent
public void publishEvent(final EventTranslator<T> eventTranslator)
{
ringBuffer.publishEvent(eventTranslator);
}
public void publishEvent(EventTranslator<E> translator)
{
final long sequence = sequencer.next();
translateAndPublish(translator, sequence);
}
获取提供者定序器的下一个可用下标MultiProducerSequencer#next,获取当前下标以及下一个下标,wrapPoint为正数代表已经设置一圈数据了,获取gatingSequenceCache下标,初始值为-1,刚开始时设置成功cursor.compareAndSet(current, next)返回next下标,第二圈时,wrapPoint为0,cachedGatingSequence为-1,获取消费端的下标和当前提供者的下标的最小下标gatingSequence,当wrapPoint小于gatingSequence时,代表消费者还没有消费当前下标,所以不能覆盖当前位置的对象,所以就需要等待LockSupport.parkNanos(1);如果大于的话,就把那个最小位置设置到gatingSequenceCache保存下来。
public long next()
{
return next(1);
}
public long next(int n)
{
if (n < 1 || n > bufferSize)
{
throw new IllegalArgumentException("n must be > 0 and < bufferSize");
}
long current;
long next;
do
{
current = cursor.get();
next = current + n;
long wrapPoint = next - bufferSize;
long cachedGatingSequence = gatingSequenceCache.get();
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current)
{
long gatingSequence = Util.getMinimumSequence(gatingSequences, current);
if (wrapPoint > gatingSequence)
{
LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy?
continue;
}
gatingSequenceCache.set(gatingSequence);
}
else if (cursor.compareAndSet(current, next))
{
break;
}
}
while (true);
return next;
}
7. 转化设置发布,获取对应位置的值,EventTranslator是用来设置对应数组entries下标的值。
private void translateAndPublish(EventTranslator<E> translator, long sequence)
{
try
{
translator.translateTo(get(sequence), sequence);
}
finally
{
sequencer.publish(sequence);
}
}
设置完后发布对应的下标,设置该下标的可用标志,
public void publish(final long sequence)
{
setAvailable(sequence);
waitStrategy.signalAllWhenBlocking();
}
private void setAvailable(final long sequence)
{
setAvailableBufferValue(calculateIndex(sequence), calculateAvailabilityFlag(sequence));
}
private int calculateAvailabilityFlag(final long sequence)
{
return (int) (sequence >>> indexShift);
}
private int calculateIndex(final long sequence)
{
return ((int) sequence) & indexMask;
}
private void setAvailableBufferValue(int index, int flag)
{
long bufferAddress = (index * SCALE) + BASE;
UNSAFE.putOrderedInt(availableBuffer, bufferAddress, flag);
}
最后唤醒所有在等待的消费线程。
public void signalAllWhenBlocking()
{
synchronized (mutex)
{
mutex.notifyAll();
}
}