1.相关实体
package com.royalnu.psis.interfaces.psp.api.model; import com.royalnu.core.module.com.Identifiable; import lombok.Getter; import lombok.Setter; @Setter @Getter public class PspMsg extends Identifiable { private static final long serialVersionUID = 1L; /** * <pre> * xml内容 * </pre> * */ private String xmlContent; /** * <pre> * 服务器IP * </pre> * */ private String serverIp; }
2.定义队列静态属性减少在构造器中传递
package com.royalnu.psis.interfaces.psp.provider.threads; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import com.royalnu.psis.interfaces.psp.api.model.PspMsg; public class ThreadConstants { public static final ExecutorService cachedThreadPool = Executors.newCachedThreadPool(); public static final BlockingQueue<PspMsg> PspMsgQueue = new LinkedBlockingQueue<PspMsg>(); }
3.生产者
package com.royalnu.psis.interfaces.psp.provider.threads; import java.util.List; import javax.annotation.Resource; import com.royalnu.psis.interfaces.psp.api.model.PspMsg; import com.royalnu.psis.interfaces.psp.api.service.PspMsgService; import lombok.extern.log4j.Log4j2; @Log4j2 public class PspProduceDealThread implements Runnable { private List<PspMsg> pspMsgList; private boolean isData = true; public PspProduceDealThread() { } public PspProduceDealThread(List<PspMsg> pspMsgList) { this.pspMsgList = pspMsgList; } public void run() { while (isData) { try { for (PspMsg pspMsg : pspMsgList) { ThreadConstants.PspMsgQueue.put(pspMsg); log.info("生产" + pspMsg.getServerIp()); Thread.sleep(500L); } } catch (Exception e) { e.printStackTrace(); } } } }
4.消费者
package com.royalnu.psis.interfaces.psp.provider.threads; import com.royalnu.psis.interfaces.psp.api.model.PspMsg; import lombok.extern.log4j.Log4j2; @Log4j2 public final class PspConsumerDealThread implements Runnable { public void run() { while (true) { try { PspMsg take = ThreadConstants.PspMsgQueue.take(); log.info("消费" + take.getServerIp()); } catch (InterruptedException e) { e.printStackTrace(); } } } }
5.生产消费处理类主要方法
List<PspMsg> pspMsgList = new LinkedList<PspMsg>(); for (int i = 1; i < 10; i++) { PspMsg pspMsg = new PspMsg(); pspMsg.setServerIp(String.valueOf(i)); pspMsgList.add(pspMsg); } PspProduceDealThread produceDealThread = new PspProduceDealThread(pspMsgList); ThreadConstants.cachedThreadPool.execute(produceDealThread); PspConsumerDealThread consumerDealThread = new PspConsumerDealThread(); ThreadConstants.cachedThreadPool.execute(consumerDealThread);