并发编程实战

一、线程管理

  • 并发是进程级并发,但在一个进程内页可以有多个同时进行的任务,这种进程内并发的任务称为线程。并发是在单核处理器中使用多线程执行应用,并行是在多核处理器中使用多线程执行应用

  • 对于一个实现了Runnable接口的类来说,创建Thread对象并不会创建一个新的执行线程;同样的,调用它的run方法,也不会创建一个新的执行线程。只有调用它的start()方法时,才会创建一个新的执行线程

  • 线程信息的获取和设置
  1. Thread类中保存信息的属性,这些属性可以用来标识线程,显示线程的状态或者控制线程的优先级
  2. ID——保存了线程的唯一标示符;Name——保存了线程名称;Priority——保存了线程对象的优先级。线程的优先级是从1到10,其中1是最低优先级,10是最高优先级;Status——保存了线程的状态,在java中线程有六种状态:new、runnable、blocked、waiting、time waiting或者terminated。
  3. 可以通过get方法获取到线程的ID、Name、Priority、Status,可以通过set方法设置Priority、Name,但是不能设置ID和Status。
  4. 如果一个线程是以Runnable对象为参数构建的,那么也可以使用Thread类的静态方法currentThread来访问这个线程对象

  • 线程的中断
  1. 如果一个Java程序有不止一个执行线程,当所有线程都运行结束的时候,这个Java程序才能运行结束;更确切地说应该是所有的非守护线程(非守护线程就是用户线程,守护线程是为其他运行的线程提供服务如GC线程,当用户线程全部撤离,守护线程也自动退出)运行结束时,或者其中一个线程调用了System.exit()方法时,这个Java程序才运行结束。
  2. Java提供了中断机制,我们可以使用它来结束一个线程,这种机制要求线程检查它是否被中断了,然后决定是不是响应这个中断请求,线程允许忽略中断请求并且继续执行
  3. 中断是通过调用Thread.interrupt()方法来做的。这个方法通过修改了被调用线程的中断状态来告知那个线程,说它被中断了对于非阻塞中的线程, 只是改变了中断状态,即Thread.isInterrupted()将返回true; 对于可取消的阻塞状态中的线程, 比如等待在这些函数上的线程,Thread.sleep()、Object.wait()、Thread.join(),这个线程收到中断信号后,会抛出InterruptedException,同时会把中断状态置回为false。
  4. 中断状态可以通过 Thread.isInterrupted()来读取,并且可以通过一个名为 Thread.interrupted()的静态方法读取和清除状态(即调用该方法结束之后, 中断状态会变成false)
  5. 线程提供了task.interrupt方法中断线程,在线程中可以使用isInterrupted方法判断线程是否被中断。
     
         
    1. //这种方式只能使用在线程很容易被中断的情况下,因为线程可以忽略中断,所以中断操作并不是每次都会执行
    2. public void run(){
    3. //线程的run方法
    4. ....
    5. if(isInterrupted()){
    6. ...
    7. return;
    8. }
    9. ...
    10. }
    11. 调用该线程的interrut方法
    12. task.interrupt();
  6. 为了更好地控制线程的中断,Java提供了InterruptedException异常。当检查到线程中断的时候,就抛出这个异常,然后在run中捕获并处理这个异常
     
         
    1. public void run(){
    2. try{
    3. }catch(InterruptedException e){
    4. //捕获中断异常,无论在什么情况下,只要线程被中断就抛出该异常
    5. }
    6. }
    7. 调用该线程的中断方法
    8. task.interrupt();
    与并发API相关的Java方法将会抛出InterruptedException异常,如sleep方法。
  7. 不实用stop和suppend方法的原因:                                                                         

  • 线程的休眠和恢复
  1. sleep方法接收整型数值作为参数,以表名线程挂起执行的毫秒数。当线程休眠的时间结束了,JVM会分给它CPU时钟,线程将继续执行它的指令。
  2. sleep可以使用通过TimeUnit枚举类元素进行调用。TimeUnit表示给定单元粒度的时间段:MICROSECONDS、MILLISECONDS、NANOSECONDS、SECONDS、MINUTES、HOURS、DAYS
     
         
    1. //休眠1秒
    2. TimeUnit.SECONDS.sleep(1);
  3. 调用sleep方法之后线程会释放CPU并且不再继续执行任务。在这段时间内,线程不占用CPU时钟,所以CPU可以执行其他的任务。
  4. Java还提供了yield方法来释放CPU,它将通知JVM这个线程对象可以释放CPU了。JVM并不保证遵循这个要求。通常来讲,yield方法只做调试使用                                                 sleep方法释放了CPU资源,但是没有释放对象锁。而wait方法释放了对象锁,必须通过notify方法才能唤醒。

  • 守护线程
  1. 守护线程的优先级很低,通常来说,当同一个应用程序里没有其他的线程运行的时候,守护线程才运行。守护线程通常被用来做为同一程序中普通线程的服务提供者。
  2. 通过setDaemon(true)方法设置守护线程,一旦线程开始运行,将不再修改守护状态。
  3. 通过isDaemon方法判断一个线程是不是守护线程

  • 线程中不可控异常的处理
  1. run方法不支持throws语句,所以当线程对象的run方法抛出非运行异常时,我们必须捕获并且处理他们。
  2. Java提供了UncaughtExceptionHandler接口,并且实现这个接口的uncaughtException方法。
  3. 接着创建一个线程对象,调用该线程的setUncaughtExceptionHandler方法,将之前实现的类作为参数传入上述方法即可。

  • 线程分组
  1. 我们可以把一个组的线程当作一个单一的单元,对组内线程对象进行访问并操作它们。对于一个线程组,是需要一个单一的操作,就可以对线程中的所有线程执行
  2. ThreadGroup类表示一组线程。线程组可以包含线程对象,也可以包含其他的线程组对象,它是一个树形结构。
     
         
    1. ThreadGroup tg=new ThreadGroup("name");
    2. Thread t=new Thread(tg,new Runnable(){...});
    3. //可以通过activeCount获取线程组包含的线程数目,通过enumerate获取线程组包含的线程列表
    4. Thread[] threads=new Threads[tg.activeCount()];
    5. tg.enumerate(threads);
  3. 如果想要捕获ThreadGroup中的异常,可以继承ThreadGroup类,重写其uncaughtException方法即可

  • 使用工厂类创建线程
  1. 使用工厂类创建线程可以功容易修改类,或者改变创建对象的方式;更容易为优先资源限制创建对象的数目;更容易为创建对象生成统计数据。
  2. Java提供了ThreadFactory接口实现线程对象工厂。
  3. 继承ThreadFactory接口,重写newThread(Runnable r)方法,在该方法中创建线程并返回。

