Curator 分布式 Barrier

DistributedBarrier

package com.aop8.curator.barrier;

import java.text.SimpleDateFormat;
import java.util.Date;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.atomic.AtomicValue;
import org.apache.curator.framework.recipes.atomic.DistributedAtomicInteger;
import org.apache.curator.framework.recipes.barriers.DistributedBarrier;
import org.apache.curator.framework.recipes.queue.DistributedQueue;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.retry.RetryNTimes;

import com.aop8.CommonsUtils;

public class CuratorBarrier2 {

	/** session超时时间 */
	static final int SESSION_OUTTIME = 5000;//ms 
	
	static DistributedBarrier barrier = null;
	
	public static void main(String[] args) throws Exception {
		
		
		for(int i = 0; i < 5; i++){
			new Thread(new Runnable() {
				@Override
				public void run() {
					try {
						RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
						CuratorFramework cf = CuratorFrameworkFactory.builder()
									.connectString("127.0.0.1:2181")
									.sessionTimeoutMs(SESSION_OUTTIME)
									.retryPolicy(retryPolicy)
									.build();
						cf.start();
						
						
						barrier = new DistributedBarrier(cf, "/super");
						System.out.println(Thread.currentThread().getName() + "设置barrier!");			
						
						barrier.setBarrier();	//设置
						
						barrier.waitOnBarrier();	//等待
						System.out.println("---------开始执行程序----------");
						
					} catch (Exception e) {
						e.printStackTrace();
					}
				}
			},"t" + i).start();
		}
		
		//Thread.sleep(1000); // 如果时间过短时,会报空指针异常的
		Thread.sleep(1000*10);  // 等待时间需要估算,确保所有的线程都处于 waitOnBarrier() ,否则会报空指针。
		barrier.removeBarrier();	//释放		
		
	}
}

运行结果:

t0设置barrier!
t3设置barrier!
t2设置barrier!
t1设置barrier!
t4设置barrier!
---------开始执行程序----------
---------开始执行程序----------
---------开始执行程序----------
---------开始执行程序----------
---------开始执行程序----------

在上面这个实例程序中,我们模拟了 5个线程,通过调用 DistributedBarrier. setBarrier() 方法来完成 Barrier 的设置,并通过调用DistributedBarrier.waitOnBarrier() 方法来等待 Barrier 的释放。然后在主线程中,通过调用 DistributedBarrier.removeBarrier() 方法来释放 Barrier,同时触发所有等待 该Barrier的5个线程同时进行各自的业务逻辑。

需要由主线程来控制 Barrier 的释放 。

DistributedDoubleBarrier

package com.aop8.curator.barrier;

import java.util.Random;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.barriers.DistributedDoubleBarrier;
import org.apache.curator.retry.ExponentialBackoffRetry;

import com.aop8.CommonsUtils;

public class CuratorBarrier1 {

	/** session超时时间 */
	static final int SESSION_OUTTIME = 5000;//ms 
	
	public static void main(String[] args) throws Exception {		
		
		for(int i = 0; i < 5; i++){
			new Thread(new Runnable() {
				@Override
				public void run() {
					try {
						RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 2);
						CuratorFramework cf = CuratorFrameworkFactory.builder()
									.connectString("127.0.0.1:2181")
									.retryPolicy(retryPolicy)
									.build();
						cf.start();
						
						DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(cf, "/super", 5);
						Thread.sleep(1000 * (new Random()).nextInt(5)); 
						System.out.println(Thread.currentThread().getName() + "已经准备");
						
						barrier.enter();
						System.out.println("同时开始运行...");
						
						
						Thread.sleep(1000 * (new Random()).nextInt(5));
						System.out.println(Thread.currentThread().getName() + "运行完毕");
						
						barrier.leave();
						System.out.println("同时退出运行...");						

					} catch (Exception e) {
						e.printStackTrace();
					}
				}
			},"t" + i).start();
		}		
		
	}
}

上面这个示例程序就是一个和 JDK自带的 CyclicBarrier 非常类似的实现了,它们都指定了进入Barrier的成员数阈值,例如上面示例程序中的“5”。每个Barrier的参与 者都会在调用 DistributedDoubleBarrier.enter() 方法之后进行等待,此时处 于准备进入状态。一旦准备进入Barrier 的成员数达到5个后,所有的成员会被同时触 发进人。之后调用 DistributedDoubleBarrier.leave() 方法则会再次等待,此时处于准备退出状态。一旦准备退出Barrier的成员数达到5个后,所有的成员同样会 被同时触发退出。因此,使用Curator的 DistributedDoubleBarrier能够很好地实现一个分布式Barrier,并控制其同时进入和退出。

运行结果:

t2已经准备
t4已经准备
t0已经准备
t3已经准备
t1已经准备
同时开始运行...
同时开始运行...
同时开始运行...
同时开始运行...
同时开始运行...
t3运行完毕
t2运行完毕
t0运行完毕
t4运行完毕
t1运行完毕
同时退出运行...
同时退出运行...
同时退出运行...
同时退出运行...
同时退出运行...

猜你喜欢

转载自blog.csdn.net/xiaojin21cen/article/details/90137427