并行转成串行(多生产者单消费者顺序消费)

package com.misrobot.exam;

import java.util.EventListener;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;

/**

测试类

*/
public class QueueTest {
    
    private static BaseListener lis = new BaseListener();


    public static void main(String[] args) {
        try {
            Thread.sleep(5000L);
            BaseEvent event = new BaseEvent();
            event.setName("M4");
            lis.onEvent(event);
            lis.onEvent(event);
            Thread.sleep(10000000L);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

    }

}

     class BaseListener implements EventListener{
        
        private LinkedBlockingQueue<BaseEvent> examMsgQueue = null;
        
        private Thread  workThread = null;
        
        public BaseListener(){
            examMsgQueue = new LinkedBlockingQueue<>();
            workThread = new Thread(new Runnable() {
                @Override
                public void run() {
                    // TODO Auto-generated method stub
                    while(true){
//                        System.out.println("1111");
                        try {
                            
//                            if(examMsgQueue.size() > 0){
//                                LogUtil.hsLogDebug(String.format("当前线程队列消息数量 : %s", examMsgQueue.size()));
                                System.out.println(String.format("当前线程队列消息数量 : %s", examMsgQueue.size()));
//                                BaseEvent tempEvent = examMsgQueue.poll();
                                BaseEvent tempEvent = examMsgQueue.take();
                                //消息事件处理
                                msgProcess(tempEvent);
//                        }
                            //线程休眠1ms
//                            System.out.println("空转。。。");
//                            Thread.sleep(1);
                        
                        } catch (Exception e) {
                            try {
                                Thread.sleep(10);
                            } catch (InterruptedException e1) {
                                // TODO Auto-generated catch block
                                e1.printStackTrace();
//                                LogUtil.hsLogError(CommUtil.getErrorInfoFromException(e1));
                            }
                            e.printStackTrace();
                            // TODO: handle exception
//                            LogUtil.hsLogError(CommUtil.getErrorInfoFromException(e));
                        }
                    }
                }

import java.util.Enumeration;
import java.util.Vector;

import service.osce.base.BaseEvent;
import service.osce.base.BaseListener;
import service.osce.event.listener.OsceEventListener;
import service.osce.event.listener.Zyyk21EventListener;

public class CommonListenerManager {
    private static Vector<BaseListener> repository = new Vector<BaseListener>();

    public static void initListener(){
        if(repository.size() <=0)
        {
            BaseListener baseListener = new OsceEventListener();
            addListener(baseListener);
            BaseListener checkinListener = new Zyyk21EventListener();
            addListener(checkinListener);
        }
    }
    //注册监听器,如果这里没有使用Vector而是使用ArrayList那么要注意同步问题
    public static void addListener(BaseListener listener)
    {
        repository.addElement(listener);//这步要注意同步问题
    }
    //如果这里没有使用Vector而是使用ArrayList那么要注意同步问题
    public static void notifyEvent(BaseEvent event) {
        initListener();
        Enumeration<BaseListener> enumeration = repository.elements();//这步要注意同步问题
        while(enumeration.hasMoreElements())
        {
          BaseListener listener = (BaseListener)enumeration.nextElement();
          listener.onEvent(event);
        }
    }
    //删除监听器,如果这里没有使用Vector而是使用ArrayList那么要注意同步问题
    public static void removeListener(BaseListener dl)
    {
        repository.remove(dl);//这步要注意同步问题
    }
}
 

            });
            workThread.start();
        }
        
        public void onEvent(BaseEvent event){
            switch(event.getName())
            {
                case "NextExaminee":
                case "M4":
                case "M18":
                case "CheckIn":
                case "CheckOut":
                    examMsgQueue.offer(event);
//                    LogUtil.hsLogDebug("消息进入队列 : " + event.getName());
//                    LogUtil.hsLogDebug(String.format("%s消消息进入队列", event.getName()));
                    break;
            }
        }
        
        public void onNextExaminee(BaseEvent event)
        {
            
        }
        
        public void onMessageM4(BaseEvent event){
            System.out.println("deal M4 finished");
        }
        
        public void onMessageM18(BaseEvent event){
            
        }
        
        public void onCheckIn(BaseEvent event){
            
        }
        
        public void onCheckOut(BaseEvent event){
            
        }
        
        public void msgProcess(BaseEvent msgEvent){
            try {
                if(null != msgEvent){
                    switch(msgEvent.getName())
                    {
                        case "NextExaminee":
                            onNextExaminee(msgEvent);
                            break;
                        case "M4":
                            onMessageM4(msgEvent);
                            break;
                        case "M18":
                            onMessageM18(msgEvent);
                            break;
                        case "CheckIn":
                            onCheckIn(msgEvent);
                            break;
                        case "CheckOut":
                            onCheckOut(msgEvent);
                            break;
                    }
//                    LogUtil.hsLogDebug("处理消息 : " + msgEvent.getName());
//                    LogUtil.hsLogDebug(String.format("%s消息处理完成", msgEvent.getName()));
                }
            } catch (Exception e) {
//                LogUtil.hsLogError(e.getMessage());
            }
        }
    }

//通知消息有多个生产者,担心消费者需要保证顺序处理

CommonListenerManager.notifyEvent(new BaseEvent("OsceSchedule","NextExaminee",params));

猜你喜欢

转载自blog.csdn.net/liuqianduxin/article/details/90546870