前言
Spring-kafka内部封装了可重试消费消息的语义,也就是可以设置为当消费数据出现异常时,重试这个消息。而且可以设置重试达到多少次后,让消息进入预定好的Topic。也就是死信队列里。
环境搭建
搭建单机版kafka
,docker-compose.yml
文件如下:
version: '2'
services:
zookeeper:
image: wurstmeister/zookeeper ## 镜像
ports:
- "2181:2181" ## 对外暴露的端口号
kafka:
image: wurstmeister/kafka ## 镜像
volumes:
- /etc/localtime:/etc/localtime ## 挂载位置(kafka镜像和宿主机器之间时间保持一直)
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 192.168.56.124 ## 修改:宿主机IP
KAFKA_ZOOKEEPER_CONNECT: 192.168.56.124:2181 ## 卡夫卡运行是基于zookeeper的
kafka-manager:
image: sheepkiller/kafka-manager ## 镜像:开源的web管理kafka集群的界面
environment:
ZK_HOSTS: 192.168.56.124 ## 修改:宿主机IP
ports:
- "9000:9000"
代码演示
代码已经上传到github
上面,https://github.com/fafeidou/fast-cloud-nacos/tree/master/fast-common-examples/fast-kafka-examples
- 引入
pom
依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
- 配置
@Bean
public ConcurrentKafkaListenerContainerFactory containerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory);
factory.getContainerProperties().setAckMode(AckMode.MANUAL);
// 最大重试次数3次
factory.setErrorHandler(new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(kafkaTemplate), 3));
return factory;
}
这里主要设定消息消费为手动模式,然后设置重试次数
- 消费者监听
@KafkaListener(topics = "kafka-topic2", containerFactory = "containerFactory", groupId = "testGroup")
public void onMessage(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) throws Exception {
Optional kafkaMessage = Optional.ofNullable(record.value());
if (!kafkaMessage.isPresent()) {
throw new Exception("监听到的消息为空值");
}
log.info("topicID: " + record.topic());
log.info("recordValue: " + record.value());
try {
/*业务逻辑*/
throw new RuntimeException("消息异常,进入死信队列...");
} catch (Exception e) {
acknowledgment.acknowledge();
throw new Exception(e);
}
}
@KafkaListener(id = "testGroup", topics = "kafka-topic2.DLT", groupId = "testGroup")
public void dltListen(ConsumerRecord<String, String> record, Acknowledgment acknowledgment,
@Header(KafkaHeaders.DLT_EXCEPTION_MESSAGE) String exception,
@Header(KafkaHeaders.DLT_EXCEPTION_STACKTRACE) String stacktrace) {
log.info("Received from DLT: " + record.value());
acknowledgment.acknowledge();
}
一个监听原始队列,一个监听死信队列,死信队列的Topic的规则是,
业务Topic名字+“.DLT”
- 消息生产者
@RequestMapping("sendStr2")
public String sendStr2(String message) {
kafkaTemplate.send("kafka-topic2", message);
return message;
}
这里就是使用kafak模板类,发一个消息
- 接下来是配置文件
spring:
application:
name: base.kafka
kafka:
bootstrap-servers: 192.168.56.124:9092
producer:
# 写入失败时,重试次数。当leader节点失效,一个repli节点会替代成为leader节点,此时可能出现写入失败,
# 当retris为0时,produce不会重复。retirs重发,此时repli节点完全成为leader节点,不会产生消息丢失。
retries: 0
#procedure要求leader在考虑完成请求之前收到的确认数,用于控制发送记录在服务端的持久化,其值可以为如下:
#acks = 0 如果设置为零,则生产者将不会等待来自服务器的任何确认,该记录将立即添加到套接字缓冲区并视为已发送。在这种情况下,无法保证服务器已收到记录,并且重试配置将不会生效(因为客户端通常不会知道任何故障),为每条记录返回的偏移量始终设置为-1。
#acks = 1 这意味着leader会将记录写入其本地日志,但无需等待所有副本服务器的完全确认即可做出回应,在这种情况下,如果leader在确认记录后立即失败,但在将数据复制到所有的副本服务器之前,则记录将会丢失。
#acks = all 这意味着leader将等待完整的同步副本集以确认记录,这保证了只要至少一个同步副本服务器仍然存活,记录就不会丢失,这是最强有力的保证,这相当于acks = -1的设置。
#可以设置的值为:all, -1, 0, 1
acks: 1
consumer:
group-id: testGroup
# smallest和largest才有效,如果smallest重新0开始读取,如果是largest从logfile的offset读取。一般情况下我们都是设置smallest
auto-offset-reset: earliest
# 设置自动提交offset
enable-auto-commit: false
# 如果'enable.auto.commit'为true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000。
auto-commit-interval: 100
max-poll-records: 5
listener:
ack-mode: manual
server:
port: 8060
测试
- 前面工作已经完成了,下面开始测一下
访问:http://localhost:8060/sendStr2?message=aaa
看到死信队列是可以收到消息,测试成功。