kafka5 编写简单生产者

一 客户端

1.打开eclipse,新建maven项目(new-->other-->Maven Project-->Artifact Id设为mykafka)。

2.配置Build Path。

右击项目名mykafka-->Build Path-->Configure Buiid Path-->
把原来的JRE干掉(点击JRE System Library [J2SE-1.5]-->remove)-->
添加新的JRE(点击Add Library-->JRE System Library-->选择Execution environment:JavaSE-1.7(jre1.8.0_171)>)

3.添加如下2个依赖。

第一个:kafka-clients

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.0.0</version>
</dependency>

也可以到maven仓库( http://mvnrepository.com/)搜索kafka-clients找到此依赖。

将依赖复制到pom.xml中,保存。此时eclipse会自动从maven仓库下载相应jar包。

第二个:slf4j-simple

扫描二维码关注公众号,回复: 4041861 查看本文章
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-simple -->
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-simple</artifactId>
    <version>1.7.25</version>
</dependency>

下载完成后如下所示

4.将APP.java重命名为SimpleProducer.java。从官网拷贝示例代码,修改如下

 1 package cn.test.mykafka;
 2 
 3 import java.util.Properties;
 4 
 5 import org.apache.kafka.clients.producer.KafkaProducer;
 6 import org.apache.kafka.clients.producer.Producer;
 7 import org.apache.kafka.clients.producer.ProducerRecord;
 8 
 9 /**
10  * 简单生产者
11  *
12  */
13 
14 public class SimpleProducer {
15 
16     public static void main(String[] args) {
17         
18          //创建配置信息
19          Properties props = new Properties();
20          props.put("bootstrap.servers", "192.168.42.133:9092"); //指定broker的节点和端口
21          props.put("acks", "all");
22          props.put("retries", 0);
23          props.put("batch.size", 16384);
24          props.put("linger.ms", 1);
25          props.put("buffer.memory", 33554432);
26          props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
27          props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
28 
29          //创建一个生产者
30          Producer<String, String> producer = new KafkaProducer<>(props);
31          
32          //发送消息
33          ProducerRecord<String, String> msg = new ProducerRecord<String, String>("test-topic","hello world from win7");
34          producer.send(msg);
35          //for (int i = 0; i < 10; i++)
36          //   producer.send(new ProducerRecord<String, String>("test-topic", Integer.toString(i), Integer.toString(i))); //topic,key(决定分区,非必填),value 
37             
38          System.out.println("over");
39          producer.close();
40     }
41 }
SimpleProducer.java

二 服务器端

1.搭建单节点单broker的kafka。具体步骤看这里

2.启动服务器

启动zookeeper

[root@hadoop kafka]# zookeeper-server-start.sh config/zookeeper.properties
[root@hadoop kafka]# jps #打开另一个终端查看是否启动成功
3892 Jps
3566 QuorumPeerMain

启动kafka

[root@hadoop kafka]# kafka-server-start.sh config/server.properties 

3.创建topic

#创建一个分区,一个副本的主题
#副本数无法修改,只能在创建主题时指定
[root@hadoop kafka]# kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test-topic
Created topic "test-topic".  

[root@hadoop kafka]# kafka-topics.sh --list --zookeeper localhost:2181 #列出主题
test-topic

4.启动消费者

[root@hadoop kafka]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --from-beginning 

三 测试发送消息

1.在eclipse运行代码,发送消息。

2.查看消费者是否接收到消息。

如上消费者接收到消息,说明消息发送成功。

四 遇到的问题

报错1:slf4j类加载失败。

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.

解决方法:在pom文件中添加slf4j-simple依赖,如上文所示。 

由于我们虚拟机安装的是kafka_2.11-2.0.0.tgz版本,所以到maven仓库找到其依赖之后,复制粘贴到pom.xml中

报错2:java.io.IOException: Can't resolve address: hadoop:9092

原因:kafka 连接原理:首先连接 192.168.42.133:9092,再连接返回的host.name = hadoop,最后继续连接advertised.host.name=hadoop。

解决方法:添加window解析。C:\Windows\System32\drivers\etc\hosts文件添加92.168.42.133 hadoop。用cmd命令行ping hadoop试试如果可以ping通即可。

猜你喜欢

转载自www.cnblogs.com/zhengna/p/9453991.html
今日推荐