1.多线程访问共享资源的问题
多线程是可以共享资源的,但会引发一个问题,共享资源被多个线程同时访问时会造成数据异常或程序异常。还是用之前的货船来阐述这个问题,如下图所示
每个货船都需要经过一处集装箱检查的地方,但该检查处一次只能检查一艘船,当多艘货船同时到达,那必定是有货船在排队等待进入。如果多艘货船一同挤进该检查处,那势必会造成检查处的混乱。
共享资源的实例如下所示
import java.util.concurrent.Executors
abstract class IntGenerator{
@Volatile private var canceled = false
abstract fun next():Int
fun cancel(){
canceled = true
}
fun isCanceled(): Boolean{
return canceled
}
}
class EvenChecker(var generator:IntGenerator,val id:Int): Runnable{
override fun run() {
while (!generator.isCanceled()){
var i = generator.next()
if(i % 2 != 0){ //如果是奇数,则会去设置IntGenerator的属性值,从而退出循环
println("$i not even!")
generator.cancel()
}
}
}
companion object {
fun test( gp:IntGenerator,count: Int){
println("Press Control-C to exit")
val exec = Executors.newCachedThreadPool()
for(i in 0 until count){
exec.execute(EvenChecker(gp,i))
}
exec.shutdown()
}
}
}
class EvenGenerator: IntGenerator() {
private var currentEvenValue = 0
override fun next(): Int {
//产生一个偶数值,使得上述循环不会退出
++currentEvenValue //自增操作并非是原子操作
Thread.yield() //让错误更快地发生
++currentEvenValue
return currentEvenValue
}
}
fun main(args: Array<String>) {
//执行5个任务,都可以同时去访问同一个EvenGenerator对象
EvenChecker.test(EvenGenerator(),10)
}
按正常逻辑来说,线程是处于死循环状态的,因为每个线程都在加2,所以无法达到奇数的条件。但是输出结果如下,这是其中一种可能
Press Control-C to exit
17 not even!
19 not even!
21 not even!
23 not even!
当在两个自增操作间调用Thread.yield()方法时,退出循环的概率将大幅度增加,yield方法是让出线程的CPU使用权给其它线程。
因为每个任务都可以随时访问与修改EvenGenerator.currentEvenValue这个值,当有一个任务在执行第一个自增加操作后,又有另一个任务执行完了next()方法,这时取到的currentEvenValue就是奇数。所以多线程下的可能会造成资源的不正确访问。
2.解决共享资源竞争
2.1序列化访问共享资源
如何保证一次只能一艘货船进入检查处呢,那就是当有货船进入时就把入口锁起来,那其它的船就是想进无法进入,对应的就是多线程中的互斥量(mutex),这种机制是通过在代码前面加上一条锁语句来实现的,使得一段时间内只有一个任务可以运行这段代码。
2.1.1 synchronized关键字实现对象锁
Java用synchronized实现了为防止资源冲突的内置支持,当任务要执行被synchronized关键字保护的代码片段的时候,它将检查锁是否可用,然后获取锁,执行代码,释放锁。
所有对象都自动含有单一的锁(也称为监视器),当在对象上调用其任意synchronized方法的时候,此对象都将被加锁,这时该对象上的其它synchronized方法只有等到前一个方法调用完毕并释放了锁之后才能被调用。
所有synchronized方法共享同一个锁,当某个任务调用对象的synchronized方法时,其它任务都不能再访问对象中的任何一个synchronized方法
JVM有一个对锁计数的机制,当调用一次synchronized方法时,计数加1,当离开一次synchronized方法时计数减一,当计数为0时即表示该对象已经完全释放了锁。
示范如下
synchronized void f(){}
或是用以实现临界区(防止多个线程同时访问方法内部的部分代码而非整个方法,分离出的代码称之为临界区),也称之为同步控制块.通过使用同步控制块,而不是对整个方法进行同步控制,可以使多个任务访问对象的时间性能得到显著提高。
synchronized void f(){
int a = 1;
//以下部分称之为临界区或是同步控制块
synchronized(syncObject){
}
}
使用synchronized解决上述资源访问的问题,示例如下
class SynchronizedGenerator: IntGenerator(){
private var currentEvenValue = 0
@Synchronized override fun next(): Int {
++currentEvenValue //自增操作并非是原子操作
Thread.yield() //让错误更快地发生
++currentEvenValue
return currentEvenValue
}
}
fun main(args: Array<String>) {
//执行5个任务,都可以同时去访问同一个EvenGenerator对象
EvenChecker.test(SynchronizedGenerator(),10)
}
/*output
Press Control-C to exit
*/
程序一直处于死循环的状态,并不会退出循环,因为运用了锁的机制,将自增操作的next()进行同步,当有一个任务在访问next()方法时,其它任务都无法访问同一个SynchronizedGenerator对象的next()方法,从而实现共享资源并发访问的正确性。
2.1.2 使用显式的Lock对象
除了用synchronized实现互斥,还可用java.util.concurrent.locks
包中的类来实现.
Lock对象必须被显式地创建、锁定和释放,它与内建的锁形式相比(synchronized),代码缺乏优雅性,但对于解决某些特定类型的问题它理加灵活。
使用Lock的实例如下
class MutexEvenGenerator: IntGenerator(){
private var currentEvenValue = 0
private val lock = ReentrantLock()
override fun next(): Int {
lock.lock()
try {
++currentEvenValue
Thread.yield()
++currentEvenValue
return currentEvenValue
}finally {
lock.unlock()
}
}
}
fun main(args: Array<String>) {
EvenChecker.test(MutexEvenGenerator(),10)
}
运行结果同样是处于死循环的状态,当前数值永远都是奇数,作用同synchronized是一致的
ReentrantLock的作用:允许尝试着获取但最终未获取到锁,可用来判断是否其它线程获取了锁,来避免一直处于等待锁的状态,从而提供了灵活性。
2.2 使用原子类来避免同步操作
- 原子性:原子操作是不能被线程调度机制中断的操作,一旦操作开始则必须在可能发生的线程切换前执行完毕。可应用于除long和double之外的所有基本类型之上的"简单操作"
Java中引入了诸AtomicInteger、AtomicLong、AtomicReference等特殊的原子性变量类,这是机器级别上的原子性。
用原子类解决上述奇数问题退出的实例如下
class AtomicEvenGenerator: IntGenerator(){
//使用原子类
private var currentEvenValue = AtomicInteger(0)
//消除了synchronized关键字,因为变量的原子操作可避免产生同步问题
override fun next(): Int {
return currentEvenValue.addAndGet(2)
}
}
fun main(args: Array<String>) {
EvenChecker.test(AtomicEvenGenerator(),10)
}
2.3 使用volatile关键字实现共享资源可视化
- 可视性: 这是一个有关缓存的概念,线程自身持有一个寄存器,CPU也有缓存机制,当对一个变量进行写操作后,修改可能只是暂时性地存储在本地处理器的缓存中,并未刷新到主存中。其它线程可能无法及时地读取到该变量的最新值,这即是对其它线程不具有可视性。
当将一个域声明为volatile时,只要对这个域产生了写操作,那么所有的读操作就都可以看到这个修改,即意味着
volatile域会立即被写入到主存中,而读取操作就发生在主存中。将域定义为volatile时,它就会告诉编译器不要执行任何移除读取和写入操作的优化,这些操作的目的是用线程的局部变量维护对这个域的精确同步。
如果多个任务在同时访问某个域,那么这个域就应该是volatile的,否则这个域就应该只能经由同步来访问。同步也会导致向主存中刷新,因此如果一个域完全由synchronized方法或语句块来防护,那就不必将其设置为是volatile的。
使用volatile而不是synchronized的唯一安全的情况是类中只有一个可变的域,应优先选择最安全的方式synchronized
简单示例如下
class SerialNumberGenerator{
companion object {
@Volatile private var serialNumber = 0
fun nextSerialNumber():Int{
return serialNumber++
}
}
}
可参考文章关于volatile的详细介绍:Java中Volatile关键字详解
2.4 线程本地存储
线程本地存储是一种自动化机制,可为使用相同变量的每个不同线程都创建不同的存储,根除了对变量的共享。
创建和管理本地存储可由java.lang.ThreadLocal
类来实现,实例如下
class Accessor(val id:Int): Runnable{
override fun run() {
while (!Thread.currentThread().isInterrupted){
ThreadLocalVariableHolder.increment()
println(this)
Thread.yield()
}
}
override fun toString(): String {
return "#$id:${ThreadLocalVariableHolder.get()}"
}
}
class ThreadLocalVariableHolder{
companion object {
var value = object:ThreadLocal<Int>() {
private val rand = Random()
@Synchronized override fun initialValue():Int{
return rand.nextInt(10000)
}
}
fun increment(){
value.set(value.get() + 1)
}
fun get():Int{
return value.get()
}
}
}
fun main(args: Array<String>) {
val exec = Executors.newCachedThreadPool()
for(i in 0 until 5){
exec.execute(Accessor(i))
}
TimeUnit.SECONDS.sleep(3)
exec.shutdown()
}
/**output
#1:18883
#3:16378
#3:16379
#3:16380
#0:20039
#4:23289
#3:16381
#1:18884
#2:20347
#1:18885
#3:16382
#3:16383
#4:23290
#0:20040
#4:23291
*/
ThreadLocal对象通常当作静态域存储,只能通过get()和set()方法来访问该对象的内容,get()方法将返回与其线程相关联的对象的副本,而set()会将参数插入到为其线程存储的对象中,并返回存储中原有的对象。 在输出结果中,可以看到每个任程的变量都是按照自己线程的计数独立增加的,不会受到其它线程的影响。
2.5总结
综上所述,关于解决资源共享所引发问题的解决方案共有四种
- 序列化访问共享资源(synchronized关键字, 使用显式的Lock对象)
- 使用原子类来避免同步操作
- 使用volatile关键字实现共享资源可视化
- 线程本地存储
3.多线程协作的两个经典问题
3.1 生产者-消费者问题
假设有一个饭店,它有一个厨师和服务员,服务员必须等待厨师准备好膳食,当厨师准备好时,他会通知服务员,之后服务员上菜,然后返回继续等待。
这里面厨师代表生产者,服务员代表消费者,两个任务必须在膳食被生产和消费时进行握手,而系统必须以有序的方式关闭。
下面是关于这个问题的代码实现
package concurrency;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* 厨师生产的食物类
*/
class Meal{
/**
* 食物的订单号
*/
private final int orderNum;
public Meal(int orderNum){
this.orderNum = orderNum;
}
@Override
public String toString() {
return "Meal " + orderNum;
}
}
/**
* 服务员
* 职责同将厨师生产的食物给端给客人,即消费了食物
*/
class WaitPerson implements Runnable{
private Restaurant restaurant;
public WaitPerson(Restaurant r){restaurant = r;}
@Override
public void run() {
try{
while (!Thread.interrupted()){
synchronized (this){
//当厨师没有生产好的食物时,服务员进入等待状态
while (restaurant.meal == null){
wait(); //wait()方法必须在同步块中调用,并会释放锁
}
}
System.out.println("Waitperson got "+ restaurant.meal);
//方法执行到这里时说明食物已经准备好了,即restaurant.meal不为null
synchronized (restaurant.chef){
//服务员将食物端给客人,即是消耗了厨师生产的食物
restaurant.meal = null;
//然后通知厨师开始制作下一份食物
restaurant.chef.notifyAll();
}
}
}catch (InterruptedException e){
System.out.println("WaitPerson interrupted");
}
}
}
/**
* 厨师
* 食物的生产者
*/
class Chef implements Runnable{
private Restaurant restaurant;
private int count = 0;
public Chef(Restaurant r){
restaurant = r;
}
@Override
public void run() {
try{
while (!Thread.interrupted()){
//厨师如果制作出了一份食物,还没被服务员端走,那就会进入等待状态
synchronized (this){
while (restaurant.meal != null){
wait();
}
}
//一个退出条件,当制作的食物等于10份时就会结束线程
if(++count == 10){
System.out.println("Out of food,closing ");
restaurant.exec.shutdownNow();
}
System.out.println("Order Up!");
synchronized (restaurant.waitPerson){
//没有食物或食物被消费了,开始制作食物
restaurant.meal = new Meal(count);
//食物制作完成了,通知服务员端菜
restaurant.waitPerson.notifyAll();
}
TimeUnit.MILLISECONDS.sleep(100);
}
}catch (InterruptedException e){
System.out.println("Chef interrupted");
}
}
}
/**餐厅
* 服务员与厨师交互的中心
* 相当于两者的共享资源
*/
public class Restaurant {
/**
* 这是服务员与厨师都共享的资源,用以判断同步的条件
*/
Meal meal;
ExecutorService exec = Executors.newCachedThreadPool();
WaitPerson waitPerson = new WaitPerson(this);
Chef chef = new Chef(this);
public Restaurant(){
exec.execute(chef);
exec.execute(waitPerson);
}
public static void main(String[] args){
new Restaurant();
}
}
/**output
Order Up!
Waitperson got Meal 1
Order Up!
Waitperson got Meal 2
Order Up!
Waitperson got Meal 3
Order Up!
Waitperson got Meal 4
Order Up!
Waitperson got Meal 5
Order Up!
Waitperson got Meal 6
Order Up!
Waitperson got Meal 7
Order Up!
Waitperson got Meal 8
Order Up!
Waitperson got Meal 9
Out of food,closing
Order Up!
WaitPerson interrupted
Chef interrupted
*/
上述生产者-消费者的模式很明显一次只能生产或者消费一份食物,同一时刻只有一份食物在生产或在消费,这在现实生活中明显是不合实际的。最理想的模式应该是厨师接到订单就制作食物,然后把制作好的食物放在一个专门的盘子中,盘子中可以放置多份食物,而服务员只需要关心这个盘子中是否有食物,有就端给客人,没有就继续观察盘子中有新制作好的食物。这种模式可以省去厨师和服务员交互的过程,从而提高了工作效率。而这个专门放置食物的区域就是Java中的同步队列,同步队列在任何时刻都只允许一个任务插入或移除元素,这就是java.util.concurrent.BlockingQueue
接口。
3.2 哲学家就餐问题(Dijkstra算法)
假定有五个哲学家,它们很穷只能买五根筷子,五个人围成一桌,每人之间放一根筷子,当一个哲学家要就餐的时候,这个哲学家必须同时得到左边和右边的筷子。如果一个哲学家左边或右边已对有人在使用筷子了,那么这个哲学家就必须等待,直至可得到必需的筷子。
代码表示如下
/**
* 筷子类
*/
public class Chopstick {
private boolean taken = false;
public synchronized void take() throws InterruptedException{
//任何两个Philosopher都不能成功take()同一根筷子
while (taken){
//如一根Chopstick已被某个Philosopher获得,那另一个Philosopher可以wait,
//直至这根Chopstick的当前持有者调用drop()使其可用为止
wait();
taken = true;
}
}
//用来修改标志的状态,并通知其它哲学家可去获得这根筷子
public synchronized void drop(){
taken = false;
notifyAll();
}
}
/**
* 哲学家类
*/
public class Philosopher implements Runnable{
private Chopstick left;
private Chopstick right;
private final int id;
/**
* 表示哲学家思考的时间参数
*/
private final int ponderFactor;
private Random rand = new Random(47);
private void pause() throws InterruptedException{
if(ponderFactor == 0 ) return;
TimeUnit.MILLISECONDS.sleep(rand.nextInt(ponderFactor*250));
}
public Philosopher(Chopstick left,Chopstick right,int ident,int ponder){
this.left = left;
this.right = right;
id = ident;
ponderFactor = ponder;
}
@Override
public void run() {
try {
//哲学家不断地思考和吃饭
while (!Thread.interrupted()){
System.out.println(this+ " " + "thinking");
pause();
System.out.println(this+ " " + "grabbing right");
right.take();
System.out.println(this+ " " + "grabbing left");
left.take();
System.out.println(this+ " " + "eating");
pause();
right.drop();
left.drop();
}
}catch (InterruptedException e){
System.out.println(this + " " + "exiting via interrupt");
}
}
@Override
public String toString() {
return "Philosopher " + id;
}
}
这个过程很容易产生死锁问题,具体实现如下
public class DeadlockingDiningPhilosophers {
public static void main(String[] args) throws Exception {
System.out.println("args len = "+args.length+";"+args);
int ponder = 0;
int size = 5;
ExecutorService exec = Executors.newCachedThreadPool();
Chopstick[] sticks=new Chopstick[size];
for(int i =0;i<size;i++){
sticks[i] = new Chopstick();
}
for(int i =0;i<size;i++){
exec.execute(new Philosopher(sticks[i],sticks[(i+1)%size],i,ponder));
}
System.out.println("press 'enter' to quit");
System.in.read();
exec.shutdownNow();
}
}
/**output
Philosopher 0 grabbing right
Philosopher 2 grabbing right
Philosopher 1 grabbing left
Philosopher 3 eating
Philosopher 4 grabbing right
Philosopher 2 grabbing left
Philosopher 3 thinking
Philosopher 0 grabbing left
Philosopher 3 grabbing right
Philosopher 4 grabbing left
Philosopher 3 grabbing left
程序不会再输出任何值,正常情况应是不断地输出
*/
将哲学家的思考时间置为0将会很快达到死锁的条件,即是所有哲学家都拿起了右边的筷子,在等待左边的筷子时,这会形成一个循环等待的条件,让线程都处于无限等待的状态。
从上述案例中可以总结死锁产生同时满足的四个条件:
- 互斥条件。任务使用的资源中至少有一个是不能共享的。这里一根筷子一次就只能被一个哲学家使用
- 至少有一个任务它必须持有一个资源且正在等待获取一个当前被别的任务持有的资源。即是哲学家必须拿着一根筷子且在等待另一根
- 资源不能被任务抢占,任务必须把资源释放当作普通事件。哲学家不会从其它哲学家手里抢筷子
- 必须有循环等待,这里一个任务等待其他任务所持有的资源,后者又在等待另一个任务所持有的资源,使得大家都被锁住。在就餐问题中,哲学家都拿到了右边的筷子在等待左边的筷子从而产生了死锁。
所以如要防止死锁的话,只需要破坏上述任何一个条件即可。在程序中,最容易破坏的就是第4个条件,因为每个哲学家都按照特定的顺序去拿筷子(先右后左),这样就会产生循环等待,所以只需要保证最后一个哲学家先拿左边的筷子再拿右边的筷子就可以破坏死锁产生的条件。
代码实现如下,只需要调整下执行任务中的代码即可达到目的
for(int i =0;i<size;i++){
if(i < size - 1)
exec.execute(new Philosopher(sticks[i],sticks[i+1],i,ponder));
else
//保证最后一个哲学家先拿左边的筷子(即是4),再拿右边的筷子(即是0)
exec.execute(new Philosopher(sticks[0],sticks[i],i,ponder));
}
4.java.util.concurrent中的用以并发的实用工具类
4.1.CountDownLatch
用来同步一个或多个任务,强制它们等待由其他任务执行的一组操作完成。
使用方法如下:
- 向CountDownLatch对象设置一个初始计数值
CountDownLatch countDownLatch = new CountDownLatch(10);
- 设置一个任务让CountDownLatch对象进行等待,调用await()方法会让线程进入阻塞状态
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
- 在其它任务完成工作是调用同一个CountDownLatch对象的countDown方法让计数减一
countDownLatch.countDown();
- 当countDownLatch对象计数减为0时,所有调用await()线程的阻塞状态都会结束,任务往下执行。
4.2.CyclicBarrier
适用于希望创建一组任务,它们并行地执行工作,然后在进行下一个步骤之前等待,直至所有任务都完成。它使得所有的并行任务都将在栅栏处列队,因此可以一致地向前移动。与CountDownLatch类似,只是CountDownLatch是只触发一次的事件,而CyclicBarrier可以多次重用。
4.3.DelayQueue
这是一个无界的BlockingQUeue,用于放置实现了Delayed接口的对象,其中的对象只能在其到期时才能从队列中取走。这种队列的有序的,即队头对象的延迟到期的时间最长。如果没有任何延迟到期,那么就不会有任何头元素,并且poll()将返回null(不能将null放置到这种队列中)
4.4.PriorityBlockingQueue
这是一个很基础的优先级队列,它具有可阻塞的读取操作。
4.5 Semaphore-计数信号量
正常的锁在任何时刻都只允许一个任务访问一项资源,而计数信号量允许n个任务同时访问这个资源。还可将信号量看作是在向外分发使用资源的“许可证”。
4.6 Exchanger
是在两个任务之间交换对象的栅栏。当这些任务进入栅栏时,它们各自拥有一个对象,当它们离开时,它们都拥有之前由对象持有的对象。典型应用场景是一个任务在创建对象,这些对象的生产代价很高昂,而另一个任务在消费这些对象,通过Exchanger可以有更多的对象在被创建的同时被消费。