一、幂等与事务的引入
在本专栏前面的文章我们讲过kafka生产者发送数据失败后的重试机制,同时也介绍过一种可能产生的异常情况,我们来回顾一下:
- 生产者发送数据至broker,由于网络原因生产者可能会没有能够得到服务端的确认(确认消息发送的成功),实际上消息数据已经成功发送,kafka服务端broker已经成功写入。
- 由于生产者没有收到消息确认成功写入,它就认为消息发送失败了。所以重新发送了该消息,结果这个消息就有可能在kafka broker服务端被写入第二次。
通常情况下我们不能接受这种情况的发生,我们期望的效果是exactly once(一批数据发送成功一次,并且只成功一次)。在0.11.0.0版本之前这是做不到的,在0.11.0.0版本之后kafka引入了幂等和事务机制,从而可以支持exactly once语义。
- 幂等:简单地说就是对接口的多次调用所产生的结果和调用一次产生的结果是一致的。对于kafka而言就是消息发送一次与消息被发送多次产生的结果是一样的,消息不会被消费者重复处理。
- 事务:事务这个词对于开发者可能就比较熟悉了,通常是指一系列操作行为要么都成功,要么都失败(回滚)。对于kafka而言,事务是用来保证多条消息要么都发送成功(都写入kafka broker数据日志),要么就都不写入kafka数据日志。
二、kafka幂等性实现原理?
开启kafka幂等性功能的方法非常简单,只需要将生产者客户端参数enable.idempotence
设置为true即可(这个参数的默认值为false)。那么我们的下一个问题就是:Kafka是如何做到发送重复消息(重试),仍然可以保证幂等性的呢?
为此Kafka生产者引入了producer id(以下简称PID)和序列号(sequence number)这两个概念。
- 每个kafka生产者客户端在初始化的时候都会被分配一个PID
- PID + 序列号可以代表唯一的一条消息数据,生产者每一条消息对应唯一的序列号。即使同一个消息被多次发送,该消息对应的序列号也是不会变的。
同时kafka broker服务端会为每一个生产者(PID)保存已经成功发送的数据批次的起始序列号(start_seq)和终止序列号(end_seq)。所以当新的批次消息被发送到服务端的时候,会首先进行序列号区间比对,一旦发生重叠就意味着序号重叠的消息已经在服务端被成功写入过,就可以将重复的消息数据抛弃掉,从而避免消费端重复消费。
需要注意的是:我们上面所讲的幂等性都是基于某一个分区而言的,也就是说Kafka的幂等只能保证某个主题的单个分区的幂等性。所以发生重试的消息与第一次被发送的同一个消息如果被发往不同的分区,幂等性是不生效的。但是这种情况通常不会出现,因为即使消息发送失败后进行重试,但是消息的key、value、topic信息本身没变化、消息分区算法没有变化、分区数量没有变化。在这些前提下,同一个消息即使被重复发送,也会发往同一个分区。
三、kafka事务
事务的基本介绍
上文我们讨论的幂等性针对的场景是:同一个消息被发送多次,发送至同一个分区。那么如果多个不同的消息发送至不同的分区,我们该如何保证多条消息要么都发送成功(都写入kafka broker数据日志),要么就都不写入kafka数据日志?这就需要依赖kafka事务来实现。
- kafka生产者需要设置
transactional.id
参数,可以认为该参数就是事务管理器的id - kafka事务生产者开启幂等,即:
enable.idempotence
设置为true(如果未显式设置,则KafkaProducer默认会将它的值设置为true)。如果用户显式地将enable.idempotence设置为false,则会报出ConfigException的异常。
KafkaProducer提供了5个与事务相关的方法,详细如下:
void initTransactions();
void beginTransaction() throws ProducerFencedException;
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId)throws ProducerFencedException;
void commitTransaction() throws ProducerFencedException;
void abortTransaction() throws ProducerFencedException;
- initTransactions()方法用来初始化事务;
- beginTransaction()方法用来开启事务;
- sendOffsetsToTransaction()方法为消费者提供在事务内的位移提交的操作;
- commitTransaction()方法用来提交事务;
- abortTransaction()方法用来中止事务,类似于事务回滚。
事务的隔离级别
在kafka消费客户端有一个参数isolation.level
,与事务有着莫大的关联.
- 这个参数的默认值为“read_uncommitted”,意思是说消费端应用可以看到(消费到)未提交的事务,当然对于已提交的事务也是可见的。
- 这个参数还可以设置为“read_committed”,表示消费端应用不可以看到尚未提交的事务内的消息。
举个例子,如果生产者开启事务并向某个分区值发送3条消息msg1、msg2和msg3,在执行commitTransaction()或abortTransaction()方法前,设置为“read_committed”的消费端应用是消费不到这些消息的,不过在KafkaConsumer内部会缓存这些消息,直到生产者执行commitTransaction()方法之后它才能将这些消息推送给消费端应用。反之,如果生产者执行了abortTransaction()方法,那么KafkaConsumer会将这些缓存的消息丢弃。
四、Kafka事务代码实现
示例如下所示,如果你学过数据库事务及JDBC,那么下面的代码就不难理解
//生产者参数配置
Properties props = new Properties();
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("client.id", "ProducerTranscationnalExample");
props.put("bootstrap.servers", "localhost:9092");
props.put("transactional.id", "test-transactional");
props.put("acks", "all");
//生产者对象新建
KafkaProducer producer = new KafkaProducer(props);
producer.initTransactions(); //初始化事务管理器
try {
String msg = "zimug test";
producer.beginTransaction(); //开启一个事务
producer.send(new ProducerRecord(topic, "0", msg.toString()));
producer.send(new ProducerRecord(topic, "1", msg.toString()));
producer.send(new ProducerRecord(topic, "2", msg.toString()));
producer.commitTransaction(); //提交事务
} catch (ProducerFencedException e1) {
e1.printStackTrace();
producer.abortTransaction(); //事务回滚
} catch (KafkaException e2) {
e2.printStackTrace();
producer.abortTransaction(); //事务回滚
}finally{
producer.close();
}