实际应用中,常见的有一个master机器和多个slave机器组成的集群,master常需要对slave进行心跳检测,接收来自slave的信息,这个例子中我们就使用netty来实现。
Server
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
public class Server {
public static void main(String[] args) throws Exception{
EventLoopGroup pGroup = new NioEventLoopGroup();
EventLoopGroup cGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(pGroup, cGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
//设置日志
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel sc) throws Exception {
sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
sc.pipeline().addLast(new ServerHeartBeatHandler());
}
});
ChannelFuture cf = b.bind(8765).sync();
cf.channel().closeFuture().sync();
pGroup.shutdownGracefully();
cGroup.shutdownGracefully();
}
}
服务端代码不变,主要看下server
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import java.util.HashMap;
public class ServerHeartBeatHandler extends ChannelHandlerAdapter {
/** key:ip value:auth */
private static HashMap<String, String> AUTH_IP_MAP = new HashMap<String, String>();
private static final String SUCCESS_KEY = "auth_success_key";
static {
AUTH_IP_MAP.put("169.254.90.205", "1234");
}
private boolean auth(ChannelHandlerContext ctx, Object msg){
//System.out.println(msg);
String [] ret = ((String) msg).split(",");
String auth = AUTH_IP_MAP.get(ret[0]);
if(auth != null && auth.equals(ret[1])){
ctx.writeAndFlush(SUCCESS_KEY);
return true;
} else {
ctx.writeAndFlush("auth failure !").addListener(ChannelFutureListener.CLOSE);
return false;
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if(msg instanceof String){
auth(ctx, msg);
} else if (msg instanceof RequestInfo) {
RequestInfo info = (RequestInfo) msg;
System.out.println("--------------------------------------------");
System.out.println("当前主机ip为: " + info.getIp());
System.out.println("当前主机cpu情况: ");
HashMap<String, Object> cpu = info.getCpuPercMap();
System.out.println("总使用率: " + cpu.get("combined"));
System.out.println("用户使用率: " + cpu.get("user"));
System.out.println("系统使用率: " + cpu.get("sys"));
System.out.println("等待率: " + cpu.get("wait"));
System.out.println("空闲率: " + cpu.get("idle"));
System.out.println("当前主机memory情况: ");
HashMap<String, Object> memory = info.getMemoryMap();
System.out.println("内存总量: " + memory.get("total"));
System.out.println("当前内存使用量: " + memory.get("used"));
System.out.println("当前内存剩余量: " + memory.get("free"));
System.out.println("--------------------------------------------");
ctx.writeAndFlush("info received!");
} else {
ctx.writeAndFlush("connect failure!").addListener(ChannelFutureListener.CLOSE);
}
}
}
定义了一个静态代码块,来模拟数据库中的ip和key映射关系,当客户端发送认证信息auth时,我们将根据这个映射关系来比较:
private static HashMap<String, String> AUTH_IP_MAP = new HashMap<String, String>();
private static final String SUCCESS_KEY = "auth_success_key";
static {
AUTH_IP_MAP.put("169.254.90.205", "1234");
}
Client代码也不变:
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
public class Client {
public static void main(String[] args) throws Exception{
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
sc.pipeline().addLast(new ClienHeartBeattHandler());
}
});
ChannelFuture cf = b.connect("127.0.0.1", 8765).sync();
cf.channel().closeFuture().sync();
group.shutdownGracefully();
}
}
看下ClienHeartBeattHandler
import java.net.InetAddress;
import java.util.HashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.hyperic.sigar.CpuPerc;
import org.hyperic.sigar.Mem;
import org.hyperic.sigar.Sigar;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;
public class ClienHeartBeattHandler extends ChannelHandlerAdapter {
private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
private ScheduledFuture<?> heartBeat;
//主动向服务器发送认证信息
private InetAddress addr ;
private static final String SUCCESS_KEY = "auth_success_key";
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
addr = InetAddress.getLocalHost();
String ip = addr.getHostAddress();
String key = "1234";
//证书
String auth = ip + "," + key;
ctx.writeAndFlush(auth);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
if(msg instanceof String){
String ret = (String)msg;
if(SUCCESS_KEY.equals(ret)){
// 握手成功,主动发送心跳消息
this.heartBeat = this.scheduler.scheduleWithFixedDelay(new HeartBeatTask(ctx), 0, 2, TimeUnit.SECONDS);
System.out.println(msg);
}
else {
System.out.println(msg);
}
}
} finally {
ReferenceCountUtil.release(msg);
}
}
private class HeartBeatTask implements Runnable {
private final ChannelHandlerContext ctx;
public HeartBeatTask(final ChannelHandlerContext ctx) {
this.ctx = ctx;
}
@Override
public void run() {
try {
RequestInfo info = new RequestInfo();
//ip
info.setIp(addr.getHostAddress());
Sigar sigar = new Sigar();
//cpu prec
CpuPerc cpuPerc = sigar.getCpuPerc();
HashMap<String, Object> cpuPercMap = new HashMap<String, Object>();
cpuPercMap.put("combined", cpuPerc.getCombined());
cpuPercMap.put("user", cpuPerc.getUser());
cpuPercMap.put("sys", cpuPerc.getSys());
cpuPercMap.put("wait", cpuPerc.getWait());
cpuPercMap.put("idle", cpuPerc.getIdle());
// memory
Mem mem = sigar.getMem();
HashMap<String, Object> memoryMap = new HashMap<String, Object>();
memoryMap.put("total", mem.getTotal() / 1024L);
memoryMap.put("used", mem.getUsed() / 1024L);
memoryMap.put("free", mem.getFree() / 1024L);
info.setCpuPercMap(cpuPercMap);
info.setMemoryMap(memoryMap);
ctx.writeAndFlush(info);
} catch (Exception e) {
e.printStackTrace();
}
}
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
if (heartBeat != null) {
heartBeat.cancel(true);
heartBeat = null;
}
ctx.fireExceptionCaught(cause);
}
}
}
当client连接上server后,触发channelActive方法,发送本机ip和key:1234,给服务端,服务端拿到后进行认证。
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
addr = InetAddress.getLocalHost();
String ip = addr.getHostAddress();
String key = "1234";
//证书
String auth = ip + "," + key;
ctx.writeAndFlush(auth);
}
看下服务端认证代码:
private boolean auth(ChannelHandlerContext ctx, Object msg){
//System.out.println(msg);
String [] ret = ((String) msg).split(",");
String auth = AUTH_IP_MAP.get(ret[0]);
if(auth != null && auth.equals(ret[1])){
ctx.writeAndFlush(SUCCESS_KEY);
return true;
} else {
ctx.writeAndFlush("auth failure !").addListener(ChannelFutureListener.CLOSE);
return false;
}
}
根据发客户端发送信息拆分第一个参数ip,获取得到对应key,跟服务端AUTH_IP_MAP中的key比较,如果相等则认证成功,返回给客户端一个:
private static final String SUCCESS_KEY = "auth_success_key";
客户端拿到后,握手成功,发心跳给服务端:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
if(msg instanceof String){
String ret = (String)msg;
if(SUCCESS_KEY.equals(ret)){
// 握手成功,主动发送心跳消息
this.heartBeat = this.scheduler.scheduleWithFixedDelay(new HeartBeatTask(ctx), 0, 2, TimeUnit.SECONDS);
System.out.println(msg);
}
else {
System.out.println(msg);
}
}
} finally {
ReferenceCountUtil.release(msg);
}
}
心跳检测的实现是有一个newScheduledThreadPool,延迟0s执行,每2s执行一次HeartBeatTask,发送RequestInfo当前客户端机器信息给服务端打印出来,服务端如果没收到来自客户端的消息则断开和客户端的连接。
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if(msg instanceof String){
auth(ctx, msg);
} else if (msg instanceof RequestInfo) {
RequestInfo info = (RequestInfo) msg;
System.out.println("--------------------------------------------");
System.out.println("当前主机ip为: " + info.getIp());
System.out.println("当前主机cpu情况: ");
HashMap<String, Object> cpu = info.getCpuPercMap();
System.out.println("总使用率: " + cpu.get("combined"));
System.out.println("用户使用率: " + cpu.get("user"));
System.out.println("系统使用率: " + cpu.get("sys"));
System.out.println("等待率: " + cpu.get("wait"));
System.out.println("空闲率: " + cpu.get("idle"));
System.out.println("当前主机memory情况: ");
HashMap<String, Object> memory = info.getMemoryMap();
System.out.println("内存总量: " + memory.get("total"));
System.out.println("当前内存使用量: " + memory.get("used"));
System.out.println("当前内存剩余量: " + memory.get("free"));
System.out.println("--------------------------------------------");
ctx.writeAndFlush("info received!");
} else {
ctx.writeAndFlush("connect failure!").addListener(ChannelFutureListener.CLOSE);
}
}
RequestInfo:
import java.io.Serializable;
import java.util.HashMap;
public class RequestInfo implements Serializable {
private String ip ;
private HashMap<String, Object> cpuPercMap ;
private HashMap<String, Object> memoryMap;
//.. other field
public String getIp() {
return ip;
}
public void setIp(String ip) {
this.ip = ip;
}
public HashMap<String, Object> getCpuPercMap() {
return cpuPercMap;
}
public void setCpuPercMap(HashMap<String, Object> cpuPercMap) {
this.cpuPercMap = cpuPercMap;
}
public HashMap<String, Object> getMemoryMap() {
return memoryMap;
}
public void setMemoryMap(HashMap<String, Object> memoryMap) {
this.memoryMap = memoryMap;
}
}