J.U.C-三剑客[semaphore\CyclicBarrier\CountDownLatch]

一.semaphore信号量,底层也是基于AQS
/**
* 可以理解为控制某个资源最多有多少个线程同时执行,(比如洗手间,并行与排队)
* 如果满了只能等待直到其它资源释放(可以理解为并发量控制)
* @author Binglong
* @date 2018-11-12
*/
public class SemaphoreUtils {
    public static void main(String[] args) {
        final int SH_SIZE = 10;
        Semaphore semaphore = new Semaphore(SH_SIZE);
        final int TH_NUM = 20;
        for (int i = 0; i < TH_NUM; i++) {
            ThreadPoolUtils.getSingle().threadPoolDo(new TaskSemaphore(semaphore));
        }
    }
}
 
class TaskSemaphore implements Runnable {
    private Semaphore semaphore;
 
    TaskSemaphore(Semaphore semaphore) {
        this.semaphore = semaphore;
    }
 
    public void run() {
        String threadName = Thread.currentThread().getName();
        try {
            this.semaphore.acquire();
            System.out.println(threadName + ":occupy...");
            Thread.sleep(new Random().nextInt(10000));
            System.out.println(threadName + ":over...");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            semaphore.release();
        }
    }
}
 
二、CyclicBarrier
1.构造方法
//等待parties个线程后,先完成barrierAction的run方法,其它线程继续执行
public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    this.parties = parties;
    this.count = parties;
    this.barrierCommand = barrierAction;
}
 
//只是做等待parties个线程
public CyclicBarrier(int parties) {
    this(parties, null);
}
 
2.重要方法
a.wait()方法,当调用wait()方法的线程数量,达到CyclicBarrier构造方法的N时,(CyclicBarrier在构造方法的Runnable barrierAction,方法完成后,当前线程继续执行)
在CyclicBarrier上等待的线程数量达到parties,则所有线程被释放,继续执行。
当前线程被中断,则抛出InterruptedException异常,并停止等待,继续执行。
当前线程等待超时,则抛出TimeoutException异常,并停止等待,继续执行。
其他等待的线程被中断,则当前线程抛出BrokenBarrierException异常,并停止等待,继续执行。
其他等待的线程超时,则当前线程抛出BrokenBarrierException异常,并停止等待,继续执行。
其他线程调用CyclicBarrier.reset()方法,则当前线程抛出BrokenBarrierException异常,并停止等待,继续执行
 
public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException,TimeoutException {
    return dowait(true, unit.toNanos(timeout));
}
 
public int await() throws InterruptedException, BrokenBarrierException {
    try {
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen
    }
}
 
public static void main(String[] args) {
    final int N = 5;
    final CyclicBarrier cyclic = new CyclicBarrier(N, new Runnable() {
        public void run() {
            try {
                System.out.println("汇总计算开始");
                Thread.sleep(Math.abs(10));
                System.out.println("汇总计算完成");
            } catch (Exception e) {
                e.printStackTrace();
            }
 
        }
    });
 
    for (int i = 0; i < N; i++) {
        final int t = i;
        new Thread(new Runnable() {
            public void run() {
                try {
                    System.out.println(t + "中心数据已计算开始");
                    Thread.sleep(Math.abs(new Random().nextInt() % 10000));
                    System.out.println(t + "中心数据已计算结束");
                    cyclic.await();
                    System.out.println(t + "中心数据退出");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}
 
 
0中心数据已计算开始
3中心数据已计算开始
4中心数据已计算开始
2中心数据已计算开始
4中心数据已计算结束
1中心数据已计算开始
1中心数据已计算结束
3中心数据已计算结束
2中心数据已计算结束
0中心数据已计算结束
汇总计算开始
汇总计算完成
0中心数据退出
1中心数据退出
4中心数据退出
2中心数据退出
3中心数据退出
b.getParties()获取CyclicBarrier等待的线程数,也就是CyclicBarrier构造方法参数parties的值
 
c.getNumberWaiting() how many thread wait now
 
d.rest()
如果有正在等待的线程,则会抛出BrokenBarrierException异常,且这些线程停止等待,继续执行。
将是否破损标志位broken置为false。
 
三、CountDownLatch
1.使用
三个方法
CountDownLatch(int count):构造器中的计数值(count)。
 
void await() :会一直阻塞当前线程,直到计时器的值为0
 
void countDown():计数减一
 
/**
* countDownLatch.countDown()调用一次减一,到0时,其它await方法继续往下执行
* 可以做并发开关(把SIZE设置为1,通过主线程来countDown(),其它线程都调用await()方法)
* @author Binglong
* @date 2018-11-12
*/
public class CountDownLatchUtils {
    public static void main(String[] args) throws InterruptedException {
        final int SIZE = 20;
        CountDownLatch countDownLatch = new CountDownLatch(SIZE);
        for (int i = 0; i < SIZE; i++) {
            ThreadPoolUtils.getSingle().threadPoolDo(new TaskCountDownLatch(countDownLatch));
        }
        System.out.println("waiting.....");
//        Thread.sleep(10000);
//        countDownLatch.countDown();
    }
}
 
class TaskCountDownLatch implements Runnable {
 
    private CountDownLatch countDownLatch;
 
    TaskCountDownLatch(CountDownLatch countDownLatch) {
        this.countDownLatch = countDownLatch;
    }
 
    public void run() {
        String name = Thread.currentThread().getName();
        try {
            System.out.println(name + ":waiting.."+countDownLatch.getCount());
            //等待一定数量任务继续执行
            Thread.sleep(new Random().nextInt(10000));
            countDownLatch.countDown();
            System.out.println(name + ":over...");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
 
 
2.原理
CountDownLatch源代码是有内部类Sync实现,而Sync是继承AQS(抽象队列同步器)
 
private static final class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 4982264981922014374L;
 
    Sync(int count) {
        setState(count);
    }
 
    int getCount() {
        return getState();
    }
 
    protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1;
    }
 
    protected boolean tryReleaseShared(int releases) {
        for (;;) {
            int c = getState();
            if (c == 0)
                return false;
            int nextc = c-1;
            if (compareAndSetState(c, nextc))
                return nextc == 0;
        }
    }
}
 
//构造器
public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}
 
//countDown方法
public void countDown() {
    //releaseShared方法是抽象队列同步器的方法
    sync.releaseShared(1);
}
 
//await方法
public void await() throws InterruptedException {
    //acquireSharedInterruptibly方法是抽象队列同步器的方法
    sync.acquireSharedInterruptibly(1);
}

猜你喜欢

转载自www.cnblogs.com/nedhome/p/10315759.html