package com.pcxm.blockqueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; public class QueueTest { public static AtomicInteger atomicInteger = new AtomicInteger(0); public volatile static Boolean isRun = true; public volatile static Boolean isConsum = true; public static void main(String[] args) throws Exception { BlockingQueue<String> que = new LinkedBlockingQueue<>(); // 定义生产者 ExecutorService producerPool = Executors.newFixedThreadPool(5); // 定义消费者 ExecutorService consumerPool = Executors.newFixedThreadPool(5); producerPool.execute(new Runnable() { @Override public void run() { System.err.println("开始启动生产者线程!"); while (isRun) { System.out.println("正在生产数据..."); try { int incrementAndGet = atomicInteger.incrementAndGet(); Thread.sleep(1000); if(incrementAndGet > 10){ break; } System.out.println("将数据" + incrementAndGet + "放入到队列中"); que.put("数据" + incrementAndGet); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } }); consumerPool.execute(new Runnable() { @Override public void run() { System.err.println("开始启动消费者线程"); while (isConsum) { try { String poll = que.poll(2, TimeUnit.SECONDS); if(poll==null){ break; } System.err.println("开始消费" + poll); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } }); producerPool.shutdown(); consumerPool.shutdown(); } }
实现一个简单的生产消费程序
猜你喜欢
转载自www.cnblogs.com/hetutu-5238/p/9084357.html
今日推荐
周排行