用户服务
package com.demo.local.zookeeper;
import lombok.Data;
import java.io.Serializable;
@Data
public class UserCenter implements Serializable {
private Integer id;
private String name;
}
选主器
package com.demo.local.zookeeper;
import com.sun.istack.internal.NotNull;
import lombok.Data;
import lombok.NonNull;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.IZkStateListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkException;
import org.I0Itec.zkclient.exception.ZkNodeExistsException;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@Data
public class MasterSelector {
@NonNull
private ZkClient zkClient;
@NonNull
private UserCenter userCenter;
private UserCenter master;
private Boolean isRunning = false;
private ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
private static final String MASTER_PATH = "/master";
public void start() {
if (isRunning) {
return;
}
chooseMaster();
isRunning = true;
}
private void chooseMaster() {
try {
if (userCenter != master) {
System.out.println(userCenter.getName() + ":争抢master");
zkClient.createEphemeral(MASTER_PATH, userCenter);
master = userCenter;
System.out.println(userCenter.getName() + ":现在我是master");
executorService.schedule(() -> releaseMaster(), 1, TimeUnit.SECONDS);
}
} catch (ZkNodeExistsException e) {
zkClient.subscribeDataChanges(MASTER_PATH, new IZkDataListener() {
@Override
public void handleDataChange(String s, Object o) throws Exception {
}
@Override
public void handleDataDeleted(String s) throws Exception {
System.out.println(userCenter.getName() + ":master消失");
chooseMaster();
}
});
}
}
private void releaseMaster() {
UserCenter data = zkClient.readData(MASTER_PATH, true);
if (data != null && userCenter.getId().equals(data.getId())) {
zkClient.delete(MASTER_PATH, -1);
System.out.println(userCenter.getName() + ":现在没有master");
}
master = null;
}
}
运行代码
package com.demo.local.zookeeper;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.SerializableSerializer;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import java.sql.Time;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class MasterTest {
public static void main(String[] args) throws InterruptedException {
String address = "localhost:2181";
CountDownLatch latch = new CountDownLatch(10);
for (int i = 0; i < 10; i++) {
UserCenter userCenter = new UserCenter();
userCenter.setId(i);
userCenter.setName("用户服务:" + i);
ZkClient client = new ZkClient(address, 5000, 10000, new SerializableSerializer());
ExecutorService executorService = Executors.newFixedThreadPool(10);
MasterSelector selector = new MasterSelector(client, userCenter);
executorService.submit(() -> {
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
selector.start();
});
latch.countDown();
}
}
}