java并发编程三剑客

java并发编程三剑客

思维导图

CountDownLatch用法

​ 位于java.util.concurrent包下,利用它可以实现类似计数器的功能,比如有一个任务A,它要等待其他几个任务执行完毕后才能执行,这时可以使用CountDownLatch来实现这个功能

构造器以及方法

构造器

只提供了一种构造器,其中参数count为计数器

public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }
主要方法
// 调用await()方法的线程会被挂起,它会等待直到count的值为0才继续执行 
public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
// 和await()方法类似,只不过添加了一个等待时间,时间到后也会执行
public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }
// 将count的值减一
 public void countDown() {
        sync.releaseShared(1);
    }

使用方法

import java.util.concurrent.CountDownLatch;

public class Test{
    public static void main(String[] args) {
        final CountDownLatch latch = new CountDownLatch(2);

        new Thread(){
            @Override
            public void run(){
                try{
                    System.out.println("子线程"+Thread.currentThread().getName()+"正在执行");
                    Thread.sleep(3000);
                    System.out.println("子线程"+Thread.currentThread().getName()+"执行完成");
                    latch.countDown();
                }catch (InterruptedException e){
                    e.printStackTrace();
                }
            };
        }.start();

        new Thread(){
            @Override
            public void run(){
                try {
                    System.out.println("子线程"+Thread.currentThread().getName()+"正在执行");
                    Thread.sleep(3000);
                    System.out.println("子线程"+Thread.currentThread().getName()+"执行完毕");
                    latch.countDown();
                }catch (InterruptedException e){
                    e.printStackTrace();
                }
            };
        }.start();
        try {
            System.out.println("等待两个子线程执行完毕...");
            latch.await();
            System.out.println("两个子线程执行完毕");
            System.out.println("继续执行主线程");
        }catch (InterruptedException e){
            e.printStackTrace();
        }
    }
}
  • 运行结果:

CyclicBarrier用法

​ 位于java.util.concurrent包下,字面意思回环栅栏,通过它可以实现让一组线程等待直到某个状态后再全部同时执行,叫做回环是因为当所有等待线程都被释放以后,CyclicBarrier可以重新使用。我们把这个状态称为barrier。当调用await()方法以后,线程就处于barrier状态

构造器以及主要方法

构造器

​ 提供了两种构造器

// 参数parties是指让多少个线程或者任务等待至barrier状态
// 参数barrierAction为当前这些线程都达到barrier状态时,会执行的内容
public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }
 public CyclicBarrier(int parties) {
        this(parties, null);
    }
主要方法

​ await方法

// 比较常用,用来挂起当前线程,直至所有线程都达到barrier状态再同时执行后续任务
public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }
// 让这些线程等待至一定的时间,如果还有线程没有到达barrier状态就直接让达到barrier状态的线程执行后续任务
public int await(long timeout, TimeUnit unit)
        throws InterruptedException,
               BrokenBarrierException,
               TimeoutException {
        return dowait(true, unit.toNanos(timeout));
    }

其中主要调用了dowait

private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        // 获取锁,显然每次只有一个线程能获取到对象的锁,
        lock.lock();
        try {
            // 判断是否处于下一代,默认g.broken=false;
            final Generation g = generation;

            if (g.broken)
                throw new BrokenBarrierException();
			// 如果线程被中断调用breadBarrier退出屏障并抛出异常
            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }
			// 减少线程达到屏障线程数
            int index = --count;
            // 如果所有线程达到屏障,唤醒其他线程继续执行
            if (index == 0) {  // tripped
                boolean ranAction = false;
                try {
                    // 获取需要指向的Runnable对象,如果不为null则执行run方法
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();
                    // 设置执行方法完成
                    ranAction = true;
                    // 通知其他线程继续执行并重置下一代
                    nextGeneration();
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }

            // 如果还有其他线程没有达到屏障将执行下面的循环
            for (;;) {
                try {
                    // 是否是超时等待,不是超时等待立马调用trip.await(),trip是Condition,调用await将会是线程阻塞,否则调用带有超时时间的awaitnanos(nanos)(超时时间大于0的情况下)
                    if (!timed)
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                       
                        Thread.currentThread().interrupt();
                    }
                }
				// 如果设置了超时且过了超时时间,查看当前代是否被破坏,破坏抛出异常
                if (g.broken)
                    throw new BrokenBarrierException();
				// 不是当前代返回
                if (g != generation)
                    return index;
				// 设置了超时且超时时间小于0,设置当前代被破坏同时唤醒其他线程并抛出超时异常
                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }

