「这是我参与11月更文挑战的第18天,活动详情查看:2021最后一次更文挑战」
Java访问kafka准备
修改本机的Host文件
192.168.200.20 kafka1
192.168.200.20 kafka2
192.168.200.20 kafka3
复制代码
- 创建maven的工程, 导入kafka相关的依赖
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.0.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- java编译插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>
</build>
复制代码
生产者代码
public class ProducerDemo {
public static String topic = "test";//定义主题
public static void main(String[] args) throws Exception {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.20:9092,192.168.200.20:9093,192.168.200.20:9094");
//网络传输,对key和value进行序列化
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
//创建消息生产对象,需要从properties对象或者从properties文件中加载信息
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
try {
while (true) {
//设置消息内容
String msg = "Hello," + new Random().nextInt(100);
//将消息内容封装到ProducerRecord中
ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, msg);
kafkaProducer.send(record);
System.out.println("消息发送成功:" + msg);
Thread.sleep(500);
}
} finally {
kafkaProducer.close();
}
}
}
复制代码
消费者代码
public class ConsumerDemo {
public static void main(String[] args){
Properties p = new Properties();
p.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.20:9092,192.168.200.20:9093,192.168.200.20:9094");
p.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
p.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
//指定组名
p.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(p);
kafkaConsumer.subscribe(Collections.singletonList(ProducerDemo.topic));// 订阅消息
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.println(String.format("topic:%s,offset:%d,消息:%s", record.topic(), record.offset(), record.value()));
}
}
}
}
复制代码
分区副本机制
kafka有三层结构:kafka有多个主题,每个主题有多个分区,每个分区又有多条消息。
- 分区机制:主要解决了单台服务器存储容量有限和单台服务器并发数限制的问题 一个分片的不同副本不能放到同一个broker上。当主题数据量非常大的时候,一个服务器存放不了,就将数据分成两个或者多个部分,存放在多台服务器上。每个服务器上的数据,叫做一个分片
分区对于Kafka集群的好处是:实现负载均衡,高存储能力、高伸缩性。分区对于消费者来说,可以提高并发度,提高效率。
- 副本:副本备份机制解决了数据存储的高可用问题。当数据只保存一份的时候,有丢失的风险。为了更好的容错和容灾,将数据拷贝几份,保存到不同的机器上。多个follower副本通常存放在和leader副本不同的broker中。通过这样的机制实现了高可用,当某台机器挂掉后,其他follower副本也能迅速”转正“,开始对外提供服务。
kafka的副本都有哪些作用?
- 在kafka中,实现副本的目的就是冗余备份,且仅仅是冗余备份,所有的读写请求都是由leader副本进行处理的。follower副本仅有一个功能,那就是从leader副本拉取消息,尽量让自己跟leader副本的内容一致。
说说follower副本为什么不对外提供服务?
- 这个问题本质上是对性能和一致性的取舍。试想一下,如果follower副本也对外提供服务那会怎么样呢?首先,性能是肯定会有所提升的。但同时,会出现一系列问题。类似数据库事务中的幻读,脏读。
- 比如你现在写入一条数据到kafka主题a,消费者b从主题a消费数据,却发现消费不到,因为消费者b去读取的那个分区副本中,最新消息还没写入。而这个时候,另一个消费者c却可以消费到最新那条数据,因为它消费了leader副本。
- 为了提高那么些性能而导致出现数据不一致问题,那显然是不值得的。
kafka保证数据不丢失机制
从Kafka的大体角度上可以分为数据生产者,Kafka集群,还有就是消费者,而要保证数据的不丢失也要从这三个角度去考虑。
消息生产者
消息生产者保证数据不丢失:消息确认机制(ACK机制),参考值有三个:0,1,-1
//producer无需等待来自broker的确认而继续发送下一批消息。
//这种情况下数据传输效率最高,但是数据可靠性确是最低的。
properties.put(ProducerConfig.ACKS_CONFIG,"0");
//producer只要收到一个分区副本成功写入的通知就认为推送消息成功了。
//这里有一个地方需要注意,这个副本必须是leader副本。
//只有leader副本成功写入了,producer才会认为消息发送成功。
properties.put(ProducerConfig.ACKS_CONFIG,"1");
//ack=-1,简单来说就是,producer只有收到分区内所有副本的成功写入的通知才认为推送消息成功了。
properties.put(ProducerConfig.ACKS_CONFIG,"-1");
复制代码
消息消费者
kafka消费消息的模型:
即消费消息,设置好offffset,类比一下:
Kafka动作 | 看书动作 |
---|---|
消费消息 | 看书 |
offffset位移 | 书签 |
什么时候消费者丢失数据呢?
- 由于Kafka consumer默认是自动提交位移的(先更新位移,再消费消息),如果消费程序出现故障,没消费完毕,则丢失了消息,此时,broker并不知道。
解决方案:
- enable.auto.commit=false关闭自动提交位移
- 在消息被完整处理之后再手动提交位移
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
复制代码
消息存储及查询机制
kafka使用日志文件的方式来保存生产者消息,每条消息都有一个offffset值来表示它在分区中的偏移量。
Kafka中存储的一般都是海量的消息数据,为了避免日志文件过大,一个分片并不是直接对应在一个磁盘上的日志文件,而是对应磁盘上的一个目录,这个目录的命名规则是<topic_name>_<partition_id>。
kafka容器数据目录:
- /kafka/kafka-logs-kafka1
消息存储机制
log分段
每个分片目录中,kafka 通过分段的方式将 数据分为多个 LogSegment。
一个 LogSegment 对应磁盘上的一个日志文件(00000000000000000000.log)和一个索引文件(如上:00000000000000000000.index)。
其中日志文件是用来记录消息的。索引文件是用来保存消息的索引。
每个LogSegment 的大小可以在server.properties 中log.segment.bytes=107370 (设置分段大小,默认是1gb)选项进行设置。
当log文件等于1G时,新的会写入到下一个segment中。
timeindex文件,是kafka的具体时间日志
通过 offffset 查找 message
存储的结构:一个主题 --> 多个分区 ----> 多个日志段(多个文件)
- 第一步:查询segment fifile:
segment fifile命名规则跟offffset有关,根据segment fifile可以知道它的起始偏移量,因为Segment fifile的命名规则是上一个segment文件最后一条消息的offffset值。所以只要根据offffset 二分查找文件列表,就可以快速定位到具体文件。
比如
第一个segment fifile是00000000000000000000.index表示最开始的文件,起始偏移量(offffset)为0。
第二个是00000000000000091932.index:代表消息量起始偏移量为91933 = 91932 + 1。那么offffset=5000时应该定位00000000000000000000.index
- 第二步通过segment fifile查找message:
通过第一步定位到segment fifile,当offffset=5000时,依次定位到00000000000000000000.index的元数据物理位置和00000000000000000000.log的物理偏移地址,然后再通过00000000000000000000.log顺序查找直到offffset=5000为止。
生产者消息分发策略
kafka在数据生产的时候,有一个数据分发策略。默认的情况使用DefaultPartitioner.class类。
这个类中就定义数据分发的策略。
public interface Partitioner extends Configurable, Closeable {
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
/**
* This is called when partitioner is closed.
*/
public void close();
}
复制代码
默认实现类:org.apache.kafka.clients.producer.internals.DefaultPartitioner
- 如果是用户指定了partition,生产就不会调用DefaultPartitioner.partition()方法
数据分发策略的时候,可以指定数据发往哪个partition。
当ProducerRecord 的构造参数中有partition的时候,就可以发送到对应partition上
public ProducerRecord(String topic, Integer partition, K key, V value) {
this(topic, partition, null, key, value, null);
}
复制代码
- DefaultPartitioner源码
如果指定key,是取决于key的hash值
如果不指定key,轮询分发
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
//获取该topic的分区列表
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
//获得分区的个数
int numPartitions = partitions.size();
//如果key值为null
if (keyBytes == null) {//如果没有指定key,那么就是轮询
//维护一个key为topic的ConcurrentHashMap,并通过CAS操作的方式对value值执行递增+1操作
int nextValue = nextValue(topic);
//获取该topic的可用分区列表
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {//如果可用分区大于0
//执行求余操作,保证消息落在可用分区上
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// 没有可用分区的话,就给出一个不可用分区
return Utils.toPositive(nextValue) % numPartitions;
}
} else {//不过指定了key,key肯定就不为null
// 通过计算key的hash,确定消息分区
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
复制代码
消费者负载均衡机制
同一个分区中的数据,只能被一个消费者组中的一个消费者所消费。例如 P0分区中的数据不能被Consumer Group A中C1与C2同时消费。
消费组:一个消费组中可以包含多个消费者,properties.put(ConsumerConfifig.GROUP_ID_CONFIG,"groupName");如果该消费组有四个
消费者,主题有四个分区,那么每人一个。多个消费组可以重复消费消息。
- 如果有3个Partition, p0/p1/p2,同一个消费组有3个消费者,c0/c1/c2,则为一一对应关系
- 如果有3个Partition, p0/p1/p2,同一个消费组有2个消费者,c0/c1,则其中一个消费者消费2个分区的数据,另一个消费者消费一个分区的数据
- 如果有2个Partition, p0/p1,同一个消费组有3个消费者,c0/c1/c3,则其中有一个消费者空闲,另外2个消费者消费分别各自消费一个分区的数据