二、线程同步基础

  • Synchronized实现同步方法
  1. 如果一个对象已用Synchronized关键字声明,那么只有一个执行线程被允许访问它。如果其他某个线程试图访问这个对象的其他方法,它将被挂起,直到第一个线程执行完正在运行的方法。
  2. 每一个用Synchronized声明的方法都是临界区。在Java中,同一个对象的临界区,在同一时间只能有一个允许被访问
  3. 用Synchronized关键字声明的静态方法,同时只能够被一个执行线程访问,但是其他线程可以访问这个对象的非静态方法
  4. 如果两个线程访问的是同一个类的不同对象,那么线程不会被阻塞。
  5. 可重入锁:一个线程可以递归调用Synchronized声明的方法。当线程访问一个对象的同步方法时,它还可以调用这个对象的其他同步方法,也包含正在执行的方法,而不必再次去获取这个方法的访问权

  • 使用非依赖属性实现同步
  1. 当使用Synchronized关键字来保护代码块时,必须把对象引用传入参数。通常情况下使用this关键字来引用执行方法所属对象
     
         
    1. synchronized(this){
    2. .....
    3. }
  2. 用Synchronized关键字保护代码块时,我们使用对象作为它的传入参数。JVM保证同一时间只有一个线程能够访问这个对象的代码保护块。
     
         
    1. Object o1=new Object();
    2. Object o2=new Object();
    3. synchronized(o1){...}
    4. synchronized(o2){...}
  3.                                                                                                一般来说类锁都是用于静态方法之中,而对象锁用于非静态方法之中

  • 在同步代码中使用条件
  1. Java在Object类中提供了wait、notify、notifyAll方法。线程可以在同步代码块中调用wait方法,如果在同步代码块之外调用wait方法,则会抛出异常
  2. 当一个线程调用wait方法时,JVM将这个线程置入休眠,并且释放控制这个同步代码块对象,同时允许其他线程执行这个对象控制的其他同步代码块。为了唤醒这个线程,必须在这个对象控制的同步代码块中调用notify或者notifyAll方法。

  • 使用锁实现同步
  1. 锁机制是一种比Synchronized更加强大更加灵活的机制。锁机制基于Lock接口及其实现类(例如ReentrantLock),提供了更多的好处。                                                                                                                
  2.  
  3.  
         
    1. //创建一个锁对象
    2. Lock lock=new ReentrantLock();
    3. //获取锁对象的控制
    4. lock.lock();
    5. //释放锁对象控制
    6. lock.unlock();
    在线程离开临界区的时候,必须使用unlock方法释放它持有的锁,以让其他线程来访问临界区 。如果没有调用则会导致死锁。如果在临界区使用了try-catch块,将unlock方法放入finally中
  4. ReentrantLock类也允许使用递归调用。如果一个线程获取了锁并且进行了递归调用,它将继续持有这个锁(可重入)。

  • 使用读写锁实现同步数据访问
  1. 锁机制的最大改进之一就是ReadWriteLock接口和它的唯一实现类ReentrantReadWriteLock。这个类有两个锁,一个是读操作锁,另一个是写操作锁
  2. 使用读操作锁可以允许多个线程同时访问,但是使用写操作锁时只允许一个线程访问。一个线程在执行写操作时,其他线程不能进行读操作。
  3.  
         
    1. ReadWriteLock lock=new ReentrantReadWriteLock();
    2. //获取读锁,释放读锁
    3. lock.readLock().lock();
    4. lock.readLock().unlock();
    5. //获取写锁,释放写锁
    6. lock.writeLock().lock();
    7. lock.writeLock().unlock();

  • 修改锁的公平性
  1. ReentrantLock和ReentrantReadWriteLock类的构造器都含有一个不二参数fair,它允许你控制这两个类的行为。默认fair值是false,它成为非公平模式
  2. 非公平模式下,当有多个线程正在等待该锁,锁将随意选择其中一个访问临界区,没有任何限制。而公平模式下,则会选择等待时间最长的线程进行访问

  • 在锁中使用多条件
  1. 一个锁可能关联一个或多个条件,这些条件通过Condition接口声明。目的是允许线程获取所并且查看等待的某一个条件是否满足,如果不满足就挂起直到某个线程唤醒它们。Condition接口提供了挂起线程和唤醒线程的机制
     
         
    1. //只有ReentrantLock可以使用conditon,WriteLock不能使用
    2. ReentrantLock lock=new ReentrantLock();
    3. Condition c=lock.newCondition();
    4. c.await();
    5. c.signalAll();
  2. 与锁绑定的所有条件对象都是通过Lock接口声明的newCondition()方法创建的。在使用条件时,必须获取这个条件绑定的锁,所以带条件的代码必须在调用Lock对象的lock方法和unlock方法之间
  3.  
         
    1. await(long time,TimeUnit unit)
    2. awaitUninterruptibly():不可中断
    3. awaitUntil(Date date):当指定的最后期限到了,线程才恢复

