Kafka应答ack 模式
• 0:生产者发送过来的数据,不需要等数据落盘应答。
• 1:生产者发送过来的数据,Leader收到数据后应答。
可能出现的情况:应答后,leader挂了,从新选举,此时之前应答的数据不会在发送了
• -1(all):生产者发送过来的数据,Leader+和isr队列 里面的所有节点收齐数据后应答。-1和all等价。
可能出现情况:当有一个follower挂掉,那么迟迟应答,所以内部维护了一个动态的isr,当发送请求或同 步数据时,默认30s没响应就会被提出isr
保证数据可靠
如果分区副本设置为1个,或 者ISR里应答的最小副本数量 ( min.insync.replicas 默认为1)设置为1,和ack=1的效果是一 样的,仍然有丢数的风险(leader:0,isr:0)。
• 数据完全可靠条件 = ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2
代码实现
public class CustomProducer {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//属性配置
Properties properties = new Properties();
//连接集群
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.6.101:9092");
//指定k、v序列化类型
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
// 设置 acks
properties.put(ProducerConfig.ACKS_CONFIG, "1");
// 重试次数 retries,默认是 int 最大值,2147483647
properties.put(ProducerConfig.RETRIES_CONFIG, 3);
//创建生产者对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
//像first主题发送数据
kafkaProducer.send(new ProducerRecord<>("first", 1,"","lzq"),new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e==null){
System.out.println("发送成功,主题"+recordMetadata.topic()+"分区"+recordMetadata.partition());
}
}
}).get();
//关闭资源
kafkaProducer.close();
}
}
数据重复问题
发送数据时,leader接收到,像follower同步,在应答时挂掉了,此时会重新选主,消费者再次发送数据,这时候数据就重复了
幂等性
幂等性就是指Producer不论向Broker发送多少次重复数据,Broker端都只会持久化一条,保证了不重复。 精确一次(Exactly Once) = 幂等性 + 至少一次( ack=-1 + 分区副本数>=2 + ISR最小副本数量>=2)
重复数据的判断标准:具有相同主键的消息提交时,Broker只会持久化一条。其 中PID是Kafka每次重启都会分配一个新的;Partition 表示分区号;Sequence Number是单调自增的。 所以幂等性只能保证的是在单分区单会话内不重复。
开启参数 enable.idempotence 默认为 true,false 关闭
运行过程
代码实现
public class CustomProducer {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//属性配置
Properties properties = new Properties();
//连接集群
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.6.101:9092");
//指定k、v序列化类型
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
//关联自定义分区
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.lzq.producer.MyPartitioner");
// 设置 acks
properties.put(ProducerConfig.ACKS_CONFIG, "all");
// 重试次数 retries,默认是 int 最大值,2147483647
properties.put(ProducerConfig.RETRIES_CONFIG, 3);
//指定事务id
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"t1");
//创建生产者对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
//初始化、开启时候
kafkaProducer.initTransactions();
kafkaProducer.beginTransaction();
try {
//像first主题发送数据
kafkaProducer.send(new ProducerRecord<>("first", 1,"","lzq"),new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e==null){
System.out.println("发送成功,主题"+recordMetadata.topic()+"分区"+recordMetadata.partition());
}
}
});
//提交事务
kafkaProducer.commitTransaction();
}catch (Exception e){
//回滚事务
kafkaProducer.abortTransaction();
}finally {
//关闭资源
kafkaProducer.close();
}
}
}
数据乱序
kafka在1.x及以后版本保证数据单分区有序,条件如下:
(1)未开启幂等性 max.in.flight.requests.per.connection需要设置为1。
(2)开启幂等性 max.in.flight.requests.per.connection需要设置小于等于5。
原因说明:因为在kafka1.x以后,启用幂等后,kafka服务端会缓存producer发来的最近5个request的元数据, 故无论如何,都可以保证最近5个request的数据都是有序的,就是先将请求缓存在重新排序
负载均衡
创建一个josn文件vim topics-to-move.json
{
"topics": [
{"topic": "first"}
],
"version": 1
}
生成一个负载均衡的计划
bin/kafka-reassign-partitions.sh -- bootstrap-server 192.168.6.100:9092 --topics-to-move-json-file topics-to-move.json --broker-list "0,1,2,3" --gener
会自动生成负载均衡计划
在创建一个josn文件,复制对应计划
vim increase-replication-factor.json
执行计划
bin/kafka-reassign-partitions.sh --bootstrap-server 192.168.6.100:9092 --reassignment-json-file increase-replication-factor.json --execute
验证计划
bin/kafka-reassign-partitions.sh --bootstrap-server 192.168.6.100:9092 --reassignment-json-file increase-replication-factor.json --verify
退役旧节点
重新生成执行计划
bin/kafka-reassign-partitions.sh --bootstrap-server 192.168.6.100:9092 --topics-to-move-json-file topics-to-move.json --broker-list "0,1,2" --generate
启动脚本
#!/bin/bash
case $1 in
"start")
for i in ip地址
do
ssh $i "绝对路径"
;;
"stop")
;;
esac