/*
* LinkedBlockingDeque:无界队列
* 阻塞式容器
* 实现生产者消费者
* 用的特别多
*/
put take 阻塞式
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
/*
* LinkedBlockingDeque:无界队列
* 阻塞式容器
* 实现生产者消费者
* 用的特别多
*/
public class LinkedBlockingDeque_value {
static BlockingQueue<String> strs=new LinkedBlockingDeque<>(); //阻塞式容器
static Random r=new Random();
public static void main(String[] args) {
/*
* 启动一个生产者
*/
new Thread(new Runnable() {
public void run() {
for(int i=0;i<100;i++){
try {
strs.put("a"+i); //如果满了就会等待
TimeUnit.MILLISECONDS.sleep(r.nextInt(1000)); //睡眠不超过一秒钟
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
},"p1").start();
/*
* 5个消费者线程
*/
for(int i=0;i<5;i++){
new Thread(new Runnable() {
public void run() {
for(;;){
try {
System.out.println(Thread.currentThread().getName()+"take-"+strs.take()); //如果拿空了,就会等待
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
},"c"+i).start();
}
}
}
/*
*ArrayBlockingQueue
*有界队列,在线程池里装的就是一个一个的任务
*/
put 阻塞
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TransferQueue;
public class ArrayBlockingQueue_value {
//有界队列,在线程池里装的就是一个一个的任务
static BlockingQueue<String> strs=new ArrayBlockingQueue<>(10);
static Random r=new Random();
public static void main(String[] args) throws InterruptedException {
for(int i=0;i<10;i++){
strs.put("a"+i); //装10个
}
strs.put("aaa"); //满了会阻塞
strs.add("aaa"); //满了报异常
strs.offer("aaa"); //满了不报异常,但加不进去,因为已经满了
strs.offer("aaa",1,TimeUnit.SECONDS); //1秒钟加不进去,就不往里面加了
}
}
/*
* DelayQueue
* 无界队列,塞进去的数据有规定的时间在什么时候才可以取,默认是排好顺序的,等待时间最长的排在前面,先往外拿
* DelayQueue做定时执行任务,谁时间快到了要执行了,就先取出谁
*/
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
public class DelayQueue_value {
/*
* 无界队列,塞进去的数据有规定的时间在什么时候才可以取,默认是排好顺序的,等待时间最长的排在前面,先往外拿
* DelayQueue做定时执行任务,谁时间快到了要执行了,就先取出谁
*/
static BlockingQueue<MyTask> tasks=new DelayQueue<>();
static Random r=new Random();
/*
* MyTask往DelayQueue里面装的时候必须实现Delayed接口
* 内部类MyTask用来模拟一个接口
*/
static class MyTask implements Delayed{
long runningTime;
MyTask(long rt){
this.runningTime=rt;
}
/*
* 实现Comparable接口的方法
*/
@Override
public int compareTo(Delayed o) {
if(this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS))
return -1; //轮到该执行的时间已经过去了
else if(this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS))
return 1; //还剩一会儿时间才执行
else
return 0;
}
/*
* 还有多长时间我就可以往外拿了
*/
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(runningTime-System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
public String toString(){
return ""+runningTime;
}
}
public static void main(String[] args) throws InterruptedException {
long now=System.currentTimeMillis();
/*
* 五个任务
*/
MyTask t1=new MyTask(now+1000); //现在开始,1秒钟之后进行
MyTask t2=new MyTask(now+2000);
MyTask t3=new MyTask(now+1500);
MyTask t4=new MyTask(now+2500);
MyTask t5=new MyTask(now+500);
/*
* 把任务扔进队列里面去
*/
tasks.put(t1);
tasks.put(t2);
tasks.put(t3);
tasks.put(t4);
tasks.put(t5);
System.out.println(tasks);
for(int i=0;i<5;i++){
System.out.println(tasks.take());
}
}
}
/*
* LinkedTransferQueue
* 消费者先启动,生产者生产完产品不是先往队列里面扔
* 首先去找有没有消费者,如果有则直接扔给消费者
* —这样服务器支持并发的量会大一点,所以LinkedTransferQueue是用在更高得并发的情况下
*/
/*
* 仅限于该方法transfer在没有消费者线程的时候,生产者会在这里阻塞
* 后面的代码执行不了…
* 一般实时消息处理用这个方法的比较多
* 还有netty里面用LinkedTransferQueue也比较多
*/
transfer
import java.util.concurrent.LinkedTransferQueue;
public class TransferQueue_value {
public static void main(String[] args) throws InterruptedException{
/*
* LinkedTransferQueue
* 消费者先启动,生产者生产完产品不是先往队列里面扔
* 首先去找有没有消费者,如果有则直接扔给消费者
*---这样服务器支持并发的量会大一点,所以LinkedTransferQueue是用在更高得并发的情况下
*/
final LinkedTransferQueue<String> strs=new LinkedTransferQueue<>();
/*
new Thread(new Runnable() {
public void run() {
try {
System.out.println(strs.take()); //消费者先起来
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
*/
/*
* 仅限于该方法transfer在没有消费者线程的时候,生产者会在这里阻塞
* 后面的代码执行不了...
* 一般实时消息处理用这个方法的比较多
* 还有netty里面用LinkedTransferQueue也比较多
*/
strs.transfer("nun");
/*
* 该方法put没有问题,因为它不会阻塞
* strs.put("edu");
*/
new Thread(new Runnable() {
public void run() {
try {
System.out.println(strs.take()); //消费者后起来
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}
/*
* 同步队列是一种特殊的TransferQueue
* SynchronousQueue容量为0的队列
* 生产者送来的产品,消费者必须马上消费掉,如果不消费会出问题
*
*/
/*
* 阻塞,等待消费者消费
* 这个put里面用的transfer
* --是一种特殊的transferQueue,里面装的任何东西都必须立刻交给消费者去消费
*/
put
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
public class SynchronousQueue_value {
public static void main(String[] args) throws InterruptedException {
/*
* 同步队列是一种特殊的TransferQueue
* SynchronousQueue容量为0的队列
* 生产者送来的产品,消费者必须马上消费掉,如果不消费会出问题
*
*/
final BlockingQueue<String> strs=new SynchronousQueue<>();
new Thread(new Runnable() {
public void run() {
try {
System.out.println(strs.take()); //消费者先起来
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
/*
* 阻塞,等待消费者消费
* 这个put里面用的transfer
*--是一种特殊的transferQueue,里面装的任何东西都必须立刻交给消费者去消费
*/
strs.put("chnet");
//strs.add("cmcc"); //队列已满(SynchronousQueue本来就是空的容器),无法加入,报错
System.out.println(strs.size()); //永远为0
}
}