三、线程同步辅助类

  • 辅助类
  1. 信号量Semaphore:是一种计数器,用来保护一个或者多个共享资源的访问。它是并发编程的一种基础工具,大多数编程语言都提供了这个机制。
  2. CountDownLatch:在完成一组正在其他线程中执行的操作之前,它允许线程一直等待。
  3. CyclicBarrier:它允许多个线程在某个集合点处进行相互等待。
  4. Phaser:它把并发任务分成多个阶段运行,在开始下一阶段之前,当前阶段中的所有线程都必须执行完成。
  5. Exchanger:它提供了两个线程之间的数据交换点。
  6. 在应用程序中任何时候都可以使用Semaphore来保护临界区,因为它是一个数据的同步机制。而其他的同步机制,则需要根据各自的上述特性来对其选择使用。

  • 资源的并发访问控制
  1. 信号量Semaphore是一种计数器,用来保护一个或者多个共享资源的访问。
  2. 如果线程要访问一个共享资源,它必须先获得信号量。如果信号量的内部计数器大于0,信号量减1,然后允许访问这个共享资源。计数器大于0意味着有可以使用的资源,因此线程将被允许使用其中一个资源。如果信号量小于等于0,线程将会被阻塞。
  3.  
         
    1. //创建信号量Semaphore,并初始化
    2. Semaphore s=new Semaphore(1);
    3. //获取信号量,会抛出异常需要捕获
    4. s.acquire();
    5. //释放信号量
    6. s.release();
  4. 其他获取信号量的方式:
     
         
    1. //线程在被阻塞时有可能会被中断而抛出异常,此方法会忽略线程中断并且不会抛出任何异常
    2. acquireUniterruptibly();
    3. //刚方法会试图获取信号量,如果能就返回true,如果不能就返回false,从而避开线程的阻塞和等待信号量的释放
    4. tryAcquire();
  5. acquire、acquireUninterruptibly、tryAcquire、release方法都有另一种实现方式,及提供一个int型的传入参数这个参数声明了线程试图获取或者释放的共享资源数目。如果计数器的数值少于参数对应的数值,则线程会被阻塞知道计数器重新累加到或者超过这个值。

  • 等待多个并发事件的完成
  1. CountDownLatch在完成一组正在其他线程中执行的操作之前,它允许线程一直等待。
  2. 这个类使用一个整数进行初始化,这个整数就是线程要等待完成的操作的数目。当一个线程要等待某些操作先执行完时,需要调用await方法,这个方法让线程进入休眠直到等待的所有操作都完成当某一个操作完成后,它将调用countDown方法将CountDownLatch类的内部计数器减1。当计数器变成0时,CountDownLatch类将唤醒所有调用await方法而进入休眠的线程。
  3.  
         
    1. //创建CountDownLatch
    2. CountDownLatch countDownLatch=new CountDownLatch(1);
    3. //等待事件完成
    4. countDownLatch.await();
    5. //计数器减1,唤醒所有调用await方法
    6. countDownLatch.countDown();
  4. CountDownLatch设定的初始值参数与调用的线程数量没有关系,而和需要完成操作的数量有关系。
  5. CountDownLatch机制不是用来保护共享资源或者临界区的,它是用来同步执行多个任务的一个或者多个线程。且只准许进入一次,也就是说只能被初始化一次。

  • 在集合点的同步
  1. CyclicBarrier允许两个或者多个线程在某个点上进行同步。
  2. CyclicBarrier类使用一个整型数进行初始化,这个数是需要在某个点上同步的线程数,当一个线程到达指定的点后,它将调用await方法等待其他的线程。当线程调用await方法后,CyclicBarrier类将阻塞这个线程并使之休眠直到所有其他线程到达。当最后一个线程调用CyclicBarrier的await方法时,CyclicBarrier对象将唤醒所有等待的线程,然后这些线程将继续执行
  3. CyclicBarrier还可以传入另一个Runnable对象作为初始化参数,当所有线程都到达集合点后,CyclicBarrier类将这个Runnable对象作为线程执行。
     
         
    1. //创建CyclicBarrier
    2. CyclicBarrier c=new CyclicBarrier(number,Runnable);
    3. //等待其他线程,当所有线程都到达时,这些线程继续执行
    4. c.await();
  4. CyclicBarrier还提供了两个方法:getNumberWaiting和getParties方法,前者将返回await上阻塞的线程的数目(目前已经被阻塞的线程数目),后者返回被CyclicBarrier对象同步的任务数(CyclicBarrier初始化数值)。
  5. 重置CyclicBarrier对象:通过reset方法可以将CyclicBarrier重置回初始化状态,在await方法中等待的线程将收到一个BrokenBarrierException异常。
  6. 损坏的CyclicBarrier对象:当很多线程在await方法上等待,如果其中一个线程被中断,这个线程将抛出中断异常其他的等待线程将抛出BrokenBarrierException异常,于是CyclicBarrier就处于损坏状态。

  • 并发阶段任务的运行
  1. Phaser允许执行并发多阶段任务。Phaser类机制是在每一步结束的位置对线程进行同步,当所有的线程都完成了这一步,才允许执行下一步
  2. 必须对Phaser类中参与同步操作的任务数进行初始化,不同的是我们可以动态地增加或者减少任务数
     
         
    1. //Phaser其实是允许多个线程在多个不同的阶段进行等待同步
    2. //在某个阶段,所有线程都必须执行完成,所有线程才可以往下继续执行
    3. //Phaser的初始化数就与线程数量有关系
    4. //初始化Phaser
    5. Phaser p=new Phaser(3);
    6. //通知Phaser对象当前线程已经完成了当前阶段
    7. p.arriveAndAwaitAdvance();
    8. //通知Phaser对象当前线程已经结束这个阶段,并且将不再参与接下来的阶段操作,取消Phaser注册
    9. p.arriveAndDeregister();
     
          
    1. public void run() {
    2. super.run();
    3. phaser.arriveAndAwaitAdvance(); //当所有线程都启动了,才开始运行
    4. System.out.println("1");
    5. try {
    6. TimeUnit.SECONDS.sleep(3);
    7. } catch (InterruptedException e) {
    8. e.printStackTrace();
    9. }
    10. phaser.arriveAndAwaitAdvance();
    11. System.out.println(2);
    12. }
  3. Phaser对象有两种状态:活跃态(当存在参与同步的线程时,Phaser是活跃的)和终止态(当所有参与同步的线程都取消注册的时候,Phaser就处于终止态,不会做任何同步操作)。
  4. Phaser类置于休眠的线程不会相应中断事件,也不会抛出中断异常。

  • 并发阶段任务中的阶段切换
  1. Phaser类提供了onAdvance方法,它在phaser阶段改变的时候会被自动执行。onAdvance方法需要两个int型的传入参数:当前阶段数以及注册的参与者数量。它返回boolean值,如果返回false表示phaser在继续执行,返回true表示phaser已经完成执行并且进入了终止态。
  2. 该方法默认实现是当注册的参与者数量是0就返回true,否则就返回false。但是我们可以通过继承Phaser覆盖这个方法。一般来说,当必须从一个阶段到另一个阶段过渡的时候执行一些操作,我们就可以这么做。
     
         
    1. //phase是当前的阶段数,从0开始,可以用switch来判断各个不同的阶段
    2. onAdvance(int phase,int registeredParties)
  3. phaser.register进行注册,这个注册并不是与某个对象产生某种联系,仅仅只是增加phaser中参与者的数目而已。

  • 并发任务间的数据交换
  1. Exchanger允许在并发任务之间交换数据。Exchanger允许在线程之间定义同步点当两个线程都到达同步点时,他们交换数据结构,因此第一个线程的数据结构进入到第二个线程中,同时第二个线程的数据结构进入到第一个线程中。
  2. Exchanger只能同步两个线程。
  3.  
         
    1. //创建Exchanger对象
    2. Exchanger<List<String>> e=new Exchanger<>();
    3. List<String> list=new ArrayList<>();
    4. //交换数据,将本线程的list数据结构传给另一个线程,同时另一个线程也会传回list
    5. list=exchanger.exchange(list);

