1、同步队列
同步队列,顾名思义就是当队列中的元素满了的时候去做事情。
例如:一个公司组织旅游,只有当报名了的员工全部到齐后司机才能出发。
我们把需求拆分开:
1、员工签到,我们可以用zookeeper的znode节点来模拟,在/queue路径下创建子节点,签到一个创建一个znode节点
2、员工到齐,只有当/queue节点下的子节点满了以后,才会通知司机发车,比如公司有100个人,那么就是/queue节点下有100个节点是,才会通知司机出发
3、通知司机,如何通知司机呢?我们可以用zookeeper的事件通知机制,司机客户端去监控某个节点,比如/start节点,当/queue队列满了的时候去创建一个/start节点,如此,监控了/start节点的客户端(这里就是司机客户端)就会收到事件触发。
同步队列如图:
OK~~~~~讲了这么多,直接上代码吧!!
这个是同步队列的核心代码:
public class ZKQueue {
private static String path = "/queue";
private static String start = "/start";
private static int limit;
private ZooKeeper client = ZookeeperUtil.getClient();
public ZKQueue(int limit) {
ZKQueue.limit = limit;
init();
}
private void init() {
try {
if(client.exists(path,false) == null) {
client.create(path,"xx".getBytes(),ZooDefs.Ids
.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void push(String data) {
try {
//000000089,000000090,0000000091 /queue/0000000091
String currentPath = client.create(path + "/", data.getBytes(), ZooDefs.Ids
.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
//队列已经满了
if(limit <= size()) {
List<String> childrens = client.getChildren(path, false);
Collections.sort(childrens);
//二分法排序,拿到当前节点在childrens这个list中的位置? 只有当位置是最后一个的时候,我才
//创建start节点
int i = Collections.binarySearch(childrens,currentPath.substring
(path
.length() + 1));
//只有最后一个位置的时候我才创建start节点
if(i == (limit -1)) {
System.out.println("队列满了,创建start节点~~~~");
client.create(start,data.getBytes(), ZooDefs.Ids
.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
}
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//获取到path节点下面的子节点个数
private int size() {
try {
return client.getChildren(path,false).size();
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return 0;
}
}
ZookeeperUtil工具类代码:
public class ZookeeperUtil {
private static String connectStr = "192.168.60.104:2181";
private static CountDownLatch cdl = new CountDownLatch(1);
public static ZooKeeper getClient() {
//CONNECTING 正在连接 CONNECTED 连接状态
// watcher这个匿名类就是一个事件模板定义
try {
ZooKeeper client = new ZooKeeper(connectStr, 5000, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
//如果连接状态变成了connected就会触发这个事件
if (Event.KeeperState.SyncConnected == watchedEvent
.getState()) {
//第一次连接
if (Event.EventType.None == watchedEvent.getType() &&
null == watchedEvent.getPath()) {
System.out.println("触发了连接状态事件!!" + watchedEvent
.getType());
cdl.countDown();
} else if (Event.EventType.NodeCreated == watchedEvent
.getType()) {
System.out.println("触发了节点创建事件" + watchedEvent.getType());
System.out.println(Thread.currentThread().getName
() + "触发了事件,并且得到了通知!!");
} else if (Event.EventType.NodeDataChanged ==
watchedEvent.getType()) {
System.out.println("触发了节点修改事件" + watchedEvent
.getType());
} else if (Event.EventType.NodeDeleted == watchedEvent
.getType()) {
System.out.println("触发了节点删除事件" + watchedEvent
.getType());
}
}
}
});
cdl.await();
client.exists("/start",true);
return client;
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
return null;
}
}
测试类代码:
public class MyTest {
public static void main(String[] args) {
int limit = 10;
for (int i = 0; i < limit; i++) {
new Thread(new Runnable() {
@Override
public void run() {
ZKQueue queue = new ZKQueue(10);
queue.push("xx");
}
}).start();
}
}
}
OK,大功告成,这个就是同步队列基于zookeeper的实现~~