我们先创建一个topic,然后启动生产者和消费者,进行消息通信,然后在使用Kafka API编程的方式实现,笔者使用的ZK和Kafka都是单节点,你也可以使用集群方式。
启动Zookeeper
zkServer.sh start
启动Kafka
kafka-server-start.sh $KAFKA_HOME/config/server.properties
创建topic
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kafka_api
查看topic详细信息
[hadoop@Master ~]$ kafka-topics.sh --describe --zookeeper localhost:2181 --topic kafka_api
Topic:kafka_api PartitionCount:1 ReplicationFactor:1 Configs:
Topic: kafka_api Partition: 0 Leader: 0 Replicas: 0 Isr: 0
启动生产者和消费者,测试消息通信
# 生产者
kafka-console-producer.sh --broker-list localhost:9092 --topic kafka_api
# 消费者
kafka-console-consumer.sh --zookeeper localhost:2181 --topic kafka_api
Java API 编程实现
1.创建maven项目,pom.xml中引入kafka依赖
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.11.0.0</version>
</dependency>
</dependencies>
2.创建KafkaProperties类,配置Kafka相关属性
package com.bigdata.kafka;
/**
* Kafka 相关属性配置类
*/
public interface KafkaProperties {
// zookeeper连接,与server.properties中的zookeeper.connect属性一致,多个用逗号隔开,例如:zk01:2181,zk02:2181
public static final String ZK = "Master:2181";
// 如果是多个blocker,用逗号分隔即可,例如:kafka01::9092,kafka02:9093
public static final String BLOCK_LIST = "Master:9092";
// 主题
public static final String TOPIC = "kafka_api";
}
3.Kafka Producer API 开发
生产者API中常用的类如下
Producer:生产者
ProducerConfig:生产者对应的配置
KeyedMessage:封装的消息对象
创建KafkaProducer类,代码如下
package com.bigdata.kafka;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import java.util.Properties;
/**
* Kafka 生产者
*/
public class KafkaProducer extends Thread {
private String topic;
private Producer<Integer, String> producer;
public KafkaProducer(String topic) {
this.topic = topic;
Properties properties = new Properties();
properties.put("metadata.broker.list", KafkaProperties.BLOCK_LIST);
properties.put("serializer.class", "kafka.serializer.StringEncoder");
producer = new Producer<Integer, String>(new ProducerConfig(properties));
}
@Override
public void run() {
int messageNo = 1;
while(true) {
String message = "message_" + messageNo;
System.out.println("Send:" + message);
producer.send(new KeyedMessage<Integer, String>(topic, message));
messageNo ++;
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
new KafkaProducer(KafkaProperties.TOPIC).start();
}
}
运行上述代码,在控制台中使用命令启动一个消费者,观察控制台是否能接收到消息
4.Kafka Consumer API 开发
消费者API中常用的类如下
Consumer:消费者
ConsumerConnector:消费者连接器
ConsumerConfig:消费者对应的配置
KafkaStream:数据流
创建KafkaConsumer类,代码如下
package com.bigdata.kafka;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
/**
* Kafuka 消费者
*/
public class KafkaConsumer extends Thread{
private String topic;
public KafkaConsumer(String topic) {
this.topic = topic;
}
private ConsumerConnector createConsumer() {
Properties properties = new Properties();
properties.setProperty("zookeeper.connect", KafkaProperties.ZK);
properties.setProperty("group.id", "testGroup");
return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
}
@Override
public void run() {
// 创建Consumer
ConsumerConnector consumer = createConsumer();
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, 1);
Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer.createMessageStreams(topicCountMap);
// 获取每次接受到的数据
KafkaStream<byte[], byte[]> stream = messageStreams.get(topic).get(0);
ConsumerIterator<byte[], byte[]> iterator = stream.iterator();
// 不停地从stream中读取最新接收到的数据
while(iterator.hasNext()){
String message = String.valueOf(iterator.next().message());
System.out.println("message:" + message);
}
}
public static void main(String[] args) {
new KafkaConsumer(KafkaProperties.TOPIC).start();
}
}
运行生产者及消费者代码,观察控制台
生产者控制台(部分结果):
Send:message_5
Send:message_6
Send:message_7
Send:message_8
Send:message_9
Send:message_10
消费者控制台(部分结果):只接收最新的数据
message:message_7
message:message_8
message:message_9
message:message_10