文章目录
一、线程通信(了解)
- 概念
线程通信就是线程间相互发送数据,线程间共享一个资源即可实现线程通信
- 常见方式
通过共享一个数据的方式实现
根据共享数据的情况决定自己该怎么做,以及通知其他线程怎么做
- 场景
生产者与消费者模型:生产者线程负责生产数据,消费者线程负责消费生产者产生的数据
- 要求
生产者线程生产完数据后唤醒消费者,然后等待自己,消费者消费完该数据后唤醒生产者,然后等待自己
方法名 | 解释 |
---|---|
void wait() |
当前线程等待,直到另一个线程调用notify() 或 notifyAll()唤醒自己 |
void notify() |
唤醒正在等待对象监视器(锁对象)的单个线程 |
void notifyAll() |
唤醒正在等待对象监视器(锁对象)的所有线程 |
二、线程池(重点)
- 概述
可以复用线程的技术
1、获得线程池对象
用ExecutorService实现类ThreadPoolExecutor自创建一个线程池对象
用Executors(线程池的工具类)调用方法返回不同特点的线程池对象
public ThreadPoolExecutor(
int corePoolSize,//核心线程数
int maximumPoolSize,//最大线程数
long keepAliveTime,//线程空闲时间
TimeUnit unit,//时间单位
BlockingQueue<Runnable> workQueue,//任务队列
ThreadFactory threadFactory,//线程工厂
RejectedExecutionHandler handler//拒绝策略
)
参数名 | 解释 |
---|---|
corePoolSize |
指定线程池的线程数量(核心线程),不能小于0 |
maximumPoolSize |
指定线程池可支持的最大线程数,最大数量 >= 核心线程数量 |
keepAliveTime |
指定临时线程的最大存活时间 ,不能小于0 |
unit |
指定存活时间的单位(秒、分、时、天) |
workQueue |
指定任务队列 ,不能为null |
threadFactory |
指定用哪个线程工厂创建线程,不能为null |
handler |
指定线程忙,任务满的时候,新任务来了怎么办,不能为null |
TimeUnit.DAYS; //天
TimeUnit.HOURS; //小时
TimeUnit.MINUTES; //分钟
TimeUnit.SECONDS; //秒
TimeUnit.MILLISECONDS; //毫秒
TimeUnit.MICROSECONDS; //微妙
TimeUnit.NANOSECONDS; //纳秒
2、线程池处理Runnable
ExecutorService方法 | 解释 |
---|---|
void execute(Runnable command) |
执行任务/命令,没有返回值,一般用来执行 Runnable 任务 |
Future<T> submit(Callable<T> task) |
执行任务,返回未来任务对象获取线程结果,一般拿来执行 Callable 任务 |
void shutdown() |
等任务执行完毕后关闭线程池 |
List<Runnable> shutdownNow() |
立刻关闭,停止正在执行的任务,并返回队列中未执行的任务 |
4种策略 | 解释 |
---|---|
ThreadPoolExecutor.AbortPolicy |
丢弃任务并抛出RejectedExecutionException异常。是默认的策略 |
ThreadPoolExecutor.DiscardPolicy |
丢弃任务,但是不抛出异常 这是不推荐的做法 |
ThreadPoolExecutor.DiscardOldestPolicy |
抛弃队列中等待最久的任务 然后把当前任务加入队列中 |
ThreadPoolExecutor.CallerRunsPolicy |
由主线程负责调用任务的run()方法从而绕过线程池直接执行 |
1)AbortPolicy实战
当运行任务数超过核心数时,会报RejectedExecutionException错误
import java.util.concurrent.*;
public class ThreadPoolExecutorTest implements Runnable {
private String name;
public ThreadPoolExecutorTest(String name) {
this.name = name;
}
@Override
public void run() {
try {
System.out.println("当前线程名: " + Thread.currentThread().getName() + ", 任务 " + name + " is running!");
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
//线程池
ExecutorService pools = new ThreadPoolExecutor(
1,
1,
1,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
ThreadPoolExecutorTest run = null;
// 循环创建线程
for (int i = 0; i < 5; i++) {
run = new ThreadPoolExecutorTest("" + i);
// 将任务添加到线程池中
pools.execute(run);
}
//关闭线程池
pools.shutdown();
}
}
2)DiscardPolicy实战
import java.util.concurrent.*;
public class ThreadPoolExecutorTest implements Runnable {
private String name;
public ThreadPoolExecutorTest(String name) {
this.name = name;
}
@Override
public void run() {
try {
System.out.println("当前线程名: " + Thread.currentThread().getName() + ", 任务 " + name + " is running!");
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
//线程池
ExecutorService pools = new ThreadPoolExecutor(
1,
1,
1,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardPolicy());
ThreadPoolExecutorTest run = null;
// 循环创建线程
for (int i = 0; i < 5; i++) {
run = new ThreadPoolExecutorTest("" + i);
// 将任务添加到线程池中
pools.execute(run);
}
//关闭线程池
pools.shutdown();
}
}
3)DiscardOldestPolicy实战
import java.util.concurrent.*;
public class ThreadPoolExecutorTest implements Runnable {
private String name;
public ThreadPoolExecutorTest(String name) {
this.name = name;
}
@Override
public void run() {
try {
System.out.println("当前线程名: " + Thread.currentThread().getName() + ", 任务 " + name + " is running!");
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
//线程池
ExecutorService pools = new ThreadPoolExecutor(
1,
1,
1,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardOldestPolicy());
ThreadPoolExecutorTest run = null;
// 循环创建线程
for (int i = 0; i < 5; i++) {
run = new ThreadPoolExecutorTest("" + i);
// 将任务添加到线程池中
pools.execute(run);
}
//关闭线程池
pools.shutdown();
}
}
4)CallerRunsPolicy实战
import java.util.concurrent.*;
public class ThreadPoolExecutorTest implements Runnable {
private String name;
public ThreadPoolExecutorTest(String name) {
this.name = name;
}
@Override
public void run() {
try {
System.out.println("当前线程名: " + Thread.currentThread().getName() + ", 任务 " + name + " is running!");
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
//线程池
ExecutorService pools = new ThreadPoolExecutor(
1,
1,
1,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy());
ThreadPoolExecutorTest run = null;
// 循环创建线程
for (int i = 0; i < 5; i++) {
run = new ThreadPoolExecutorTest("" + i);
// 将任务添加到线程池中
pools.execute(run);
}
//关闭线程池
pools.shutdown();
}
}
3、线程池处理Callable
ExecutorService方法 | 解释 |
---|---|
void execute(Runnable command) |
执行任务/命令,没有返回值,一般用来执行 Runnable 任务 |
Future<T> submit(Callable<T> task) |
执行任务,返回未来任务对象获取线程结果,一般拿来执行 Callable 任务 |
void shutdown() |
等任务执行完毕后关闭线程池 |
List<Runnable> shutdownNow() |
立刻关闭,停止正在执行的任务,并返回队列中未执行的任务 |
import java.util.concurrent.Callable;
public class MyCallable implements Callable {
private String name;
public MyCallable(String name) {
this.name = name;
}
@Override
public Object call() throws Exception {
for (int i = 0; i < 10; i++) {
System.out.println(Thread.currentThread().getName() + name + " " + i);
}
return true;
}
}
import java.util.concurrent.*;
public class ThreadPoolExecutorTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//创建线程池
ExecutorService pools = Executors.newFixedThreadPool(3);
MyCallable myCallable1 = new MyCallable(" hadoop");
MyCallable myCallable2 = new MyCallable(" flink");
MyCallable myCallable3 = new MyCallable(" spark");
//提交执行
Future<Boolean> sb1 = pools.submit(myCallable1);
Future<Boolean> sb2 = pools.submit(myCallable2);
Future<Boolean> sb3 = pools.submit(myCallable3);
//获取结果
boolean b1 = sb1.get();
boolean b2 = sb2.get();
boolean b3 = sb3.get();
System.out.println(b1);
System.out.println(b2);
System.out.println(b3);
// 关闭线程池,如不关闭,线程池会一直运行
pools.shutdown();
}
}
4、Callable和Runnable接口的区别
Callable接口中的Call方法有返回值,Runnable接口中的Run方法没有返回值
Callable接口中的Call方法有声明异常,Runnable接口中的Run方法没有异常