High Performance(2)Ringbuffer and Disruptor
1. Disruptor Introduction
a set circle array, lock-free ring buffer
Component - Ringbuffer
current number % size = order number 12%10=2
the end of the array is managed by consumer.
Component - Sequence
thread safe, counter
Component - SequenceBarrier
consumers wait for sequence
Component - WaitStrategy
BusySpinWaitStrategy, BlockingWaitStrategy, SleepingWaitStrategy, YieldingWaitStragegy
PhasedBackoffWaitStrategy
BactchEventProcessor
consumer
WorkProcessor
consumers share one sequence
WorkerPool
All the WorkProcessor which share the same Sequence are managed by WorkerPool
2. Simple Producer and Consumer
All the example codes are in easydisruptor.
dependencies
libraryDependencies ++= Seq(
"org.scalatest" % "scalatest_2.10" % "1.9.1" % "test",
"org.specs2" %% "specs2" % "1.13" % "test",
"com.lmax" % "disruptor" % "3.3.0"
)
model
package com.sillycat.easydisruptor.model
class LongEvent(){
var value = Long.MinValue
}
factory
package com.sillycat.easydisruptor.factory
import com.lmax.disruptor.EventFactory
import com.sillycat.easydisruptor.model.LongEvent
class LongEventFactory extends EventFactory[LongEvent] {
def newInstance():LongEvent = {
return new LongEvent()
}
}
translator
package com.sillycat.easydisruptor.translator
import com.sillycat.easydisruptor.model.LongEvent
import java.nio.ByteBuffer
import com.lmax.disruptor.EventTranslatorOneArg;
class CustomEventTranslatorOneArg extends EventTranslatorOneArg[LongEvent,ByteBuffer]{
def translateTo(event: LongEvent, sequence: Long, bb: ByteBuffer) = {
event.value = bb.getLong(0)
}
}
consumer
package com.sillycat.easydisruptor.consumer
import com.lmax.disruptor.EventHandler
import com.sillycat.easydisruptor.model.LongEvent
class LongEventHandler extends EventHandler[LongEvent]{
def onEvent(event:LongEvent,sequence:Long,endOfBatch:Boolean): Unit ={
println("Event: " + event.value)
}
}
producer
package com.sillycat.easydisruptor.producer
import com.lmax.disruptor.RingBuffer
import java.nio.ByteBuffer
import com.sillycat.easydisruptor.translator.CustomEventTranslatorOneArg
import com.sillycat.easydisruptor.model.LongEvent
class LongEventProducer(ringBuffer: RingBuffer[LongEvent]) {
val translator = new CustomEventTranslatorOneArg()
def onData(bb:ByteBuffer) = {
ringBuffer.publishEvent(translator,bb)
}
}
App to run
package com.sillycat.easydisruptor
import java.util.concurrent.Executors
import com.sillycat.easydisruptor.factory.LongEventFactory
import com.lmax.disruptor.dsl.Disruptor
import com.sillycat.easydisruptor.consumer.LongEventHandler
import com.sillycat.easydisruptor.producer.LongEventProducer
import java.nio.ByteBuffer
object LongEventApp extends App{
val executor = Executors.newCachedThreadPool()
val factory = new LongEventFactory()
// Specify the size of the ring buffer, must be power of 2.
val bufferSize = 1024
val disruptor = new Disruptor(factory, bufferSize, executor)
//set consumer/handler
disruptor.handleEventsWith(new LongEventHandler())
disruptor.start()
val ringBuffer = disruptor.getRingBuffer()
val producer = new LongEventProducer(ringBuffer)
val bb = ByteBuffer.allocate(8);
for( a <- 1 to 10){
bb.putLong(0, a)
producer.onData(bb)
Thread.sleep(1000)
}
sys.exit(0)
}
We can also define the Disruptor as follow:
val disruptor = new Disruptor[LongEvent](factory, bufferSize, executor,ProducerType.SINGLE, new SleepingWaitStrategy())
References:
http://developer.51cto.com/art/201306/399341.htm
http://liuxun.org/blog/disruptor-yi-ge-kai-yuan-de-gao-xiao-nei-cun-wu-suo-dui-lie/
http://ziyue1987.github.io/pages/2013/09/22/disruptor-use-manual.html
http://lmax-exchange.github.io/disruptor/
More Example
https://code.google.com/p/disruptor/source/browse/trunk/code/src/perf/?r=421#perf%2Fcom%2Flmax%2Fdisruptor
High Performance(2)Ringbuffer and Disruptor
猜你喜欢
转载自sillycat.iteye.com/blog/2101557
今日推荐
周排行