生产者分区写入规则
概述
生产者写入消息到topic,Kafka将依据不同的规则将数据分配到不同的分区中,如果指定了分区数据就会写到指定分区,如果没有指定分区,会按照下列若干规则进行指定分区:
分类
Hash分区(指定Key默认开启)
优点:相同的key会进去同一分区
缺点:由于相同key的Hash取余结果是相同的,会导致数据倾斜问题
轮询分区(没有指定Key时开启)
- 默认的策略,也是使用最多的策略,可以最大限度保证所有消息平均分配到一个分区
- 如果在生产消息时,key为null,则使用轮询算法均衡地分配分区
优点:数据分配更加均匀
缺点:相同key的数据进去不同的分区
在2.x之前:轮询分区,2.x之后:黏性分区
随机分区(不用)
随机策略,每次都随机地将消息分配到每个分区。在较早的版本,默认的分区策略就是随机策略,也是为了将消息均衡地写入到每个分区。但后续轮询策略表现更佳,所以基本上很少会使用随机策略。
自定义分区
实现代码:
1.创建自定义分区器
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
import java.util.Random;
/**
* @ClassName UserPartition
* @Description TODO 自定义分区器,实现随机分区
* @Date 2021/3/31 9:21
* @Create By Frank
*/
public class UserPartition implements Partitioner {
/**
* 返回这条数据对应的分区编号
* @param topic:Topic的名
* @param key:key的值
* @param keyBytes:key的字节
* @param value:value的值
* @param valueBytes:value的字节
* @param cluster:集群对象
* @return
*/
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
//获取Topic的分区数
Integer count = cluster.partitionCountForTopic(topic);
//构建一个随机对象
Random random = new Random();
//随机得到一个分区编号
int part = random.nextInt(count);
return part;
}
@Override
public void close() {
//释放资源
}
@Override
public void configure(Map<String, ?> configs) {
//获取配置
}
}
2.在Kafka生产者配置中,自定使用自定义分区器的类名
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, 类所在包路径.UserPartition.class);
副本机制
概述
副本就是给数据做备份,当Kafka集群中的某个broker的数据(分区)丢失或者宕机,在其他broker中的副本是可用的
producer的ACKS参数
ACKS参数表示生产者生产消息,对写入副本的要求严格程度,不同的ACKS参数性能不同,安全性也不同
参数分类
acks配置为0:
acks=0:生产者不管kafka集群有没有收到,直接发送下一条消息
优缺点:
优点:性能好,就是快
缺点:容易导致数据丢失,概率比较高
acks配置为1:
acks=1:生产者将数据发送给Kafka,Kafka等待这个分区leader副本写入成功,返回ack确认,生产者发送下一条
优缺点:
优点:性能和安全上做了平衡
缺点:依旧存在数据丢失的概率,但概率比较小
acks配置为-1或者all
acks=all/-1:生产者将数据发送给Kafka,Kafka等待这个分区所有副本全部写入,返回ack确认,生产者发送下一条
优缺点:
优点:数据安全
缺点:慢
补充:如果Kafka没有返回acks怎么办?
- 生产者会等待Kafka集群返回ACKS,所有会有一个等待时间,如果Kafka在规定的时候内没有返回ACKS,代表数据丢失
- 生产者有重试机制,重新发送这条数据给Kafka
存在的问题:acks每次在Kafka集群数据写入成功时返回,如果在返回的过程中Kafka宕机,就会导致数据重复,如何解决?