senddata2kafka

package com.octv.collect.netty.handler;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DuplicateKeyException;
import org.springframework.stereotype.Component;
import com.oct.bigdata.sink.kafka.KafkaSink;
import com.octv.collect.service.SensorService;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.DatagramPacket;
import io.netty.util.CharsetUtil;

@Component("collectServerHandler")
public class CollectServerHandler extends SimpleChannelInboundHandler<DatagramPacket> {

@Autowired
private SensorService sensorService;

@Autowired
private KafkaSink kafkaSink;

private static final Logger log = LoggerFactory.getLogger(CollectServerHandler.class);

@Override
public void channelActive(final ChannelHandlerContext ctx) {
log.info("Collect channel has actived,Waiting for message.");
kafkaSink.start();
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
log.error("Collect channel has Caught exception ",cause);
ctx.close();
kafkaSink.stop();
}

@Override
protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket packet) throws Exception {
// DatagramPacket packet = (DatagramPacket) msg;
String content = packet.content().toString(CharsetUtil.UTF_8);
log.info("Collect channel received message from " + packet.sender().getAddress().getHostAddress() + "@"
+ packet.sender().getPort() + " : " + content);
try {
sensorService.save(content);
// send data to kafka
kafkaSink.setTopic("liuzhouPLC");
kafkaSink.process(content);

} catch (DuplicateKeyException e) {
// 重复数据忽略
log.info("忽略重复数据:" + content);
} catch (Exception e) {
log.error("保存到数据库异常", e);
}finally {

}
}

/*
* @Override public void channelRead(ChannelHandlerContext ctx, Object msg)
* throws Exception { DatagramPacket packet = (DatagramPacket) msg; String
* content = packet.content().toString(CharsetUtil.UTF_8);
* log.info("Collect channel received message from " +
* packet.sender().getAddress().getHostAddress() + "@" +
* packet.sender().getPort() + " : " + content); try {
* sensorService.save(content); } catch(DuplicateKeyException e){ //重复数据忽略
* log.info("忽略重复数据:" + content); }catch (Exception e) {
* log.error("保存到数据库异常",e); } }
*/
}

猜你喜欢

转载自blog.csdn.net/yblbbblwsle/article/details/80880206