我们常常说Batching(批量增加、批量操作...),那么Batching会增加延迟性,特别针对于本身延迟比较low的系统。
但是从我跟人角度来说,如果Batching的算法如果做得好,不仅会带来吞吐量的增加,而且也降低整个系统的延迟性。
在我们的网络传输时候,经常采用把messages/even捆绑在一起形成数据包,然后提高网络传输的吞吐量,同样我们也会类似的方式在系统中IOPS来帮助我们提高性能。
如果上图1中场景非常容易产生竞争,而下面图是因为通过Queue,但是最终是有单一线程完成批量的操作。
The Algorithm
public final class NetworkBatcher implements Runnable { private final NetworkFacade network; private final Queue<Message> queue; private final ByteBuffer buffer; public NetworkBatcher(final NetworkFacade network, final int maxPacketSize, final Queue<Message> queue) { this.network = network; buffer = ByteBuffer.allocate(maxPacketSize); this.queue = queue; } public void run() { while (!Thread.currentThread().isInterrupted()) { while (null == queue.peek()) { employWaitStrategy(); // block, spin, yield, etc. } Message msg; while (null != (msg = queue.poll())) { if (msg.size() > buffer.remaining()) { sendBuffer(); } buffer.put(msg.getBytes()); } sendBuffer(); } } private void sendBuffer() { buffer.flip(); network.send(buffer); buffer.clear(); } }
伪代码大致地想法可能如上所述。
在现实中,并发下的同步以及性能指标基本会采用CAS的队列ConcurrentLinkedQueue,但是都有问题就是Queue都是无边界,当超出可控的范围,那么性能指标可能会多损耗50%+,这个也许大家对于ArrayList都有比较多的了解。
算法都是基于 single writer principle 的原则而进行编写设计的。
Batching with the Disruptor
在爆发业务场景,我们也会采用 EventHandler 用来进行加载
public final class NetworkBatchHandler implements EventHander<Message> { private final NetworkFacade network; private final ByteBuffer buffer; public NetworkBatchHandler(final NetworkFacade network, final int maxPacketSize) { this.network = network; buffer = ByteBuffer.allocate(maxPacketSize); } public void onEvent(Message msg, long sequence, boolean endOfBatch) throws Exception { if (msg.size() > buffer.remaining()) { sendBuffer(); } buffer.put(msg.getBytes()); if (endOfBatch) { sendBuffer(); } } private void sendBuffer() { buffer.flip(); network.send(buffer); buffer.clear(); } }
Separation of IO from Work Processing