作为比较接近正式使用的方式,我用一个生产者多个消费者来进行了Disurptor的不重复消费的性能测试,在这里我主要是介绍下我在测试过程中使用的代码以及出现的情况做下说明,这些情况有可能是我自己的代码原因引起的,在此也给自己留一个记录,如果看到的同学提出异议的麻烦给我说一下,关于Disruptor的其他介绍可以参考http://357029540.iteye.com/blog/2395677,这是我参考别人的文章写的。
使用maven的方式引入需要的jar
<dependencies> <dependency> <groupId>com.lmax</groupId> <artifactId>disruptor</artifactId> <version>3.3.7</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.20</version> </dependency> </dependencies>
在这里我使用的是WorkHandler接口来实现的消费者类,消费者的代码如下:
package com.demo.disruptor.consumer; import com.demo.disruptor.dto.LogEvent; import com.demo.disruptor.test.BlockingQueueTest; import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.WorkHandler; /** * @author liaoyubo * @version 1.0 2017/10/9 * @description 自定义消费者 */ public class LogEventWorkHandlerConsumer implements WorkHandler<LogEvent> { private long startTime; public LogEventWorkHandlerConsumer() { this.startTime = System.currentTimeMillis(); } @Override public void onEvent(LogEvent logEvent) throws Exception { //全部转化为大写,用于耗时测试 logEvent.setContent(logEvent.getContent().toUpperCase()); //判断是否已经有了开始时间 if(logEvent.getStartTime() == null || logEvent.getStartTime() == 0){ logEvent.setStartTime(startTime); }else { startTime = logEvent.getStartTime(); } //判断是否已经到最后 if (logEvent.getLogId() +1 == BlockingQueueTest.eventNum) { long endTime = System.currentTimeMillis(); System.out.println(" costTime1 = " + (endTime - startTime) + "ms"); } //System.out.println("消费者1-seq logEvent:" + logEvent.toString()); } }LogEventWorkHandlerConsumer2和LogEventWorkHandlerConsumer3同LogEventWorkHandlerConsumer也一样。
生产者类代码如下:
package com.demo.disruptor.producer; import com.demo.disruptor.dto.LogEvent; import com.lmax.disruptor.EventTranslatorVararg; import com.lmax.disruptor.RingBuffer; import java.util.Date; /** * @author liaoyubo * @version 1.0 2017/10/9 * @description 使用translator方式到事件生产者发布事件,通常使用该方法 */ public class LogEventProducerWithTranslator { private EventTranslatorVararg eventTranslatorVararg = new EventTranslatorVararg<LogEvent>() { public void translateTo(LogEvent logEvent, long l, Object... objects) { logEvent.setLogId((Long) objects[0]); logEvent.setContent((String)objects[1]); logEvent.setDate((Date)objects[2]); } }; private RingBuffer<LogEvent> ringBuffer; public LogEventProducerWithTranslator(RingBuffer<LogEvent> ringBuffer) { this.ringBuffer = ringBuffer; } public void onData(long logId, String content, Date date){ //System.out.println("生产者:" + logId); ringBuffer.publishEvent(eventTranslatorVararg,logId,content,date); } }
创建的实体对象:
package com.demo.disruptor.dto; import com.alibaba.fastjson.JSONObject; import java.util.Date; /** * @author liaoyubo * @version 1.0 2017/10/9 * @description 自定义事件对象 */ public class LogEvent { private long logId; private String content; private Date date; private Long startTime; public long getLogId() { return logId; } public void setLogId(long logId) { this.logId = logId; } public String getContent() { return content; } public void setContent(String content) { this.content = content; } public Date getDate() { return date; } public void setDate(Date date) { this.date = date; } public Long getStartTime() { return startTime; } public void setStartTime(Long startTime) { this.startTime = startTime; } @Override public String toString() { return JSONObject.toJSONString(this); } }一个用于定义测试次数的类:
package com.demo.disruptor.test; /** * @author liaoyubo * @version 1.0 2017/10/11 * @description */ public class BlockingQueueTest { public static int eventNum = 5000000; }用于定义异常的代码:
package com.demo.disruptor.exception; import com.demo.disruptor.dto.LogEvent; import com.lmax.disruptor.ExceptionHandler; /** * @author liaoyubo * @version 1.0 2017/10/12 * @description */ public class LogEventExceptionHandler implements ExceptionHandler<LogEvent> { @Override public void handleEventException(Throwable throwable, long l, LogEvent logEvent) { System.out.println("handleEventException...."); } @Override public void handleOnStartException(Throwable throwable) { System.out.println("handleOnStartException...."); } @Override public void handleOnShutdownException(Throwable throwable) { System.out.println("handleOnShutdownException...."); } }
下面是用于测试耗时的主程序:
package com.demo.disruptor.test; import com.demo.disruptor.consumer.*; import com.demo.disruptor.dto.LogEvent; import com.demo.disruptor.exception.LogEventExceptionHandler; import com.demo.disruptor.factory.LogEventFactory; import com.demo.disruptor.producer.LogEventProducer; import com.demo.disruptor.producer.LogEventProducerWithTranslator; import com.lmax.disruptor.*; import com.lmax.disruptor.TimeoutException; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.EventHandlerGroup; import com.lmax.disruptor.dsl.ProducerType; import com.lmax.disruptor.util.DaemonThreadFactory; import java.util.Date; import java.util.concurrent.*; /** * @author liaoyubo * @version 1.0 2017/10/11 * @description */ public class DisruptorTest { //5000000:554,627,545,602,550,578,675,626,587,692 604 //10000000:1657,1471,1234,1231,1302,1083,1186,1064,1044,1073 1235 //50000000:5017,5255,5048,5009,5410,4609,5979,5184,5060,4771 5134 public static void main(String [] args) throws TimeoutException, InterruptedException, InsufficientCapacityException { LogEventFactory factory = new LogEventFactory(); //ExecutorService executor = Executors.newCachedThreadPool(); // 线程池 int ringBufferSize = 65536; final Disruptor<LogEvent> disruptor = new Disruptor<LogEvent>(factory,ringBufferSize, DaemonThreadFactory.INSTANCE, ProducerType.MULTI, new YieldingWaitStrategy()); EventHandlerGroup<LogEvent> eventHandlerGroup = disruptor.handleEventsWithWorkerPool(new LogEventWorkHandlerConsumer(),new LogEventWorkHandlerConsumer2(),new LogEventWorkHandlerConsumer3()); final RingBuffer<LogEvent> ringBuffer = disruptor.getRingBuffer(); disruptor.setDefaultExceptionHandler(new LogEventExceptionHandler()); disruptor.start(); /*new Thread(new Runnable() { @Override public void run() { RingBuffer<LogEvent> ringBuffer = disruptor.getRingBuffer(); //进行事件的发布 LogEventProducer logEventProducer = new LogEventProducer(ringBuffer); for(int i = 0; i < BlockingQueueTest.eventNum; i++){ logEventProducer.onData(i, "c" + i, new Date()); } } }).start();*/ //启用单独线程进行发布 Thread thread = new Thread(new Runnable() { @Override public void run() { //进行事件的发布 LogEventProducerWithTranslator producerWithTranslator = new LogEventProducerWithTranslator(ringBuffer); for(int i = 0; i < BlockingQueueTest.eventNum; i++){ producerWithTranslator.onData(i, "c" + i, new Date()); } } }); thread.start(); while (true){ long sequence = ringBuffer.getMinimumGatingSequence(); //System.out.println(sequence); if(sequence + 1 == BlockingQueueTest.eventNum){ break; } } //检查现在运行的线程 //Thread.currentThread().getThreadGroup().list(); //从代码上理解应该只是把消费者线程关闭了 disruptor.halt(); //Thread.currentThread().getThreadGroup().list(); //在关闭disruptor后如果发起新线程,那么新线程不会关闭,因为需要消费者消防数据导致处于阻塞状态,线程会一直挂起 /*Thread thread1 = new Thread(new Runnable() { @Override public void run() { //进行事件的发布 LogEventProducerWithTranslator producerWithTranslator = new LogEventProducerWithTranslator(ringBuffer); for(int i = 0; i < BlockingQueueTest.eventNum; i++){ producerWithTranslator.onData(i, "d" + i, new Date()); } } }); thread1.setName("thread1"); thread1.start(); while (true){ long sequence = ringBuffer.getMinimumGatingSequence(); System.out.println(sequence); if(sequence + 1 == BlockingQueueTest.eventNum){ break; } } Thread.currentThread().getThreadGroup().list();*/ System.out.println(eventHandlerGroup.asSequenceBarrier().getCursor()); } }
这个地方因为添加了关闭程序,所以性能上面比没有添加关闭操作及判断要慢一些,上面的测试数据是没有关闭操作的测试数据。
下面是BlockingQueue的代码:
/** * @author liaoyubo * @version 1.0 2017/10/11 * @description */ public class BlockingQueueMultiTest { //5000000:1195,1189,1147,1181,1135,1174,1216,1133,1145,1093 1161 //10000000:2633,1972,2417,2330,2255,2429,2354,2178,2235,2402 2321 //50000000:11841,11902,13048,12262,9769,10531,12721,12229,12283,13595 12018 public static void main(String[] args) throws InterruptedException { final BlockingQueue<LogEvent> queue = new ArrayBlockingQueue<LogEvent>(65536); //System.out.println("开始时间:" + startTime); new Thread(new Runnable() { @Override public void run() { int i = 0; while (i < BlockingQueueTest.eventNum) { LogEvent logEvent = new LogEvent(); logEvent.setLogId(i); logEvent.setContent("c" + i); logEvent.setDate(new Date()); try { queue.put(logEvent); } catch (InterruptedException e) { e.printStackTrace(); } i++; } } }).start(); // 创建缓冲池 final ExecutorService executorService = Executors.newCachedThreadPool(); final long startTime = System.currentTimeMillis(); executorService.execute(new Thread(new Runnable() { @Override public void run() { while (true) { try { LogEvent logEvent = queue.take(); logEvent.setContent(logEvent.getContent().toUpperCase()); //System.out.println("BlockingQueue1获取数据:" + (countDownLatch.getCount())); if(logEvent.getLogId() + 1 == BlockingQueueTest.eventNum){ break; } } catch (InterruptedException e) { e.printStackTrace(); } } long endTime = System.currentTimeMillis(); System.out.println("BlockingQueue1 花费时间:" + (endTime-startTime) + "ms"); } })); executorService.execute(new Thread(new Runnable() { @Override public void run() { while (true) { try { LogEvent logEvent = queue.take(); logEvent.setContent(logEvent.getContent().toUpperCase()); //System.out.println("BlockingQueue1获取数据:" + (countDownLatch.getCount())); if(logEvent.getLogId() + 1 == BlockingQueueTest.eventNum){ break; } } catch (InterruptedException e) { e.printStackTrace(); } } long endTime = System.currentTimeMillis(); System.out.println("BlockingQueue2 花费时间:" + (endTime-startTime) + "ms"); } })); executorService.execute(new Thread(new Runnable() { @Override public void run() { while (true) { try { LogEvent logEvent = queue.take(); logEvent.setContent(logEvent.getContent().toUpperCase()); //System.out.println("BlockingQueue1获取数据:" + (countDownLatch.getCount())); if(logEvent.getLogId() + 1 == BlockingQueueTest.eventNum){ break; } } catch (InterruptedException e) { e.printStackTrace(); } } long endTime = System.currentTimeMillis(); System.out.println("BlockingQueue3 花费时间:" + (endTime-startTime) + "ms"); } })); } }
上面的Disruptor和BlockingQueue分别使用了3个线程来消费生产的数据,分别用500W,1000W,5000W的生产数据来取运行10次的平均值来查看结果:
数量 | Disruptor | BlockingQueue | 性能对比 |
500W | 604 | 1161 | 1.9 |
1000W | 1235 | 2321 | 1.9 |
5000W | 5134 | 12018 | 2.3 |
通过以上的测试对比说明在单生产者-多消费者的模式下Disruptor的性能还是比ArrayBlockingQueue好,这也可能和我使用的代码有关(电脑的配置是win10,64位,i5-6500的cpu,8g的内存,jdk是1.8).