一 阻塞队列
栈与队列的区别
阻塞队列 BlockingQueue
- 当队列是空的,从队列中获取元素的操作将会被阻塞
- 当队列是满的,从队列中添加元素的操作将会被阻塞
阻塞队列分类
阻塞队列核心方法
package new_course.chp4.queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* @author Created by Lin Weihong
* @date on 2022/6/17 9:37
*/
public class BlockingQueueDemo {
public static void main(String[] args) throws InterruptedException {
//创建阻塞队列
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
//第一组
System.out.println(blockingQueue.add("a")); //true
System.out.println(blockingQueue.add("b")); //true
System.out.println(blockingQueue.add("c")); //true
System.out.println(blockingQueue.element());//a
// System.out.println(blockingQueue.add("d")); //Exception in thread "main" java.lang.IllegalStateException: Queue full
System.out.println(blockingQueue.remove());//a
System.out.println(blockingQueue.remove());//b
System.out.println(blockingQueue.remove());//c
// System.out.println(blockingQueue.remove());//Exception in thread "main" java.util.NoSuchElementException
//第二组
System.out.println(blockingQueue.offer("a")); //true
System.out.println(blockingQueue.offer("b")); //true
System.out.println(blockingQueue.offer("c")); //true
System.out.println(blockingQueue.offer("ww")); //false,当队列满,offer不会报错,而add会报错
System.out.println(blockingQueue.poll());//a
System.out.println(blockingQueue.poll());//b
System.out.println(blockingQueue.poll());//c
System.out.println(blockingQueue.poll());//null 当队列空,poll不会报错,而remove会报错
//第三组
blockingQueue.put("a");
blockingQueue.put("b");
blockingQueue.put("c");
// blockingQueue.put("w");//阻塞,直到队列有位置
blockingQueue.take();
blockingQueue.take();
blockingQueue.take();
// blockingQueue.take();//阻塞,直到队列有元素
//第四组
System.out.println(blockingQueue.offer("a"));
System.out.println(blockingQueue.offer("b"));
System.out.println(blockingQueue.offer("c"));
System.out.println(blockingQueue.offer("w",3L, TimeUnit.SECONDS)); //超过时间没放进去自动退出
}
}
二 线程池
为什么使用线程池
三种线程池的声明
package new_course.chp4.threadpool;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* @author Created by Lin Weihong
* @date on 2022/6/17 11:06
*/
public class MyThreadPoolDemo {
public static void main(String[] args) {
//一个线程池,五个工作线程,类似一个银行有五个窗口受理点
//ExecutorService threadPool = Executors.newFixedThreadPool(5);
//一个线程池,一个工作线程,类似一个银行有一个窗口受理点
//ExecutorService threadPool = Executors.newSingleThreadExecutor();
//以上两种都不好,因为总有不满足的情况,最好的情况是要有扩容
//一个线程池,N个工作线程,类似一个银行有N个窗口受理点
ExecutorService threadPool = Executors.newCachedThreadPool();
try {
//模拟有10个顾客过来银行办理业务,目前池子里面有五个工作人员提供服务
for (int i = 1; i <= 10; i++) {
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + "\t 办理了业务");
});
try {
TimeUnit.MILLISECONDS.sleep(8);} catch (InterruptedException e) {
e.printStackTrace(); }
}
} catch (Exception e) {
e.printStackTrace();
} finally {
//关闭线程池
threadPool.shutdown();
}
}
}
ThreadPool底层原理
线程池7大参数
源码解读:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
corePoolSize:常驻核心线程数
比如周一至周五银行有五个窗口,corePoolSize = 5,然而周日业务没有那么繁忙,所以只开了一个窗口,这时候corePoolSize = 1。
maximumPoolSize:线程池中,所能容纳的最大线程数
maximumPoolSize 值必须>=1
keepAliveTime:等待的时间数
unit:时间单位
TimeUnit.SECONDS
workQueue:阻塞队列
相当于去银行办理业务,窗口都满了,需要把人员带到后客厅,侯客厅相当于是阻塞队列
threadFactory:线程工厂
一般用默认的,相当于一个牌子
handler:拒绝策略
拒绝策略,当工作线程 > maximumPoolSize 的时候,如何来拒绝请求执行的Runnable策略
JDK内置的拒绝策略:
线程池底层工作原理
阿里巴巴Java开发手册-并发控制注意事项
三 分支合并
package new_course.chp4.forkjoin;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
/**
* @author Created by Lin Weihong
* @date on 2022/6/17 20:38
* <p>
* 分支合并框架
* 分组计算1+...+100
* ForkJoinPool
* ForkJoinTask
* RecursiveTask
*/
public class ForkJoinDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
MyTask myTask = new MyTask(0, 100);
ForkJoinPool threadPool = new ForkJoinPool();
//递交任务
ForkJoinTask<Integer> forkJoinTask = threadPool.submit(myTask);
System.out.println(forkJoinTask.get());
threadPool.shutdown();
}
}
class MyTask extends RecursiveTask<Integer> {
//10以内不要forkjoin
private static final Integer ADJUST_VALUE = 10;
private int begin;
private int end;
private int result;
public MyTask(int begin, int end) {
this.begin = begin;
this.end = end;
}
@Override
protected Integer compute() {
if ((end - begin) <= ADJUST_VALUE) {
for (int i = begin; i <= end; i++) {
result += i;
}
} else {
int middle = (end + begin) / 2;
//一个任务算一部分
MyTask task01 = new MyTask(begin, middle);
MyTask task02 = new MyTask(middle + 1, end);
task01.fork();//重复递归调用compute
task02.fork();
result = task01.join() + task02.join();
}
return result;
}
}
四 异步回调
package new_course.chp4.completablefuture;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
/**
* @author Created by Lin Weihong
* @date on 2022/6/17 21:18
*/
public class CompletableFutureDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
System.out.println(Thread.currentThread().getName() + "没有返回 update mysql ok");
});
completableFuture.get();
CompletableFuture<Integer> integerCompletableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "有返回, insert mysql ok");
return 1024;
});
integerCompletableFuture.whenComplete((t,u) -> {
System.out.println("*********t: " + t);
System.out.println("*********u: " + u);
}).exceptionally((throwable -> {
System.out.println("**************"+throwable);
return 4444;
}));
}
}