来源:
工作需要实现消息路由的中间层模块
测试结果:
2W客户, 每客户10 Request -> MQEngine -> 本地tomcat
消息转发并发量: 6.7K 零丢包
设计图
1. MQEngine
package lightmq; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class MQEngine<E, P extends MessageHandler<E>> { /** * 消息队列 * TODO 优化改为 LightQueue(内部实现为queue组) */ final Queue<E> queue = new ConcurrentLinkedQueue<E>(); //final Queue<E> queue1 = new LightQueue<E>(10); /** * handler class */ private Class<? extends MessageHandler<E>> handlerClass; /** * 消费者线程池 */ ExecutorService consumerES; /** * 消费者数量 */ private int consumerSize = 1; private Runnable[] consumers; /** * 构造函数 * * @param c 处理器类 */ public MQEngine(Class<? extends MessageHandler<E>> c){ this(1, 1, c); } /** * 构�?函数 * * @param threadPoolSize 线程池大�? * @param consumerSize 消息者数�? * @param c 处理器类�? */ public MQEngine(int threadPoolSize, int consumerSize, Class<? extends MessageHandler<E>> c){ consumerES = Executors.newFixedThreadPool(threadPoolSize); this.handlerClass = c; this.consumerSize = consumerSize; } /** * 启动消费者线�? * @param consumerSize * @param c */ public void start() { final Class<? extends MessageHandler<E>> c = this.handlerClass; class ConsumerTask implements Runnable{ @Override public void run() { MessageHandler<E> p = null; try { p = c.newInstance(); } catch (InstantiationException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } catch (IllegalAccessException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } // int i = 0; while(true){ try { if (!queue.isEmpty()) { p.consume(queue.poll()); } i++; } catch (Exception e) { e.printStackTrace(); } // 每执行100次 if(10==i){ synchronized (this) { try { i = 0; wait(100); } catch (InterruptedException e) { e.printStackTrace(); } } } } } } this.consumers = new Runnable[this.consumerSize]; for (int i = 0; i < this.consumers.length; i++) { consumers[i] = new ConsumerTask(); } for (int i = 0; i < consumers.length; i++) { consumerES.execute(consumers[i]); } } /** * * @param e */ public void push(E e){ queue.add(e); for (int i = 0; i < this.consumers.length; i++) { synchronized (consumers[i]) { consumers[i].notify(); } } } /** * */ public void destory(){ this.queue.clear(); this.consumerES.shutdown(); } }
2. MessageHandler
package lightmq; /** * 消息处理器 * 由子类派生 * @author kevin * * @param <E> */ public abstract class MessageHandler<E> { public abstract void consume(E e); }
3.MyMessageHandler
package lightmq; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicLong; import org.apache.http.HttpResponse; import org.apache.http.client.methods.HttpGet; import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; import org.apache.http.impl.nio.client.HttpAsyncClients; import org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager; import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor; import org.apache.http.nio.conn.NHttpClientConnectionManager; /** * 业务相关消息处理器 * @author kevin * */ public class MyMessageHandler extends MessageHandler<String>{ static AtomicLong sentCount = new AtomicLong(0); static NHttpClientConnectionManager connMgr; @Override public void consume(String e) { sendToTomcat(e); } private CloseableHttpAsyncClient httpclient = null; public MyMessageHandler(){ try { connMgr = new PoolingNHttpClientConnectionManager(new DefaultConnectingIOReactor()); httpclient = HttpAsyncClients.createMinimal(connMgr); httpclient.start(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } /** * 发给sc * @param message */ private void sendToTomcat(String message){ long startTime = System.currentTimeMillis(); try { // http[GET]请求, final HttpGet request1 = new HttpGet("http://localhost"); Future<HttpResponse> future = httpclient.execute(request1, null); // and wait until a response is received HttpResponse response1; response1 = future.get(); System.out.println("message " + message + ":" + request1.getRequestLine() + "->" + response1.getStatusLine()); System.out.println(message + " Sent; Cost:" + (System.currentTimeMillis() - startTime) + "; Succeed Sent: " + sentCount.incrementAndGet()); } catch (Exception e1) { System.err.println(e1.getMessage()); } finally{ // 关闭链接 if(null!=httpclient){ try { //httpclient.close(); } catch (Exception e1) { //e1.printStackTrace(); System.err.println(e1.getMessage()); } } } } }
4.M2Queue
package lightmq; import java.util.Collection; import java.util.Iterator; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; /** * * @author kevin.xu * * @param <V> */ public class M2Queue<V> implements Queue<V>{ /** * 队列数组 */ private Queue<V> queues[]; /** * * @param initQueueSize */ public M2Queue(int initQueueSize) { queues = new Queue[initQueueSize]; for (int i = 0; i < queues.length; i++) { queues[i] = new ConcurrentLinkedQueue<V>(); } } @Override public int size() { return 0; } @Override public boolean isEmpty() { // TODO Auto-generated method stub return false; } @Override public boolean contains(Object o) { // TODO Auto-generated method stub return false; } @Override public Iterator<V> iterator() { // TODO Auto-generated method stub return null; } @Override public Object[] toArray() { // TODO Auto-generated method stub return null; } @Override public <T> T[] toArray(T[] a) { // TODO Auto-generated method stub return null; } @Override public boolean remove(Object o) { // TODO Auto-generated method stub return false; } @Override public boolean containsAll(Collection<?> c) { // TODO Auto-generated method stub return false; } @Override public boolean addAll(Collection<? extends V> c) { // TODO Auto-generated method stub return false; } @Override public boolean removeAll(Collection<?> c) { // TODO Auto-generated method stub return false; } @Override public boolean retainAll(Collection<?> c) { // TODO Auto-generated method stub return false; } @Override public void clear() { // TODO Auto-generated method stub } @Override public boolean add(V e) { return offer(e); } /** * 添加到元素最少的队列中 */ @Override public boolean offer(V e) { return queues[getSmallestQueueIndex()].offer(e); } /** * 从元素最大的队列中remove */ @Override public V remove() { return queues[getLargestQueueIndex()].remove(); } /** * 从元素最大的队列中poll */ @Override public V poll() { return queues[getLargestQueueIndex()].poll(); } /** * 从元素最大的队列中element */ @Override public V element() { return queues[getLargestQueueIndex()].element(); } /** * 先从记录数最多的queue里peek */ @Override public V peek() { return queues[getLargestQueueIndex()].peek(); } /** * 返回最少记录数的queue * * @return */ private int getSmallestQueueIndex(){ int index = 0; if (queues.length > 1) { for (int i = index; i < queues.length; i++) { if(queues[i].size() > queues[i+1].size()){ index = i+1; } } } return index; } /** * 返回最多记录数的queue * * @return */ private int getLargestQueueIndex(){ int index = 0; if (queues.length > 1) { for (int i = index; i < queues.length; i++) { if(queues[i].size() < queues[i+1].size()){ index = i+1; } } } return index; } }
5.TestMQ
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicLong; import lightmq.MQEngine; import lightmq.MyMessageHandler; /** * MQEngine测试类 * @author kevin * */ public class TestMQ { public static void main(String[] args) { final AtomicLong l = new AtomicLong(0); // final MQEngine<String, MyMessageHandler> mq = new MQEngine<String, MyMessageHandler>(10, 50, MyMessageHandler.class); mq.start(); // 模拟客户并发数 final int PRODUCER_SIZE = 200000; // 模拟每个客户平均请求次数 final int REQUEST_TIME = 10; ExecutorService es = Executors.newFixedThreadPool(10); for (int i = 0; i < PRODUCER_SIZE; i++) { es.execute(new Runnable() { @Override public void run() { for (int i = 0; i < REQUEST_TIME; i++) { mq.push(String.valueOf(l.incrementAndGet())); } } }); } try { Thread.sleep(1000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println(mq.size()); } }