使用方法

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class Test {
    public static void main(String[] args) {
        int N = 4;
        CyclicBarrier barrier  = new CyclicBarrier(N,new Runnable() {
            @Override
            public void run() {
                System.out.println("当前线程"+Thread.currentThread().getName());
            }
        });
        for(int i=0;i<N;i++) {
            new Writer(barrier).start();
        }
    }
    static class Writer extends Thread{
        private CyclicBarrier cyclicBarrier;
        public Writer(CyclicBarrier cyclicBarrier) {
            this.cyclicBarrier = cyclicBarrier;
        }

        @Override
        public void run() {
            System.out.println("线程"+Thread.currentThread().getName()+"正在写入数据...");
            try {
                Thread.sleep(5000);      //以睡眠来模拟写入数据操作
                System.out.println("线程"+Thread.currentThread().getName()+"写入数据完毕,等待其他线程写入完毕");
                cyclicBarrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }catch(BrokenBarrierException e){
                e.printStackTrace();
            }
            System.out.println("所有线程写入完毕,继续处理其他任务...");
        }
    }
}
  • 运行结果

Semaphore用法

​ 信号量,可以控制同时访问的线程个数,通过acquire()获取一个许可,如果没有就等待,而release释放一个许可

​ 位于java.util.concurrent包下,

构造器和主要方法

构造器
// 参数permits表示许可数目,即同时可以允许多少线程进行访问
public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }
// fair表示是否是公平的,即等待时间越久的越先获得许可
 public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }
主要方法
  • acquire 若无许可可以获取,则一直等待直到获得许可
  • release 在释放许可以前,必须先获得许可
// 尝试获取一个许可,
public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
// 获取peimits个许可
 public void acquire(int permits) throws InterruptedException {
        if (permits < 0) throw new IllegalArgumentException();
        sync.acquireSharedInterruptibly(permits);
    }
// 释放一个许可
 public void release() {
        sync.releaseShared(1);
    }
// 释放permits个许可
 public void release(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        sync.releaseShared(permits);
    }
  • 以上四个方法都会被阻塞,如果想立即得到执行结果
//尝试获取一个许可,若获取成功,则立即返回true,若获取失败就立即返回false 
public boolean tryAcquire() {
        return sync.nonfairTryAcquireShared(1) >= 0;
    }
// 尝试获取一个许可,若在指定的时间内获取成功则立即返回true,否则立即返回false
 public boolean tryAcquire(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }
// 尝试获取permits个许可,若获取成功则立即返回true,或获取失败则立即返回false
 public boolean tryAcquire(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        return sync.nonfairTryAcquireShared(permits) >= 0;
    }
// 尝试获取permits个许可,若在指定时间内成功....
public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
        throws InterruptedException {
        if (permits < 0) throw new IllegalArgumentException();
        return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
    }

使用方法

public static void main(String[] args) {
        int N = 8;            //工人数
        Semaphore semaphore = new Semaphore(5); //机器数目
        for(int i=0;i<N;i++)
            new Worker(i,semaphore).start();
    }

    static class Worker extends Thread{
        private int num;
        private Semaphore semaphore;
        public Worker(int num,Semaphore semaphore){
            this.num = num;
            this.semaphore = semaphore;
        }

        @Override
        public void run() {
            try {
                semaphore.acquire();
                System.out.println("工人"+this.num+"占用一个机器在生产...");
                Thread.sleep(2000);
                System.out.println("工人"+this.num+"释放出机器");
                semaphore.release();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
  • 结果

三种辅助类总结

  • CountDownLatch和CyclicBarrier都能够实现线程之间的等待,只不过它们侧重点不同:

    • CountDownLatch一般用于某个线程A等待若干个其他线程执行完任务之后,它才执行;
    • CyclicBarrier一般用于一组线程互相等待至某个状态,然后这一组线程再同时执行;
    • CountDownLatch是不能够重用的,而CyclicBarrier是可以重用的。
  • Semaphore其实和锁有点类似,它一般用于控制对某组资源的访问权限。

猜你喜欢

转载自blog.csdn.net/issunmingzhi/article/details/105550161