1. Kafka 简介
Kafka 是一个高吞吐量、低延迟分布式的消息队列系统。kafka 每秒可以处理几十万条消息,它的延迟最低只有几毫秒。Kafka 也是一个高度可扩展的消息系统,它在LinkedIn 的中央数据管道总扮演着十分重要的角色。
1.1 Kafka 的主要设计目标
Kafka 作为一种分布式的、基于发布/订阅的消息系统,其主要设计目标如下:
- 以时间复杂度为O(1)的方式提供消息持久能力,即使对TB级别以上的数据也能保证常数时间的访问的性能;
- 高吞吐率,即使在非常廉价的商用机上也能做到单机支持每秒100K条消息的传输;
- 支持 Kafka Server 间的消息分区,及分布式消费,同时保证每个分区内的消息顺序传输;
- 支持离线数据处理和实时数据处理;
- 支持在线水平处理。
1.2 消息队列的特点
解耦、冗余、扩展性、灵活性和峰值处理能力、可恢复性、顺序保证、缓冲、异步通信。
2. Kafka 的架构
2.1 Kafka 的基本组成
在 Kafka 集群中生产者将消息发送给以Topic命名的消息队列Queue中,消费者订阅发往以某个Topic命名的消息队列Queue中的消息。其中Kafka集群由若干个Broker组成,Topic由若干个Partition组成,每个Partition里面的消息通过Offset来获取。
- Broker:一台Kafka服务器就是一个Broker,一个集群有多个Broker组成,一个Broker可以容纳多个Topic,Broker 于Broker之间没有Master和Standby的概念,它们之间的地位基本是平等的;
- Topic:每条发送到Kafka集群的消息都属于某个主题,这个主题就称为Topic。物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存在一个或多个Broker上,但是用户只需制定消息的主题Topic即可生产或消费数据而不需要关心数据存放在何处;
- Partition:为了实现可扩展性,一个非常大的Topic可以被分为多个Partition,从而分布在多台Broker上。Partition中的每条消息都会被分配一个自增Id(offset)。Kafka只保证一个Partition中的顺序将消息发送给消费者,但不保证单个Topic中的多个Partition之间的顺序;
- Offset:消息在Topic的Partition中的位置,同一个Partition中的消息随着消息的写入,其对应的Offset也自增;
- Replica:副本;Topic的Partition含有N个Replica,N为副本因子。其中一个Replica为Leader,其他都为Follower,Leader处理Partition可以向一个Topic发布一些消息;
- Consumer:消息消费者,即向指定的Topic获取消息,根据指定Topic的分区索引及其对应分区上的消息偏移量来过去消息;
- ConsumerGroup:消费者组;每个消费者属于一个消费者组,如果所有的消费者都具有相同的消费者组,那么消息将会在该消费者之间进行负载均衡。也就是说一个Partition中的消息只会被相同ConsumerGroup中的某个Consumer消费,每个ConsumerGroup消息消费是互相独立的;
- Zookeeper:存放Kafka集群相关元数据的组件。在Zookeeper集群中会保存Topic的状态信息、Broker的状态信息、消费者的消费信息。
3. Kafka 的使用场景
- 日志收集:一个公司可以用Kafka可以收集各种服务的log,通过 kafka 以统一接口服务的方式开放给各种 consumer,例如hadoop、Hbase、Solr 等。
- 消息系统:解耦和生产者和消费者、缓存消息等。
- 用户活动跟踪:Kafka 经常被用来记录 web 用户或者 app 用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到 kafka 的 topic 中,然后订阅者通过订阅这些topic 来做实时的监控分析,或者装载到 hadoop、数据仓库中做离线分析和挖掘。
- 运营指标:Kafka 也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。
- 流式处理:比如 spark streaming 和 storm。
4. Kafka 集群部署
这里准备了Zookeeper和Kafka集群的三台服务器节点:node01、node02、node03;
1)、kafka 是一个分布式消息队列,需要依赖 ZooKeeper,请先安装好 ZK集群;
2)、下载压缩包(官网地址:http://kafka.apache.org/downloads.html),上传至服务器并解压:
tar zxvf kafka_2.10-0.9.0.1.tgz -C /home/
3)、修改配置文件,根目录config/server.properties:
broker.id=0;
zookeeper.connect=node01,node02,node03
核心配置参数说明:
-
broker.id: broker 集群中唯一标识 id,0、1、2、3 依次增长(broker即 Kafka 集群中的一台服务器)
注:当前 Kafka 集群共三台节点,分别为:node1、node2、node3。对应的 broker.id 分别为 0、1、2。
-
zookeeper.connect: ZK集群地址列表。
4)、将当前 node01 服务器上的 Kafka 目录同步到其他 node02、node03 服务器上。
5)、启动 Kafka 集群
-
事先启动ZK集群;
-
分别在三台服务器上执行以下命令启动(在kafka安装根目录下):
bin/kafka-server-start.sh config/server.properties
6)、测试
- 创建 topic:
bin/kafka-topics.sh --zookeeper node01:2181,node02:2181,node03:2181 --create --replication-factor 2 --partitions 3 --topic mytest
参数说明:
--replication-factor :指定每个分区的复制因子个数,默认 1 个;
--partitions :指定当前创建的 kafka 分区数量,默认为 1 个;
--topic :指定新建 topic 的名称;
- 查看 topic 列表:
bin/kafka-topics.sh --zookeeper node01:2181,node02:2181,node03:2181 --list
- 查看某个topic 描述:
bin/kafka-topics.sh --zookeeper node01:2181,node02:2181,node03:2181 --describe --topic mytest
如:
[root@node01 kafka_2.10-0.9.0.1]# bin/kafka-topics.sh --zookeeper node01:2181,node02:2181,node03:2181 --describe --topic mytest
Topic:mytest PartitionCount:3 ReplicationFactor:2 Configs:
Topic: mytest Partition: 0 Leader: 2 Replicas: 2,0 Isr: 2,0
Topic: mytest Partition: 1 Leader: 1 Replicas: 0,1 Isr: 1
Topic: mytest Partition: 2 Leader: 1 Replicas: 1,2 Isr: 1,2
- 创建生产者:
bin/kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic mytest
- 创建消费者:
bin/kafka-console-consumer.sh --zookeeper node01:2181,node02:2181,node03:2181 --from-beginning --topic mytest
生产者终端命令行键入字符,消费者终端将打印收到的消息。
5. Kafka 的 Java API
- 创建消息生产者:
/**
* 向kafka中生产数据
*
* @author root
*/
public class MyProducer extends Thread {
private String topic; //发送给Kafka的数据,topic
private Producer<Integer, String> producerForKafka;
public MyProducer(String topic) {
this.topic = topic;
Properties conf = new Properties();
conf.put("metadata.broker.list", "node01:9092,node02:9092,node03:9092");
conf.put("serializer.class", StringEncoder.class.getName());
conf.put("acks",1);
producerForKafka = new Producer<Integer, String>(new ProducerConfig(conf));
}
@Override
public void run() {
int counter = 0;
while (true) {
counter++;
String value = "shsxt" + counter;
KeyedMessage<Integer, String> message = new KeyedMessage<>(topic, value);
producerForKafka.send(message);
System.out.println(value + " - -- -- --- -- - -- - -");
//hash partitioner 当有key时,则默认通过key 取hash后 ,对partition_number 取余数
// producerForKafka.send(new KeyedMessage<Integer, String>(topic,22,userLog));
// 每2条数据暂停2秒
if (0 == counter % 2) {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) {
new MyProducer("testkafka").start();
}
}
- 创建消息消费者
public class MyConsumer extends Thread {
private final ConsumerConnector consumer;
private final String topic;
public MyConsumer(String topic) {
consumer = Consumer
.createJavaConsumerConnector(createConsumerConfig());
this.topic = topic;
}
private static ConsumerConfig createConsumerConfig() {
Properties props = new Properties();
props.put("zookeeper.connect", "node01:2181,node02:node03:2181");
props.put("group.id", "group01");
props.put("zookeeper.session.timeout.ms", "400");
props.put("auto.commit.interval.ms", "100");
props.put("auto.offset.reset","smallest");
// props.put("auto.commit.enable","false"); // 关闭自动提交,开启手动提交
return new ConsumerConfig(props);
}
// push消费方式,服务端推送过来。主动方式是pull
public void run() {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
//mytopic2
topicCountMap.put(topic, 1); // 描述读取哪个topic,需要几个线程读
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer
.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> list = consumerMap.get(topic); // 每个线程对应于一个KafkaStream
KafkaStream stream = list.get(0);
ConsumerIterator<byte[], byte[]> it = stream.iterator();
System.out.println("start................");
while (it.hasNext()){
String data = new String(it.next().message());
System.out.println("开始处理数据 ...:"+ data);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
// consumer.commitOffsets();//手动提交
}
}
public static void main(String[] args) {
MyConsumer consumerThread = new MyConsumer("testkafka");
consumerThread.start();
}
}
代码优化
上述代码存在些问题:
- 若设置自动提交的延时过大如:
props.put("auto.commit.interval.ms", "6000");
会出现提交不及时而重复消费消息的后果; - 若处理消息的时间过长,会出现提交数据后为完成数据处理的消息丢失现象;
处理上述现象可通过设置手动提交解决:
props.put("auto.commit.enable","false"); // 关闭自动提交,开启手动提交
consumer.commitOffsets(); //手动提交
6. Flume & Kafka
6.1 Flume 与 Kafka整合
- Flume 配置文件 fk.conf 内容如下:
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = node01
a1.sources.r1.port = 41414
# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = testflume
a1.sinks.k1.brokerList = node1:9092,node2:9092,node3:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 20
a1.sinks.k1.channel = c1
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000000
a1.channels.c1.transactionCapacity = 10000
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
-
启动 Kafka 集群
bin/kafka-server-start.sh config/server.properties
-
启动 Flume 集群(注意fk.conf文件存放位置)
bin/flume-ng agent -n a1 -c conf -f conf/fk.conf -Dflume.root.logger=DEBUG,console
- 创建 topic(也可以不去创建,系统默认创建副本数为1、分区数为1的topic)
bin/kafka-topics.sh --zookeeper node01:2181,node02:2181,node03:2181 --create --replication-factor 2 --partitions 3 --topic testflume
- 启动消费者
bin/kafka-console-consumer.sh --zookeeper node01:2181,node02:2181,node03:2181 --from-beginning --topic testflume
- Flume 中 source 类型为 AVRO 类型,此时通过 Java 发送 rpc 请求,测试数据是否传入 Kafka。其中,Java 发送 Rpc 请求 Flume 代码示例如下:
public class RpcClientDemo {
public static void main(String[] args) throws InterruptedException {
MyRpcClientFacade client = new MyRpcClientFacade();
// Initialize client with the remote Flume agent's host and port
client.init("node01", 41414);
// Send 10 events to the remote Flume agent. That agent should be
// configured to listen with an AvroSource.
for (int i =0; i < 300; i++) {
int number = new Random().nextInt(3);
String sampleData ;
if(number == 0){
sampleData = "Hello Flume! ERROR " + i;
}else if(number==1){
sampleData = "Hello Flume! INFO " + i;
}else {
sampleData = "Hello Flume! WARNING " + i;
}
client.sendDataToFlume(sampleData);
System.out.println("发送数据:" + sampleData);
Thread.sleep(500);
}
client.cleanUp();
}
}
class MyRpcClientFacade {
private RpcClient client;
private String hostname;
private int port;
public void init(String hostname, int port) {
// Setup the RPC connection
this.hostname = hostname;
this.port = port;
this.client = RpcClientFactory.getDefaultInstance(hostname, port);
// Use the following method to create a thrift client (instead of the
// above line):
// this.client = RpcClientFactory.getThriftInstance(hostname, port);
}
public void sendDataToFlume(String data) {
// Create a Flume Event object that encapsulates the sample data
Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));
// Send the event
try {
client.append(event);
} catch (EventDeliveryException e) {
// clean up and recreate the client
client.close();
client = null;
client = RpcClientFactory.getDefaultInstance(hostname, port);
// Use the following method to create a thrift client (instead of
// the above line):
// this.client = RpcClientFactory.getThriftInstance(hostname, port);
}
}
public void cleanUp() {
// Close the RPC connection
client.close();
}
}
- 运行上述代码,即可完成Flume于Kafka整合的简单流程。