一、使用idea创建SpringBoot项目
1.1 使用Spring Initializr创建一个SpringBoot程序
点击Next
。
1.2 添加依赖
依赖说明:
-
Lombok
简化实体类开发。 -
Spring Web
让项目集成web
开发所有依赖,包括Spring MVC
,内置tomcat
等。 -
Spring for Apache Kafka
就是Spring
和Kafka
的集成依赖。
配置完成之后点击Finish
。
1.3 查看pom文件
因为是第一次集成Kafka
,这时候只要等待Maven
下载好Kafka
的相关依赖,下载好后红色会消失。
这样就创建好了一个集成了Kafka
的SpringBoot Web
项目
二、创建生产者
2.1 配置生产者application.yml文件
# 连接Kafka
spring:
kafka:
bootstrap-servers: localhost:9092
# 生产者 key value的序列化方式
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
2.2 创建生产者接口
在controller包下创建生产者,把接口传递进来的数据发送给Kafka
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
@RestController
@RequestMapping("/producer")
public class ProducerController {
@Resource
private KafkaTemplate<String, String> kafka;
@PostMapping
public String data(@RequestBody String msg) {
// 通过Kafka发出数据
kafka.send("test", msg);
return "ok";
}
}
三、创建消费者
3.1 配置消费者application.yml文件
# 连接Kafka
spring:
kafka:
bootstrap-servers: localhost:9092
# 消费者 key value的反序列化方式
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
#指定消费者组的 group_id
group-id: kafka-test
3.2 创建消费者
import org.springframework.kafka.annotation.KafkaListener;
@Configuration
public class KafkaConsumer {
// 指定要监听的 topic
@KafkaListener(topics = "test")
public void consumeTopic(String msg) {
// 参数: 从topic中收到的 value值
System.out.println("收到的信息: " + msg);
}
}
3.3 说明
由于生产者和消费者都写在这个Demo
当中。
所以整体application.yml
文件如下(上面这么写只是为了区分和理解):
# 连接Kafka
spring:
kafka:
bootstrap-servers: 127.0.0.1:9092
# 生产者 key value的序列化方式
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# 消费者 key value的反序列化方式
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
#指定消费者组的 group_id
group-id: kafka-test
四、测试生产者消费者
4.1 使用ApiPost发送一个Post请求调用生产者接口
发现点击发送之后,控制台迅速返回了一个ok
。
说明生产者生产数据成功,已经向test
这个topic
中发送了hello kafka
这条数据。
4.2 观察消费者控制台
发现消费者已经接收了来自生产者的数据,并且把数据打印在了控制台上。
以上就是SpringBoot
集成Kafka
的基本方式。即使是今后复杂的使用,也是从简单转换而来。