1、Java中的线程安全问题
(1)异步化技术
MQ
(消息队列):ActiveMQ
、Kafka
、RocketMQ
、ZeroMQ
、RabbitMQ
。MQ
的控制范围是跨进程的。
线程:控制范围是进程级别,单一进程之内。
(2)多线程的使用场景
1)数据导入
导入20万数据:
循环读取数据 {
(每读取5000条数据 --> 保存到数据库(2s))---> 丢给线程处理
}
2)并行调用微服务
method() {
aservice.method(); -RPC(默认就提供了异步调用的支持)
bservice.method(); -RPC
}
运用多线程的好处:合理利用CPU资源。
(3)多线程的实现
1)继承Runable
接口
2)继承Thread
类
3)继承Callable/Future
,是Thread
带场景化的封装(带返回值的线程)
4)继承ThreadPoolExecutor
,也是Thread
带场景化的封装(线程池)
Thread
中的run
方法执行结束,表示线程执行结束,JVM
会自动触发线程资源的回收。
(4)多线程带来的安全性问题
多线程在使用过程中会带来安全性问题,是因为多个线程对一个成员变量做修改是不安全的,多个线程对于共享资源的操作是不安全的。
HashMap
不是线程安全的,无法保证数据的预期安全性。
而局部变量是线程安全的,因为局部变量是线程私有的,是在线程栈帧里的。
(5)安全性源头:可见性、原子性和有序性问题及其解决方法
1)可见性:A线程对于共享变量的修改,对B线程不可见,volatile
关键字是解决可见性问题的一种手段。硬件层面的原因:CPU的高速缓存;软件层面的原因:编译器的深度优化。
2)原子性:ACID
特性,操作不可分割,要么成功,要么失败。解决方式:Atomic
包、Synchronized
和Lock
机制。
3)有序性:指令的执行顺序和编写的顺序不同,指令会重排序,目的是优化执行效率,提升CPU利用率。在多核心多线程环境下,重排序会带来可见性问题,解决方法:volatile
。
volatile解决可见性和有序性问题。
解决线程安全问题的常用方法:synchronized
,volatile
,lock
,Atomic
,final
。
2、并发中的Lock机制
(1)并发编程的三大构件
1)条件互斥(共享资源)
volatile int state=0(无锁)/ 1(有锁)多个线程对于共享变量的修改
// 无法满足互斥的条件了
synchronized(this){
if (state == 0) { // 读取state变量,t1,t2都判断这个值为true
state = 1; // 写入
}
}
// 这种情况是可以保证原子性的
if(cas(0,1)){
// 只会有一个线程成功
state=1;
}
// cas方法:如果object中的成员变量state的预期值为0,则修改成1,返回true,否则返回false
native compareAndSet(object, stateOffset, 0, 1); // 基于缓存锁来实现的
native:C++写的本地方法,直接调用内存
2)等待队列
等待队列是一种数据结构,因为获得锁的线程在释放锁的时候要唤醒下一个线程,所以要将等待的线程放入等待队列中排队。这就在单向或双向链表中存在对象指针的引用。
3)阻塞和唤醒
唤醒是获得锁的线程释放资源之后,唤醒处于等待队列中的任一线程。
LockSupport.park:阻塞
LockSupport.unpark(Thread):主动唤醒
这两个方法是J.U.C
这个包中提供的后门,它是针对JVM
层面的park
和unpark
方法的封装(Synchronized
底层的线程阻塞也是通过park
和unpark
实现的)。
wait/notify
wait:阻塞指定的线程
notify:主动唤醒,却并不能唤醒指定的线程
sleep:阻塞和被动唤醒
Thread.join:阻塞和被动唤醒
(2)获得锁失败的线程,接下来要做什么?
加入队列(看代码去证实想法)
addWaiter(Node.EXCLUSIVE, arg)
并且阻塞
acquireQueued() // 在队列中不断的去循环抢占锁以及阻塞
3、线程池
(1)池化技术的思想
线程池的好处:
- 避免线程的重复创建和销毁所带来的性能开销
- 合理设置线程大小避免出现资源瓶颈,限流保护机制,线程不断创建会占用大量的CPU资源
(2)线程如何实现复用
Thread t1 = new Thread();
t1.start();
当t1线程中的run
方法执行结束之后,t1线程就会被回收。所以要想t1线程可以被循环利用,唯一的方式就是线程不销毁(保证run
方法不执行完)。
(线程的控制有:通过start
启动,通过join/wait/sleep/...
去把线程阻塞起来。)
那么我们在run
方法中加while(true)
死循环,run
方法就不结束了。
实现线程复用的方式:
while(true)
死循环,避免run
方法执行结束- 不能一直空转,需要阻塞
- 保证有任务需要我处理的时候,我去执行,没有任务需要我处理的时候,我就阻塞(阻塞队列可以实现)
线程池实现线程复用的本质:生产者消费者模式。
在java中如何实现生产者消费者模式?
wait/notify
方式。J.U.C Condition
等价于wait/notify
。
(3)不同的线程池创建方式
// 构建5个线程的线程池
// 方式一:固定线程数量
ExecutorService executorService = Executors.newFixedThreadPool(5);
ThreadPoolExecutor executorService2 = (ThreadPoolExecutor) Executors.newFixedThreadPool(5);
// 方式二:可以灵活调整的线程数
Executors.newCachedThreadPool();
// 方式三:定时任务
Executors.newScheduledThreadPool();
// 方式四:只有一个线程的线程池
Executors.newSingleThreadExecutor();
// 方式五:任务窃取,分配给一个线程的任务,这个线程没有执行完,其他线程可以窃取任务来执行
Executors.newWorkStealingPool(); // mapreduce fork/join
这5种线程池创建方式本质上都是对以下ThreadPoolExecutor
方法不同参数的调用。
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
corePoolSize: // 核心线程数,代表了线程池中最大能够创建的线程数量中包含的核心线程数
maximumPoolSize: // 最大线程数,代表了线程池中最大能够创建的线程数量
keepAliveTime: // 空闲线程的存活时间
TimeUnit unit: // 存活时间的单位
BlockingQueue<Runnable> workQueue: // 阻塞队列,用来存储需要执行的任务
ThreadFactory threadFactory: // 用来创建线程的工厂,它就是线程池中所有线程的构建工厂
RejectedExecutionHandler handler: // 拒绝执行的策略
(4)一些注意点
1)一般来说,核心线程是不会被回收的,其他线程当线程处于空闲状态时,空闲时间达到keepAliveTime
之后,会被自动回收(销毁)。
2)什么情况下需要去创建大于最大核心线程数的其他线程?
说明队列满了,就需要请帮手来解决,最多能请maximumPoolSize-corePoolSize
个帮手。其他线程是用来处理突发流量的。
3)线程什么情况下处于空闲状态?
说明阻塞队列中没有数据了。
4)线程池代表一系列线程,怎么管理?
调用addWorker()
方法,对线程池的数据结构进行管理:
- 先判断线程的状态
- 再增加线程个数:
compareAndIncrementWorkerCount()
- 然后构建线程:
new Worker();
- 最后把线程添加到一个数据结构中:
HashSet