我眼中的多线程:程序有多条回路,同时向下运行,就是多线程
文章目录
- 多线程实现方式1:继承Thread类,重写run()方法
- 多线程实现方式2:创建类实现Runnable接口,实现run()方法
- sleep(long millis): 线程休眠是将线程从运行态转入阻塞态以暂停线程,时间过后进入就绪状态,也可在主函数中调用,单位毫秒
- 线程合并,join()方法,和并到当前线程中
- 代码集合
- 线程礼让 yield(): 线程暂停抢占资源,从运行态或就绪态到阻塞态.
- 线程中断标志interrupt(): 通知线程该中断了
- 线程同步
- 1. synchronized锁
- 2. lock锁
- 3. lock与synchronized区别
- 4. 生产者消费者
- 5. Condition监视器,精确唤醒
- 6. Synchronized 修饰的特点
- 7. volatile(修饰变量) final
- 8. 原子性atomic
- 9. Callable 与 Future
- 10. CountDownLatch 计数器
- 11. CyclicBarrier 循环栅栏
- 12. Semaphore 信号
- 11. ReadWriteLock 读写锁
- 13. 线程安全的集合
- 14. BlockingQueue阻塞队列
- 15. 线程池
- 16. fork-join 框架(缺代码)
- 17. 异步计算CompletableFuture
- 锁总结
多线程实现方式1:继承Thread类,重写run()方法
class MyThread extends Thread{
@Override
public void run() {
for (int i = 0; i < 8; i++) {
System.out.println(i+"-------run继承Thread类-------");
}
}
}
创建派生对象,调用派生对象的start()方法
启动线程用start方法,不是run,run方法还是主线程在运行
MyThread myThread=new MyThread();//创建线程对象
myThread.start();
多线程实现方式2:创建类实现Runnable接口,实现run()方法
class MyRunnable implements Runnable{
@Override
public void run() {
for (int i = 0; i < 8; i++) {
System.out.println(i+"-------run实现Runnable接口-------");
}
}
}
先创建对象myRunnable,再创建线程对象thread,调用线程对象的start()方法
MyRunnable myRunnable=new MyRunnable();
Thread thread=new Thread(myRunnable);
thread.start();
sleep(long millis): 线程休眠是将线程从运行态转入阻塞态以暂停线程,时间过后进入就绪状态,也可在主函数中调用,单位毫秒
与wait的区别:
- wait只能用于同步块中,sleep可以在任何地方睡
- wait释放锁,sleep不释放
- wait来自Object,sleep来自Thread
这里示范将i等于3时当前线程休眠
for (int i = 0; i < 8; i++) {
System.out.println(i+"-----mian run-----");
if (i==3) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
修改刚刚的线程类,i等于3时,线程休眠1秒
class MyThread extends Thread{
@Override
public void run() {
for (int i = 0; i < 8; i++) {
if (i==3) {
try {
sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(i+"-------run继承-------");
}
}
}
线程合并,join()方法,和并到当前线程中
注意: 当前线程不能被指定,线程1在执行的过程中线程2调用join方法,此时先执行线程2,1进入阻塞,2执行完后线程1进入就绪
try {
//join(long millis)重载,在线程2执行mills后,线程1变成就绪态,join()表示join(0),millis=0的时候是线程1无限等待,直到该线程2消亡
thread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
join(long millis)重载 : 在线程2执行mills后,线程1变成就绪态,join()表示join(0),millis=0的时候是线程1无限等待,直到该线程2消亡 ; 简单的说就是先让某个线程跑一会,然后再回到争资源的状态
代码集合
public class Main {
public static void main(String[] args) {
//第一种,创建派生对象,调用派生对象的start方法
MyThread myThread=new MyThread();//创建线程对象
myThread.start();//启动线程用start方法,不是run,run方法还是主线程在运行
//第二种,先创建对象myRunnable,再创建线程对象thread,调用线程对象的start方法
MyRunnable myRunnable=new MyRunnable();
Thread thread=new Thread(myRunnable);
thread.start();
//这里示范将i等于3时主线程暂停
//主线程不是手动实例化的对象,不能用直接用sleep(),要通过Threa类的静态方法currentThread获取主线程对象,在调用sleep()
//sleep在定义时声明了可能抛出的异常InterruptedException,建议try-catch
for (int i = 0; i < 8; i++) {
System.out.println(i+"-----mian run-----");
if (i==3) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
//线程合并,join()方法,和并到当前线程中注意 当前 不能指定,线程1在执行中调用线程2的join方法,此时先执行线程2,1进入阻塞,2执行完后线程1进入就绪
try {
//join(long millis)重载,在线程2执行mills后,线程1变成就绪态,join()表示join(0),millis=0的时候是线程1无限等待,直到该线程2消亡
thread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
//多线程方式1:继承Thread,重写run方法
class MyThread extends Thread{
@Override//重写run方法
public void run() {
for (int i = 0; i < 8; i++) {
if (i==3) {
//线程休眠是将线程从运行态转入阻塞态以暂停线程,时间过后进入就绪状态,也可在主函数中调用
try {
sleep(1000);//线程休眠传入的是long型,且必须用try异常处理
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(i+"-------run继承-------");
}
}
}
//多线程方式2:创建类实现Runnable接口,实现run方法
class MyRunnable implements Runnable{
@Override
public void run() {
for (int i = 0; i < 8; i++) {
System.out.println(i+"-------run接口-------");
}
}
}
线程礼让 yield(): 线程暂停抢占资源,从运行态或就绪态到阻塞态.
注意: 礼让只是礼让一次,例如饭堂打饭,你让后面一个肚子饿的不行的老哥先打.然后继续开始排队
public class Yield {
public static Yield1 yield1=new Yield1();
public static Yield2 yield2=new Yield2();
public static class Yield1 extends Thread{
@Override
public void run() {
for (int i = 0; i < 10; i++) {
if (i==5) {
//线程Yield1礼让,让Yield2瞬间享有cpu资源,之后继续竞争
yield();
}
System.out.println(i+"-------Yield1-------");
}
}
}
public static class Yield2 extends Thread{
@Override
public void run() {
for (int i = 0; i < 10; i++) {
System.out.println(i+"-------Yield2-------");
}
}
}
public static void main(String[] args) {
//礼让实现
yield1.start();
yield2.start();
}
}
线程中断标志interrupt(): 通知线程该中断了
调用线程对象的interrupt()方法,设置中断标志位,通知线程该中断了,此时线程并没中断,但是可以获取中断标志
getState():获取当前线程的状态
boolean isInterrupted(): 看线程是否中断
interrupted(): 判断是否被中断,并清除当前中断状态
public class Main {
public static void main(String[] args) {
Thread myThread=new Thread(()->{
for (int i = 0; i < 10; i++) {
if (i==5) {
Thread.currentThread().interrupt();
System.out.println(Thread.currentThread().getState());
System.out.println(Thread.currentThread().isInterrupted());
if (Thread.currentThread().isInterrupted()){
break;
}
}
System.out.println(i+"-----run------");
}
});
System.out.println(thread.getState());//获取该线程当前状态
thread.interrupt();//没启动直接中断
System.out.println(thread.isInterrupted());//false线程未启动,不存在中断
myThread.start();
}
}
线程同步
1. synchronized锁
先模拟一下简单的买票场景
public class Synchronized {
public static Integer num=30;
public static void sale(){
try {
sleep(1);//模拟卖票延迟
System.out.println(Thread.currentThread().getName()+" 卖出了第"+num--+"张票");
}catch (InterruptedException e){
e.printStackTrace();
}
}
public static void main(String[] args){
new Thread(()->{
for (int i=0;i<15;i++){
sale();
}
},"线程1").start();
new Thread(()->{
for (int i=0;i<15;i++){
sale();
}
},"线程2").start();
}
}
在这里可以明显的发现,两个线程卖出了同一张票,这是我们所不期望的,我们可以通过用synchronized
关键字修饰这个方法达到,加锁的效果,用来保护这个方法,一次只能有一个线程执行被保护的方法
public class Synchronized {
public static Integer num=30;
public synchronized static void sale(){
try {
Thread.sleep(1);
System.out.println(Thread.currentThread().getName()+" 卖出了第"+num--+"张票");
}catch (InterruptedException e){
e.printStackTrace();
}
}
public static void main(String[] args){
new Thread(()->{
for (int i=0;i<15;i++){
sale();
}
},"线程1").start();
new Thread(()->{
for (int i=0;i<15;i++){
sale();
}
},"线程2").start();
}
}
2. lock锁
上面提到了可以用synchronized
解决卖票问题,现在用JUC的lock锁同样也可以解决这个问题
public class Synchronized {
public static Integer num=30;
private static Lock lock=new ReentrantLock();
public static void sale(){
lock.lock();
try {
Thread.sleep(1);
System.out.println(Thread.currentThread().getName()+" 卖出了第"+num--+"张票");
}catch (InterruptedException e){
e.printStackTrace();
}finally {
lock.unlock();
}
}
public static void main(String[] args){
new Thread(()->{
for (int i=0;i<15;i++){
sale();
}
},"线程1").start();
new Thread(()->{
for (int i=0;i<15;i++){
sale();
}
},"线程2").start();
}
}
ReentrantLock
重入锁(线程可以反复获得已拥有的锁),锁有一个持有计数器(hold count) 来跟踪多lock方法的嵌套调用,每一次的加锁之后都需要释放锁
lock()
加锁
unlock()
解锁
tryLock(long time, TimeUint unit)
一段时间内没有获取锁,不是进入阻塞状态,而是返回一个错误;
tryLock()
立即返回,获得锁返回true,没获得锁返回false;
tryInterruptibly()
在锁上等待,直到获取锁,但是会响应中断,这个方法优先考虑响应中断,而不是响应锁的普通获取或重入获取。
lockInterruptibly()
假如进程在等待取锁,主动终止
注意:
- 使用lock锁时不要使用try-with-resources语句
- finally子句中必须有
unlock()
如果临界区代码抛出异常,锁必须释放,否则将变成死锁
非公平锁: 可以插队
公平锁: 不可以插队,必须顺序进行
公平锁要慢很多,而且即使使用公平锁,也无法保证线程调度器是公平的,如果线程调度器选择忽略一个已经为锁等待很久的线程,就不公平了
3. lock与synchronized区别
- Synchronized 是Java关键字, Lock 是一个Java类
- Synchronized 无法判断锁的状态,Lock 可以判断是否获取到了锁
- Synchronized 会自动释放锁,lock 必须要手动释放锁
finally
- Synchronized 线程 1(获得锁,阻塞)、线程2(等待);Lock锁就不一定会等待下
trylock()
去; - Synchronized 可重入锁,不可以中断的,非公平;Lock ,可重入锁,可以 判断锁,非公平(可以
自己设置); - Synchronized 适合锁少量的代码同步问题,Lock 适合锁大量的同步代码
- lock只能锁代码块,Synchronized 能锁代码块,也能锁方法
- 使用lock锁,jvm花费更少的时间来调度线程,性能高,扩展性好
4. 生产者消费者
Synchronized
实现
声明一个装汉堡的容器类
//装汉堡的容器类
public class Container {
public Hamburger[] array=new Hamburger[6];
public int index=0;
//向容器中添加汉堡
public synchronized void push(Hamburger hamburger) {
//当容器满,线程进入等待状态
while(index==array.length) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
this.notify();
array[index]=hamburger;
index++;
System.out.println("生产了一个汉堡:"+hamburger);
}
//从容器中取出汉堡
public synchronized Hamburger pop() {
while(index==0) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
this.notifyAll();
index--;
System.out.println("消费了一个汉堡:"+array[index]);
return array[index];
}
}
生产消费线程
public class Test {
public static void main(String[] args) {
Container container=new Container();
//生产者
new Thread(()->{
for (int i = 0; i < 30; i++) {
Hamburger hamburger=new Hamburger(i);
container.push(hamburger);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
//消费者
new Thread(()->{
for (int i = 0; i < 30; i++) {
container.pop();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}
锁池: 假设线程A已经拥有对象锁,线程B、C想要获取锁就会被阻塞,进入一个地方去等待锁的等待,这个地方就是该对象的锁池;
等待池: 假设线程A调用某个对象的wait方法,线程A就会释放该对象锁,同时线程A进入该对象的等待池中,进入等待池中的线程不会去竞争该对象的锁。
wait(): 进入等待池
notifyAll(): 唤醒所有等待池中的线程,进入锁池
notify只会随机选取一个处于等待池中的线程进入锁池去竞争获取锁的机会
lock
实现
容器类:
public class Container2 {
public Hamburger[] array=new Hamburger[6];
private Lock lock=new ReentrantLock();
public int index=0;
//向容器中添加汉堡
public void push(Hamburger hamburger) {
lock.lock();
try {
while(index==array.length) {
//this.wait(); //当容器满,线程进入等待状态
lock.newCondition().await();
}
lock.newCondition().signalAll();
array[index]=hamburger;
index++;
System.out.println("生产了一个汉堡:"+hamburger);
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
}
//从容器中取出汉堡
public Hamburger pop() {
lock.lock();
try {
while(index==0) {
lock.newCondition().await();
}
lock.newCondition().signalAll();
index--;
System.out.println("消费了一个汉堡:"+array[index]);
return array[index];
}catch (Exception e) {
e.printStackTrace();
}finally {
lock.unlock();
}
return array[index];
}
}
上面两个实现不用if用while的原因是为了防止虚假唤醒
5. Condition监视器,精确唤醒
public class ConditionTest {
private Lock lock=new ReentrantLock();
private Condition c1=lock.newCondition();
private Condition c2=lock.newCondition();
private Condition c3=lock.newCondition();
private int num=1;
public void one(){
lock.lock();
try {
while (num!=1){
c1.await();
}
System.out.println(Thread.currentThread().getName()+"11111111111111");
num=2;
c2.signal();
}catch (Exception e){
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void two() {
lock.lock();
try {
while (num!=2){
c2.await();
}
System.out.println(Thread.currentThread().getName()+"22222222222222222222");
num=3;
c3.signal();
}catch (Exception e){
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void three(){
lock.lock();
try {
while (num!=3){
c3.await();
}
System.out.println(Thread.currentThread().getName()+"3333333333333333");
num =1;
c1.signal();
}catch (Exception e){
e.printStackTrace();
} finally {
lock.unlock();
}
}
public static void main(String[] args) {
ConditionTest conditionTest = new ConditionTest();
new Thread(()->{
for (int i = 0; i <10 ; i++) {
conditionTest.one();
}
},"A").start();
new Thread(()->{
for (int i = 0; i <10 ; i++) {
conditionTest.two();
}
},"B").start();
new Thread(()->{
for (int i = 0; i <10 ; i++) {
conditionTest.three();
}
},"C").start();
}
}
在上面的程序中,one中c1等待后会指定的唤醒c2,这是lock锁的一个特点,精确唤醒
6. Synchronized 修饰的特点
上面介绍的锁都是对象锁
,锁的是对象,不同的对象调用获取资源的方法是不会上锁的
但如果锁的是被static修饰的方法,这就不是对象锁了,是Class锁
,会锁住Class实例(字节码对象)
而对于Synchronized 修饰的代码块,锁的是括号中的对象
synchronized (SynchronizedDecorationExamples.class) {
try {
TimeUnit.SECONDS.sleep(1000);//juc的Thread.sleep
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Class锁
与对象锁
是两个锁,是不用互相等待的,要分开考虑
7. volatile(修饰变量) final
volatile关键字为实例字段的同步访问提供了一种免锁机制,编译器和虚拟机会知道该字段可能被另一个线程并发更新
注意:
volatile变量不能提供原子性,不能保证读取,翻转,写入不被中断
vilatile三大特性(JMM模型)
- 可见性(更新操作执行完会告诉其他的线程)
- 防止指令重排,利用内存屏障(防止内存重排和编译器的优化重排,重排会将互不依赖的指令重新排序)
- 不保证原子性(要么同时成功要么同时失败)
单例懒汉式
public class LazyMan {
private LazyMan(){
System.out.println("构造了新的LazyMan");
}
private static LazyMan lazyMan;
public static LazyMan getInstance() {
if (lazyMan==null){
lazyMan=new LazyMan();
}
return LazyMan.lazyMan;
}
public static void main(String[] args) {
ThreadPoolExecutor threadPool=new ThreadPoolExecutor(5, 13,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(), Executors.defaultThreadFactory());
for (int i = 1; i <=10 ; i++) {
threadPool.submit(()->{
LazyMan lazyMan=LazyMan.getInstance();
System.out.println(lazyMan.toString());
});
}
threadPool.shutdown();
}
}
可以看到,当多线程实现单例模式时,构造函数被调用了多次,如何解决这个问题呢?
- 加锁 用
synchronized
修饰方法getInstance (太重了,不推荐) - 采用双端检索机制 (有很低很低的出错概率,因为存在创建对象的过程存在指令重排 1.申请内存 2.创建对象 3.将指针指向该对象的内存)
- 采用双端检索机制 并用
volatile
修饰对象(防止指令重排 )
private static volatile LazyMan lazyMan;
public static LazyMan getInstance() {
if (lazyMan==null){
synchronized (LazyMan.class){
if (lazyMan==null){
lazyMan=new LazyMan();
}
}
}
return LazyMan.lazyMan;
}
上面修改过的单例模式可以被反射破坏,要防止破坏 可以使用枚举(枚举的默认构造是(String,int))
当字段声明为final
时,也可以安全的访问共享字段
8. 原子性atomic
当对共享变量除了赋值外不进行其他操作时,可用volatile修饰
在juc.atomic包中有许多类使用了高效的机器指令(不是锁,Unsafe
类很多方法用c语言实现的),来保证其他操作的原子性
CAS (Compare and Swap, 比较并交换) 比较并交换 如果旧值和你期望的一致,那么就更新
AtomicInteger atomicInteger = new AtomicInteger(3);
int i = atomicInteger.addAndGet(3);
System.out.println(i);
//CAS 比较并更新 如果旧值和你期望的一致,那么就更新
atomicInteger.compareAndSet(6,998);
System.out.println(atomicInteger);
如果有大量线程要访问相同的原子值,性能会大幅度下降,因为乐观更新需要太多次重试
ABA问题 将值从A改为B 再从B改为A 其实修改了 但是cas认为没有被修改(乐观锁)
//ABA问题 将值从A改为B 再从B改为A 其实修改了 但是cas认为没有被修改(乐观锁问题)
atomicInteger.compareAndSet(998,-1);
atomicInteger.compareAndSet(-1,998);
atomicInteger.compareAndSet(998,666);
System.out.println(atomicInteger);
解决办法 原子引用 带版本号的原子操作
public boolean compareAndSet(V expectedReference,V newReference,int expectedStamp,int newStamp)
AtomicStampedReference<Integer> atomic=new AtomicStampedReference<>(333,1);
new Thread(()->{
System.out.println(Thread.currentThread().getName()+atomic.getStamp()+"修改了值"+atomic.compareAndSet(atomic.getReference(), 666, atomic.getStamp(), atomic.getStamp()+1));
System.out.println(Thread.currentThread().getName()+atomic.getStamp()+"修改了值"+atomic.compareAndSet(atomic.getReference(), 333, atomic.getStamp(), atomic.getStamp()+1));
},"A").start();
new Thread(()->{
System.out.println(Thread.currentThread().getName()+atomic.getStamp()+"修改了值"+atomic.compareAndSet(atomic.getReference(), 999, atomic.getStamp(), atomic.getStamp()+1));
},"B").start();
9. Callable 与 Future
Runnable封装一个没有返回值的异步任务,而Callable封装一个有返回值
的异步任务
class Thread2 implements Callable<Integer> {
@Override
public Integer call() throws Exception {
System.out.println("this is Callable=>call");
return 1111;
}
}
有返回值的线程调用方法 Callable
表示一个返回值为T的异步计算
- 有泛型
- call()
- 抛异常
FutureTask
实现了Runnable
,FutureTask\<T>
构造函数传入Callable
FutureTask<Integer> integerFutureTask = new FutureTask<>(()->{
System.out.println("this is Callable=>call");
return 1111;
});
new Thread(integerFutureTask,"AAA").start();//runnable
System.out.println(integerFutureTask.get());//future
future
保存异步计算的结果,将future交给某个线程然后忘掉他,他的所有者可以在计算好结果之后获得结果- integerFutureTask是一个对象,无论被放到几个线程中,计算只产生一次
方法调用
T get()
调用会阻塞,直至计算完成T get(long timeout,TimeUnit unit)
也会阻塞,在timeout计时结束后还未完成,会抛异常 TimeOutException
get一般在最后调用,在get时计算被中断 InterruptedException
Boolean isDone()
计算是否完成cancel(boolean)
取消计算,如果传入true, 会中断正在进行的计算 取消之后无法再次回复boolean isCancelled()
任务是否已取消,如果在正常完成前已取消返回true。
取消失败:
1. 不知道任务在哪个线程中执行
2. 任务没有监视执行该任务的线程的中断状态
10. CountDownLatch 计数器
CountDownLatch 就像一个倒数计数器,必须要等计数完毕后阻塞的位置才能继续运行
//计数6次,必须释放6次,被阻塞的线程才能继续运行
CountDownLatch count = new CountDownLatch(6);
for (int i=0;i<6;i++) {
new Thread(()->{
System.out.println("线程"+Thread.currentThread().getName()+"完事了!");
count.countDown();//计数器减一
},String.valueOf(i)).start();
}
count.await();//需要等待的位置,必须要等计数到0才就绪
System.out.println("全部完事");
new CountDownLatch(6)
构造一个计数器,构造函数填计数次数
countDown()
计数器计数一次(减一)
await()
线程阻塞的位置
11. CyclicBarrier 循环栅栏
CyclicBarrier cyclicBarrier=new CyclicBarrier(7,()->{
System.out.println(Thread.currentThread().getName()+"开始行动!");
});
for (int i=1;i<=7;i++){
new Thread(()->{
System.out.println(Thread.currentThread().getName()+"到了栅栏");
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
// cyclicBarrier.await();使得线程到这里之后,会等parties个线程到了再突破(await7次)
System.out.println(Thread.currentThread().getName()+"突破栅栏");
}, String.valueOf(i)).start();
}
CyclicBarrier(int parties, Runnable barrierAction)
等parties个线程到达栅栏,才启动barrierAction中runnable的动作(等p个人到齐了再开会)
runnable中的动作会由随机一个线程启动
cyclicBarrier.await()
使得线程到这里之后,会等parties个线程到了再突破(await7次)
reset()
先打破当前栅栏,再重建栅栏
CountDownLatch
,CyclicBarrier
区别:
- CountDownLatch:一个或者多个线程,等待其他多个线程完成某件事情之后才能执行;(每个线程都有自己的事)
- CyclicBarrier(可复用):多个线程互相等待,直到到达同一个同步点,再继续一起执行。
12. Semaphore 信号
//多个线程抢占资源 构造方法中填资源的数量
Semaphore semaphore=new Semaphore(3);
for (int i = 1; i <=6 ; i++) {
new Thread(()->{
try {
semaphore.acquire();//占位置,资源-1
System.out.println(Thread.currentThread().getName()+"抢到了资源!");
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName()+"用完了了资源!");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
semaphore.release();//最后要release资源,使得资源+1
}
},String.valueOf(i)).start();
}
//例如,让多个线程抢一个资源,并持有一定时间,可以将构造方法参数设为1(synchronized)
new Semaphore()
多个线程抢占资源 构造方法中填资源的数量,例如,让多个线程抢一个资源,并持有一定时间,可以将构造方法参数设为1(synchronized)
acquire()
抢占资源,资源-1
finally{semaphore.release();}
释放资源.资源+1 放到finally里
11. ReadWriteLock 读写锁
写锁(只能让一个线程写,保证写的完整)
读锁(可以让多个线程同时读)
只要遇到了写操作,就是不能共存的
下面的例子用读写锁保证了map的多线程操作的一致性,写时加写锁,读时加读锁
public class readWriteLockTest {
Map<Integer,Integer> map= new HashMap<>();
ReadWriteLock readWriteLock=new ReentrantReadWriteLock();
/**
* 写操作 单
* @param k
* @param v
*/
public void put(Integer k,Integer v){
readWriteLock.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName()+"写开始"+k);
TimeUnit.MILLISECONDS.sleep(300);
map.put(k,v);
System.out.println(Thread.currentThread().getName()+"写完成"+k);
}catch (Exception e){
}finally {
readWriteLock.writeLock().unlock();
}
}
/**
* 读操作 多
* @param k
*/
public void read(Integer k){
readWriteLock.readLock().lock();
try {
System.out.println(Thread.currentThread().getName() + "读开始" + k);
TimeUnit.MILLISECONDS.sleep(300);
int v = map.get(k);
System.out.println(Thread.currentThread().getName() + "读完成" + v);
}catch (Exception e){}finally {
readWriteLock.readLock().unlock();
}
}
public static void main(String[] args) {
readWriteLockTest resources=new readWriteLockTest();
for (int i = 1; i <5 ; i++) {
final int a =i;
new Thread(()->{
resources.put(a,a);
},String.valueOf(i)).start();
}
for (int i = 1; i <5 ; i++) {
final int a =i;
new Thread(()->{
resources.read(a);
},String.valueOf(i)).start();
}
}
}
ReadWriteLock readWriteLock=new ReentrantReadWriteLock()
ReentrantReadWriteLock是读写锁接口的实现
readWriteLock.writeLock().lock();
写锁
readWriteLock.readLock().lock();
读锁
finally{readWriteLock.readLock().unlock();}
最后别忘了相应的解锁
在写的状态下,所有操作都不能进行,读的状态下允许其他线程读,不允许其他线程写
13. 线程安全的集合
ArrayList
Vector
jdk1.0 加锁collections.sync
加锁CopyOnWriteArrayList
写时复制,读写分离;在线程读数据时不会改变集合,在写入前先复制数据,写入完后这个副本为新的集合
Set
collections.sync
加锁CopyOnWriteSet
写时复制,读写分离
Map
HashTable
加锁collections.sync
加锁ConcurrentHashMap
线程安全的hashmap 不允许有null值
ConcurrentHashMap<K,V>
一定安全吗?
对于ConcurrentHashMap<String,Long>
来说,改变其中的Long值是不安全的,可能会有另一线程同时更新
解决办法
- replace操作 do{ } while(!map.replace(word,old,new))
- 将Long替换为AtomicLong 并使用对应的原子操作
compute(键,(k,v)->算新值的函数(函数式接口))
会阻塞对映射的其他更新操作,返回null会将对应的键删除
map.compute("666",(k,v)->666);
merge(键,新参数值,算新值的函数(函数式接口))
会阻塞对映射的其他更新操作,返回null会将对应的键删除
map.merge("aa", 3, Integer::sum)
14. BlockingQueue阻塞队列
有的场景,需要将所有被阻塞的线程放到一个队列中,生产者向队列插入元素,消费者从队列中获取元素
例如,饭店的等位区,就是一个阻塞队列,饭店(消费者)吸收队列中的顾客(线程),不断有外来的顾客(生产者生产线程)进入等候区(阻塞队列)
当饭店有位置空时,可以让顾客吃饭(出队),不断涌入等候区的顾客(入队)
当等位区满时,不能再有顾客进入等位(阻塞),当等位区空时,饭店无法吸收顾客(阻塞)
在队满时与队空时都阻塞
实现类:
ArrayBlockingQueue:
由数组结构组成的有界阻塞队列。LinkedBlockingQueue:
由链表结构组成的有界(但大小默认值为integer.MAX_VALUE)阻塞队列。PriorityBlockingQueue:
支持优先级排序的无界阻塞队列。DelayQueue:
使用优先级队列实现的延迟无界阻塞队列。SynchronousQueue:
不存储元素的阻塞队列,也即单个元素的队列。LinkedTransferQueue:
由链表组成的无界阻塞队列。LinkedBlockingDeque:
由链表组成的双向阻塞队列。
ArrayBlockingQueue
是有界队列,构造时必须传入容量
BlockingQueue<Integer> blockingQueue=new ArrayBlockingQueue<>(5);
三组方法
1. 抛异常
//add 入队
blockingQueue.addAll(Arrays.asList(1,2,3,4,5));
//blockingQueue.add(9); 当阻塞队列满后无法再入队
//remove出队 队空抛异常
System.out.println(blockingQueue.remove());
//element返回队首元素(不删除) 队空抛异常NoSuchElementException
System.out.println(blockingQueue.element());
add
入队 当阻塞队列满后无法再入队
remove
出队 队空抛异常
element
返回队首元素(不删除) 队空抛异常NoSuchElementException
2. 不抛异常
//offer 入队(不抛异常) 成功返回true 失败(队满)返回false
System.out.println(blockingQueue.offer(11));
//offer 入队3秒,成功返回true 3秒后还阻塞,就不入队了,返回false
System.out.println(blockingQueue.offer(12,3L, TimeUnit.SECONDS));
//poll 出队(不抛异常) 成功返回对象 失败返回null
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
//poll 出队3秒,成功返回对象 3秒后还阻塞,返回null
System.out.println(blockingQueue.poll());
//peek返回队首元素(不删除) 队空返回null
System.out.println(blockingQueue.peek());
offer
入队(不抛异常) 成功返回true 失败(队满)返回false
offer(E var1, long var2, TimeUnit var4)
入队x秒,成功返回true x秒后还阻塞,就不入队了,返回false
poll
出队(不抛异常) 成功返回对象 失败返回null
poll (E var1, long var2, TimeUnit var4)
出队x秒,成功返回对象 3秒后还阻塞,返回null
peek
返回队首元素(不删除) 队空返回null
3. 阻塞
//put 入队
blockingQueue.put(1);
blockingQueue.put(2);
blockingQueue.put(3);
blockingQueue.put(4);
blockingQueue.put(5);
//blockingQueue.put(6); 队满阻塞
//take 出队 队空阻塞
System.out.println(blockingQueue.take());
put
入队 队满阻塞
take
出队 队空阻塞
SynchronousQueue:
队列容量为1(不储存元素) 进了一个就队满
15. 线程池
线程池做的工作主要是控制运行的线程数量,处理过程中将任务放入队列,然后在线程创建后启动这些任务,如果线程数量超过了最大数量,超出数量的线程排队等候,等其他线程执行完毕,再从队列中取出任务来执行。
特点
- 线程复用;
- 控制最大并发数;
- 管理线程。
Executors.newFixedThreadPool(int);
1池指定个数受理线程 空闲的线程一直保留
Executors.newSingleThreadExecutor();
池容量为1
Executors.newCachedThreadPool();
必要时创建新线程,空闲线程会保留60
//创建线程池
//1池5个受理线程 池容量5 空闲的线程一直保留
ExecutorService threadPool1=Executors.newFixedThreadPool(5);
//创建容量为1的池 单例
ExecutorService threadPool2 = Executors.newSingleThreadExecutor();
//必要时创建新线程,空闲线程会保留60s
ExecutorService threadPool3 = Executors.newCachedThreadPool();
try {
//这里有10个线程
for (int i = 1; i <=100 ; i++) {
threadPool3.execute(()->{
System.out.println(Thread.currentThread().getName()+"正在运行");
});
try {
TimeUnit.SECONDS.sleep(1);//对于池3 加了休眠 上个线程完成后这个线程会复用
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}catch (Exception e){
e.printStackTrace();
}finally {
threadPool1.shutdown();
threadPool2.shutdown();
threadPool3.shutdown();
}
上面3个创建线程池的方法都是, ThreadPoolExecutor类的一个构造方法 ,实现都是靠阻塞队列
有关ThreadPoolExecutor构造方法
ThreadPoolExecutor的7个参数
- corePoolSize即使在空闲状态下也要保留在池中的核心线程数,除非设置了{@code allowCoreThreadTimeOut}
- maximumPoolSize 池中允许的最大线程数,一般是cpu线程数加1(java显示线程数:
Runtime.getRuntime().availableProcessors()
) - keepAliveTime 当线程数大于核心线程数时的生存时间,这是多余的空闲线程将在终止之前等待新任务的最长时间。
- unit {@code keepAliveTime}参数的时间单位
- workQueue 在执行任务之前用于保留任务的队列。该队列将仅保存由{@code execute}方法提交的{@code Runnable}任务。
- threadFactory 执行程序创建新线程时要使用的工厂 ,默认Executors.defaultThreadFactory()
- handler 因为达到了线程界限和队列容量而在执行被阻止时使用的处理程序,默认defaultHandler
一般使用ThreadPoolExecutor创建线程池
(防止由于队列长度过长的OOM)
自定义线程池
//自定义线程池
ExecutorService threadPool=new ThreadPoolExecutor(2,
Runtime.getRuntime().availableProcessors()+1,//最大线程数一般是cpu线程数加1
2,TimeUnit.SECONDS,
new LinkedBlockingQueue<>(3),//容量不写默认Integer.MAXVALUE
Executors.defaultThreadFactory(),new ThreadPoolExecutor.CallerRunsPolicy());
try {
for (int i = 1; i <=20 ; i++) {//线程数超过maximumPoolSize+workQueue长度 会触发reject
threadPool.execute(()-> System.out.println(Thread.currentThread().getName()+"正在运行"));
/*try {
TimeUnit.MILLISECONDS.sleep(3);//对于池3 加了休眠 上个线程完成后这个线程会复用
} catch (InterruptedException e) {
e.printStackTrace();
}*/
}
}catch (Exception e){
e.printStackTrace();
}finally {
threadPool.shutdown();
}
自定义线程池的拒绝策略:
new ThreadPoolExecutor.AbortPolicy()
中断策略 线程池爆了直接抛异常RejectedExecutionException
new ThreadPoolExecutor.CallerRunsPolicy()
“调用者运行”一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量。(把任务丢回去,表示这里运行不了,让Caller(调用者)运行这个任务,如果caller线程关了,任务会被抛弃)new ThreadPoolExecutor.DiscardPolicy()
丢弃策略 线程池满了把多余的线程丢弃new ThreadPoolExecutor.DiscardOldestPolicy()
丢弃队列中等待最久(最先入队还未被处理的任务)的任务,然后把当前任务加人队列中,尝试再次提交当前任务。
执行线程的方法execute
与 submit
:
- 在
execute(Runnable)
中可以传入一个runnable接口的实现对象 出现异常会抛出 submit()
可以传入Runnable
(传Runnable时get会返回null)也可以传Callable
,在需要获取结果时要用get接收(get时会抛异常,不get永远不会抛异常)
//submit
ExecutorService threadPool=new ThreadPoolExecutor(2,
Runtime.getRuntime().availableProcessors()+1,//最大线程数一般是cpu线程数加1
2,TimeUnit.SECONDS,
new LinkedBlockingQueue<>(3),//容量不写默认Integer.MAXVALUE
Executors.defaultThreadFactory(),new ThreadPoolExecutor.CallerRunsPolicy());
Future<Integer> submit1 = threadPool.submit(() -> {System.out.println(Thread.currentThread().getName());return 444;});
Future<?> submit2 = threadPool.submit(() -> System.out.println(Thread.currentThread().getName()));
try {
System.out.println(submit1.get());
System.out.println(submit2.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}finally {
threadPool.shutdown();
}
16. fork-join 框架(缺代码)
将一个大任务,进行拆分(fork)成若干个小任务(拆到不可再拆时),再将一个个的小任务运算的结果进行 join 汇总。
“工作窃取”模式(work-stealing):当执行新的任务时它可以将其拆分分成更小的任务执行,并将小任务加到线程队列中,然后再从一个随机线程的队列中偷一个并把它放在自己的队列中。
17. 异步计算CompletableFuture
当有一个Future对象时,需要调用get来获取其计算的结果,这个方法会阻塞,直到其值可用
CompletableFuture
类实现了Future接口,实现了一种类似于AJAX的异步机制,你要注册一个回调,一旦结果可用,就会在某个线程中利用该结果调用这个回调
public static CompletableFuture<Void> runAsync(Runnable runnable)
传入Runnable接口的实现,无返回
CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "完成了异步计算");
});
completableFuture.get();//获得结果
public static <U> CompletableFuture<U> supplyAsync(supplier<U> supplier)
供给型接口,有返回值
supplier接口不会抛出检查型异常
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("正在异步计算!");
return 6688;
});
Integer complete = completableFuture.whenComplete((t, u) -> {
System.out.println("t->" + t);//正常的返回结果
System.out.println("u->" + u);//错误信息
}).exceptionally(u -> {
System.out.println(u.getMessage());//打印错误信息
return -1;
}).get();
System.out.println(complete);
whenComplete
得到结果 和 未捕获的异常
exceptionally
从错误中得出一个计算结果
锁总结
1. 公平锁与非公平锁
公平锁
不可以插队
非公平锁
可以插队,默认一般是非公平
2. 可重入锁(递归锁)
同一线程外层函数获得锁之后,会同时获得内层函数的锁
ReentrantLock/synchronized就是一个典型的可重入锁
可重入锁最大的作用就是避免死锁
3. 自旋锁 spinlock CAS
当一个线程在获取锁的时候,如果锁已经被其它线程获取,那么该线程将循环等待,然后不断的判断锁是否能够被成功获取,直到获取到锁才会退出循环。while()
4. 死锁
两个线程互相争抢资源 都不放手
public class DeadLockRunnable implements Runnable {
private int num;
private String chopsticks1;
private String chopsticks2;
public DeadLockRunnable(int num, String chopsticks1, String chopsticks2) {
this.num = num;
this.chopsticks1 = chopsticks1;
this.chopsticks2 = chopsticks2;
}
@Override
public void run() {
if (num==1) {
synchronized (chopsticks1) {
System.out.println(Thread.currentThread().getName()+"获取到chopsticks1等待获取chopsticks2");
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (chopsticks2) {
System.out.println(Thread.currentThread().getName()+"用餐完毕");
}
}
}
if (num==2) {
synchronized (chopsticks2) {
System.out.println(Thread.currentThread().getName()+"获取到chopsticks2等待获取chopsticks1");
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (chopsticks1) {
System.out.println(Thread.currentThread().getName()+"用餐完毕");
}
}
}
}
public static void main(String[] args) {
//张三李四都需要资源1和2,才能解决问题,可都只占有一个资源且不放手,形成死锁
new Thread(new DeadLockRunnable(1,"A","B")).start();
new Thread(new DeadLockRunnable(2,"A","B")).start();
}
}
解决步骤
- 查看现在所有进程的进程号
jps -l
定位死锁进程号 jstack 进程号
查看堆栈信息 里面会提示 发现死锁