1.Kafka是什么?
它是一个分布式的消息队列,消息队列:生产者、消费者的功能。它提供了类似于JMS 的特性,但是在设计实现上完全不同,此外它并不是JMS规范的实现。
2. Kafka的作用:
这里我们简单来说,它的主要作用就是解耦(降低系统之间的耦合度)、异构(异步架构),与并行。如果不同子系统之间的依赖太高的情况下,如果发生一些变化就需要更改整个系统,甚至系统的整体架构也可能要发生变化,Kafka就是来解决这问题的,用它来作为消息中间键(子系统之间的消息处理交换者)
普通情况:子系统(消息生产者)-------子系统(消息消费者)
加入Kafka的情况:子系统(消息生产者)-------kafka集群----------子系统(消息消费者)
3.Kafka的核心组件:
Topic :消息根据 Topic 进行归类
Producer:发送消息者
Consumer:消息接受者
broker:每个 kafka 实例(server)
Zookeeper:依赖集群保存 meta 信息。
4.Kafka的单机搭建(kafka很依赖zookeeper,所以在搭建kafka之前一定要有zk的环境):
4.1下载地址:http://kafka.apache.org/downloads.html 当前版本是1.0。因为kafka是用Scala语言编写的,会根据Scala语言的版本变更,目前有两个主流版本kafka2.11-1.0与kafka2.12-1.0,推荐使用2.11的。
4.2 解压安装。
4.3 server.properties配置文件修改需要修改三个配置
4.3.1 必须要只要一个brokerid,并且它必须是唯一的。
broker.id=0(可以自己设定值)
4.3.2 日志数据文件存储的路径(如不存在,需要手动创建该目录, mkdir -p /export/data/kafka/)
log.dirs=/export/data/kafka
4.3.3 # ZooKeeper的配置,本地模式下指向到本地的ZooKeeper服务即可
zookeeper.connect= (zk leader所在服务器地址):2181
5.Kafka启动
启动命令:kafka-server-start.sh -daemon +(server.properties所在路径)
6.kafka-manager:
Kafka Manager 由 yahoo 公司开发,该工具可以方便查看集群主题分布情况,同时支持对多个集群的管理、分区平衡以及创建主题等操作。使我们操作观察更方便(UI显示)这里不做多的说明。
7.Kafka集群搭建:
kafka集群的搭建是非常简单的,只需要将上面的单机版的kafka分发的其他机器,并且将ZooKeeper信息修改成集群的配置以及设置不同的broker值即可。
8.kafka的操作
Kafka的操作有两种方式,一种shell命令行和java操作api的方式(常用这一种)
8.1命令行操作:
8.1.1创建topic-------kafka-topics.sh --create --zookeeper (服务器地址):2181 --replication-factor 1 --partitions 1 --topic my-kafka-topic
参数说明:
- zookeeper:参数是必传参数,用于配置 Kafka 集群与 ZooKeeper 连接地址。至少写一个。
- partitions:参数用于设置主题分区数,该配置为必传参数。
- replication-factor:参数用来设置主题副本数 ,该配置也是必传参数。
- topic:指定topic的名称。
8.1.2 查看topic列表------kafka-topics.sh --list --zookeeper (服务器地址):2181
8.1.3 删除topic---kafka-topics.sh --delete --zookeeper (服务器地址):2181 --topic my-kafka-topic
通过kafka-topics.sh执行删除动作,需要在server.properties文件中配置 delete.topic.enable=true,该配置默认为 false。
否则执行该脚本并未真正删除主题,而是在 ZooKeeper 的/admin/delete_topics 目录下创建一个 与待删除主题同名的节点 ,将该主题标记为删除状态 。
8.1.4生产者发送信息-------kafka-console-producer.sh --broker-list (服务器地址):9092 --topic my-kafka-topic
8.1.5消费者接受的操作----kafka-console-consumer.sh --bootstrap-server (服务器地址):9092 --topic my-kafka-topic
8.2通过java Api操作
8.2.1创建java maven工程,导入依赖,依赖如下:
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
</dependencies>
8.2.2 创建Topic(举例)
public void testCreateTopic() {
ZkUtils zkUtils = null;
try {
//参数:zookeeper的地址,session超时时间,连接超时时间,是否启用zookeeper安全机制
zkUtils = ZkUtils.apply("node01:2181", 30000, 3000, JaasUtils.isZkSecurityEnabled());
String topicName = "my-kafka-topic-test1";
if (!AdminUtils.topicExists(zkUtils, topicName)) {
//参数:zkUtils,topic名称,partition数量,副本数量,参数,机架感知模式
AdminUtils.createTopic(zkUtils, topicName, 1, 1, new Properties(), AdminUtils.createTopic$default$6());
System.out.println(topicName + " 创建成功!");
} else {
System.out.println(topicName + " 已存在!");
}
} finally {
if (null != zkUtils) {
zkUtils.close();
}
}
8.2.3 删除topic(举例)
public void testDeleteTopic() {
ZkUtils zkUtils = null;
try {
//参数:zookeeper的地址,session超时时间,连接超时时间,是否启用zookeeper安全机制
zkUtils = ZkUtils.apply("node01:2181", 30000, 3000, JaasUtils.isZkSecurityEnabled());
String topicName = "my-kafka-topic-test1";
if (AdminUtils.topicExists(zkUtils, topicName)) {
//参数:zkUtils,topic名称,partition数量,副本数量,参数,机架感知模式
AdminUtils.deleteTopic(zkUtils, topicName);
System.out.println(topicName + " 删除成功!");
} else {
System.out.println(topicName + " 不已存在!");
}
} finally {
if (null != zkUtils) {
zkUtils.close();
}
}
8.2.4 生产者的操作:
public void testProducer() throws InterruptedException {
Properties config = new Properties();
// 设置kafka服务列表,多个用逗号分隔
config.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node01:9092,node02:9092");
// 设置序列化消息 Key 的类
config.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 设置序列化消息 value 的类
config.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 初始化
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(config);
for (int i = 0; i < 100 ; i++) {
ProducerRecord record = new ProducerRecord("my-kafka-topic","data-" + i);
// 发送消息
kafkaProducer.send(record);
System.out.println("发送消息 --> " + i);
Thread.sleep(100);
}
kafkaProducer.close();
}
8.2.5 消费者的操作(举例)
public void testConsumer() {
Properties config = new Properties();
// 设置kafka服务列表,多个用逗号分隔
config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node01:9092,node02:9092");
// 设置消费者分组id
config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
// 设置客户端id
config.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "my-test-client");
// 设置移量自动提交
config.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
// 设置偏移量提交时间间隔
config.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
// 设置序反列化消息 Key 的类
config.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 设置序反列化消息 value 的类
config.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(config);
// 订阅topic
kafkaConsumer.subscribe(Arrays.asList("my-kafka-topic"));
while (true) { // 使用死循环不断的拉取数据(这里用spring kafka可以优化,我们以后再说这个技术点)
ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
String value = record.value();
long offset = record.offset();
System.out.println("value = " + value + ", offset = " + offset);
}
}
}