kafka背景
Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源 项目。
kafka的使用场景
- 日志收集:一个公司可以用Kafka可以收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer,例如hadoop、Hbase、Solr等。
- 消息系统:解耦和生产者和消费者、缓存消息等。
- 用户活动跟踪:Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到hadoop、数据仓库中做离线分析和挖掘。
- 运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。
- 流式处理:比如spark streaming和storm
- 事件源
kafka的特性
- 高吞吐量、低延迟:kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒,每个topic可以分多个partition, consumer group 对partition进行consume操作。
- 可扩展性:kafka集群支持热扩展
- 持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失
- 容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)
- 高并发:支持数千个客户端同时读写
kafka的相关名词
- Broker:Kafka节点,一个Kafka节点就是一个broker,多个broker可以组成一个Kafka集群
- Topic:一类消息,消息存放的目录即主题,例如page view日志、click日志等都可以以topic的形式存在,Kafka集群能够同时负责多个topic的分发
- massage: Kafka中最基本的传递对象。
- Partition:topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列
- Segment:partition物理上由多个segment组成,每个Segment存着message信息
- Producer : 生产者,生产message发送到topic
- Consumer : 消费者,订阅topic并消费message, consumer作为一个线程来消费
- Consumer Group:消费者组,一个Consumer Group包含多个consumer
- Offset:偏移量,理解为消息partition中的索引即可
kafka原理
-
首先看到kafka消息传输过程
Producer为生产者,向Kafka集群发送消息,在发送消息之前,会对消息进行分类,即Topic,上图展示了两个producer发送了分类为topic1的消息,另外一个发送了topic2的消息。
Topic为发布订阅模式,通过对消息进行分类,因此消费者可以只关注自己需要的Topic中的消息
Consumer为消费者,消费者通过与kafka集群建立长连接的方式,不断地从集群中拉取消息,然后可以对这些消息进行处理。
从上图中就可以看出同一个Topic下的消费者和生产者的数量可以不是对应的 -
kafka服务器集群消息存储策略
谈起kafka的存储,那么就不得不说分区的概念,即partitions,创建一个topic时,同时可以指定分区的数目,很大程度的提高了程序的吞吐量,kafka在接收到生产者的消息后,会将消息根据均衡策略进行均摊存储将消息存储到不同的分区中
在每个分区中,消息以顺序存储,最晚接收的的消息会最后被消费。 -
kafka与生产者交互的几种方式
生产者在向kafka集群发送消息的时候,可以通过指定分区来发送到指定的分区中
也可以通过指定均衡策略来将消息发送到不同的分区中
如果不指定,就会采用默认的随机均衡策略,将消息随机的存储到不同的分区中 -
kafka与消费者进行交互
- 在消费者消费消息时,kafka使用offset来记录当前消费的位置
- 在kafka的设计中,可以有多个不同的group来同时消费同一个topic下的消息,如图,我们有两个不同的group同时消费,他们的的消费的记录位置offset各不项目,不互相干扰。
- 对于一个group而言,消费者的数量不应该多余分区的数量,因为在一个group中,每个分区至多只能绑定到一个消费者上,即 一个消费者可以消费多个分区,一个分区只能给一个消费者消费
- 因此,若一个group中的消费者数量大于分区数量的话,多余的消费者将不会收到任何消息。
高可用kafka集群搭建与使用
1.下载
你可以在kafka官网 http://kafka.apache.org/downloads 下载到最新的kafka安装包,选择下载二进制版本的tgz文件,根据网络状态可能需要fq
2.安装
如果有不熟悉linux系统的同学建议先看下其他博客熟悉一下linux 在来搭建kafka
Kafka是使用scala编写的运行与jvm虚拟机上的程序,虽然也可以在windows上使用,但是kafka基本上是运行在linux服务器上,因此我搭建的也是在linux系统上。
-
首先确保你的集群服务器已经安装了jdk环境,因为kafka需要java运行环境和zookeeper jdk如何安装我就不多说了
-
每台服务器节点上安装zookeeper
- 进入到/user/load目录 下载并且安装zookeeper安装包
wget https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/zookeeper-3.4.10/zookeeper-3.4.10.tar.gz - 解压安装包
tar -zxvf zookeeper-3.4.10.tar.gz - 重命名
重命名: mv zookeeper-3.4.10 zookeeper
- 进入到/user/load目录 下载并且安装zookeeper安装包
-
搭建Zookeeper集群环境
- cd /usr/local/zookeeper/conf
- mv zoo_sample.cfg zoo.cfg
- 修改conf: vi zoo.cfg 修改两处
(1) dataDir=/usr/local/zookeeper/data(注意同时在zookeeper创建data目录)
(2)最后面添加( 更改为自己的集群服务器的真实IP地址 )
server.0=192.168.137.3:2888:3888
server.1=192.168.137.4:2888:3888
server.2=192.168.137.5:2888:3888
-
创建服务器标识
- 进入到zookeper目录创建data文件夹
mkdir data - 进入data目录创建myid文件存放服务器标识为0
这里的标识 跟 第三步 server.0是同一台服务器 如果是server.1服务器 标识就为1)
vim myid (如果没有vim命令 可以使用vi命令 我使用vim是因为编辑的是彩色的更容易看)
安装vim命令: yum vim就ok了
- 进入到zookeper目录创建data文件夹
-
设置zookeeper环境变量
vim /etc/profile
// 使修改完的环境变量生效
source /etc/profile
到此我们的zookeeper集群环境就搭建完了 -
启动zookeeper
启动之前一定要关闭防火墙:systemctl stop firewalld.service- 启动zookeeper:
路径: /usr/local/zookeeper/bin - 执行: zkServer.sh start
(注意这里3台机器都要进行启动) - 状态: zkServer.sh status(在三个节点上检验zk的mode,一个leader和俩个follower)
- 启动zookeeper:
-
安装kafka集群环境
集群服务器都是按照以下进行操作- 通过命令下载kafka
cd /usr/local
wget http://mirror.bit.edu.cn/apache/kafka/1.0.0/kafka_2.11-1.0.0.tgz - 解压并重命名
tar -zxvf kafka_2.11-1.0.0.tgz
mv kafka_2.12-0.11.0.0 kafka - 修改配置文件
vim kafka/config/server.properties
需要修改如下内容:
broker.id=0 (这里的标识0 跟 同服务器的zookeeper 的myid 标识是一样的)
listeners=PLAINTEXT://192.168.131.130:9092
zookeeper.connect=192.168.137.3:2181,192.168.137.4:2181,192.168.137.5:2181 (更改为自己的真实集群IP地址) - 在系统环境中配置kafka的路径
vim /etc/profile
// 使修改完的环境变量生效
source /etc/profile
到此kafka集群环境已经全部搭建完成接下来就开始测试一番吧!!!
- 通过命令下载kafka
3.测试kafka集群环境
- 启动3台虚拟机的zookeeper程序
/usr/local/zookeeper/bin/zkServer.sh start - 开启成功后查看zookeeper集群的状态
/usr/local/zookeeper/bin/zkServer.sh status
出现Mode:follower或是Mode:leader则代表成功
- 然后启动3台虚拟机的kafka程序(cd /usr/local/kafka)
./bin/kafka-server-start.sh -daemon config/server.properties - 查看kafka是否启动成功
ps -ef | grep kafka 如下图这样就代表成功了
- 在其中一台虚拟机(192.168.137.3) 让我们创建一个名为“test”的topic,它有一个分区和一个副本:
–partitions 代表在topic中创建的分区数量
bin/kafka-topics.sh --create --zookeeper 192.168.137.3:2181 --replication-factor 1 --partitions 1 --topic test - 现在我们可以运行list(列表)命令来查看这个topic:
bin/kafka-topics.sh --list --zookeeper 192.168.137.3:2181 - 创建完topic后 现在发送一条消息给kafka
bin/kafka-console-producer.sh --broker-list 192.168.137.3:9092 --topic test
This is a message
This is another message - 启动一个consumer
Kafka 还有一个命令行consumer(消费者),将消息转储到标准输出。
bin/kafka-console-consumer.sh --bootstrap-server 192.168.137.3:9092 --topic test --from-beginning
This is a message
This is another message
如果您将上述命令在不同的终端中运行,那么现在就可以将消息输入到生产者终端中,并将它们在消费终端中显示出来。
所有的命令行工具都有其他选项;运行不带任何参数的命令将显示更加详细的使用信息。