ChannelPipeline
ChannelPipeline是ChannelHandler的容器,他负责ChannelHandler的事件拦截.
@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handler);
newCtx = newContext(group, filterName(name, handler), handler);
addLast0(newCtx);
// If the registered is false it means that the channel was not registered on an eventloop yet.
// In this case we add the context to the pipeline and add a task that will call
// ChannelHandler.handlerAdded(...) once the channel is registered.
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
newCtx.setAddPending();
executor.execute(new Runnable() {
@Override
public void run() {
callHandlerAdded0(newCtx);
}
});
return this;
}
}
callHandlerAdded0(newCtx);
return this;
}
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}
ChannelHandler
看个例子
/**
*
* Title: HelloServerHandler
* Description: 服务端业务逻辑
* Version:1.0.0
* @author guokaige
* @date 2017-8-31
*/
public class NettyServerHandler extends SimpleChannelInboundHandler<String> {
/*
* 收到消息时,返回信息
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg)
throws Exception {
// 收到消息直接打印输出
System.out.println("服务端接受的消息 : " + msg);
if("quit".equals(msg)){//服务端断开的条件
ctx.close();
}
Date date=new Date();
// 返回客户端消息
ctx.writeAndFlush(date+"\n");
}
/*
* 建立连接时,返回消息
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("连接的客户端地址:" + ctx.channel().remoteAddress());
ctx.writeAndFlush("客户端"+ InetAddress.getLocalHost().getHostName() + "成功与服务端建立连接! \n");
super.channelActive(ctx);
}
}