public class Producer implements Runnable { private static AtomicInteger count = new AtomicInteger(); private volatile boolean isRunning = true; private BlockingQueue<String> queue; private String name; public Producer(BlockingQueue<String> queue , String name) { this.queue = queue; this.name = name; } public void run() { String data = null; Random r = new Random(); System.out.println("启动"+name); do { try { Thread.sleep(r.nextInt(1000)); data = "data:" + count.incrementAndGet(); //这里是等待2秒后,返回true或者false if (queue.offer(data, 2, TimeUnit.SECONDS)) { System.out.println(name +"生产数据:" + data); } } catch (InterruptedException e) { e.printStackTrace(); break; } }while(isRunning); System.out.println(name+ "退出!"); } public void stop() { isRunning = false; } }
public class Consumer implements Runnable { private BlockingQueue<String> queue; private String name; public Consumer(BlockingQueue<String> queue , String name) { this.queue = queue; this.name = name; } public void run() { System.out.println("启动"+name); Random r = new Random(); boolean isRunning = true; do { try { String data = queue.poll(2, TimeUnit.SECONDS); if (null != data) { System.out.println(name+"消费数据:" + data); Thread.sleep(r.nextInt(1000)); } else { // 超过2s还没数据,认为所有生产线程都已经退出,自动退出消费线程。 isRunning = false; } } catch (InterruptedException e) { e.printStackTrace(); break; } }while(isRunning); System.out.println(name+"退出!"); } }
public class Client { public static void main(String[] args) throws InterruptedException { // 声明一个容量为10的缓存队列 BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10); Producer producer1 = new Producer(queue , "【生产者一】"); Producer producer2 = new Producer(queue ,"【生产者二】"); Producer producer3 = new Producer(queue ,"【生产者三】"); Consumer consumer1 = new Consumer(queue ,"【消费者一】"); Consumer consumer2 = new Consumer(queue ,"【消费者二】"); // 借助Executors ExecutorService service = Executors.newCachedThreadPool(); // 启动线程 service.execute(producer1); service.execute(producer2); service.execute(producer3); service.execute(consumer1); service.execute(consumer2); // 执行10s Thread.sleep(10 * 1000); producer1.stop(); producer2.stop(); producer3.stop(); Thread.sleep(2000); service.shutdown(); } }