一 客户端
1.打开eclipse,新建maven项目(new-->other-->Maven Project-->Artifact Id设为mykafka)。
2.配置Build Path。
右击项目名mykafka-->Build Path-->Configure Buiid Path-->
把原来的JRE干掉(点击JRE System Library [J2SE-1.5]-->remove)-->
添加新的JRE(点击Add Library-->JRE System Library-->选择Execution environment:JavaSE-1.7(jre1.8.0_171)>)
3.添加如下2个依赖。
第一个:kafka-clients
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.0.0</version> </dependency>
也可以到maven仓库( http://mvnrepository.com/)搜索kafka-clients找到此依赖。
将依赖复制到pom.xml中,保存。此时eclipse会自动从maven仓库下载相应jar包。
第二个:slf4j-simple
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-simple --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.7.25</version> </dependency>
下载完成后如下所示
4.将APP.java重命名为SimpleProducer.java。从官网拷贝示例代码,修改如下
1 package cn.test.mykafka; 2 3 import java.util.Properties; 4 5 import org.apache.kafka.clients.producer.KafkaProducer; 6 import org.apache.kafka.clients.producer.Producer; 7 import org.apache.kafka.clients.producer.ProducerRecord; 8 9 /** 10 * 简单生产者 11 * 12 */ 13 14 public class SimpleProducer { 15 16 public static void main(String[] args) { 17 18 //创建配置信息 19 Properties props = new Properties(); 20 props.put("bootstrap.servers", "192.168.42.133:9092"); //指定broker的节点和端口 21 props.put("acks", "all"); 22 props.put("retries", 0); 23 props.put("batch.size", 16384); 24 props.put("linger.ms", 1); 25 props.put("buffer.memory", 33554432); 26 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 27 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 28 29 //创建一个生产者 30 Producer<String, String> producer = new KafkaProducer<>(props); 31 32 //发送消息 33 ProducerRecord<String, String> msg = new ProducerRecord<String, String>("test-topic","hello world from win7"); 34 producer.send(msg); 35 //for (int i = 0; i < 10; i++) 36 // producer.send(new ProducerRecord<String, String>("test-topic", Integer.toString(i), Integer.toString(i))); //topic,key(决定分区,非必填),value 37 38 System.out.println("over"); 39 producer.close(); 40 } 41 }
二 服务器端
1.搭建单节点单broker的kafka。具体步骤看这里。
2.启动服务器
启动zookeeper
[root@hadoop kafka]# zookeeper-server-start.sh config/zookeeper.properties [root@hadoop kafka]# jps #打开另一个终端查看是否启动成功 3892 Jps 3566 QuorumPeerMain
启动kafka
[root@hadoop kafka]# kafka-server-start.sh config/server.properties
3.创建topic
#创建一个分区,一个副本的主题 #副本数无法修改,只能在创建主题时指定 [root@hadoop kafka]# kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test-topic Created topic "test-topic". [root@hadoop kafka]# kafka-topics.sh --list --zookeeper localhost:2181 #列出主题 test-topic
4.启动消费者
[root@hadoop kafka]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --from-beginning
三 测试发送消息
1.在eclipse运行代码,发送消息。
2.查看消费者是否接收到消息。
如上消费者接收到消息,说明消息发送成功。
四 遇到的问题
报错1:slf4j类加载失败。
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
解决方法:在pom文件中添加slf4j-simple依赖,如上文所示。
由于我们虚拟机安装的是kafka_2.11-2.0.0.tgz版本,所以到maven仓库找到其依赖之后,复制粘贴到pom.xml中
报错2:java.io.IOException: Can't resolve address: hadoop:9092
原因:kafka 连接原理:首先连接 192.168.42.133:9092,再连接返回的host.name = hadoop,最后继续连接advertised.host.name=hadoop。
解决方法:添加window解析。C:\Windows\System32\drivers\etc\hosts文件添加92.168.42.133 hadoop。用cmd命令行ping hadoop试试如果可以ping通即可。