kafka-高级应用-SpringBoot整合kafka-小白笔记(八)

1 SpringBoot整合kafka

SpringBoot整合kafka简单方便。后文介绍了生产者、消费者、事务代码实现。

1.1 添加依赖

       <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.1.7.RELEASE</version>
        </dependency>

1.2 添加application.proerties配置

 kafka的配置参数都可以在此配置文件添加,根据需要。

#生产者
spring.kafka.producer.bootstrap-servers=11.211.80.151:9092
#消费者
spring.kafka.consumer.bootstrap-servers=11.211.80.151:9092

1.3 简单代码实现

实现了生产者、消费者、事务。

package xb.study.kafka.producer;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping
public class KafkaLearnDemo {
    @Autowired
    private KafkaTemplate template;
    private static final String topic="test";

    /**
     * 消息生产者
     * @param input
     * @return
     */
    @GetMapping("/send/{input}")
    public String Send(@PathVariable String input){
        template.send(topic,input);
        return "send sucess "+input;
    }
    /**
     * 消息生产者
     * 支持事务 方式1
     * @param input
     * @return
     */
    @GetMapping("/send2/{input}")
    public String Send2(@PathVariable String input){
        template.executeInTransaction(t->{
            template.send(topic,input);
            return true;
        });

        return "send sucess "+input;
    }
    /**
     * 消息生产者
     * 支持事务  注解方式
     * @param input
     * @return
     */
    @GetMapping("/send3/{input}")
    @Transactional(rollbackFor = RuntimeException.class)
    public String Send3(@PathVariable String input){
        template.send(topic,input);

        return "send sucess "+input;
    }

    /**
     * 消息消费者
     * @param input
     */
    @KafkaListener(id="test1",topics=topic,groupId = "group1")
    public void revice(String input){
        System.out.println("收到"+input);
    }



}

猜你喜欢

转载自blog.csdn.net/h4241778/article/details/108479922