kafka简介
待补充…
Spring For Kafka 官方链接
使用Docker 搭建kafka&zookeeper
待补充…
创建一个主题
待补充…
引入kafka依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.3.4.RELEASE</version>
</dependency>
注意:这里有一个很大的坑,因为版本的问题,spring-kafka和kafka-clients的版本一定要按照下图对应。
kafka配置
spring:
kafka:
bootstrap-servers: 106.53.7.17:9092 #kafka的ip:port
producer: #生产者
retries: 0 #消息发送失败的重试机制,大于0的时候启用重试机制
batch-size: 16384 #默认批次大小。小批量将使分批变得不那么普遍,并且可能会降低吞吐量(零批量将完全禁用批处理)。
buffer-memory: 33554432 #生产者可以用来缓冲等待发送到服务器的记录的总内存大小。
acks: 1 #生产者要求leader在确认请求完成之前已收到的确认数。
key-serializer: org.apache.kafka.common.serialization.StringSerializer #key序列化
value-serializer: org.apache.kafka.common.serialization.StringSerializer #value序列化
consumer: #消费者
auto-offset-reset: earliest #当Kafka中没有初始偏移量或服务器上不再存在当前偏移量时该怎么办。
enable-auto-commit: true #消费者的补偿是否在后台定期提交。
auto-commit-interval: 100 # 如果将“ enable.auto.commit”设置为true,则将消费者偏移量自动提交给Kafka的频率。
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer #key反序列化
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer #value反序列化
group-id: edcGroup #消费者组,消费者唯一标识
使用@EnableKafka 启动kafka
@SpringBootApplication
@EnableKafka
public class KafkaApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaApplication.class, args);
}
}
使用KafkaTemplate向主题mytopic发送消息
@RestController
public class KafkaController {
@Autowired
private KafkaTemplate<String,Object> kafkaTemplate;
@GetMapping("/send")
public boolean send(String message){
return kafkaTemplate.send("mytopic",message).isDone();
}
}
监听主题mytopic的消息
@Component
public class MessageListener {
@SneakyThrows
@KafkaListener(topics = "mytopic")
public void onMessage(String message){
System.out.println(message);
}
}