版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/u014252478/article/details/83658857
消费者:
(1)配置:
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String brokers;
@Value("${spring.kafka.consumer.group-id}")
private String group;
@Value("${spring.kafka.consumer.key-deserializer}")
private String keyType;
@Value("${spring.kafka.consumer.value-deserializer}")
private String valueType;
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(4);
factory.getContainerProperties().setPollTimeout(4000);
return factory;
}
@Bean
public KafkaListeners kafkaListeners() {
return new KafkaListeners() {
@Override
public boolean equals(Object obj) {
return false;
}
@Override
public int hashCode() {
return 0;
}
@Override
public String toString() {
return null;
}
@Override
public Class<? extends Annotation> annotationType() {
return null;
}
@Override
public KafkaListener[] value() {
return new KafkaListener[0];
}
};
}
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> properties = new HashMap<String, Object>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyType);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueType);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, group);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
return new DefaultKafkaConsumerFactory<String, String>(properties);
}
}
(2)消费:
@Component
public class KafkaConsumer {
private static final Logger log = LoggerFactory.getLogger(KafkaConsumer.class);
@Autowired
private ObjectMapper objectMapper;
@Autowired
private RestTemplate restTemplate;
@Autowired
private KafkaTemplate kafkaTemplate;
@KafkaListener(topics = {KafkaMessageConfig.BUSINESS_SERVICE_TOPIC})
public void processMessage(ConsumerRecord<String, String> consumer) {
try {
String topic = "";
String key = "";
String message = "";
if (consumer.topic() != null) {
topic = consumer.topic();
}
if (consumer.key() != null) {
key = consumer.key();
}
if (consumer.value() != null) {
message = consumer.value();
}
if (topic == null || "".equals(topic.trim()) || " ".equals(topic)) {
log.warn("[processMessage] invalid topic {}", topic);
return;
}
if (key == null || "".equals(key.trim()) || " ".equals(key)) {
log.warn("[processMessage] invalid key {}", key);
return;
}
if (!new JsonValidator().validate(message)) {
log.warn("[processMessage] invalid message string {}", message);
return;
}
Map<String, Object> body = JsonUtil.json2Bean(message, Map.class);
Map<String, Object> params = new HashMap<>();
if (body == null || body.isEmpty() || !body.containsKey("OperationType")) {
log.warn("[processMessage] invalid message string {}", message);
return;
}
}