版权声明:本文为博主原创文章,允许转载,请标明出处。 https://blog.csdn.net/qwdafedv/article/details/84238921
在并发队列上JDK提供了两套实现,
一个是以ConcurrentLinkedQueue为代表的高性能队列,
一个是以BlockingQueue接口为代表的阻塞队列,无论哪种都继承自Queue。
ConcurrentLinkedQueue
ConcurrentLinkedQueue : 是一个适用于高并发场景下的队列,
通过无锁的方式,实现了高并发状态下的高性能,
通常ConcurrentLinkedQueue性能好于BlockingQueue。
它是一个基于链接节点的无界线程安全队列。该队列的元素遵循先进先出的原则。
头是最先加入的,尾是最近加入的,该队列不允许null元素。
-
add 和offer() 都是加入元素的方法(在ConcurrentLinkedQueue中这俩个方法没有任何区别)
public boolean add(E e) { return offer(e); }
-
poll() 和peek() 都是取头元素节点,区别在于前者会删除元素,后者不会。
package cn.qbz.thread; import java.util.concurrent.ConcurrentLinkedQueue; public class Test111904 { public static void main(String[] args) { ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue(); for (int i = 0; i < 3; i++) { new ThreadTest111904(queue).start(); } while (true) { if (queue.size() > 0) { System.out.println(queue.poll()); } } } } class ThreadTest111904 extends Thread { private ConcurrentLinkedQueue queue; public ThreadTest111904(ConcurrentLinkedQueue queue) { this.queue = queue; } @Override public void run() { for (int i = 0; i < 3; i++) { queue.offer(getName() + "..." + i); } } }
BlockingQueue
在队列为空时,获取元素的线程会等待队列变为非空。
当队列满时,存储元素的线程会等待队列可用。
阻塞队列常用于生产者和消费者的场景
ArrayBlockingQueue
ArrayBlockingQueue是一个有边界的阻塞队列,它的内部实现是一个数组。
有边界的意思是它的容量是有限的,
我们必须在其初始化的时候指定它的容量大小,
容量大小一旦指定就不可改变。
package cn.qbz.thread;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
public class Test111905 {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<String> queue = new ArrayBlockingQueue(2);
queue.add("test1");
queue.offer("test2");
Boolean isOffer = queue.offer("test3");
System.out.println("添加test3:"+isOffer);
isOffer = queue.offer("test4", 1, TimeUnit.SECONDS);
System.out.println("添加test4:"+isOffer);
for (int i = 0; i < 4; i++) {
System.out.println(queue.poll());
}
}
}
其中,add和offer的区别是:
当超出队列界限时,add会抛出异常,offer只是返回false。
LinkedBlockingQueue
LinkedBlockingQueue阻塞队列大小的配置是可选的,
如果我们初始化时指定一个大小,它就是有边界的,
如果不指定,它就是无边界的。说是无边界,
其实是采用了默认大小为Integer.MAX_VALUE的容量 。
它的内部实现是一个链表。
code of demo:
package cn.qbz.thread;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class Test111906 {
public static void main(String[] args) throws InterruptedException {
BlockingQueue queue = new LinkedBlockingDeque(3);
Produce111906 p1 = new Produce111906(queue);
Consumer111906 c1 = new Consumer111906(queue);
Thread produce1 = new Thread(p1);
Thread produce2 = new Thread(p1);
Thread consumer1 = new Thread(c1);
produce1.start();
produce2.start();
consumer1.start();
Thread.sleep(1000 * 10);
p1.stop();
c1.stop();
}
}
class Produce111906 implements Runnable {
private BlockingQueue<String> queue;
private volatile Boolean flag = true;
private AtomicInteger count = new AtomicInteger();
public Produce111906(BlockingQueue queue) {
this.queue = queue;
}
public void run() {
while (flag) {
try {
int data = count.incrementAndGet();
System.out.println("开始生产数据:" + data);
Boolean isOffer = queue.offer(data + "", 2, TimeUnit.SECONDS);
if (isOffer) {
System.out.println("写入数据:" + data + "成功");
} else {
System.out.println("写入数据:" + data + "失败");
}
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("退出本次生产");
}
}
}
public void stop() {
flag = false;
}
}
class Consumer111906 implements Runnable {
private BlockingQueue<String> queue;
private volatile Boolean flag = true;
public Consumer111906(BlockingQueue queue) {
this.queue = queue;
}
public void run() {
while (flag) {
System.out.println("开始消费数据");
try {
String data = queue.poll(2, TimeUnit.SECONDS);
if (data != null) {
System.out.println("消费成功:" + data);
} else {
System.out.println("消费失败");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public void stop() {
flag = false;
}
}