模拟一个多生产者(比如100个)生产订单(每个生产者生产100个订单),3个消费者消费订单的场景
Order:
public class Order { private String id;//ID private String name; private double price;//金额 public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public double getPrice() { return price; } public void setPrice(double price) { this.price = price; } }
Producer
import com.lmax.disruptor.RingBuffer; public class Producer { private final RingBuffer<Order> ringBuffer; public Producer(RingBuffer<Order> ringBuffer){ this.ringBuffer = ringBuffer; } /** * onData用来发布事件,每调用一次就发布一次事件 * 它的参数会用过事件传递给消费者 */ public void onData(String data){ //可以把ringBuffer看做一个事件队列,那么next就是得到下面一个事件槽 long sequence = ringBuffer.next(); try { //用上面的索引取出一个空的事件用于填充(获取该序号对应的事件对象) Order order = ringBuffer.get(sequence); //获取要通过事件传递的业务数据 order.setId(data); } finally { //发布事件 //注意,最后的 ringBuffer.publish 方法必须包含在 finally 中以确保必须得到调用;如果某个请求的 sequence 未被提交,将会堵塞后续的发布操作或者其它的 producer。 ringBuffer.publish(sequence); } } }
Consumer
import java.util.concurrent.atomic.AtomicInteger; import com.lmax.disruptor.WorkHandler; public class Consumer implements WorkHandler<Order>{ private String consumerId; private static AtomicInteger count = new AtomicInteger(0); public Consumer(String consumerId){ this.consumerId = consumerId; } @Override public void onEvent(Order order) throws Exception { System.out.println("当前消费者: " + this.consumerId + ",消费信息:" + order.getId()); count.incrementAndGet(); } public int getCount(){ return count.get(); } }
消费者定义了一个静态的AutomicInteger来计数。
//创建ringBuffer RingBuffer<Order> ringBuffer = RingBuffer.create(ProducerType.MULTI, new EventFactory<Order>() { @Override public Order newInstance() { return new Order(); } }, 1024 * 1024, new YieldingWaitStrategy());
创建Ringbuffer时指定ProducerType.MULTI,表示多生产者。
SequenceBarrier barriers = ringBuffer.newBarrier(); Consumer[] consumers = new Consumer[3]; for(int i = 0; i < consumers.length; i++){ consumers[i] = new Consumer("c" + i); } WorkerPool<Order> workerPool = new WorkerPool<Order>(ringBuffer, barriers, new IntEventExceptionHandler(), consumers);
创建3个消费者,提交给WorkerPool,且将序列协调者SequenceBarrier,ringbuffer传递进去。
ringBuffer.addGatingSequences(workerPool.getWorkerSequences()); workerPool.start(Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()));
将消费者的位置信息传递给生产者,启动WorkerPool。
final CountDownLatch latch = new CountDownLatch(1); for (int i = 0; i < 100; i++) { final Producer p = new Producer(ringBuffer); new Thread(new Runnable() { @Override public void run() { try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } for(int j = 0; j < 100; j ++){ p.onData(UUID.randomUUID().toString()); } } }).start(); } Thread.sleep(2000); System.out.println("---------------开始生产-----------------"); latch.countDown(); Thread.sleep(5000); System.out.println("总数:" + consumers[0].getCount() );
创建100个Producer,为每一个Producer设置一个线程去onData向ringbuffer填充100次数据,也就是生产了100个订单,100个生产者就填充了ringbuffer一共1万个订单。
打印结果,无非是消费者1、消费者2、消费者3消耗了这10000个订单。