1.在pom.xml文件上添加依赖
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.3.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.5</version>
</dependency>
</dependencies>
2.配置文件application.properties
# kafka
spring.kafka.bootstrap-servers=192.168.3.146:8092
spring.kafka.consumer.group-id=springboot-group1
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
logging.level.root=info
3.添加消费类和发送消息类
@Component
public class KafkaReceiver {
@KafkaListener(topics = {"wftest"})
public void listen(ConsumerRecord<?, ?> record) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
System.out.println("record =" + record);
System.out.println("message =" + message);
}
}
}
@Component
public class KafkaSender {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
private Gson gson = new GsonBuilder().create();
public void send() {
Message message = new Message();
message.setId(System.currentTimeMillis());
message.setMsg(UUID.randomUUID().toString());
message.setSendTime(new Date());
kafkaTemplate.send("wftest", gson.toJson(message));
}
}
public class Message {
private Long id;
private String msg;
private Date sendTime;
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public String getMsg() {
return msg;
}
public void setMsg(String msg) {
this.msg = msg;
}
public Date getSendTime() {
return sendTime;
}
public void setSendTime(Date sendTime) {
this.sendTime = sendTime;
}
4.测试
@SpringBootApplication
public class Application {
public static void main(String[] args) {
ConfigurableApplicationContext context = SpringApplication.run(Application.class, args);
KafkaSender sender = context.getBean(KafkaSender.class);
for (int i = 0; i < 3; i++) {
sender.send();
try {
Thread.sleep(3_000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
运行时报错:org.apache.kafka.common.errors.TimeoutException。
排查问题:
1.kafka中间件是否正常启动
2.配置文件配置是否有有问题
3.hosts配置文件是否配置kafka服务器的hostname
我们可以打开debug日志,查看详细日志信息。
logging.level.root=debug
如图所示:我们可以通过日志可以分析出来,它使用的是hostname连接kafka服务器的,所以要配置hosts文件。