原创转载请注明出处:http://agilestyle.iteye.com/blog/2343903
CountDownLatch
CountDownLatch所提供的功能是判断count计数不为0时,则当前线程处于wait状态。
await()的作用是实现等待,判断计数是否为0,如果不为0则呈等待状态。
countDown()的作用是继续运行,其他线程可以调用此方法将计数减1,当计数减到为0时,呈等待的线程继续运行。
getCount()的作用是获得当前的计数个数。
用CountDownLatch模拟一个田径短跑的例子,10个线程代表10名选手
MyThread.java
package org.fool.java.concurrent.countdownlatch; import java.util.concurrent.CountDownLatch; public class MyThread implements Runnable { private CountDownLatch comingTag; private CountDownLatch waitStartTag; private CountDownLatch waitRunTag; private CountDownLatch beginTag; private CountDownLatch endTag; public MyThread(CountDownLatch comingTag, CountDownLatch waitStartTag, CountDownLatch waitRunTag, CountDownLatch beginTag, CountDownLatch endTag) { this.comingTag = comingTag; this.waitStartTag = waitStartTag; this.waitRunTag = waitRunTag; this.beginTag = beginTag; this.endTag = endTag; } @Override public void run() { try { System.out.println(Thread.currentThread().getName() + " 走向起跑点..."); Thread.sleep((int) (Math.random() * 10000)); System.out.println(Thread.currentThread().getName() + " 到达起跑点..."); comingTag.countDown(); waitStartTag.await(); Thread.sleep((int) (Math.random() * 10000)); waitRunTag.countDown(); beginTag.await(); System.out.println(Thread.currentThread().getName() + " 加速起跑..."); Thread.sleep((int) (Math.random() * 10000)); endTag.countDown(); System.out.println(Thread.currentThread().getName() + " 到达终点..."); } catch (InterruptedException e) { e.printStackTrace(); } } }
CountDownLatchTest4.java
package org.fool.java.concurrent.countdownlatch; import java.util.concurrent.CountDownLatch; public class CountDownLatchTest4 { public static void main(String[] args) { try { CountDownLatch comingTag = new CountDownLatch(10); CountDownLatch waitStartTag = new CountDownLatch(1); CountDownLatch waitRunTag = new CountDownLatch(10); CountDownLatch beginTag = new CountDownLatch(1); CountDownLatch endTag = new CountDownLatch(10); for (int i = 0; i < 10; i++) { Thread thread = new Thread(new MyThread(comingTag, waitStartTag, waitRunTag, beginTag, endTag)); thread.setName("Thread " + (i + 1)); thread.start(); } System.out.println("裁判在等待选手的到来..."); comingTag.await(); System.out.println("裁判确认所有选手就位...."); Thread.sleep(5000); waitStartTag.countDown(); System.out.println("各就各位!预备!"); waitRunTag.await(); Thread.sleep(2000); System.out.println("发令枪响起!"); beginTag.countDown(); endTag.await(); System.out.println("所有选手到达终点,统计名次!"); } catch (InterruptedException e) { e.printStackTrace(); } } }
Run
await(long timeout, TimeUnit unit)
await(long timeout, TimeUnit unit)的作用是使线程在指定的最大时间单位内进入WAITING状态,如果超过这个时间则自动唤醒,程序继续向下运行。
CountDownLatchTest5.java
package org.fool.java.concurrent.countdownlatch; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; public class CountDownLatchTest5 { public static void main(String[] args) { Service service = new Service(); for (int i = 0; i < 3; i++) { Thread thread = new Thread(new MyThread(service)); thread.start(); } } public static class Service { private CountDownLatch latch = new CountDownLatch(1); public void testMethod() { try { System.out.println(Thread.currentThread().getName() + " start..."); latch.await(3, TimeUnit.SECONDS); System.out.println(Thread.currentThread().getName() + " end..."); } catch (InterruptedException e) { e.printStackTrace(); } } } public static class MyThread implements Runnable { private Service service; public MyThread(Service service) { this.service = service; } @Override public void run() { service.testMethod(); } } }
Run
getCount()
getCount()作用是获取当前计数的值
CountDownLatchTest6.java
package org.fool.java.concurrent.countdownlatch; import java.util.concurrent.CountDownLatch; public class CountDownLatchTest6 { public static void main(String[] args) { CountDownLatch latch = new CountDownLatch(3); System.out.println(latch.getCount()); latch.countDown(); System.out.println(latch.getCount()); latch.countDown(); System.out.println(latch.getCount()); latch.countDown(); System.out.println(latch.getCount()); latch.countDown(); System.out.println(latch.getCount()); latch.countDown(); System.out.println(latch.getCount()); } }
Run
CountDownLatch使用例子
在这个例子中,模拟一个应用程序启动类,它开始时启动了n个线程类,这些线程将检查外部系统并通知闭锁,并且启动类一直在闭锁上等待着。一旦验证和检查了所有外部服务,那么启动类恢复执行。
BaseHealthChecker.java
package org.fool.test.countdownlatch; import java.util.concurrent.CountDownLatch; public abstract class BaseHealthChecker implements Runnable { private CountDownLatch latch; private String serviceName; private boolean serviceUp; public BaseHealthChecker(String serviceName, CountDownLatch latch) { this.serviceName = serviceName; this.latch = latch; this.serviceUp = false; Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { @Override public void run() { System.out.println(getServiceName() + " down..."); } })); } @Override public void run() { try { verifyService(); serviceUp = true; } catch (Exception e) { e.printStackTrace(); serviceUp = false; } finally { if (latch != null) { latch.countDown(); } } } public String getServiceName() { return serviceName; } public boolean isServiceUp() { return serviceUp; } public abstract void verifyService(); }
NetworkHealthChecker.java
package org.fool.test.countdownlatch; import java.util.concurrent.CountDownLatch; public class NetworkHealthChecker extends BaseHealthChecker { public NetworkHealthChecker(CountDownLatch latch) { super("Network Service", latch); } @Override public void verifyService() { System.out.println("Checking " + this.getServiceName()); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(this.getServiceName() + " is UP"); } }
DatabaseHealthChecker.java
package org.fool.test.countdownlatch; import java.util.concurrent.CountDownLatch; public class DatabaseHealthChecker extends BaseHealthChecker { public DatabaseHealthChecker(CountDownLatch latch) { super("Database Service", latch); } @Override public void verifyService() { System.out.println("Checking " + this.getServiceName()); try { Thread.sleep(15000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(this.getServiceName() + " is UP"); } }
CacheHealthChecker.java
package org.fool.test.countdownlatch; import java.util.concurrent.CountDownLatch; public class CacheHealthChecker extends BaseHealthChecker { public CacheHealthChecker(CountDownLatch latch) { super("Cache Service", latch); } @Override public void verifyService() { System.out.println("Checking " + this.getServiceName()); try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(this.getServiceName() + " is UP"); } }
ApplicationStartupUtil.java
package org.fool.test.countdownlatch; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class ApplicationStartupUtil { private static List<BaseHealthChecker> services; private static CountDownLatch latch; private static ApplicationStartupUtil instance = new ApplicationStartupUtil(); public static ApplicationStartupUtil getInstance() { return instance; } private ApplicationStartupUtil() { } public boolean checkExternalServices() throws Exception { latch = new CountDownLatch(3); services = new ArrayList<>(); services.add(new NetworkHealthChecker(latch)); services.add(new CacheHealthChecker(latch)); services.add(new DatabaseHealthChecker(latch)); ExecutorService executor = Executors.newFixedThreadPool(services.size()); for (BaseHealthChecker service : services) { executor.execute(service); } latch.await(); for (BaseHealthChecker service : services) { if(!service.isServiceUp()) { return false; } } return true; } }
MainTest.java
package org.fool.test.countdownlatch; public class MainTest { public static void main(String[] args) throws Exception { boolean result = ApplicationStartupUtil.getInstance().checkExternalServices(); System.out.println("External services validation completed !! Result was :: "+ result); } }
Console Output
Reference
Java并发编程核心方法与框架
http://www.importnew.com/15731.html