netty的一个小Demo-有ack和重连的功能

起因

以前对接过一个tcp协议的接口,实现对类似于手机的pdt设备发送文本文字的功能,对接协议其实是文本形式的,很简单的一种协议。当初一路坎坷的对接完成,那时候实现方式也比较复杂,没有支持断连重连功能,本想着能优化一下,但是直到我从那家公司离职,也没有优化:)

想在回想起来当初实现功能的过程,比较曲折,这其中,当然是因为不熟悉多线程编程,对netty实现方式不熟悉。后续继续看netty的时候,就写下了这个demo程序,希望以后能够用到,这个demo搭建起来后,研究netty就比较方便了,如果以后有人也用netty对接什么协议接口,也是给其他人一些参考。

log4j配置文件

log4j.rootCategory=DEBUG,stdout 
 
log4j.appender.stdout=org.apache.log4j.ConsoleAppender 
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout 
log4j.appender.stdout.layout.ConversionPattern=%p %t %d{yyyy-MM-dd HH:mm:ss}  %C %m%n

项目公有依赖

        <dependency> 
            <groupId>org.slf4j</groupId> 
            <artifactId>slf4j-api</artifactId> 
            <version>1.7.7</version> 
        </dependency> 
        <dependency> 
            <groupId>org.slf4j</groupId> 
            <artifactId>slf4j-log4j12</artifactId> 
            <version>1.7.7</version> 
        </dependency> 
        <dependency> 
            <groupId>log4j</groupId> 
            <artifactId>log4j</artifactId> 
            <version>1.2.17</version> 
        </dependency> 
        <dependency> 
            <groupId>io.netty</groupId> 
            <artifactId>netty-all</artifactId> 
            <version>4.1.29.Final</version> 
        </dependency> 
        <dependency> 
            <groupId>com.alibaba</groupId> 
            <artifactId>fastjson</artifactId> 
            <version>1.2.49</version> 
        </dependency> 

客户端

public class Client {
    
    private final Random random = new Random();

    private final static Logger log = LoggerFactory.getLogger(Client.class);

    private final AtomicInteger requestSequence = new AtomicInteger(0);
    private final Timer timer = new Timer("NettyClient_scanneResponseTable", true);

