1 SpringBoot整合kafka
SpringBoot整合kafka简单方便。后文介绍了生产者、消费者、事务代码实现。
1.1 添加依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.1.7.RELEASE</version>
</dependency>
1.2 添加application.proerties配置
kafka的配置参数都可以在此配置文件添加,根据需要。
#生产者
spring.kafka.producer.bootstrap-servers=11.211.80.151:9092
#消费者
spring.kafka.consumer.bootstrap-servers=11.211.80.151:9092
1.3 简单代码实现
实现了生产者、消费者、事务。
package xb.study.kafka.producer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping
public class KafkaLearnDemo {
@Autowired
private KafkaTemplate template;
private static final String topic="test";
/**
* 消息生产者
* @param input
* @return
*/
@GetMapping("/send/{input}")
public String Send(@PathVariable String input){
template.send(topic,input);
return "send sucess "+input;
}
/**
* 消息生产者
* 支持事务 方式1
* @param input
* @return
*/
@GetMapping("/send2/{input}")
public String Send2(@PathVariable String input){
template.executeInTransaction(t->{
template.send(topic,input);
return true;
});
return "send sucess "+input;
}
/**
* 消息生产者
* 支持事务 注解方式
* @param input
* @return
*/
@GetMapping("/send3/{input}")
@Transactional(rollbackFor = RuntimeException.class)
public String Send3(@PathVariable String input){
template.send(topic,input);
return "send sucess "+input;
}
/**
* 消息消费者
* @param input
*/
@KafkaListener(id="test1",topics=topic,groupId = "group1")
public void revice(String input){
System.out.println("收到"+input);
}
}