DistributedBarrier
package com.aop8.curator.barrier;
import java.text.SimpleDateFormat;
import java.util.Date;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.atomic.AtomicValue;
import org.apache.curator.framework.recipes.atomic.DistributedAtomicInteger;
import org.apache.curator.framework.recipes.barriers.DistributedBarrier;
import org.apache.curator.framework.recipes.queue.DistributedQueue;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.retry.RetryNTimes;
import com.aop8.CommonsUtils;
public class CuratorBarrier2 {
/** session超时时间 */
static final int SESSION_OUTTIME = 5000;//ms
static DistributedBarrier barrier = null;
public static void main(String[] args) throws Exception {
for(int i = 0; i < 5; i++){
new Thread(new Runnable() {
@Override
public void run() {
try {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
CuratorFramework cf = CuratorFrameworkFactory.builder()
.connectString("127.0.0.1:2181")
.sessionTimeoutMs(SESSION_OUTTIME)
.retryPolicy(retryPolicy)
.build();
cf.start();
barrier = new DistributedBarrier(cf, "/super");
System.out.println(Thread.currentThread().getName() + "设置barrier!");
barrier.setBarrier(); //设置
barrier.waitOnBarrier(); //等待
System.out.println("---------开始执行程序----------");
} catch (Exception e) {
e.printStackTrace();
}
}
},"t" + i).start();
}
//Thread.sleep(1000); // 如果时间过短时,会报空指针异常的
Thread.sleep(1000*10); // 等待时间需要估算,确保所有的线程都处于 waitOnBarrier() ,否则会报空指针。
barrier.removeBarrier(); //释放
}
}
运行结果:
t0设置barrier!
t3设置barrier!
t2设置barrier!
t1设置barrier!
t4设置barrier!
---------开始执行程序----------
---------开始执行程序----------
---------开始执行程序----------
---------开始执行程序----------
---------开始执行程序----------
在上面这个实例程序中,我们模拟了 5个线程,通过调用 DistributedBarrier. setBarrier() 方法来完成 Barrier 的设置,并通过调用DistributedBarrier.waitOnBarrier() 方法来等待 Barrier 的释放。然后在主线程中,通过调用 DistributedBarrier.removeBarrier() 方法来释放 Barrier,同时触发所有等待 该Barrier的5个线程同时进行各自的业务逻辑。
需要由主线程来控制 Barrier 的释放 。
DistributedDoubleBarrier
package com.aop8.curator.barrier;
import java.util.Random;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.barriers.DistributedDoubleBarrier;
import org.apache.curator.retry.ExponentialBackoffRetry;
import com.aop8.CommonsUtils;
public class CuratorBarrier1 {
/** session超时时间 */
static final int SESSION_OUTTIME = 5000;//ms
public static void main(String[] args) throws Exception {
for(int i = 0; i < 5; i++){
new Thread(new Runnable() {
@Override
public void run() {
try {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 2);
CuratorFramework cf = CuratorFrameworkFactory.builder()
.connectString("127.0.0.1:2181")
.retryPolicy(retryPolicy)
.build();
cf.start();
DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(cf, "/super", 5);
Thread.sleep(1000 * (new Random()).nextInt(5));
System.out.println(Thread.currentThread().getName() + "已经准备");
barrier.enter();
System.out.println("同时开始运行...");
Thread.sleep(1000 * (new Random()).nextInt(5));
System.out.println(Thread.currentThread().getName() + "运行完毕");
barrier.leave();
System.out.println("同时退出运行...");
} catch (Exception e) {
e.printStackTrace();
}
}
},"t" + i).start();
}
}
}
上面这个示例程序就是一个和 JDK自带的 CyclicBarrier 非常类似的实现了,它们都指定了进入Barrier的成员数阈值,例如上面示例程序中的“5”。每个Barrier的参与 者都会在调用 DistributedDoubleBarrier.enter() 方法之后进行等待,此时处 于准备进入状态。一旦准备进入Barrier 的成员数达到5个后,所有的成员会被同时触 发进人。之后调用 DistributedDoubleBarrier.leave() 方法则会再次等待,此时处于准备退出状态。一旦准备退出Barrier的成员数达到5个后,所有的成员同样会 被同时触发退出。因此,使用Curator的 DistributedDoubleBarrier能够很好地实现一个分布式Barrier,并控制其同时进入和退出。
运行结果:
t2已经准备
t4已经准备
t0已经准备
t3已经准备
t1已经准备
同时开始运行...
同时开始运行...
同时开始运行...
同时开始运行...
同时开始运行...
t3运行完毕
t2运行完毕
t0运行完毕
t4运行完毕
t1运行完毕
同时退出运行...
同时退出运行...
同时退出运行...
同时退出运行...
同时退出运行...