    private Bootstrap bootstrap = new Bootstrap();
    private EventLoopGroup eventLoopGroupWorker = new NioEventLoopGroup(2, new ThreadFactory() {
        private AtomicInteger threadIndex = new AtomicInteger(-1);

        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, String.format("NettyClient_%d", this.threadIndex.incrementAndGet()));
        }
    });
    private final int connectTimeout = 5000;
    private final String hostname;
    private final int port;
    private final String charsetName = "UTF-8";
    private Channel channel;
    private volatile boolean inited = false;
    private final int sendTimeout = 5;
    private final int waitResponseTimeout = 10;

    protected final ConcurrentMap<Integer, ReponseWrapper> responseTable = new ConcurrentHashMap<Integer, ReponseWrapper>(
            256);

    public Client(String hostname, int port) {
        super();
        this.hostname = hostname;
        this.port = port;
    }
    
    /**
     * 初始化
     */
    private void init() {
        
        log.info("初始化");
        
        final Charset charset = Charset.forName(this.charsetName);
        
        bootstrap.group(eventLoopGroupWorker)//
        .channel(NioSocketChannel.class)//
        .option(ChannelOption.TCP_NODELAY, true)//
        .option(ChannelOption.SO_KEEPALIVE, false)//
        .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, this.connectTimeout)//
        .handler(new ChannelInitializer<SocketChannel>() {//
            @Override
            public void initChannel(SocketChannel ch) throws Exception {
                ChannelPipeline pipeline = ch.pipeline();
                ByteBuf delimiter = Unpooled.copiedBuffer("\n".getBytes());
                pipeline.addLast(//
                        new StringDecoder(charset), //
                        new DelimiterBasedFrameDecoder(1024, delimiter), //
                        new StringEncoder(charset), //
                        new NettyClinetHandler()//
                        );
            }
        });
    }

    /**
     * 连接服务端
     */
    public void connect() {
        
        if(!inited) {
            this.init();
        }
        
        log.info("开始连接");
        
        final ChannelFuture cf = bootstrap.connect(this.hostname, this.port);
        try {
            cf.await(this.connectTimeout, TimeUnit.SECONDS);
            if (cf.isSuccess()) {
                log.info("连接[{}]成功", cf.channel());
                this.channel = cf.channel();
                this.inited = true;
                this.timer.scheduleAtFixedRate(new TimerTask() {
                    @Override
                    public void run() {
                        try {
                            Client.this.scanResponseTable();
                        } catch (Throwable e) {
                            log.error("scanResponseTable exception", e);
                        }
                    }
                }, 1000 * 3, 1000);
            } else {
                if(!inited) {
                    //是首次连接
                    this.eventLoopGroupWorker.shutdownGracefully();                    
                }else {
                    log.info("继续重连");
                    this.eventLoopGroupWorker.schedule(new ReconnectTask(), nextReconnectDelayTime(), TimeUnit.SECONDS);
                }
            }
        } catch (InterruptedException e) {
            log.error("connect[{}] cause exception", cf.channel(), e);
        }
    }
    
    /**
     * 重连随机时间
     * @return
     */
    protected int nextReconnectDelayTime() {
        return this.random.nextInt(5);
    }
    
    /**
     * 断开连接 
     */
    public void disconnect() {
        this.timer.cancel();
        Future<?> future = this.eventLoopGroupWorker.shutdownGracefully();
        try {
            future.await(5, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            log.error("断开连接异常",e);
        }
        this.channel.close();
    }

    /**
     * 发送消息,true成功,false失败
     * 
     * @param msg
     * @return
     */
    public boolean send(String msg) {

        final Integer seq = requestSequence.incrementAndGet();

        JSONObject jsonObject = new JSONObject();
        jsonObject.put("seq", seq);
        jsonObject.put("msg", msg);

        final ChannelFuture channelFuture = this.channel.writeAndFlush(jsonObject.toJSONString() + "\n");

        final int timeoutMillis = (this.sendTimeout + this.waitResponseTimeout) * 1000;
        final ReponseWrapper rep = new ReponseWrapper(channelFuture, timeoutMillis);

        channelFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                responseTable.put(seq, rep);
                rep.setSendSuccess(true);
                rep.releaseSendMessage();
            }
        });
        try {
            rep.awaitSendMessage(this.sendTimeout,TimeUnit.SECONDS);
            rep.setSendSuccess(true);
        } catch (InterruptedException e) {
            log.error("awaitSendMessage[{}] cause exception", channelFuture.channel(), e);
            return false;
        }

        if (responseTable.containsKey(seq)) {
            try {
                rep.awaitResponse(this.waitResponseTimeout,TimeUnit.SECONDS);
                return rep.isResponseSuccess();
                
            } catch (InterruptedException e) {
                log.error("awaitResponse[{}] cause exception", channelFuture.channel(), e);
                return false;
            }
        } else {
            return false;
        }
    }

    /**
     * 检测请求和响应
     */
    protected void scanResponseTable() {

        Iterator<Entry<Integer, ReponseWrapper>> it = this.responseTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<Integer, ReponseWrapper> next = it.next();
            ReponseWrapper rep = next.getValue();

            long time = rep.getBeginTimestamp() + rep.getTimeoutMillis() + 1000;
            if (time <= System.currentTimeMillis()) {
                rep.releaseAll();
                it.remove();
                log.warn("remove timeout request, " + rep);
            }
        }
    }

    /**
     * 业务处理
     * @param ctx
     * @param msg
     */
    protected void processMessageReceived(ChannelHandlerContext ctx, String msg) {
        log.trace("接收消息[{}]",msg);
        JSONObject jsonObject = JSONObject.parseObject(msg);
        Integer seq = jsonObject.getInteger("seq");
        final ReponseWrapper responseChannelFutureWrapper = this.responseTable.get(seq);
        if (responseChannelFutureWrapper != null) {
            responseChannelFutureWrapper.setResponseSuccess(true);
            responseChannelFutureWrapper.releaseResponse();
        } else {
            log.warn("不存的请求号[{}]的消息[{}]",seq,msg);
        }
    }

    /**
     * 业务处理
     */
    class NettyClinetHandler extends SimpleChannelInboundHandler<String> {
        
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
            processMessageReceived(ctx, msg);
        }

        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            log.info("失去连接,开始准备重连[{}]",ctx.channel());
            ctx.executor().schedule(new ReconnectTask(), nextReconnectDelayTime(), TimeUnit.SECONDS);
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            log.error("发生异常[{}]",ctx.channel(),cause);
        }
    }

    /**
     * 重连任务
     */
    class ReconnectTask implements Runnable{
        @Override
        public void run() {
            Client.this.connect();
        }
    }

    /**
     * 响应
     */
    class ReponseWrapper {

        private final long beginTimestamp = System.currentTimeMillis();
        private final long timeoutMillis;
        private final ChannelFuture channelFuture;

        private final CountDownLatch sendCountDownLatch = new CountDownLatch(1);
        private final CountDownLatch waitResponseCountDownLatch = new CountDownLatch(1);
        
        private volatile boolean sendSuccess = false;
        private volatile boolean responseSuccess = false;

        public ReponseWrapper(ChannelFuture channelFuture, long timeoutMillis) {
            super();
            this.timeoutMillis = timeoutMillis;
            this.channelFuture = channelFuture;
        }

        public void awaitSendMessage(int sendTimeout,TimeUnit unit) throws InterruptedException {
            this.sendCountDownLatch.await(sendTimeout, unit);
        }

        public void releaseSendMessage() {
            this.sendCountDownLatch.countDown();
        }

        public void awaitResponse(int waitResponseTimeout,TimeUnit unit) throws InterruptedException {
            this.waitResponseCountDownLatch.await(waitResponseTimeout, unit);
        }

        public void releaseResponse() {
            this.waitResponseCountDownLatch.countDown();
        }
        
        public void releaseAll() {
            this.sendCountDownLatch.countDown();
            this.waitResponseCountDownLatch.countDown();
        }

        public long getBeginTimestamp() {
            return beginTimestamp;
        }

        public long getTimeoutMillis() {
            return timeoutMillis;
        }

        public boolean isSendSuccess() {
            return sendSuccess;
        }

        public void setSendSuccess(boolean sendSuccess) {
            this.sendSuccess = sendSuccess;
        }

        public boolean isResponseSuccess() {
            return responseSuccess;
        }

        public void setResponseSuccess(boolean responseSuccess) {
            this.responseSuccess = responseSuccess;
        }

        @Override
        public String toString() {
            return "ReponseWrapper [beginTimestamp=" + beginTimestamp + ", timeoutMillis=" + timeoutMillis
                    + ", channelFuture=" + channelFuture + ", sendCountDownLatch=" + sendCountDownLatch
                    + ", waitResponseCountDownLatch=" + waitResponseCountDownLatch + ", sendSuccess=" + sendSuccess
                    + ", responseSuccess=" + responseSuccess + "]";
        }
    }
}