四、线程执行器

  • 执行器框架
  1. 执行器框架Executor Framework,围绕着Executor接口和它的子接口ExecutorService,以及实现这两个接口的ThreadPoolExecutor类展开。
  2. 这套机制分离了任务的创建和执行。通过使用执行器,仅需要实现Runnable接口的对象,然后将这个对象发送给执行器即可。执行器通过创建所需的线程,来负责这些Runnable对象的创建、实例化以及运行。同时执行器还是用了线程池来提高应用程序性能。避免不断地创建和销毁线程而导致系统性能下降。
  3. 执行器框架还提供了Callable接口,类似于Runnable接口,但是这个接口的主方法名称为call,可以返回结果。当发送一个Callable对象给执行器时,将获得一个实现了Future接口的对象。可以使用这个对象来控制Callable对象的状态和结果

  • 创建线程执行器
  1. 创建ThreadPoolExecutor对象,通过ThreadPoolExecutor类提供的四个构造器或者使用Executor工厂类来创建ThreadPoolExecutor对象。
  2.  
         
    1. //通过Executor类来初始化ThreadPoolExecutor对象,仅当线程的数量合理或者线程运行时间很短才适合如下的创建方式
    2. ThreadPoolExecutor executor=(ThreadPoolExecutorExecutor.newCachedThreadPool();
    3. //将任务发送给执行器
    4. executor.execute(Runnable);
    5. //结束执行器
    6. executor.shutdown();
  3.  
         
    1. getPoolSize:返回执行器线程池中实际的线程数
    2. getActiveCount:返回执行器中正在执行任务的线程数
    3. getCompletedTaskCount:返回执行器已经完成的任务数
  4. 当已经没有任务执行了,必须使用shutdown方法关闭执行器,否则执行器将一直等待不会结束。

  • 创建固定大小的线程执行器
  1. newCachedThreadPool创建的执行器,在线程池里没有空闲的线程可用是,将为接收到的每一个任务创建一个新县城,当发送大量的任务给执行器并且任务需要持续较长的时间时,系统将会超负荷。
  2. 因此可以创建一个固定大小的线程执行器,这个执行器有一个线程数的最大值,如果发送超过这个最大值的任务给执行器,执行器将不再创建额外的线程,剩下的任务将被阻塞知道执行器有空闲的线程可用
  3.  
         
    1. ThreadPoolExecutor executor=(ThreadPoolExecutor)Executor.newFixedThreadPool(5);

  • 在执行器中执行任务并返回结果
  1. Callable:这个接口声明了call方法,可以在这个方法里实现任务的具体逻辑操作。Callbable接口是一个泛型接口,这就意味着必须声明call方法返回数据类型
  2. Future:这个接口声明了一些方法来获取由Callable对象产生的结果,并且管理它们的状态
  3.  
        
    1. //继承Callable接口,实现call方法
    2. public class A implements Callable<Integer>{
    3. public Integer call(){
    4. int result;
    5. return result;
    6. }
    7. }
  4.  
         
    1. //创建执行器executor,将任务发送给执行器,返回一个Future对象
    2. //返回了Future并不代表该任务已经执行完成,而是可以查看执行的状态
    3. Future<Integer> result=executor.submit(Callable);
    4. //使用isDone方法判断任务是否完成
    5. result.isDone();
    6. //对于每一个Future对象来讲,可以通过get方法得到任务返回的Integer对象
    7. //如果调用 get方法时任务还没有完成,那么这个方法会阻塞等待该任务完成。
    8. Integer i=result.get();

  • 运行多个任务并处理第一个结果
  1. 当采用多个并发任务来解决一个问题时,往往只关心这些任务的第一个结果。
  2.  
         
    1. //创建一个任务列表,将多个任务添加到任务列表中
    2. List<Callable> list=new ArrayList<>();
    3. //使用执行器executor,并返回结果
    4. //传入参数列表,并返回第一个结果
    5. String s=executor.invokeAny(list);

  • 运行多个任务并处理所有结果
  1. 执行器框架允许执行并发任务而不需要去考虑线程创建和执行。它还提供了可以用来控制在执行器中执行任务的状态和获取任务运行结果的Future类。
  2. 等待任务执行结束:Future接口的isDone方法返回true;在调用shutdown方法后,ThreadPoolExecutor类的awaitTermination方法会将线程休眠,直到所有任务执行结束。
  3. 调用invokeAll执行多个任务并返回多个Future结果

  • 在执行器中延时执行任务
  1. 如果不想让任务马上被执行,而是想让任务在过一段时间后才被执行,或者任务能够被周期性地执行。为了达到这个目的,执行器框架提供了ScheduledThreadPoolExecutor类。
  2.  
         
    1. //创建ScheduledThreadPoolExecutor执行器,传递的参数为线程数量
    2. ScheduledThreadPoolExecutor executor=(ScheduledThreadPoolExecutor)Executor.newScheduledThreadPool(1);
    3. //调用schedule方法启动任务,后两个参数表示等待的时间
    4. executor.schedule(Runnable,Time,TimeUnit);
  3. ScheduledThreadPoolExecutor在执行器中周期性执行任务:
     
         
    1. //使用scheduledAtFixedRate方法发送任务,并接收4个参数
    2. //分别为将被周期性执行的任务,任务第一次执行后的延时时间
    3. //两次执行的时间周期(指任务在两次执行开始时的时间间隔),以及时间单位
    4. executor.scheduledAtFixedRate(Callable,1,2,TimeUnit);
    5. //还可以使用scheduledWithFixedRate
    6. //该方法表示的时间间隔与之前有所区别,表示任务上一次执行结束的时间与任务下一次开始执行的时间间隔
    7. executor.scheduledWithFixedRate(Callable,1,2,TimeUnit);

  • 执行器中取消任务
  1. 使用Future接口的cancel方法来执行取消操作。
     
         
    1. futurecancel(true);
  2. 如果任务已经完成,或者之前已经被取消,或者由于某种原因而不能被取消,那么方法将返回false并且任务也不能取消。
  3. 如果任务在执行器中等待分配Thread对象来执行它,那么任务被取消,并且不会开始执行。如果任务已经在运行,那么它依赖于调用cancel方法是所传递的参数,如果传递的参数为true并且任务正在运行,那么任务将被取消。如果传递的参数为false并且任务正在运行,那么任务不会被取消

  • 在执行器中控制任务的完成
  1. FutureTask类提供了一个名为done的方法,允许在执行器中的任务执行结束之后,还可以执行一些代码。默认情况下,done方法的实现为空。
     
         
    1. class ResultTask extends FutureTask<String>{
    2. //实现构造器,接收Callable对象作为参数
    3. //重写done方法
    4. }
    5. //直接将ResultTask调用submit方法传递给线程池执行

  • 在执行器中分离任务的启动与结果的处理
  1. 在一个对象里发送任务给执行器,然后在另一个对象里处理结果,Java提供了CompletionService类。CompletionService类有一个方法用来发送任务给执行器,还有一个方法为下一个已经执行结束的任务获取Future对象。
  2. 从内部实现机制来看,CompletionService类使用Executor对象来执行任务。这个行为的优势是可以共享CompletionService对象,并发送任务到执行器,然后其他的对象可以处理任务的结果。第二个方法有一个不足,它只能为已经执行结束的任务获取Future对象,因此Future对象只能用来获取任务的结果
     
         
    1. //创建CompletionService对象
    2. //将执行器作为传入参数
    3. Executor executor=Executor.newCacheThreadPool();
    4. CompletionService<String> service=new CompletionService<>(executor);
    5. //提交任务
    6. service.submit(Callable);
    7. //返回结果
    8. Future<String> result=service.poll();
  3. 返回结果又两种方法:poll和take方法,take方法将阻塞知道队列中有可用的元素。

  • 处理在执行器中被拒绝的任务
  1. 如果在shutdown方法与执行器结束之间发送一个任务给执行器,这个任务会被拒绝,因为这个时间段执行器已经不再接收任务了。ThreadPoolExecutor类提供了一套机制,当任务被拒绝时调用这套机制来处理它们。
  2. 实现RejectedExecutionHandler,重写rejectedExecution方法。最后executor.setRejectedExecutionHandler(RejectedExecutionHandler)即可。

五、Fork/Join框架

  • 概念
  1. Fork/Join框架,也被称为分解/合并框架。是用来解决能够通过分治技术将问题拆分成小任务的问题
  2. 在一个任务中,先检查将要解决的问题的大小,如果大于一个设定的大小,那将问题拆分成可以通过框架来执行的小任务。如果问题的大小比设定的大小要小,就可以直接在任务里解决这个问题,然后,根据需要返回任务的结果。
  3. Fork操作——当需要将一个任务拆分成更小的多个任务时,在框架中执行这些任务;Join操作——当一个主任务等待其创建的多个子任务的完成执行。
  4. Fork/Join框架与执行器框架主要的区别在于工作窃取算法(http://blog.csdn.net/xuqiaobo/article/details/51495698  )。与执行框架不同,使用Join操作让一个主任务等待它所创建的子任务的完成,执行这个任务的线程称为工作者线程。工作者线程寻找其他仍未被执行的任务,然后开始执行。通过这种方式,这些线程在运行时拥有所有的优点,进而提升应用程序的性能。
  5. Fork/Join框架有几个限制:任务只能使用fork和join操作当作同步机制。如果使用其他的同步机制,工作者线程就不能执行其他任务;任务不能执行I/O操作;任务不能抛出非运行时异常,必须在代码中处理掉这些异常。
  6. ForkJoinPool:这个类实现了ExecutorService接口和工作窃取算法。它管理工作者线程,并提供任务的状态信息,以及任务的执行信息
  7. ForkJoinTask:在ForkJoinPool中执行的任务基类。其子类包括RecursiveAction——用于任务没有返回结果的场景RecursiveTask——用于任务有返回结果的场景

  • 创建Fork/Join线程池
  1. 继承RecursiveAction,无返回结果,如果需要返回结果则继承RecursiveTask:
     
         
    1. public class Task extends RecursiveAction{
    2. //这个是必须的,因为其实现了Serializable接口
    3. private static final long serialVersionUID=1L;
    4. //重写compute方法,实现任务的逻辑
    5. //先判断当前任务的大小是否合理如果不合理进行拆分
    6. protected void compute(){
    7. if(合理){
    8. //执行这个任务
    9. }else{
    10. //将任务拆分
    11. Task a=new Task...
    12. Task b=new Task...
    13. //再调用invokeAll方法执行新的子任务,这个方法是同步调用,会等待子任务的完成
    14. invokeAll(a,b);
    15. }
    16. }
    17. }
  2. 创建ForkJoinPool对象:
     
         
    1. //线程数等于计算机CPU数目的线程池
    2. ForkJoinPool pool=new ForkJoinPool();
    3. pool.execute(task);
  3. 当一个Runnable或Callable传递给ForkJoinPool时,不会采用工作窃取算法,只有使用ForkJoinTask时才会使用工作窃取算法。
  4. ForkJoinTask中的invokeAll方法:
     
         
    1. invokeAll(ForkJoinTask<?>... tasks)
    2. invokeAll(Collection<T> tasks)
  5. 可以使用ForkJoinTask类的adapt方法接收一个Callable对象或者一个Runnable对象,然后将之转化为一个ForkJoinTask对象。

  • 合并任务的结果
  1. 使用RecursiveTask返回任务的结果,需要按照如下结构:
     
         
    1. if(problem size>size){
    2. Divide(task);
    3. invokeAll(task);
    4. groupResults();
    5. return reslut;
    6. }else{
    7. resolve result;
    8. return result;
    9. }
  2. 创建RecursiveTask类:
     
         
    1. public class Task extends RecursiveTask<Integer>{
    2. private static final long serialVersionUID=1L;
    3. //重写compute方法
    4. protected int compute(){
    5. if(合适){
    6. //执行任务
    7. }else{
    8. //分解任务
    9. Task a=new Task..
    10. Task b=new Task..
    11. invokeAll(a,b);
    12. //将任务返回的结果相加,得到计算结果
    13. int result=groupResults(a.get(),b.get());
    14. return result;
    15. }
    16. }
    17. }
  3. 调用get方法来获得Task返回的结果,这个方法声明在Future接口中,并由RecursiveTask类实现。
  4. ForkJoinTask提供了complete方法来结束任务的执行并返回结果,任务会在合并结果之后返回结果。

  • 异步运行任务
  1. 在ForkJoinPool中执行ForkJoinTask时,可以采用同步或异步方式。采用异步方式执行时,发送任务给执行器的方法将立即返回结果,但是任务仍能够继续执行
  2. 当采用同步方式方式执行时,任务会被挂起,直到任务被发送到线程池中执行完成。这种方式允许采用工作窃取算法来分配一个新任务给在执行休眠任务的工作者线程。而采用异步方法,任务将继续执行,无法采用工作窃取来提升应用程序性能 ,只有用join或get方法等待任务的结束,才可以使用工作窃取算法
  3. 重写任务的compute方法,调用fork方法采用异步方式来执行:
     
         
    1. public class Task extends RecursiveTask<Integer>{
    2. private static final long serialVersionUID=1L;
    3. //重写compute方法
    4. protected int compute(){
    5. if(合适){
    6. //执行任务
    7. }else{
    8. //异步执行任务
    9. Task a=new Task..
    10. Task b=new Task..
    11. a.fork();
    12. b.fork();
    13. //调用join方法等待任务执行结束,返回结果
    14. int result=a.join()+b.join();
    15. return result;
    16. }
    17. }
    18. }
  4. join方法在主任务中被调用,然后等待任务执行结束并返回结果。
  5. join方法和get方法都是等待任务返回结果,但是join方法不能被中断,否则抛出中断异常。如果任务抛出任何运行时异常,那么get方法将返回ExecutionException异常,但是join方法将返回RuntimeExecption异常

六、并发集合

  • 简介
  1. Java API提供了包含接口、类和算法的Java集合框架,它实现了可用在程序中的大量数据结构。当需要在并发程序中使用数据集合时,必须要谨慎地选择相应的实现方式。大多数集合类不能直接用于并发应用,因为它们没有对本身数据的并发访问进行控制
  2. Java提供了一些可以用于并发程序中的数据集合:阻塞式集合、非阻塞式集合。
  3. 阻塞式集合:这类集合包括添加和移除数据的方法。当集合已满或为空时,被调用的添加或者移除方法就不能立即被执行,那么调用这个方法的线程将被阻塞,一直到该方法可以被成功执行。
  4. 非阻塞式集合:这类集合包括添加和移除数据的方法。如果方法不能立即被执行,则返回null或者抛出异常,但是调用这个方法的线程不会被阻塞。

  • 使用非阻塞式线程安全列表
  1. ConcurrentLinkedDeque类来实现非阻塞式并发列表,如果操作不能被立即执行则会抛出异常或返回null。
  2.  
          
    1. ConcurrentLinkedDeque<> list=new ConcurrentLinkedDeque<Stirng>();
    2. list.add(...);
    3. list.pollFirst();
    4. list.pollLast();
  3.  
          
    1. //元素不会从列表中移除,如果列表为空则抛出异常
    2. getFirst()和getLast()
    3. //元素不会从列表中移除,如果列表为空则方法返回null
    4. peek()、peekFirst()和peekLast()
    5. //返回的元素会从列表中移除,如果列表为空则抛出异常
    6. remove()、removeFirst()和removeLast()

  • 使用阻塞式线程安全列表
  1. 阻塞式列表与非阻塞式列表的主要差别是:阻塞式列表在插入和删除操作时,如果列表已满或为空,操作不会被立即执行,而是将调用这个操作的线程阻塞队列直到操作可以执行成功。LinkedBlockingDeque类来实现阻塞式列表。
  2.  
         
    1. LinkedBlockingQueue<String> list=new LinkedBlockingQueue<>(3);
    2. //如果列表已满将被阻塞
    3. list.put(...);
    4. //如果列表为空将被阻塞
    5. list.take();

  • 使用按优先级排序的阻塞式线程安全列表
  1. PriorityBlockingQueue类可以实现一个有序列表,所有添加进PriorityBlockingQueue的元素必须实现Comparable。
  2. PriorityBlockingQueue使用compareTo方法来决定插入元素的位置,元素越大越靠后。
  3.  
         
    1. //实现Comparable接口
    2. public class Event implement Comparable<Event>{
    3. public int compareTo(Event e){
    4. return -1,0,1;
    5. }
    6. }
    7. PriorityBolckingQueue<Event> queue=new PriorityBlockingQueue<>();
    8. queue.add(...);
    9. queue.poll();

  • 使用带有延迟元素的线程安全列表
  1. DelayQueue可以存放带有激活日期的元素。当调用方法从队列中返回或提取元素时,未来的元素日期将被忽略。存放在DelayQueue类中的元素必须继承Delayed接口。Delayed接口时对象成为延迟对象,它使存放在DelayQueue类中的元素具有了激活日期。
  2. Delayed接口需要复写compareTo(Delayed o)getDelay(TimeUnit unit)方法。
     
         
    1. public int compareTo(Delayed o){
    2. long result=getDelay(TimeUnit)-getDelay(TimeUnit);
    3. //判断result..........
    4. }
    5. //返回开始时间和当前时间的间隔
    6. public long getDelay(TimeUnit unit){
    7. Date now=new Date();
    8. long diff=startDate.getTime()-now.getTime();
    9. return unit.convert(diff,TimeUnit.MILLISECONDS);
    10. }
  3.  
         
    1. DelayQueue<Event> queue=new DelayQueue<>();
    2. queue.add(...);
    3. queue.poll();

  • 生成并发随机数
  1. ThreadLocalRandom是线程本地变量,每个生成随机数的线程都有一个不同的生成器,但是都在同一个类中被管理,对程序员来说是透明的,用来生成伪随机数
  2.  
         
    1. ThreadLocalRandom.current().nextInt(10);

  • 使用原子变量
  1. 原子变量提供了单个变量上的原子操作。当对一个普通变量进行计算操作时,JVM会生成多条不同的指令完成一个操作,当多个线程共享同一个变量时,就会发生数据不一致的错误
  2. 原子变量不实用锁或其他同步机制来保护对其值得并发访问。所有操作都是基于CAS原子操作的,它保证了多线程在同一时间操作一个原子变量而不会产生数据不一致的错误,并且它的性能优于使用同步机制保护的普通变量。
  3. 原子类:AtomicBoolean、AtomicInteger、AtomicReference、AtomicLong等。
  4.  
         
    1. AtomicLong a=new Atomic();
    2. a.get();
    3. a.set(100);
    4. //增加a的属性的值
    5. a.getAndAdd(100);
    6. //减少a的属性的值
    7. a.getAndAdd(-100);

  • 使用原子数组
  1. Java在原子变量中实现了CAS机制,这些变量提供了实现比较和交换操作的compareAndSet方法,其他方法也基于它展开。
  2. Java也引入了原子数组提供对Integer或long数字数组的原子操作。例如:AtomicIntegerArray类的原子数组
     
         
    1. AtomicIntegerArray vector=new AtomicIntegerArray(1000);
    2. //增加数组中的某个元素的值
    3. vector.getAndIncrement(数组的位置);
    4. //减少数组中的某个元素的值
    5. vector.getAndDecrement(数组的位置);
  3.  
         
    1. get(int i):返回数组中由参数指定位置的值
    2. set(int i,int newValue):设置由参数执行位置的新值

七、定制并发类

  • 定制ThreadPoolExecutor类
  1. 通过继承ThreadPoolExecutor类,定制一个线程池:
     
         
    1. public class MyExecutor extends ThreadPoolExecutor{
    2. public MyExecutor(int corePoolSize,int maxinumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable>){
    3. super(....);
    4. }
    5. //覆盖shutdown方法
    6. public void shutdown(){
    7. ...
    8. }
    9. //覆盖shutdowNow
    10. public List<Runnable> shutdownNow(){
    11. ....
    12. return super.shutdownNow();
    13. }
    14. //覆盖BeforeExecutor方法
    15. //覆盖afterExecutor方法
    16. }
  2.  
         
    1. //创建该ThreadPoolExecutor
    2. MyExecutor myExecutor=new MyExecutor(2,4,1000,TimeUnit.MILLISECONDS,new LinkedBolckingDeque<Runnable>());

  • 实现基于优先级的Executor类
  1. 执行器内部使用一个阻塞式队列存放等待执行的任务,并按任务到达执行器时的顺序进行存放。另一个可行的替代方案是使用优先级队列存放新的任务
  2.  
         
    1. //基于优先级的阻塞队列
    2. ThreadPoolExecutor executor=new ThreadPoolExecutor(2,2,1,TimeUnit.SECONDS,new PriorityBlockingQueue<Runnable>());
  3. 把一个普通的执行器转换为基于优先级的执行器非常简单,只需要把PriorityBlockingQueue对象作为其中一个传入参数,并且要求它的泛型参数树Runnable接口即可。使用这种执行器的时候,存放在优先队列中的所有对象必须实现Comparable接口

  • 实现ThreadFactory接口生成定制线程
  1. Java提供了ThreadFactory接口来实现Thread对象工厂。Java并发API的一些高级辅助类,向Executor框架或Fork/Join框架,都使用了线程工厂来创建线程。
  2.  
         
    1. public class MyThreadFactory implements ThreadFactory{
    2. //重写newThread方法
    3. public Thread newThread(Runnable r){
    4. Thread t=new Thread(r);
    5. return t;
    6. }
    7. }
    8. Thread t=ThreadFactory.newThread(Runnable);

  • 在Executor对象中使用ThreadFactory
  1. Executor框架内部使用了ThreadFactory接口来生成新的线程
  2.  
         
    1. ThreadFactory factory=new ThreadFactory(...);
    2. Executor e=Executors.newCachedThreadPool(factory);

  • 定制运行在定时线程池的任务
  1. 定时线程池ScheduledThreadPool是Executor框架基本线程池的扩展,允许在一段时间后定时执行任务。该线程池还可以执行两类任务:延迟任务和周期性任务。
  2. 延迟任务能够执行实现Callable和Runnable接口的两类对象周期性任务尽能执行实现Runnable接口的对象。所有由定时线程池执行的任务都必须实现RunnableScheduledFuture接口

  • 实现ThreadFactory接口为Fork/Join框架生成定制线程
  1. 继承ForkJoinWorkerThread类:
     
         
    1. public class MyWorkerThread extends ForkJoinWorkerThread{
    2. //重写onStart、onTermination方法
    3. }
  2. 创建工厂类,实现ForkJoinWorkerThreadFactory接口:
     
         
    1. public class MyWorkerThreadFactory implements ForkJoinWorkerThreadFactory{
    2. //重写newThread方法
    3. public ForkJoinWorkerThread newThread(ForkJoinPool pool){
    4. return new MyWorkerThread(pool);
    5. }
    6. }
  3. 主类中调用:
     
         
    1. MyWorkerThreadFactory factory=new MyWorkerThreadFactory();
    2. ForkJoinPool pool=new ForkJoinPool(4,factory,null,false);

  • 实现基于优先级的传输队列:Java提供了几种用于并发应用程序的数据结构:LinkedTranferQueue(适用于拥有生产者-消费者结构的程序中。生产者将数据存放到数据结构中,消费者则从数据结构中取出数据。如果数据结构为空,消费者被阻塞直到数据结构中有可用的数据);PriorityBlockingQueue(在这个数据结构中,元素按顺序存储,这些元素必须实现Comparable接口,并实现接口中定义的compareTo方法)。




















猜你喜欢

转载自blog.csdn.net/lingengliang/article/details/80913349