背景
憋了这么久终于可以学到并发了,其实并发很迷人又让人头疼的地方就是如何决解多个线程访问同一个资源而带来的
线程间互相干扰的问题。
1.并发的多面性
并发是用于多处理器编程的基本工具,用并发解决的问题可以分为“速度”和“设计可管理性”。
(1)速度的提高是以多核处理器的形式而不是更快的芯片形式出现的。但是,并发通常是提高运行在单处理器上的程序的
性能,原因如下所述:
如果程序中的某个任务因为该程序控制范围之外的某些条件(通常是I/O)而导致不能继续执行,那么我们就说这个任务
或线程阻塞了。如果没有并发,整个程序就将停止下来,直到外部条件发生变化。使用并发编程的话,当一个任务阻塞
了,程序中的其他任务还可以继续执行。从性能角度看,如果任务不会阻塞,那么单处理器机器上使用并发就没有任何
意义。可以使用并发来长生具有可响应的用户界面。
某些编程语言被设计为可以将并发任务彼此隔离,这些语言通常叫函数型语言,例如Erlang。
(2)并发提供了一个重要的组织结构上的好处:你的程序设计可以极大地简化,例如仿真。
(3)java的线程机制是抢占式的,这表示调度机制会周期性地中断线程,将上下文切换到另一个线程(上下文切换的意思
是从从一个任务切换到另一个任务),从而为每个线程都提供时间片,使得每个线程都会分配到数量合理的时间去驱动
它的任务。
2.基本的线程机制
并发编程使我们可以将程序划分为多个分离的、独立运行的任务。通过使用多线程机制,这些独立任务中的每一个都将
由执行线程来驱动。在使用线程时,cpu将轮流给每个人物分配其占用时间,每个任务都觉得自己在一直占用cpu,但事
实上cpu时间是划分成片段分配给了所有的任务(例外的情况是程序确实运行在多个cpu之上)。多任务和多线程是使用多
处理器系统的最合理方式。
(1)定义任务,实现Runnable接口并重写run()方法。
public class LiftOff implements Runnable {
public void run(){
}
}
Thread.yield()的调用是对线程调度器(java线程机制的一部分,可以将cpu从一个线程转移给另一个线程)的一种建议。
(2)Thread类:
public class BasicThreads {
public static void main(String[] args){
Thread t=new Thread(new LiftOff());
t.start();
}
}
如果在使用Thread类的时候没有捕获该对象的引用,那么垃圾回收器无法清除它,因为Thread“注册”了它自己,存在一个对
它的引用。
(3)Executor(执行器):可以管理Thread对象。ExecutorService(具有服务生命周期的Executor,例如关闭)知道如何构建恰当
的上下文来执行Runnable对象。
import java.util.concurrent.*;
import java.util.*;
public class CachedThreadPool {
public static void main(String[] args){
ExecutorService exec=Executors.newCachedThreadPool();
for(int i=0;i<5;i++){
exec.execute(new LiftOff());
}
exec.shutdown();
}
}
FixedThreadPool使用了有限的线程集来执行所提交的任务,可以一次性预先执行代价高昂的线程分配,可以节省时间,不用
为每个任务都固定地付出创建线程的开销。
注意在任何线程池中,现有线程在可能的情况下,都会被自动复用。CachedThreadPool在程序执行过程中通常会创建与所需
数量相同的线程,然后在它回收旧线程时停止创建新线程。
SingleThreadExecutor就像是数量为1的FixedThreadPool。如果向SingleThreadExecutor提交了多个任务,那么这些任务将排
队,每个任务都会在下一个任务开始之前运行结束,所有的任务将使用相同的线程。因此SingleThreadExecutor会序列化所有
提交给它的任务,并会维护自己(隐藏)的悬挂任务队列。
(4)从任务中产生返回值:实现Callable接口,重写call()方法,而且必须用ExecutorService.submit()方法调用它。
class TaskWithResult implements Callable<String>{
...
public String call(){
return "result of TaskWithResult "+id;
}
}
public class CallableDemo {
public static void main(String[] args){
ExecutorService exec=Executors.newCachedThreadPool();
Future<String> future=exec.submit(new TaskWithResult(10));
...
}
}
(5)休眠:调用sleep()方法。使任务中止执行给定的时间。
package concurrency;
import java.util.concurrent.*;
public class SleepingTask extends LiftOff{
public void run(){
try{
while(countDown-- >0){
System.out.print(status());
TimeUnit.MILLISECONDS.sleep(100);
}
}catch(InterruptedException e){
System.err.println("Interrupted");
}
}
public static void main(String[] args){
ExecutorService exec=Executors.newCachedThreadPool();
for(int i=0;i<5;i++)
exec.execute(new SleepingTask());
exec.shutdown();
}
}
(6)优先级:调度器将倾向于让优先权最高的线程先执行。使用getPriority()和setPriority()来执获取与设置优先级。
Thread t=new Thread(r);
t.setPriority(priority);
Thread.currentThread().setPriority(priority);
使用Thread.toString()打印线程的名称、线程的优先级以及线程所属的“线程组”。
惟一可移植的方法是当调整优先级的时候,只使用MAX_PRIORITY、NORM_PRIORITY、MIN_PRIORITY 。
(7)让步:使用yield()方法,建议具有相同优先级的其他线程可以运行,这只是一个暗示,没有任何机制保证它将会被采纳。
(8)后台线程:指在程序运行的时候在后台提供一种通用服务的线程。当所有的非后台线程结束时,程序也就终止了,同时
会杀死进程中的所有后台线程。执行main()的就是一个非后台线程。使用setDaemon()将线程设置为后台线程。
Thread daemon=new Thread(new SimpleDaemons());
daemon.setDaemon(true);// Must call before start();
daemon.start();
通过编写定制的ThreadFactory可以定制由Executor创建的线程的属性(后台,优先级,名称):
package tools;
import java.util.concurrent.*;
public class DaemonThreadFactory implements ThreadFactory {
@Override
public Thread newThread(Runnable r) {
Thread t=new Thread(r);
t.setDaemon(true);
return t;
}
}
package concurrency;
import java.util.concurrent.*;
import tools.*;
import static tools.Print.*;
public class DaemonFromFactory implements Runnable{
public void run(){
try{
while(true){
TimeUnit.MILLISECONDS.sleep(100);
print(Thread.currentThread()+" "+this);
}
}catch(InterruptedException e){
print("Interupted");
}
}
public static void main(String[] args)throws Exception{
ExecutorService exec=Executors.newCachedThreadPool(new DaemonThreadFactory());
for(int i=0;i<10;i++)
exec.execute(new DaemonFromFactory());
print("All daemons started");
TimeUnit.MILLISECONDS.sleep(500); //Run for a while
}
}
通过isDaemon()方法来确定是否是一个后台线程,如果是一个后台线程,那么它创建的任何线程将被自动设置成后台线程。
后台进程可能会在不执行finally子句的情况下就会终止其run()方法,因为当最后一个非后台线程终止时,后台线程会“突然”
中止,finally子句有可能就不会执行到。
(9)编码的变体:可以直接从Thread继承
public class SimpleThread extends Thread{
private int countDown=5;
private static int threadCount=0;
public SimpleThread(){
//Store the thread name:
super(Integer.toString(++threadCount));
start();
}
public String toString(){
return "#"+getName()+"("+countDown+"), ";
}
public void run(){
while(true){
System.out.print(this);
if(--countDown==0)
return;
}
}
public static void main(String[] args){
for(int i=0;i<5;i++)
new SimpleThread();
}
}
另一种是自管理的Runnable:实现接口的好处是可以继承另一个不同的类。
package concurrency;
public class SelfManaged implements Runnable {
private int countDown =5;
private Thread t=new Thread(this);
public SelfManaged(){ t.start();}
public String toString(){
return Thread.currentThread().getName()+"("+countDown+"), ";
}
public void run(){
while(true){
System.out.print(this);
if(--countDown==0)
return;
}
}
public static void main(String[] args){
for(int i=0;i<5;i++)
new SelfManaged();
}
}
有时通过使用内部类来将线程代码隐藏在类中也挺有用。
public InnerThread2(String name){
t=new Thread(name){
public void run(){
try{
while(true){
print(this);
if(--countDown==0)return ;
sleep(10);
}
}catch(InterruptedException e){
print("sleep() interrupted");
}
}
public String toString(){
return getName()+": "+countDown;
}
};
t.start();
}
public InnerRunnable2(String name){
t=new Thread(new Runnable(){
public void run(){
try{
while(true){
print(this);
if(--countDown==0) return;
TimeUnit.MILLISECONDS.sleep(10);
}
}catch(InterruptedException e){
print("Sleep() interrupted");
}
}
public String toString(){
return Thread.currentThread().getName()+": "+countDown;
}
},name);
t.start();
}
(10)加入一个线程:一个线程可以在其他线程之上调用join()方法,作用是等待一段时间直到第二个线程结束才继续执行。如果
某个线程在另一个线程t上调用t.join(),此线程将被挂起,直到目标线程t结束才恢复。
(11)捕获异常:Thread.UncaughtExceptionHandle接口,允许在每个Thread对象上附着一个异常处理器。Thread.UncaughtExceptionHandle.uncaughtException()会在线程因未捕获异常而临近死亡时被调用。
package concurrency;
import java.util.concurrent.*;
class ExceptionThread2 implements Runnable{
public void run(){
Thread t=Thread.currentThread();
System.out.println("run() by "+t);
System.out.println("eh= "+t.getUncaughtExceptionHandler());
throw new RuntimeException();
}
}
class MyUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler{
public void uncaughtException(Thread t,Throwable e){
System.out.println("caught "+e);
}
}
class HandlerThreadFactory implements ThreadFactory{
public Thread newThread(Runnable r){
System.out.println(this+" creating new Thread");
Thread t=new Thread(r);
System.out.println("create "+t);
t.setUncaughtExceptionHandler(new MyUncaughtExceptionHandler());
System.out.println("en = "+t.getUncaughtExceptionHandler());
return t;
}
}
public class CaptureUncaughtException {
public static void main(String[] args){
ExecutorService exec=Executors.newSingleThreadExecutor(new HandlerThreadFactory());
exec.execute(new ExceptionThread2());
}
}
另一种用法是:将这个处理器设置为默的未捕获异常处理器
package concurrency;
import java.util.concurrent.*;
public class SettingDefaultHandler {
public static void main(String[] args){
Thread.setDefaultUncaughtExceptionHandler(new MyUncaughtExceptionHandler());
ExecutorService exec=Executors.newCachedThreadPool();
exec.execute(new ExceptionThread());
}
}
3.共享受限资源
(1)解决共享资源竞争:采用序列化访问共享资源的方案,在给定时刻只允许一个任务访问共享资源。通常是通过在代码前面
加上一条锁语句来实现的。锁语句产生了一种互相排斥的效果,所以这种机制常常称为互斥量。
$1.使用关键字synchronized:当任务要执行被synchronized关键字保护的代码片段的时候,它将检查锁是否可用,然后获取锁,
执行代码,释放锁。
public synchronized int next(){
++currentEvenValue;
Thread.yield(); //Cause faulure faster
++currentEvenValue;
return currentEvenValue;
}
要控制都共享资源访问,得先把它包装进一个对象。然后把所有要访问这个资源的方法标记为synchronized。对某个特定对象
来说,其所有synchronized方法共享同一个锁。将域设置为private是非常重要的,防止其他任务直接访问域。
一个任务可以多次获得对象的锁,如果一个方法在同一个对象上调用了第二个方法,后者又调用了同一个对象上的另一个方法,
就会发生这种情况,JVM负责跟中对象被加锁的次数,当计数为0时才完全释放锁。只有首先获得了锁的任务才能继续获得更多
的锁。
$2.使用显式Lock对象
private Lock lock=new ReentrantLock();
public int next(){
lock.lock();
try{
++currentEvenValue;
Thread.yield(); //Cause failure faster
++currentEvenValue;
return currentEvenValue;
}finally{
lock.unlock();
}
}
使用lock.tryLock()方法可以尝试获取锁。
(2)Brian同步规则:如果你正在写一个变量,它可能接下来将被另一个线程读取,或者正在读取一个上一次已经被另一个线程写过的变量,那么你必须使用同步,并且,读写线程都必须用相同的监视器锁同步。
(3)原子性与易变性:原子性可以应用于除long和double之外的所有基本类型之上的”简单操作“,对于读取和写入除了long和double之外的基本类型变量这样的操作,可以保证它们会被当作不可分(原子)的操作来操作内存。当你定义long或double变量时,如果使用volatile关键字,就会获得原子性 。
(4)在多处理器系统上,相对于单处理器系统而言,可视性问题远比原子性问题多得多。一个任务做出的修改,即使在不中断的意义上讲是原子性的,对其他任务也可能是不可视的(例如:修改只是暂时性地存储在本地处理器的缓存中),因此不同的任务对应用的状态有不同的视图。另一方面,同步机制强制在处理系统中,一个任务做出的修改必须在应用中可视的。volatile关键字还确保了应用中的可视性,volatile域会立即被写入到主存中,而读取操作就发生在主存中。当一个域的值依赖于它之前的值时,volatile就无法工作了。如果某个域的值受到其他域的值的限制,volatile也无法工作。使用volatile而不是synchronized的唯一安全的情况是类中只有一个可变的域。
(5)原子类:例如AtomicInteger、AtomicLong、AtomicReference等。在涉及性能调优时,就比较有用。
(6)临界区:希望防止多个线程同时访问方法内部的部分代码而不是防止访问整个方法。通过这种方式分离出来的代码段被称为临界区。使用synchronized关键字建立。这种方式有助于提高性能。
synchronized(syncObject){
//This code can be accessed
//by only one task at a time
}
(6)在其他对象上同步:两个任务可以同时进入同一个对象,只要这个对象上的方法是在不同的锁上同步的即可:
package concurrency;
import static tools.Print.*;
class DualSynch{
private Object syncObject =new Object();
public synchronized void f(){
for(int i=0;i<5;i++){
print("f()");
Thread.yield();
}
}
public void g(){
synchronized(syncObject){
for(int i=0;i<5;i++){
print("g()");
Thread.yield();
}
}
}
}
public class SyncObject {
public static void main(String[] args){
final DualSynch ds=new DualSynch();
new Thread(){
public void run(){
ds.f();
}
}.start();
ds.g();
}
}
(7)线程本地存储:根除对变量的共享,可以为使用相同变量的每个不同的线程都创建不同的存储。使用ThreadLocal类来实现。
package concurrency;
import java.util.concurrent.*;
import java.util.*;
class Accessor implements Runnable{
private final int id;
public Accessor(int idn){ id=idn;}
public void run(){
while(!Thread.currentThread().isInterrupted()){
ThreadLocalVariableHolder.increment();
System.out.println(this);
Thread.yield();
}
}
public String toString(){
return "#"+id+": "+ThreadLocalVariableHolder.get();
}
}
public class ThreadLocalVariableHolder {
private static ThreadLocal<Integer> value=new ThreadLocal<Integer>(){
private Random rand=new Random(47);
protected synchronized Integer initialValue(){
return rand.nextInt(10000);
}
};
public static void increment(){
value.set(value.get()+1);
}
public static int get(){ return value.get();}
public static void main(String[] args)throws Exception{
ExecutorService exec=Executors.newCachedThreadPool();
for(int i=0;i<5;i++)
exec.execute(new Accessor(i));
TimeUnit.SECONDS.sleep(3); //Run for a while
exec.shutdownNow(); //All Accessors will quit
}
}
4.终结任务
(1)ExecutorService.awaitTermination()方法等待每个任务结束,如果所有任务在超时时间达到之前全部结束,返回true;否则返回false。
exec.shutdownNow();
if(!exec.awaitTermination(1000, TimeUnit.MILLISECONDS))
print("Some tasks were not terminated!");
(2)在阻塞时终结:有时候你必须终止被阻塞的任务。
一个线程可以处于以下四种状态之一:
¥新建(new):当线程被创建时,它只会短暂地处于这种状态。此时它已经分配了必需的系统资源,并执行了初始化。此刻线程已经有资格获得cup时间了,之后调度器将把这个线程转变为可运行状态或阻塞状态。
¥就绪(Runnable):在这种情况下,只要调度器把时间片分配给线程,线程就可以运行。也就是说,在任意时刻,线程可以运行也可以不运行。只要调度器能分配时间片给线程,它就可以运行,这不用于死亡和阻塞状态。
¥阻塞(Blocked):线程能够运行,但有某个条件阻止它的运行。当线程处于阻塞状态时,调度器将忽略线程,不会分配给线程任何cpu时间。直到线程重新进入了就绪状态,它才有可能执行操作。
¥死亡(Dead):处于死亡或终止状态的线程将不再是可调度的,并且再也不会得到cpu时间,它的任务已结束,或不再是可运行的。任务死亡的通常方式是从run()方法返回,但是任务的线程还是可以被中断。
一个任务进入阻塞状态,可能有如下原因:
¥通过调用sleep()使任务进入休眠状态,在这种情况下,任务在指定时间内不会运行。
¥通过wait()使线程挂起,直到线程得到了notify()或notifyAll()消息(signal()或signalAll()消息),线程才会进入就绪状态。
¥任务在等待某个输入/输出完成
¥任务视图在某个对象上调用其同步控制方法,但是对象锁不可用,因为另一个任务已经获得了这个锁。
(3)中断:使用interrupt()方法,可以终止被阻塞的任务,这个方法将设置线程的中断状态。如果一个线程已经被阻塞,或者试图执行一个阻塞操作,那么设置这个线程的中断状态将抛出InterruptedException异常。当抛出异常或调用Thread.interrupted()时,中断状态被复位。在Executor上调用shutdownNow(),那么它将发送一个interrupt()调用给它启动的所有线程。cancel()是一种中断由Executor启动的单个线程的方式。
Future<?> f=exec.submit(r);
f.cancel(true);
I/O和在synchronized块上的等待是不可以中断的。对于这类问题,有效的办法是关闭任务在其上发生阻塞的底层资源。
同一个互斥可以被同一个任务多次获得:
package concurrency;
import static tools.Print.*;
public class MultiLock {
public synchronized void f1(int count){
if(count-- >0){
print("f1() calling f2() with count"+count);
f2(count);
}
}
public synchronized void f2(int count){
if(count-->0){
print("f2() calling f1() with count"+count);
f1(count);
}
}
public static void main(String[] args) throws Exception{
final MultiLock multiLock=new MultiLock();
new Thread(){
public void run(){
multiLock.f1(10);
}
}.start();
}
}
在ReentrantLock上阻塞的任务具备可以被中断的能力:
private Lock lock=new ReentrantLock();
lock.lockInterruptibly(); //除非当前线程被中断,否则获取锁定 。
(4)检查中断:调用interrupted()来检查中断状态,还可以清除中断状态。下面是经典用法:
try{
while(!Thread.interrupted()){...}
}catch(InterruptedException e){...}
5.线程之间的协作
(1)wait()与notifyAll():wait()会在等待外部世界产生变化的时候将任务挂起,并且只有在notify()或notifyAll()发生时,即表示发生了某些感兴趣的事物,这个任务才会被唤醒并去检查所发生的变化。调用sleep()的时候锁并没有被释放。调用yield()也是这样。调用wait()的时候,线程的执行被挂起,对象上的锁被释放。只能在同步控制方法或同步控制块中使用wait()、notify()和notifyAll()
有两种形式的wait():
¥接受毫秒数作为参数,用来指此期间暂停。与sleep()不同的是,对wait()而言:在wait()期间对象锁是释放的,可以通过notify()、notifyAll(),或者令时间到期,从wait()中恢复执行。
¥wait()不接受任何参数,表示无限等待,直到线程接收到notify()或者notifyAll()消息。
注意:检查所感兴趣的特定条件,并在条件不满足的情况下返回到wait()中,常用的方法是使用while来编写这种代码。
(2)notify()与notifyAll():使用notify()而不是notifyAll()是一种优化。使用notify()时,在众多等待同一个锁的任务中只有一个会被唤醒。为了使用notify(),所有任务必须等待相同的条件。如果使用notify(),当条件发生变化时,必须只有一个任务能够从中受益。
notifyAll()因某个特定锁而被调用时,只有等待这个锁的任务才会被唤醒。
Timer类:线程的工具,用于在后台线程中安排将来执行的任务。可以将任务安排为一次性执行,或者以固定间隔重复执行。
(3)生产者和消费者:使用显式的Lock和Condition对象,通过Condition.await()挂起一个任务,signal()来唤醒任务或者signalAll()
唤醒所有在这个Condition上被其自身挂起的任务。
private Lock lock=new ReentrantLock();
private Lock lock=new ReentrantLock();
private Condition condition=lock.newCondition();
private boolean waxOn=false;
public void waxed(){
lock.lock();
try{
waxOn=true;
condition.signalAll();
}finally{
lock.unlock();
}
}
...
public void waitForWaxing() throws InterruptedException{
lock.lock();
try{
while(waxOn==false)
condition.await();
}finally{
lock.unlock();
}
}
(4)生产者与消费者队列:同步队列可以用来解决任务协作问题,同步队列在任何时刻都只允许一个任务插入或移除元素。java.util.concurrent.BlockingQueue接口提供了这个队列。LinkedBlockingQueue,它是一个无界队列;ArrayBlockingQueue,它有固定的尺寸,因此你可以在它被阻塞之前,向其中放置有限数量的元素。
如果消费者任务试图从队列中获取对象,而该队列此时为空,那么这些队列还可以挂起消费者任务,并且当有更多的元素可用时恢复消费者任务。
(5)任务间使用管道进行输入/输出:PipedWriter类(允许任务向管道写)和PipedReader类(允许不同任务从同一个管道中读取)。这个模型可以看成是“生产者与消费者”问题的变体,这里的管道是一个封装好的解决方案。管道基本上是一个阻塞队列。
package concurrency;
import java.util.concurrent.*;
import java.io.*;
import java.util.*;
import static tools.Print.*;
class Sender implements Runnable{
private Random rand=new Random(47);
private PipedWriter out=new PipedWriter();
public PipedWriter getPipedWriter(){ return out;}
public void run(){
try{
while(true)
for(char c='A';c<='z';c++){
out.write(c);
TimeUnit.MILLISECONDS.sleep(rand.nextInt(500));
}
}catch(IOException e){
print(e+" Sender write exception");
}catch(InterruptedException e){
print(e+ " Sender sleep interrupted");
}
}
}
class Receiver implements Runnable{
private PipedReader in;
public Receiver(Sender sender )throws IOException{
in=new PipedReader(sender.getPipedWriter());
}
public void run(){
try{
while(true){
printnb("Read: "+(char)in.read()+", ");
}
}catch(IOException e){
print(e+" Receiver read exception");
}
}
}
public class PipedIO {
public static void main(String[] args)throws Exception{
Sender sender=new Sender();
Receiver receiver=new Receiver(sender);
ExecutorService exec=Executors.newCachedThreadPool();
exec.execute(sender);
exec.execute(receiver);
TimeUnit.SECONDS.sleep(5);
exec.shutdownNow();
}
}
6.死锁
某个任务在等待另一个任务,而后者又等待别的任务,这样一直下去,直到这个链条上的任务又在等待第一个任务释放锁。这得到了一个任务之间相互等待的连续循环,没有哪个线程能继续,这被称为死锁。
哲学家就餐问题就是一个经典的死锁例证。
当以下四个条件同时满足时,就会发生死锁:
¥互斥条件。任务使用的资源中至少有一个是不能共享的;
¥至少有一个任务它必须持有一个资源且在等待获取一个当前被别的任务持有的资源;
¥资源不能被任务抢占,任务必须把资源释放当作普通事件;
¥必须有循环等待,这时,一个任务等待其他任务所持有的资源,后者又在等待另一个任务所持有的资源,这样一直下去,直到有一个任务在等待第一个任务所持有的资源,使得大家都被锁住。
所以要防止死锁的话,只要破环其中一个条件即可。最容易的办法是破坏第四个条件
7.新类库中的构件
(1)CountDownLatch:它被用来同步一个或多个任务,强制他们等待由其他任务执行的一组操作。可以向CountDownLactch对象设置一个初始计数值,任何在这个对象上调用await()的方法都将阻塞,直至这个计数值达到0 。其他任务在结束其工作时,可以在该对象上调用countDown()来减小这个计数值。CountDownLatch()只能触发一次,计数值不能被重置。如果需要能够重置计数值得版本,可以使用CyclicBarrier。典型的用法是将一个程序分为n个互相独立的可解决任务,并创建值为0的CountDownLatch。
(2)CyclicBarrier:适用于这样的情况:你希望创建一组任务,它们并行地执行工作,然后在进行下一个步骤之前等待,直至所有任务都完成。下面展示一个仿真的例子,赛马游戏:
package concurrency;
import java.util.concurrent.*;
import java.util.*;
import static tools.Print.*;
class Horse implements Runnable{
private static int counter=0;
private final int id=counter++;
private int strides=0;
private static Random rand=new Random(47);
private static CyclicBarrier barrier;
public Horse(CyclicBarrier b){barrier=b;}
public synchronized int getStrides(){ return strides;}
public void run(){
try{
while(!Thread.interrupted()){
synchronized(this){
strides+=rand.nextInt(3); //Produces 0,1or2
}
barrier.await();
}
}catch(InterruptedException e){
}catch(BrokenBarrierException e){
throw new RuntimeException(e);
}
}
public String toString(){ return "Horse "+id+" ";}
public String tracks(){
StringBuilder s=new StringBuilder();
for(int i=0;i<getStrides();i++)
s.append("*");
s.append(id);
return s.toString();
}
}
public class HorseRace {
static final int FINISH_LINE=75;
private List<Horse> horses=new ArrayList<Horse>();
private ExecutorService exec=Executors.newCachedThreadPool();
private CyclicBarrier barrier;
public HorseRace(int nHorses,final int pause){
barrier=new CyclicBarrier(nHorses,new Runnable(){
public void run(){
StringBuilder s=new StringBuilder();
for(int i=0;i<FINISH_LINE;i++)
s.append("="); //The fence on the racetrack
print(s);
for(Horse horse:horses)
print(horse.tracks());
for(Horse horse:horses)
if(horse.getStrides()>=FINISH_LINE){
print(horse+"won!");
exec.shutdownNow();
return;
}
try{
TimeUnit.MILLISECONDS.sleep(pause);
}catch(InterruptedException e){
print("barrier-action sleep interrupted");
}
}
});
for(int i=0;i<nHorses;i++){
Horse horse=new Horse(barrier);
horses.add(horse);
exec.execute(horse);
}
}
public static void main(String[] args){
int nHorses=7;
int pause=200;
if(args.length>0){
int n=new Integer(args[0]);
nHorses=n>0?n:nHorses;
}
if(args.length>1){
int p=new Integer(args[1]);
pause=p>-1?p:pause;
}
new HorseRace(nHorses,pause);
}
}
可以向CyclicBarrier提供一个“栅栏动作”,它是一个Runnable,当计数值到达0时自动执行。上面的栅栏动作是作为匿名内部类创建的,它被提交给了CyclicBarrier的构造器。
(3)DelayQueue:这是一个无界的BlockingQueue,用于放置实现了Delayed接口的对象,其中的对象只能在其到期时才能从队列中取走。这种队列是有序的,即队头对象的延迟到期的时间最长。不能将null放置到这种队列中。这是一种优先级队列的变体。
package concurrency;
import java.util.concurrent.*;
import java.util.*;
import static tools.Print.*;
import static java.util.concurrent.TimeUnit.*;
class DelayedTask implements Runnable,Delayed{
private static int counter=0;
private final int id=counter++;
private final int delta;
private final long trigger;
protected static List<DelayedTask> sequence=new ArrayList<DelayedTask>();
public DelayedTask(int delayInMilliseconds){
delta=delayInMilliseconds;
trigger=System.nanoTime()+NANOSECONDS.convert(delta,MILLISECONDS);
sequence.add(this);
}
public long getDelay(TimeUnit unit){
return unit.convert(trigger-System.nanoTime(),NANOSECONDS);
}
public int compareTo(Delayed arg){
DelayedTask that=(DelayedTask)arg;
if(trigger<that.trigger) return -1;
if(trigger>that.trigger) return 1;
return 0;
}
public void run(){ printnb(this+" ");}
public String toString(){
return String.format("[%1$-4d]", delta)+" Task "+id;
}
public String summary(){
return "("+id+": "+delta+")";
}
public static class EndSentinel extends DelayedTask{
private ExecutorService exec;
public EndSentinel(int delay,ExecutorService e){
super(delay);
exec=e;
}
public void run(){
for(DelayedTask pt: sequence){
printnb(pt.summary()+" ");
}
print();
print(this+" Calling shutdownNow()");
exec.shutdownNow();
}
}
}
class DelayedTaskConsumer implements Runnable{
private DelayQueue<DelayedTask> q;
public DelayedTaskConsumer(DelayQueue<DelayedTask> q){
this.q=q;
}
public void run(){
try{
while(!Thread.interrupted())
q.take().run(); //run task with the current thread
}catch(InterruptedException e){
//Acceptable way to exit
}
print("Finished DelayedTaskConsumer");
}
}
public class DelayQueueDemo {
public static void main(String[] args){
Random rand=new Random(47);
ExecutorService exec=Executors.newCachedThreadPool();
DelayQueue<DelayedTask> queue=new DelayQueue<DelayedTask>();
//Fill with tasks that have random delays:
for(int i=0;i<20;i++)
queue.put(new DelayedTask(rand.nextInt(5000)));
//set the stopping point
queue.add(new DelayedTask.EndSentinel(5000, exec));
exec.execute(new DelayedTaskConsumer(queue));
}
}
(4)PriorityBlockingQueue:这是一个优先级队列,它具有可阻塞的读取操作。
(5)ScheduledExecutor:ScheduledThreadPoolExecutor,通过schedule()运行一次任务或者scheduleAtFixedRate()每隔规则的时间重复执行任务,你可以将Runnable对象设置为在将来的某个时刻执行。
(6)Semaphore:计数信号量允许n个任务同时访问这个资源。如果没有任何信号量可以用,available将阻塞调用过程。
(7)Exchanger:在两个任务之间交换对象的栅栏。典型应用场景是:一个任务在创建对象,这些对象的生产代价很高昂,而另一个任务在消费这些对象。通过这种方式,可以有更多的对象在被创建的同时被消费。
8.仿真
(1)银行出纳员仿真:对象随机地出现,并且要求由数量有限的服务器提供随机数量的服务时间。通过构建仿真可以确定理想的服务器数量。
(2)饭店仿真:使用队列在任务间通信可以极大地简化并发编程的过程。
(3)分发工作
9.性能调优
通常,只互斥那些你绝对必须互斥的部分,能提高性能。在实际中,被互斥部分可能会比较大,因此在这些方法中花费的时间的百分比可能会明显大于进入和退出互斥的开销,这也就湮没也提高互斥速度带来的好处。在对性能调优时,应该立即--尝试各种不同的方法并观察它们造成的影响。在并发程序中使用Atomic类时,也可以提高性能,但是Atomic对象只有在非常简单的情况下才有用,这种情况通常包括你只有一个要被修改的Atomic对象,并且这个对象独立于其他所有的对象。更安全的做法是,以更加传统的互斥方式入手,只有在性能方面的需求能够明确指示时,再替换为Atomic。
(1)免锁容器:Collections类提供了各种static的同步的装饰方法,来同步不同类型的容器。免锁容器背后的通用策略是:对容器的修改可以与读取操作同时发生,只要读取者只能看到完成修改的结果即可。修改是在容器数据结构的某个部分的一个单独副本(有时是整个数据结构的副本)上执行的,并且这个副本在修改过程中是不可视的。只有当修改完成时,被修改的结构才会自动地与主数据结构进行交换,之后读取者就可以看到这个修改了。
CopyOnWriteArrayList中,写入将导致创建整个底层数组的副本,而源数组将保留在原地,使得复制的数组在被修改时,读取的操作可以安全地执行。当修改完成时,一个原子性的操作将把新的数组换入,使得新的读取操作剋看到这个新的修改。CopyOnWriteArrayList的好处之一是当多个迭代器同时遍历和修改这个列表时,不会抛出ConcurrentModificationException。
CopyOnWriteArraySet将使用CopyOnWriteArrayList来实现其免锁行为,ConcurrentHashMap和ConcurrentLinkedQueue采用了类似的技术。
(2)乐观锁:只要你主要是从免锁容器中读取,那么他就会比synchronized快许多,因为获取和释放锁的开销被省掉了。如果需要向免锁容器中执行少量写入,情况仍旧如此。
(3)乐观加锁:当你准备更新Atomic对象时,可用compareAndSet()的方法。将旧值和新值一起提交给这个方法,如果旧值与它在Atomic对象中的不一致,那就表明其他任务已经在此操作执行期间修改了这个对象。
(4)ReadWriteLock:对向数据结构相对不频繁地写入,但是有多个任务要经常读取这个数据结构的情况进行了优化。ReadWriteLock可以同时有多个读取者,只要他们都不试图写入就可以。如果写锁被其他任务持有,那么任何读取者都不能访问,直到这个写锁被释放为止。
10活动对象
这是另一种不同的并发模型。每个对象都维护着它自己的工作器线程和消息队列,并且所有对这种对象的请求都将进入队列排队,任何时刻都只能运行其中一个。因此,我们可以串行化消息而不是方法,意味着不再需要防备一个任务在其循环的中间被中断这种问题了。不会发生死锁,也不存在任何资源竞争。
package concurrency;
import java.util.concurrent.*;
import java.util.*;
import static tools.Print.*;
public class ActiveObjectDemo {
private ExecutorService ex=Executors.newSingleThreadExecutor();
private Random rand=new Random(47);
//Insert a random delay to produce the effect
//of a calulation time:
private void pause(int factor){
try{
TimeUnit.MILLISECONDS.sleep(100+rand.nextInt(factor));
}catch(InterruptedException e){
print("sleep() interrupted");
}
}
public Future<Integer> calculateInt(final int x,final int y){
return ex.submit(new Callable<Integer>(){
public Integer call(){
print("Starting "+x+" + "+y);
pause(500);
return x+y;
}
});
}
public Future<Float> calculateFloat(final float x,final float y){
return ex.submit(new Callable<Float>(){
public Float call(){
print("starting "+x+" + "+y);
pause(2000);
return x+y;
}
});
}
public void calculateString(final String x,final String y){
ex.execute(new Runnable(){
public void run(){
print("Runnable");
}
});
}
public void shutdown(){ ex.shutdown();}
public static void main(String[] args){
ActiveObjectDemo d1=new ActiveObjectDemo();
//Prevents ConcurrentModificationException:
List<Future<?>> results=new CopyOnWriteArrayList<Future<?>>();
for(float f=0.0f;f<1.0f;f+=0.2f)
results.add(d1.calculateFloat(f,f));
for(int i=0;i<5;i++)
results.add(d1.calculateInt(i,i));
for(int i=0;i<5;i++)
d1.calculateString("a", "b");
print("All asynch calls made");
while(results.size()>0){
for(Future<?> f:results)
if(f.isDone()){
try{
print(f.get());
}catch(Exception e){
throw new RuntimeException(e);
}
results.remove(f);
}
}
d1.shutdown();
}
}
总结
终于打下了一定的基础,这本书带给我的思想和思考的方式都将使我受益,非常谢谢写这本书的作者。