客户端的测试代码

public class BootstrapClientDemo {

public static void main(String[] args) {

    Client client = new Client("127.0.0.1", 9000);
    client.connect();
    try {
        System.out.println("开始发送:"+System.currentTimeMillis());
        client.send("hello,world");
        System.out.println("结束发送:"+System.currentTimeMillis());
    } catch (Exception e) {
        e.printStackTrace();
    }
}

}

服务端

public class Server {
    
    private AtomicInteger requestSequence = new AtomicInteger(0);

    private final ServerBootstrap serverBootstrap;
    private final EventLoopGroup eventLoopGroupBoss = new NioEventLoopGroup();
    private final EventLoopGroup eventLoopGroupWorker = new NioEventLoopGroup();

    private String charsetName = "UTF-8";

    public Server() {
        this.serverBootstrap = new ServerBootstrap();
    }

    public void startup() {

        final Charset charset = Charset.forName(this.charsetName);

        this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupWorker)
                .channel(NioServerSocketChannel.class)//
                .option(ChannelOption.SO_BACKLOG, 1024)//
                .option(ChannelOption.SO_REUSEADDR, true)//
                .childOption(ChannelOption.TCP_NODELAY, true)//
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline();
                        ByteBuf delimiter = Unpooled.copiedBuffer("\n".getBytes());
                        pipeline.addLast(//
                                new StringDecoder(charset), //
                                new DelimiterBasedFrameDecoder(1024,delimiter),//
                                new StringEncoder(charset), //
                                new NettyServerHandler()//
                        );
                    }
                });
        try {
            ChannelFuture sync = this.serverBootstrap.bind(9000).sync();
            sync.get();
            System.out.println("绑定结果:"+sync.isSuccess());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    class NettyServerHandler extends SimpleChannelInboundHandler<String> {
        
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
            processMessageReceived(ctx,msg);
        }

        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            super.channelActive(ctx);
        }
        
    }

    public void shutdown() {

    }

    protected void processMessageReceived(final ChannelHandlerContext ctx, final String msg) {
        System.out.println("接收消息:"+msg);
        ctx.executor().schedule(new Runnable() {
            @Override
            public void run() {
                //System.out.println("回显消息:"+msg);
                //ctx.channel().writeAndFlush(msg+"\n");
            }
        }, 10, TimeUnit.SECONDS);
    }
}

服务端测试代码

public class BootstrapServerDemo {

    public static void main(String[] args) {
        Server s = new Server();
        s.startup();
        try {
            TimeUnit.SECONDS.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

猜你喜欢

转载自www.cnblogs.com/weiguangyue/p/12298495.html