1 查看api 搜索关键词: BlockingQueue
可以看到具体实现类:
2 概念理解: 一个队列(理解成数组),一个人不停的放面包,一个人不停的取面包,当放满时,放队列阻塞,放面包的人等待; 当取面包到没有时,取动作阻塞,取面包的人等待;
3 使用jdk提供实现类,模拟阻塞效果:
package cn.itcast.heima2; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.locks.ReentrantLock; /** * 其内部代码 和自定义写法一样 也是弄出两个队列来 * public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = (E[]) new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); } * @author zm * * 测试结果: * Thread-0准备放数据! Thread-0已经放了数据,队列目前有1个数据 Thread-1准备放数据! Thread-1已经放了数据,队列目前有2个数据 Thread-2准备取数据! Thread-2已经取走数据,数据为: 1 队列目前有1个数据 Thread-1准备放数据! Thread-1已经放了数据,队列目前有2个数据 Thread-0准备放数据! Thread-0已经放了数据,队列目前有3个数据 Thread-1准备放数据! ------------------------> 此时篮子满了, 放队列进入阻塞 Thread-0准备放数据! Thread-2准备取数据! Thread-2已经取走数据,数据为: 1 队列目前有2个数据 -----------------------> 此时取走了一个面包 篮子不满 Thread-1已经放了数据,队列目前有3个数据 -----------------------> 此时才能继续放面包 * */ public class BlockingQueueTest { public static void main(String[] args) { final BlockingQueue queue = new ArrayBlockingQueue(3); // 定义篮子为3个大小 for(int i=0;i<2;i++){ // 两个人,不停向篮子 放面包 new Thread(){ public void run(){ while(true){ try { Thread.sleep((long)(Math.random()*1000)); System.out.println(Thread.currentThread().getName() + "准备放数据!"); queue.put(1); System.out.println(Thread.currentThread().getName() + "已经放了数据," + "队列目前有" + queue.size() + "个数据"); } catch (InterruptedException e) { e.printStackTrace(); } } } }.start(); } new Thread(){ public void run(){ // 一个人 不停的从篮子中 取面包 while(true){ try { //将此处的睡眠时间分别改为100和1000,观察运行结果 Thread.sleep(1000); // System.out.println(Thread.currentThread().getName() + "准备取数据!"); Object object = queue.take(); System.out.println(Thread.currentThread().getName() + "已经取走数据,数据为: " + object + " 队列目前有" + queue.size() + "个数据"); } catch (InterruptedException e) { e.printStackTrace(); } } } }.start(); } }4 使用自定义阻塞队列方式,实现 存取篮子面包效果:
package thread; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * * @author zm * 结果: 向第0个篮子生产了一个面包,面包重量为 25g 从第0个篮子拿走了一个面包,面包重量为 25g 向第1个篮子生产了一个面包,面包重量为 73g 向第2个篮子生产了一个面包,面包重量为 40g 从第1个篮子拿走了一个面包,面包重量为 73g 向第3个篮子生产了一个面包,面包重量为 50g 向第4个篮子生产了一个面包,面包重量为 90g 从第2个篮子拿走了一个面包,面包重量为 40g 从第3个篮子拿走了一个面包,面包重量为 50g 向第5个篮子生产了一个面包,面包重量为 50g 向第6个篮子生产了一个面包,面包重量为 77g 从第4个篮子拿走了一个面包,面包重量为 90g * * * 案例2: 两队人,一对不停像100个篮子生产面包,一对不停从篮子拿走面包(阻塞队列) * * 如果是我不停的生产面包放在篮子里,我不管消费者拿面包, notfull队列(篮子还没存放满面包队列 有自己独自的wait nodify) * 而消费者不停的去从篮子里拿面包,而不管生产者生产了多少, notempty队列(篮子面包还没空队列 有自己独自的wait nodify) * 那么这就是两个队列, * notfull队列, 只管像篮子里放面包,当100个篮子满的时,放面包动作处于等待 * notempty队列,只管从篮子里拿面包,当100个篮子空的时,存面包动作处于等待 * 此时需要使用同一个lock的两个condition来实现第二个案例 * * * 注意: count表示当前面包真实个数(生产一个 同时下个线程拿走一个 那么count = 0) * items: 存放面包篮子 * putptr, 生产面包后 存放在篮子的角标, * takeptr,取走面包时,面包所在篮子的角标 * */ public class CommunicateWithConditionThread { public static void main(String[] args) { final BoundedBuffer boundedBuffer = new BoundedBuffer(); new Thread(new Runnable() { @Override public void run() { while(true){ try { Thread.sleep((long)(Math.random()*100)); boundedBuffer.put((int)(Math.random()*100)); } catch (Exception e) { e.printStackTrace(); } } } }).start(); new Thread(new Runnable() { @Override public void run() { while(true){ try { Thread.sleep((long)(Math.random()*100)); boundedBuffer.take(); } catch (Exception e) { e.printStackTrace(); } } } }).start(); } } class BoundedBuffer { final Lock lock = new ReentrantLock(); final Condition notFull = lock.newCondition(); // 生产队列 final Condition notEmpty = lock.newCondition(); // 消费队列 final Object[] items = new Object[20]; int putptr, takeptr, count; public void put(int x) throws InterruptedException { lock.lock(); // 生产面包上锁 try { while (count == items.length) // 当生产队列并发大到突然生产了100个面包时,生产队列等待 notFull.await(); // 否则执行生产面包操作, 不断向数组下一个单元格内放新面包 items[putptr] = x; System.out.println("向第" + putptr + "个篮子生产了一个面包,面包重量为 " + x + "g" ); if (++putptr == items.length) putptr = 0;// 当存放的面包到达数组最后位置时,篮子存放面包位置又从0开始 ++count; // 记录面包个数 notEmpty.signal();// 生产了面包,就立即通知消费队列去取走面包 } finally { lock.unlock();// 生产面包完成 解锁 让下个生产执行 } } public Object take() throws InterruptedException { lock.lock();// 取面包上锁 try { while (count == 0) // 当消费队列消费并发过大,或者刚开始没生产出面包时,消费队列等待 notEmpty.await(); Object x = items[takeptr]; System.out.println("从第" + takeptr + "个篮子拿走了一个面包,面包重量为 " + x + "g" ); if (++takeptr == items.length) takeptr = 0;// 当取走面包到篮子最后一个位置时,重置,再从篮子最开始位置取面包 --count;// 记录面包个数 取走一次面包 个数减一 notFull.signal(); // 取走面包, 立即通知生产队列生产面包 return x; } finally { lock.unlock(); // 取面包完成 解锁 让下个取面包动作执行 }5 使用阻塞队列方式,实现 打印日志提高效率:
// 原来写法,耗时16S public class Thread16 { /** * 打印16个日志 耗时16S */ public static void main(String[] args) { System.out.println("begin: " + (System.currentTimeMillis()/1000)); for(int i=0; i<16; i++){ // 不能改 String log = "" + (i + 1);// 不能改 parseLog(log); } } public static void parseLog(String log){ System.out.println("log: " + (System.currentTimeMillis()/1000)); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } 修改程序代码,开四个线程让这16个对象在4秒钟打完 思路: 0创建容量16的队列 1 将16个任务增加到 阻塞队列中 2开启4个线程,每次从队列中获取数据 这样主线程不停的放, 并发来的4个线程不停的取, 你可以理解为并发一次来了4个线程,每个线程取到后内部打印1S操作仍旧不变, 执行4次,一共耗时4S完成原来16秒不用并发下的操作 主线程放log 和 子线程取log 之间用condtion notEmpty notFull 来实现阻塞 public class Test { public static void main(String[] args){ // 0 创建容量只为1的队列 final BlockingQueue<String> queue = new ArrayBlockingQueue<String>(16); // 2 开启4个线程,每次从队列中获取数据 for(int i=0;i<4;i++){ new Thread(new Runnable(){ @Override public void run() { while(true){ try { String log = queue.take(); parseLog(log); } catch (InterruptedException e) { e.printStackTrace(); } } } }).start(); } // 1 将16个任务在主线程中增加到阻塞队列中 System.out.println("begin:"+(System.currentTimeMillis()/1000)); for(int i=0;i<16;i++){ //这行代码不能改动 final String log = ""+(i+1);//这行代码不能改动 { try { queue.put(log); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } //Test.parseLog(log); } } } //parseLog方法内部的代码不能改动 public static void parseLog(String log){ System.out.println(log+":"+(System.currentTimeMillis()/1000)); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }脑图: