项目中使用kafka 接受消息并处理
springboot 版本1.5.19 使用时请注意kafka版本
1.maven
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
2.配置文件配置
spring:
#kafka
kafka:
bootstrap-servers: kafkaIp:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
retries: 3
buffer-memory: 40960
batch-size: 4096
consumer:
group-id: gps_mileage
enable-auto-commit: true
auto-commit-interval: 1000
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
3.消费者
@Component
public class KafkaConsumer {
private static final Logger logger= LoggerFactory.getLogger(KafkaConsumer.class);
@Autowired
private GpsRealDataService gpsRealDataService;
@KafkaListener(topics = {"serverRealData"})
public void reveice(ConsumerRecord<?, ?> record){
try{
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
logger.info("消费者开始消费message:" + message);
//kafka消息发送时间
Date messageSendTime= CommonUtil.getMessageSendTime(message.toString());
//得到message对象
JSONObject jsonObject = CommonUtil.parseObject(message.toString());
GpsInfoEntity gpsInfoEntity=convertGpsInfo(jsonObject);
boolean result=gpsRealDataService.addRealDataToRedis(gpsInfoEntity,messageSendTime);
}
}catch (Exception e){
e.printStackTrace();
logger.error("kafka消费异常"+e);
}
}
2.生产者
package com.cjwl.mileage.kafka;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
/**
* kafka 消息生成者;将处理好的位置消息传给hbase
*/
@Component
public class KafkaProducer {
@Autowired
private KafkaTemplate kafkaTemplate;
protected final Logger logger = Logger.getLogger(this.getClass());
public void sendMessage(String topic,String message){
kafkaTemplate.send(topic,message);
}
}