1、实现多线程的两种方法
继承Thread:
以卖票为例:
public class MyThread extends Thread { private static int COUNT = 5; private int ticket = COUNT; private String name; public MyThread(String s){ name = s; } @Override public void run() { for(int i = 0; i < COUNT; i++){ if(ticket > 0){ System.out.println(name + "-->" + ticket--); } } }
测试使用:
MyThread thread1 = new MyThread("thread1"); MyThread thread2 = new MyThread("thread2"); thread1.start(); thread2.start();
输出:
thread1-->5
thread2-->5
thread1-->4
thread2-->4
thread1-->3
thread2-->3
thread1-->2
thread2-->2
thread1-->1
thread2-->1
可以看到,这种方式每个线程自己拥有了一份票的数量,没有实现票的数量共享。下面看实现Runnable的方式:
实现Runnable接口:
public class MyRunnable implements Runnable { private static int COUNT = 5; private int ticket = COUNT; @Override public void run() { for(int i = 0; i < COUNT; i++){ if(ticket > 0){ System.out.println("ticket-->" + ticket--); } } } }
测试使用:
MyRunnable runnable = new MyRunnable(); new Thread(runnable).start(); new Thread(runnable).start();
输出:
ticket-->5
ticket-->3
ticket-->2
ticket-->1
ticket-->4
可以看到,实现Runnable的方式可以实现同一资源的共享。
实际工作中,一般使用实现Runnable接口的方式,原因:
- 支持多个线程去处理同一资源,同时,线程代码和数据有效分离,体现了面向对象的思想。
- 避免了Java的单继承性,如果使用继承Thread的方式,那这个扩展类就不能再去继承其他类。
拓展:
Thread的start()和run()方法区别:
start()方法用于启动一个线程,使其处于就绪状态,得到了CPU就会执行,而直接调用run()方法,就相当于是普通的方法调用,会在主线程中直接运行,此时没有开启一个线程。
(可以看第一篇总结中,关于线程启动)
下列方法中哪个是执行线程的方法? ()
A、run() B、start() C、sleep() D、suspend()
正确答案:A
run()方法用来执行线程体中具体的内容
start()方法用来启动线程对象,使其进入就绪状态
sleep()方法用来使线程进入睡眠状态
suspend()方法用来使线程挂起,要通过resume()方法使其重新启动
2、线程同步的方法
当我们有多个线程要访问同一个变量或对象时,而这些线程中既有对改变量的读也有写操作时,就会导致变量值出现不可预知的情况。如下一个取钱和存钱的场景:
没有加入同步控制的情形:
public class BankCount { private int count = 0;//余额 public void addMoney(int money){//存钱 count += money; System.out.println(System.currentTimeMillis() + "存入:" + money); System.out.println("账户余额:" + count); } public void getMoney(int money){//取钱 if(count - money < 0){ System.out.println("余额不足"); System.out.println("账户余额:" + count); return; } count -= money; System.out.println(System.currentTimeMillis() + "取出:" + money); System.out.println("账户余额:" + count); } }
测试类:
public class BankTest { public static void main(String[] args) { final BankCount bankCount = new BankCount(); new Thread(new Runnable() {//取钱线程 @Override public void run() { while(true){ bankCount.getMoney(200); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } }).start(); new Thread(new Runnable() {//存钱线程 @Override public void run() { while(true){ bankCount.addMoney(200); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } }).start(); } }
部分打印结果如下:
余额不足
账户余额:0
1462265808958存入:200
账户余额:200
1462265809959存入:200
账户余额:200
1462265809959取出:200
账户余额:200
1462265810959取出:200
账户余额:200
1462265810959存入:200
账户余额:200
1462265811959存入:200
账户余额:200
可以看到,此时有两个线程共同使用操作了bankCount对象中的count变量,使得count变量结果不符合预期。因此需要进行同步控制,同步控制的方法有以下几种:
(1)使用synchronized关键字同步方法
每一个Java对象都有一个内置锁,使用synchronized关键字修饰的方法,会使用Java的内置锁作为锁对象,来保护该方法。每个线程在调用该方法前,都需要获得内置锁,如果该锁已被别的线程持有,当前线程就进入阻塞状态。
修改BankCount 类中的两个方法,如下:
public synchronized void addMoney(int money){//存钱 count += money; System.out.println(System.currentTimeMillis() + "存入:" + money); System.out.println("账户余额:" + count); } public synchronized void getMoney(int money){//取钱 if(count - money < 0){ System.out.println("余额不足"); System.out.println("账户余额:" + count); return; } count -= money; System.out.println(System.currentTimeMillis() + "取出:" + money); System.out.println("账户余额:" + count); }
运行测试打印如下结果:
余额不足
账户余额:0
1462266451171存入:200
账户余额:200
1462266452171取出:200
账户余额:0
1462266452171存入:200
账户余额:200
1462266453171存入:200
账户余额:400
1462266453171取出:200
账户余额:200
1462266454171存入:200
账户余额:400
1462266454171取出:200
账户余额:200
1462266455171取出:200
账户余额:0
可以看到,打印结果符合我们的预期。
另外,如果我们使用synchronized关键字来修饰static方法,此时调用该方法将会锁住整个类。(关于类锁、对象锁下面有介绍)
(2)使用synchronzied关键字同步代码块
使用synchronized关键字修饰的代码块,会使用对象的内置锁作为锁对象,实现代码块的同步。
改造BankCount 类的两个方法:
public void addMoney(int money){//存钱 synchronized(this){ count += money; System.out.println(System.currentTimeMillis() + "存入:" + money); System.out.println("账户余额:" + count); } } public void getMoney(int money){//取钱 synchronized(this){ if(count - money < 0){ System.out.println("余额不足"); System.out.println("账户余额:" + count); return; } count -= money; System.out.println(System.currentTimeMillis() + "取出:" + money); System.out.println("账户余额:" + count); } }
(注:这里改造后的两个方法中因为synchronized包含了方法体的整个代码语句,效率上与在方法名前加synchronized的第一种同步方法差不多,因为里面涉及到了打印money还是需要同步的字段,所以全部包含起来,仅仅是为了说明synchronized作用...)
打印结果:
余额不足
账户余额:0
1462277436178存入:200
账户余额:200
1462277437192存入:200
账户余额:400
1462277437192取出:200
账户余额:200
1462277438207取出:200
账户余额:0
1462277438207存入:200
账户余额:200
1462277439222存入:200
账户余额:400
1462277439222取出:200
账户余额:200
可以看到,执行结果也符合我们的预期。
synchronized同步方法和同步代码块的选择:
同步是一种比较消耗性能的操作,应该尽量减少同步的内容,因此尽量使用同步代码块的方式来进行同步操作,同步那些需要同步的语句(这些语句一般都访问了一些共享变量)。但是像我们上面举得这个例子,就不得不同步方法的整个代码块,因为方法中的代码每条语句都涉及了共享变量,因此此时就可以直接使用synchronized同步方法的方式。
(3)使用重入锁(ReentrantLock)实现线程同步
重入性:是指同一个线程多次试图获取它占有的锁,请求会成功,当释放锁的时候,直到重入次数为0,锁才释放完毕。
ReentrantLock是接口Lock的一个具体实现类,和synchronized关键字具有相同的功能,并具有更高级的一些功能。如下使用:
public class BankCount {
private Lock lock = new ReentrantLock();//获取可重入锁
private int count = 0;//余额
public void addMoney(int money){//存钱
lock.lock();
try {
count += money;
System.out.println(System.currentTimeMillis() + "存入:" + money);
System.out.println("账户余额:" + count);
}finally{
lock.unlock();
}
}
public void getMoney(int money){//取钱
lock.lock();
try {
if(count - money < 0){
System.out.println("余额不足");
System.out.println("账户余额:" + count);
return;
}
count -= money;
System.out.println(System.currentTimeMillis() + "取出:" + money);
System.out.println("账户余额:" + count);
} finally{
lock.unlock();
}
}
}
部分打印结果:
1462282419217存入:200
账户余额:200
1462282420217取出:200
账户余额:0
1462282420217存入:200
账户余额:200
1462282421217存入:200
账户余额:400
1462282421217取出:200
账户余额:200
1462282422217存入:200
账户余额:400
1462282422217取出:200
账户余额:200
1462282423217取出:200
账户余额:0
同样结果符合预期,说明使用ReentrantLock也是可以实现同步效果的。使用ReentrantLock时,lock()和unlock()需要成对出现,否则会出现死锁,一般unlock都是放在finally中执行。
synchronized和ReentrantLock的区别和使用选择:
1、使用synchronized获得的锁存在一定缺陷:
- 不能中断一个正在试图获得锁的线程。
- 试图获得锁时不能像ReentrantLock中的trylock那样设定超时时间 ,当一个线程获得了对象锁后,其他线程访问这个同步方法时,必须等待或阻塞,如果那个线程发生了死循环,对象锁就永远不会释放。
- 每个锁只有单一的条件,不像condition那样可以设置多个。
2、尽管synchronized存在上述的一些缺陷,在选择上还是以synchronized优先:
- 如果synchronized关键字适合程序,尽量使用它,可以减少代码出错的几率和代码数量 ;(减少出错几率是因为在执行完synchronized包含完的最后一句语句后,锁会自动释放,不需要像ReentrantLock一样手动写unlock方法;)
- 如果特别需要Lock/Condition结构提供的独有特性时,才使用他们 ;(比如设定一个线程长时间不能获取锁时设定超时时间或自我中断等功能。)
- 许多情况下可以使用java.util.concurrent包中的一种机制,它会为你处理所有的加锁情况;(比如当我们在多线程环境下使用HashMap时,可以使用ConcurrentHashMap来处理多线程并发)。
下面两种同步方式都是直接针对共享变量来设置的:
(4)对共享变量使用volatile实现线程同步
- volatile关键字为变量的访问提供了一种免锁机制
- 使用volatile修饰域相当于告诉虚拟机该域可能会被其他线程更新
- 因此每次使用该变量就要重新计算,直接从内存中获取,而不是使用寄存器中的值
- volatile不会提供任何原子操作,它也不能用来修饰final类型的变量。
修改BankCount类如下:
public class BankCount { private volatile int count = 0;//余额 public void addMoney(int money){//存钱 count += money; System.out.println(System.currentTimeMillis() + "存入:" + money); System.out.println("账户余额:" + count); } public void getMoney(int money){//取钱 if(count - money < 0){ System.out.println("余额不足"); System.out.println("账户余额:" + count); return; } count -= money; System.out.println(System.currentTimeMillis() + "取出:" + money); System.out.println("账户余额:" + count); } }
部分打印结果:
余额不足
账户余额:200
1462286786371存入:200
账户余额:200
1462286787371存入:200
账户余额:200
1462286787371取出:200
账户余额:200
1462286788371取出:200
1462286788371存入:200
账户余额:200
账户余额:200
1462286789371存入:200
账户余额:200
可以看到,使用volitale修饰变量,并不能保证线程的同步。volitale相当于一种“轻量级的synchronized”,但是它不能代替synchronized,volitale的使用有较强的限制,它要求该变量状态真正独立于程序内其他内容时才能使用 volatile。volitle的原理是每次线程要访问volatile修饰的变量时都是从内存中读取,而不是从缓存当中读取,以此来保证同步(这种原理方式正如上面例子看到的一样,多线程的条件下很多情况下还是会存在很大问题的)。因此,我们尽量不会去使用volitale。
(5)ThreadLocal实现同步局部变量
使用ThreadLocal管理变量,则每一个使用该变量的线程都获得该变量的副本,副本之间相互独立,这样每一个线程都可以随意修改自己的变量副本,而不会对其他线程产生影响。
ThreadLocal的主要方法有:
- initialValue():返回当前线程赋予当前线程拷贝的局部线程变量的初始值。一般在定义ThreadLocal类的时候会重写该方法,返回初始值;
- get():返回当前线程拷贝的局部线程变量的值;
- set(T value):为当前线程拷贝的局部线程变量设置一个特定的值;
- remove():移除当前线程赋予局部线程变量的值
如下使用:
public class BankCount { private static ThreadLocal<Integer> count = new ThreadLocal<Integer>(){ protected Integer initialValue() { return 0; }; };//余额 public void addMoney(int money){//存钱 count.set(count.get() + money); System.out.println(System.currentTimeMillis() + "存入:" + money); System.out.println("账户余额:" + count.get()); } public void getMoney(int money){//取钱 if(count.get() - money < 0){ System.out.println("余额不足"); System.out.println("账户余额:" + count.get()); return; } count.set(count.get() - money); System.out.println(System.currentTimeMillis() + "取出:" + money); System.out.println("账户余额:" + count.get()); } }
部分打印结果:
余额不足
1462289139008存入:200
账户余额:0
账户余额:200
余额不足
账户余额:0
1462289140008存入:200
账户余额:400
余额不足
账户余额:0
1462289141008存入:200
账户余额:600
余额不足
账户余额:0
从打印结果可以看到,测试类中的两个线程分别拥有了一份count拷贝,即取钱线程和存钱线程都有一个count初始值为0的变量,因此可以一直存钱但是不能取钱。
ThreadLocal使用时机:
由于ThreadLocal管理的局部变量对于每个线程都会产生一份单独的拷贝,因此ThreadLocal适合用来管理与线程相关的关联状态,典型的管理局部变量是private static类型的,比如用户ID、事物ID,我们的服务器应用框架对于每一个请求都是用一个单独的线程中处理,所以事物ID对每一个线程是唯一的,此时用ThreadLocal来管理这个事物ID,就可以从每个线程中获取事物ID了。
ThreadLocal和前面几种同步机制的比较:
1、hreadLocal和其它所有的同步机制都是为了解决多线程中的对同一变量的访问冲突,在普通的同步机制中,是通过对象加锁来实现多个线程对同一变量的安全访问的。这时该变量是多个线程共享的,使用这种同步机制需要很细致地分析在什么时候对变量进行读写,什么时候需要锁定某个对象,什么时候释放该对象的锁等等很多。所有这些都是因为多个线程共享了资源造成的。
2、ThreadLocal就从另一个角度来解决多线程的并发访问,ThreadLocal会为每一个线程维护一个和该线程绑定的变量的副本,从而隔离了多个线程的数据,每一个线程都拥有自己的变量副本,从而也就没有必要对该变量进行同步了。ThreadLocal提供了线程安全的共享对象,在编写多线程代码时,可以把不安全的整个变量封装进ThreadLocal,或者把该对象的特定于线程的状态封装进ThreadLocal。
3、ThreadLocal并不能替代同步机制,两者面向的问题领域不同。同步机制是为了同步多个线程对相同资源的并发访问,是为了多个线程之间进行通信的有效方式;而ThreadLocal是隔离多个线程的数据共享,从根本上就不在多个线程之间共享资源(变量),这样当然不需要对多个线程进行同步了。所以,如果你需要进行多个线程之间进行通信,则使用同步机制;如果需要隔离多个线程之间的共享冲突,可以使用ThreadLocal,这将极大地简化你的程序,使程序更加易读、简洁。
4、锁的等级:方法锁、对象锁、类锁
Java中每个对象实例都可以作为一个实现同步的锁,也即对象锁(或内置锁),当使用synchronized修饰普通方法时,也叫方法锁(对于方法锁这个概念我觉得只是一种叫法,因为此时用来锁住方法的可能是对象锁也可能是类锁),当我们用synchronized修饰static方法时,此时的锁是类锁。
对象锁的实现方法:
- 用synchronized修饰普通方法(非static);
- 用synchronized(this){...}的形式包括代码块;
上面两种方式获得的锁是同一个锁对象,即当前的实例对象锁。(当然,也可以使用其他传过来的实例对象作为锁对象),如下实例:
public class BankCount { public synchronized void addMoney(int money){//存钱 synchronized(this){ //同步代码块 int i = 5; while(i-- > 0){ System.out.println(Thread.currentThread().getName() + ">存入:" + money); try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } } } public synchronized void getMoney(int money){//取钱 int i = 5; while(i-- > 0){ System.out.println(Thread.currentThread().getName() + ">取钱:" + money); try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } } }
测试类:
public class BankTest { public static void main(String[] args) { final BankCount bankCount = new BankCount(); new Thread(new Runnable() {//取钱线程 @Override public void run() { bankCount.getMoney(200); } },"取钱线程").start(); new Thread(new Runnable() {//存钱线程 @Override public void run() { bankCount.addMoney(200); } },"存钱线程").start(); } }
打印结果如下:
取钱线程>取钱:200
取钱线程>取钱:200
取钱线程>取钱:200
取钱线程>取钱:200
取钱线程>取钱:200
存钱线程>存入:200
存钱线程>存入:200
存钱线程>存入:200
存钱线程>存入:200
存钱线程>存入:200
打印结果表明,synchronized修饰的普通方法和代码块获得的是同一把锁,才会使得一个线程执行一个线程等待的执行结果。
类锁的实现方法:
- 使用synchronized修饰static方法
- 使用synchronized(类名.class){...}的形式包含代码块
因为static的方法是属于类的,因此synchronized修饰的static方法获取到的肯定是类锁,一个类可以有很多对象,但是这个类只会有一个.class的二进制文件,因此这两种方式获得的也是同一种类锁。
如下修改一下上面代码的两个方法:
public void addMoney(int money){//存钱 synchronized(BankCount.class){ int i = 5; while(i-- > 0){ System.out.println(Thread.currentThread().getName() + ">存入:" + money); try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } } } public static synchronized void getMoney(int money){//取钱 int i = 5; while(i-- > 0){ System.out.println(Thread.currentThread().getName() + ">取钱:" + money); try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } }
打印结果和上面一样。说明这两种方式获得的锁是同一种类锁。
类锁和对象锁是两种不同的锁对象,如果将addMoney方法改为普通的对象锁方式,继续测试,可以看到打印结果是交替进行的。
注:(1)一个线程获得了对象锁或者类锁,其他线程还是可以访问其他非同步方法,获得了锁只是阻止了其他线程访问使用相同锁的方法、代码块;
(2)一个获得了对象锁的线程,可以在该同步方法中继续去访问其他相同锁对象的同步方法,而不需要重新申请锁。
3、线程池ThreadPool相关
在java.util.concurrent包下,提供了一系列与线程池相关的类。合理的使用线程池,可以带来多个好处:
- 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗;
- 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行;
- 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
线程池可以应对突然大爆发量的访问,通过有限个固定线程为大量的操作服务,减少创建和销毁线程所需的时间。
与线程执行、线程池相关类的关系如图:
我们一般通过工具类Executors的静态方法(如newFixedThreadPool())来获取ThreadPoolExecutor线程池或静态方法(如newScheduledThreadPool())来获取ScheduleThreadPoolExecutor线程池。如下使用:
ExecutorService threadpool= Executors.newFixedThreadPool(10);
我们指定了获取10个数量的固定线程池,Executors中有很多重载的获取线程池的方法,比如可以通过自定义的ThreadFactory来为每个创建出来的Thread设置更为有意义的名称。Executors创建线程池的方法内部也就是new出新的ThreadPoolExecutor或ScheduleThreadPoolExecutor,给我们配置了很多默认的设置。如下:
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
上面通过ThreadPoolExecutor的构造方法,为我们创建了一个线程池,很多参数Executors工具类自动为我们配置好了。创建一个ThreadPoolExecutor线程池一般需要以下几个参数:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
(1)corePoolSize(线程池的基本大小):当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池基本大小时就不再创建。如果调用了线程池的prestartAllCoreThreads方法,线程池会提前创建并启动所有基本线程。
(2)maximumPoolSize(线程池最大大小):线程池允许创建的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。值得注意的是如果使用了无界的任务队列这个参数就没什么效果。
(3)keepAliveTime(线程活动保持时间):线程池的工作线程空闲后,保持存活的时间。所以如果任务很多,并且每个任务执行的时间比较短,可以调大这个时间,提高线程的利用率。
(4)TimeUnit(线程活动保持时间的单位):可选的单位有天(DAYS),小时(HOURS),分钟(MINUTES),毫秒(MILLISECONDS)等。
(5)workQueue(任务队列):用于保存等待执行的任务的阻塞队列。 可以选择以下几个阻塞队列:ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue、PriorityBlockingQueue
(6)threadFactory:用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更有意义的名字。
(7)handler(饱和策略):当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是AbortPolicy,表示无法处理新任务时抛出异常。
我们尽量优先使用Executors提供的静态方法来创建线程池,如果Executors提供的方法无法满足要求,再自己通过ThreadPoolExecutor类来创建线程池。
提交任务的两种方式:
(1)通过execute()方法,如:
ExecutorService threadpool= Executors.newFixedThreadPool(10); threadpool.execute(new Runnable(){...});
这种方式提交没有返回值,也就不能判断任务是否被线程池执行成功。
(2)通过submit()方法,如:
Future<?> future = threadpool.submit(new Runnable(){...}); try { Object res = future.get(); } catch (InterruptedException e) { // 处理中断异常 e.printStackTrace(); } catch (ExecutionException e) { // 处理无法执行任务异常 e.printStackTrace(); }finally{ // 关闭线程池 executor.shutdown(); }
使用submit 方法来提交任务,它会返回一个Future对象,通过future的get方法来获取返回值,get方法会阻塞住直到任务完成,而使用get(long timeout, TimeUnit unit)方法则会阻塞一段时间后立即返回,这时有可能任务没有执行完。
线程池工作流程分析
从上图我们可以看出,当提交一个新任务到线程池时,线程池的处理流程如下:
1、首先线程池判断基本线程池是否已满(< corePoolSize ?)?没满,创建一个工作线程来执行任务。满了,则进入下个流程。
2、其次线程池判断工作队列是否已满?没满,则将新提交的任务存储在工作队列里。满了,则进入下个流程。
3、最后线程池判断整个线程池是否已满(< maximumPoolSize ?)?没满,则创建一个新的工作线程来执行任务,满了,则交给饱和策略来处理这个任务。
也就是说,线程池优先要创建出基本线程池大小(corePoolSize)的线程数量,没有达到这个数量时,每次提交新任务都会直接创建一个新线程,当达到了基本线程数量后,又有新任务到达,优先放入等待队列,如果队列满了,才去创建新的线程(不能超过线程池的最大数maxmumPoolSize)。
关于线程池的配置原则可阅读参考文章。
ThreadPoolExecutor简单实例:
public class BankCount { public synchronized void addMoney(int money){//存钱 System.out.println(Thread.currentThread().getName() + ">存入:" + money); } public synchronized void getMoney(int money){//取钱 System.out.println(Thread.currentThread().getName() + ">取钱:" + money); } }
测试类:
public class BankTest { public static void main(String[] args) { final BankCount bankCount = new BankCount(); ExecutorService executor = Executors.newFixedThreadPool(10); executor.execute(new Runnable() {//存钱线程 @Override public void run() { int i = 5; while(i-- > 0){ bankCount.addMoney(200); try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } } }); Future<?> future = executor.submit(new Runnable() {//取钱线程 @Override public void run() { int i = 5; while(i-- > 0){ try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } bankCount.getMoney(200); } } }); try { Object res = future.get(); System.out.println(res); } catch (InterruptedException e) { // 处理中断异常 e.printStackTrace(); } catch (ExecutionException e) { // 处理无法执行任务异常 e.printStackTrace(); }finally{ // 关闭线程池 executor.shutdown(); } } }
打印结果如下:
pool-1-thread-1>存入:200
pool-1-thread-1>存入:200
pool-1-thread-2>取钱:200
pool-1-thread-1>存入:200
pool-1-thread-2>取钱:200
pool-1-thread-1>存入:200
pool-1-thread-2>取钱:200
pool-1-thread-1>存入:200
pool-1-thread-2>取钱:200
pool-1-thread-2>取钱:200
null
可以看到,打印出来的future.get()获取的结果为null,这是因为Runnable是没有返回值的,需要返回值要使用Callable,这里就不再细说了,具体可参考如下文章:
http://blog.csdn.net/xiaojin21cen/article/details/41820983
http://icgemu.iteye.com/blog/467848
4、生产者和消费者模型
生产者、消费者模型,描述是:有一块缓冲区作为仓库,生产者可以将产品放入仓库,消费者可以从仓库中取走产品。解决消费者和生产者问题的核心在于保证同一资源被多个线程并发访问时的完整性。一般采用信号量或加锁机制解决。下面介绍Java中解决生产者和消费者问题主要三种仿:
(1)wait() / notify()、notifyAll()
wait和notify方法是Object的两个方法,因此每个类都会拥有这两个方法。
wait()方法:使当前线程处于等待状态,放弃锁,让其他线程执行。
notify()方法:唤醒其他等待同一个锁的线程,放弃锁,自己处于等待状态。
如下例子:
/** * 仓库 */ public class Storage { private static final int MAX_SIZE = 100;//仓库的最大容量 private List<Object> data = new ArrayList<Object>();//存储载体 /** * 生产操作 */ public synchronized void produce(int num){ if(data.size() + num > MAX_SIZE){//如果生产这些产品将超出仓库的最大容量,则生产操作阻塞 System.out.println("生产操作-->数量:" + num + ",超出仓库容量,生产阻塞!------库存:" + data.size()); try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } //到这里,表示可以正常生产产品 for(int i = 0; i < num; i++){//生产num个产品 data.add(new Object()); } System.out.println("生产操作-->数量:" + num + ",成功入库~------库存:" + data.size()); //生产完产品后,唤醒其他等待消费的线程 notify(); } /** * 消费操作 */ public synchronized void consume(int num){ if(data.size() - num < 0){//如果产品数量不足 System.out.println("消费操作-->数量:" + num + ",库存不足,消费阻塞!------库存:" + data.size()); try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } //到这里,表示可以正常消费 for(int i = 0; i < num; i++){//消费num个产品 data.remove(0); } System.out.println("消费操作-->数量:" + num + ",消费成功~------库存:" + data.size()); //消费完产品后,唤醒其他等待生产的线程 notify(); } } 生产者: public class Producer implements Runnable{ private Storage storage; private int num;//每次生产多少个 public Producer(Storage sto,int num){ storage = sto; this.num = num; } @Override public void run() { storage.produce(num); } } 消费者: public class Consumer implements Runnable{ private Storage storage; private int num;//每次消费多少个 public Consumer(Storage sto,int num){ storage = sto; this.num = num; } @Override public void run() { storage.consume(num); } }
测试类:
public class StorageTest { public static void main(String[] args) { Storage storage = new Storage(); ExecutorService taskSubmit = Executors.newFixedThreadPool(10); //来使用使用上一节我们总结的线程池知识 //给定4个消费者 taskSubmit.submit(new Consumer(storage, 30)); taskSubmit.submit(new Consumer(storage, 10)); taskSubmit.submit(new Consumer(storage, 20)); //给定6个生产者 taskSubmit.submit(new Producer(storage, 70)); taskSubmit.submit(new Producer(storage, 10)); taskSubmit.submit(new Producer(storage, 20)); taskSubmit.submit(new Producer(storage, 10)); taskSubmit.submit(new Producer(storage, 10)); taskSubmit.submit(new Producer(storage, 10)); taskSubmit.shutdown(); } }
打印结果:
消费操作-->数量:30,库存不足,消费阻塞!------库存:0
生产操作-->数量:10,成功入库~------库存:10
生产操作-->数量:70,成功入库~------库存:80
生产操作-->数量:10,成功入库~------库存:90
生产操作-->数量:10,成功入库~------库存:100
生产操作-->数量:20,超出仓库容量,生产阻塞!------库存:100
消费操作-->数量:10,消费成功~------库存:90
生产操作-->数量:20,成功入库~------库存:110
生产操作-->数量:10,超出仓库容量,生产阻塞!------库存:110
消费操作-->数量:20,消费成功~------库存:90
消费操作-->数量:30,消费成功~------库存:60
生产操作-->数量:10,成功入库~------库存:70
在仓库中,唤醒我们使用的是notify()而没有使用notifyAll(),是因为在这里,如果测试数据设置不当很容易造成死锁(比如一下唤醒了所有的生产进程),因为使用wait和notify有一个缺陷:
逻辑本应该要这样设计的,在produce()操作后,只要唤醒等待同一把锁的消费者进程,在consume()后,唤醒等待同一把锁的生产者进程,而notify()或notifyAll()将生产者和消费者线程都唤醒了。下面的第二种方法可以解决这个问题。
wait和notify在“类消费者和生产者”问题上也很有用,比如,在A类的某个方法中调用了传进来的B对象的一个方法,A类方法的后面代码依赖于刚刚调用的B的返回值,但是B对象的这个方法是一个异步的操作,此时就可以在A方法中调用完B对象的方法后自我阻塞,即调用wait()方法,而在B对象的那个方法中,待异步操作完成后,调用notify(),唤醒处于等待同一锁对象的线程。如下:
A类的某个方法中:
XmppManager xmppManager = notificationService.getXmppManager(); if(xmppManager != null){ if(!xmppManager.isAuthenticated()){ try { synchronized (xmppManager) {//等待客户端连接认证成功 Log.d(LOGTAG, "wait for authenticated..."); xmppManager.wait(); } } catch (InterruptedException e) { e.printStackTrace(); } } //运行到此处,说明是认证成功的,有两种可能,一是运行速度很快调用notificationService.getXmppManager()后直接返回了结果,二是B中处理完了调用notify方法 Log.d(LOGTAG, "authenticated already. send SetTagsIQ now..."); B中处理完后: //客户端连接认证成功后,唤醒拥有xmppManager锁的对象 synchronized (xmppManager) { xmppManager.notifyAll(); }
(2)await() / signal()
在JDK1.5之引入concurrent包之后,新引入了await()和signal()方法来做同步,功能和wait()和notify()方法相同,可以完全取代,但await()和signal()需要和Lock机制(关于Lock机制前面已总结)结合使用,更加灵活。正如第一种所说,可以通过调用Lock的newCondition()方法依次获取两个条件变量,一个针对仓库空的,一个针对仓库满的条件变量,通过添加变量进行同步控制。
修改仓库类Storage:
/** * 仓库 */ public class Storage { private static final int MAX_SIZE = 100;//仓库的最大容量 private List<Object> data = new ArrayList<Object>();//存储载体 private Lock lock = new ReentrantLock();//可重入锁 private Condition full = lock.newCondition();//仓库满的条件变量 private Condition empty = lock.newCondition();//仓库空时的条件变量 /** * 生产操作 */ public void produce(int num){ lock.lock(); //加锁 if(data.size() + num > MAX_SIZE){//如果生产这些产品将超出仓库的最大容量,则生产操作阻塞 System.out.println("生产操作-->数量:" + num + ",超出仓库容量,生产阻塞!------库存:" + data.size()); try { full.await(); //阻塞 } catch (InterruptedException e) { e.printStackTrace(); } } //到这里,表示可以正常生产产品 for(int i = 0; i < num; i++){//生产num个产品 data.add(new Object()); } System.out.println("生产操作-->数量:" + num + ",成功入库~------库存:" + data.size()); //生产完产品后,唤醒其他等待消费的线程 empty.signalAll(); lock.unlock(); //释放锁 } /** * 消费操作 */ public void consume(int num){ lock.lock(); //加锁 if(data.size() - num < 0){//如果产品数量不足 System.out.println("消费操作-->数量:" + num + ",库存不足,消费阻塞!------库存:" + data.size()); try { empty.await(); //阻塞 } catch (InterruptedException e) { e.printStackTrace(); } } //到这里,表示可以正常消费 for(int i = 0; i < num; i++){//消费num个产品 data.remove(0); } System.out.println("消费操作-->数量:" + num + ",消费成功~------库存:" + data.size()); //消费完产品后,唤醒其他等待生产的线程 full.signalAll(); lock.unlock(); //释放锁 } }
打印结果:
消费操作-->数量:30,库存不足,消费阻塞!------库存:0
消费操作-->数量:10,库存不足,消费阻塞!------库存:0
消费操作-->数量:20,库存不足,消费阻塞!------库存:0
生产操作-->数量:70,成功入库~------库存:70
生产操作-->数量:10,成功入库~------库存:80
生产操作-->数量:10,成功入库~------库存:90
生产操作-->数量:10,成功入库~------库存:100
生产操作-->数量:10,超出仓库容量,生产阻塞!------库存:100
消费操作-->数量:30,消费成功~------库存:70
消费操作-->数量:10,消费成功~------库存:60
消费操作-->数量:20,消费成功~------库存:40
生产操作-->数量:10,成功入库~------库存:50
生产操作-->数量:20,成功入库~------库存:70
使用await和signal后,加锁解锁操作就交给了Lock,不用再使用synchronized同步(具体可看前面总结的同步的实现方法),在produce中满仓后阻塞,生产完后唤醒等待的消费线程,consume中库存不足后阻塞,消费完后唤醒等待的生产者线程,表示可以消费了。
(3)BlockingQueue阻塞队列方式
在上一节关于线程池的总结中,我们看到了要创建一个线程池如ThreadPoolExecutor,需要传入一个任务队列即BlockingQueue,BlockingQueue(接口)用于保存等待执行的任务的阻塞队列。 可以选择以下几个阻塞队列:ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue、PriorityBlockingQueue。
>ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。
>LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO (先进先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。
>SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。
>PriorityBlockingQueue:一个具有优先级的无限阻塞队列。
BlockingQueue的所有实现类内部都是已经实现了同步的队列,实现的方式采用的是上面介绍的第二种await()/signal() + Lock同步的机制。在生成阻塞队列时,可以指定队列大小。用于阻塞操作的方法主要为:
put()方法:插入一个元素,如果超过容量则自我阻塞,等待唤醒;
take()方法:取走一个元素,如果容量不足了,自我阻塞,等待唤醒;
put和take内部自己实现了await和signal、lock的机制处理,不再需要我们做相应操作。修改Storage代码如下:
public class Storage { private static final int MAX_SIZE = 100;//仓库的最大容量 private BlockingQueue<Object> data = new LinkedBlockingQueue<Object>(MAX_SIZE); //使用阻塞队列作为存储载体 /** * 生产操作 */ public void produce(int num){ if(data.size() == MAX_SIZE){//如果仓库已达最大容量 System.out.println("生产操作-->仓库已达最大容量!"); } //到这里,表示可以正常生产产品 for(int i = 0; i < num; i++){//生产num个产品 try { data.put(new Object()); //put内部自动实现了判断,超过最大容量自动阻塞 } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("生产操作-->数量:" + num + ",成功入库~------库存:" + data.size()); } /** * 消费操作 */ public void consume(int num){ if(data.size() == 0){//如果产品数量不足 System.out.println("消费操作--库存不足!"); } //到这里,表示可以正常消费 for(int i = 0; i < num; i++){//消费num个产品 try { data.take(); //take内部自动判断,消耗后库存是否充足,不足自我阻塞 } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("消费操作-->数量:" + num + ",消费成功~------库存:" + data.size()); } }
打印结果:
消费操作--库存不足!
消费操作--库存不足!
消费操作--库存不足!
生产操作-->数量:70,成功入库~------库存:45
消费操作-->数量:30,消费成功~------库存:45
生产操作-->数量:10,成功入库~------库存:56
生产操作-->数量:20,成功入库~------库存:75
生产操作-->数量:10,成功入库~------库存:85
生产操作-->数量:10,成功入库~------库存:89
消费操作-->数量:10,消费成功~------库存:60
生产操作-->数量:10,成功入库~------库存:70
消费操作-->数量:20,消费成功~------库存:70
可以看到,Storage中produce和consume方法中我们直接通过put和take方法往容器中添加或移除产品即可,没有进行逻辑控制(其实上面两个方法中if都可以去掉,只是为了打印效果才加上的),这是因为BlockingQueue内部已经实现了,不需要我们再次控制。
同时,我们看到打印的库存信息出现了不匹配,这个主要是因为我们的打印语句Systm.out.println()没有被同步导致的,因为同步语句只是在put和take方法内部,而我们打印语句中使用了data这个共享变量。这里因为我们需要看效果,所以才加的打印语句,并不影响我们对BlockingQueue的使用。
因此,在Java中,使用BlockingQueue阻塞队列的方式可以很方便的为我们处理生产者消费则问题,推荐使用。
在我们的编程生涯中,我们自己要去写生产者和消费者问题,多是前面第一种介绍的“类似消费者生产者问题”上。
解决生产者和消费者问题还有管道的方式,即在生产者和消费者之间建立一个管道缓冲区,Java中用PipedInputStream / PipedOutputStream实现,由于这种方式对于传输对象不易封装,因此实用性不高,就不具体介绍了。
5、sleep和wait的区别
sleep是Thread的静态方法,wait是Object的方法。两个方法都会暂停当前线程
(1)sleep使当前线程阻塞,让出CPU,给其他线程执行的机会;如果当前线程拥有锁,不会释放锁,也即“睡着我也要拥有锁”。睡眠时间一到,进入就绪状态,如果当前CPU空闲,才会继续执行。
(2)wait方法调用后,当前线程进入阻塞状态,进入到和该对象(即谁调用了wait()方法,如list.wait())相关的等待池中。,让出CPU,给其他线程执行的机会;当超时间过了或者别的线程调用了notify()或notifyAll()方法时才会唤醒当前等待同一把锁的线程。
(3)wait方法必须要放在同步块中,如syncbronized或Lock同步中。
所以sleep和wait的主要区别是:
sleep:保持锁,睡眠时间到进入就绪状态;
wait:释放锁,等待其他线程的notify操作或超时唤醒。
【参考文章:
http://www.codeceo.com/article/threadlocal-usage.html
http://www.codeceo.com/article/java-multi-thread-sync.html
http://outofmemory.cn/java/java.util.concurrent/synchronized-locks-Lock-ReentrantLock
http://outofmemory.cn/java/java.util.concurrent/thread-sync-with-object-wait-notify-notifyAll
http://outofmemory.cn/java/java.util.concurrent/lock-reentrantlock-condition
http://uule.iteye.com/blog/1488356
http://langgufu.iteye.com/blog/2152608
http://itfish.net/article/22032.html
https://blog.csdn.net/shakespeare001/article/details/51321498
】