多个生产者向容量为10的队列中放数据,多个消费者从队列中取数据进行消费

public class Producer implements Runnable {

    private static AtomicInteger count = new AtomicInteger();
    private volatile boolean isRunning = true;
    private BlockingQueue<String> queue;
    private String name;
    
    public Producer(BlockingQueue<String> queue , String name) {
        this.queue = queue;
        this.name =  name;
    }

    public void run() {
        String data = null;
        Random r = new Random();
        System.out.println("启动"+name);
        do {
            try {
                Thread.sleep(r.nextInt(1000));
                data = "data:" + count.incrementAndGet();
                //这里是等待2秒后,返回true或者false
                if (queue.offer(data, 2, TimeUnit.SECONDS)) {
                    System.out.println(name +"生产数据:" + data);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
                break;
            } 
        }while(isRunning);
        System.out.println(name+ "退出!");
    }

    public void stop() {
        isRunning = false;
    }

}
public class Consumer implements Runnable {

    private BlockingQueue<String> queue;
    private String name;
    public Consumer(BlockingQueue<String> queue , String name) {
        this.queue = queue;
        this.name = name;
    }

    public void run() {
        System.out.println("启动"+name);
        Random r = new Random();
        boolean isRunning = true;
        do {
            try {
                String data = queue.poll(2, TimeUnit.SECONDS);
                if (null != data) {
                    System.out.println(name+"消费数据:" + data);
                    Thread.sleep(r.nextInt(1000));
                } else {
                    // 超过2s还没数据,认为所有生产线程都已经退出,自动退出消费线程。
                    isRunning = false;
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
                break;
            }
        }while(isRunning);
        System.out.println(name+"退出!");
    }

}
public class Client {
 
    public static void main(String[] args) throws InterruptedException {
        // 声明一个容量为10的缓存队列
        BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10);
 
        Producer producer1 = new Producer(queue , "【生产者一】");
        Producer producer2 = new Producer(queue ,"【生产者二】");
        Producer producer3 = new Producer(queue ,"【生产者三】");
        Consumer consumer1 = new Consumer(queue ,"【消费者一】");
        Consumer consumer2 = new Consumer(queue ,"【消费者二】");
        // 借助Executors
        ExecutorService service = Executors.newCachedThreadPool();
        // 启动线程
        service.execute(producer1);
        service.execute(producer2);
        service.execute(producer3);
        service.execute(consumer1);
        service.execute(consumer2);
        // 执行10s
        Thread.sleep(10 * 1000);
        producer1.stop();
        producer2.stop();
        producer3.stop();
 
        Thread.sleep(2000);
        service.shutdown();
    }
}

猜你喜欢

转载自www.cnblogs.com/moris5013/p/10882112.html