目录
1. SynchronousQueue介绍
synchronousQueue是一个没有数据缓冲的阻塞队列,应用在newCachedThreadPool中,它的大小为0,每次取数据需要阻塞线程,存数据也需要阻塞线程。
默认非公平锁,底层使用桟结构实现,公平锁使用队列结构实现。
底层加锁使用CAS+自旋(默认512次)+(park/unpark)的形式保证性能。
1.1 synchronousQueue实战
两个生产者两个消费者:出现后进先出现象。符合非公平锁特性,底层使用桟结构实现。
import java.util.concurrent.SynchronousQueue;
public class SynchronousQueueTest {
//定义一个synchronousQueue
private final static SynchronousQueue synchronousQueue = new SynchronousQueue();
public static void main(String[] args) throws Exception{
//模拟消费者取数据
new Thread(()->{
try {
synchronousQueue.take();
System.out.println(Thread.currentThread().getName()+"取数据");
} catch (InterruptedException e) {
e.printStackTrace();
}
},"consumer1").start();
new Thread(()->{
try {
synchronousQueue.take();
System.out.println(Thread.currentThread().getName()+"取数据");
} catch (InterruptedException e) {
e.printStackTrace();
}
},"consumer2").start();
//模拟生产者放数据
new Thread(()->{
try {
synchronousQueue.put(1);
System.out.println(Thread.currentThread().getName()+"生产数据");
} catch (InterruptedException e) {
e.printStackTrace();
}
},"producer1").start();
new Thread(()->{
try {
synchronousQueue.put(1);
System.out.println(Thread.currentThread().getName()+"生产数据");
} catch (InterruptedException e) {
e.printStackTrace();
}
},"producer2").start();
}
}
1.2 源码分析(TODO)
关注点:
1.消费者入队阻塞和CAS自旋
2.生产者出队唤醒
3. put和take方法
2. PriorityBlockingQueue介绍
PriorityBlockingQueue是一个无界的基于数组的优先级阻塞队列。默认数组长度11,但可以无线扩充直到把资源消耗完。
数据在存入以后会以递增排序。
队列默认优先级最高的先出。
应用场景:VIP优先抢购
如何设计一个优先级队列?
PriorityBlockingQueue使用大顶堆和小顶堆的思想,大顶堆:父元素的值永远大于子元素。小顶堆:父元素的值永远小于子元素。
2.1 PriorityBlockingQueue实战
PriorityBlockingQueue获取数据会将存入数据进行排序
import java.util.Random;
import java.util.concurrent.PriorityBlockingQueue;
public class PriorityBlockingQueueTest {
//定义一个初始容量为3的队列,队列会自动扩容
private static PriorityBlockingQueue priorityBlockingQueue = new PriorityBlockingQueue(3);
public static void main(String[] args) throws InterruptedException {
//存5个数据
System.out.println("存数据");
for(int i=0;i<5;i++){
Random random = new Random();
int j = random.nextInt(100);
System.out.print(j+" ");
priorityBlockingQueue.put(j);
}
//取5个数据
System.out.println("");
System.out.println("取数据");
for(int i=0;i<5;i++){
System.out.print(priorityBlockingQueue.take()+" ");
}
}
}
2.2 源码分析(TODO)
关注点:
1. 修饰的常量:最大数量,加锁(reentrantlock),条件队列condition
2. put和take方法(小顶堆的代码)
3.LinkedTransferQueue介绍
LinkedTransferQueue一个由链表实现的无界阻塞队列。采用CAS+自旋的方式加锁。它是synchronousQueue和linkedBlockingQueue的合体,两者都可以实现。
put方法不会阻塞。
transfer会阻塞,判断阻塞方法,看方法是否抛出中断异常。
4.DelayQueue介绍
DelayQueue是一个优先级队列实现的无界阻塞队列,能够实现延迟关闭的效果。
应用场景:订单超时关闭,异步短信通知,关闭空闲连接,缓存,任务超时等
锁:reentrantLock保证
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
public class DelayQueueTest {
public static void main(String[] args) throws InterruptedException {
//定义一个delayQueue
BlockingQueue<DelayedObject> delayeds = new DelayQueue<>();
delayeds.put(new DelayedObject("A",1000 * 10));
delayeds.put(new DelayedObject("B",4000 * 10));
delayeds.put(new DelayedObject("C",3000 * 10));
delayeds.put(new DelayedObject("D",2000 * 10));
//取出元素
System.out.println(delayeds.take());
System.out.println(delayeds.take());
System.out.println(delayeds.take());
System.out.println(delayeds.take());
}
}
class DelayedObject implements Delayed{
private String name;
private long delayTime;
public DelayedObject(String name,long delayTime) {
this.name = name;
this.delayTime = System.currentTimeMillis() + delayTime;
}
@Override
public long getDelay(TimeUnit unit) {
return 0;
}
@Override
public int compareTo(Delayed o) {
return 0;
}
@Override
public String toString() {
Date date = new Date(delayTime);
SimpleDateFormat sd = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
return "\nDelayObject:{"
+ "name=" + name
+ ", time=" + sd.format(date)
+ "}";
}
}
4.1 源码分析
1. put和take方法
5.如何选择合适的阻塞队列
五个指标:
1.功能
2.容量:容量固定(ArrayBlockingQueue) 容量无限(LinkedBlockingQueue) 没有容量(SynchronousQueue) DelayQueue容量(Integer.MAX_VALUE)
3.能否扩容
不需要扩容(ArrayBlockingQueue) 需要扩容(PriorityBlockingQueue)
4.内存结构
数组(ArrayBlockingQueue) 链表(LinkedBlockingQueue)
5.性能
LinkedBlockingQueue(两把锁)
ArrayBlockingQueue(一把锁)
SynchronousQueue(直接传递)