windows下kafka简单使用附java代码

1.kafka下载

下载地址:http://kafka.apache.org/downloads 选择二进制版本

2.启动zookeeper

下载后解压到本地,不用修改配置

在kafka目录下打开第一个cmd命令(按住shift 在文件夹空白处右键 直接打开命令行)

(先输入 title zookeeper 可以把cmd窗口名称改为zookeeper)

输入 bin\windows\zookeeper-server-start.bat config\zookeeper.properties

zk启动成功

3.启动kafka服务

在kafka目录下新打开第二个cmd窗口

输入 bin\windows\kafka-server-start.bat config\server.properties

kafka 服务器启动成功

4.创建topic

打开第三个cmd窗口(执行完毕可以关闭)

输入  bin\windows\kafka-topics.bat --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic test

在本地 zk 下新建一个名为 test 的 topic

(cmd下输入 bin\windows\kafka-topics.bat -list -zookeeper 127.0.0.1:2181  可列出topic)

5.启动生产者

打开第四个cmd窗口

输入 bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test

生产者启动成功

6.启动消费者

打开第五个cmd窗口

输入  bin\windows\kafka-console-consumer.bat --zookeeper localhost:2181 --topic test

消费者启动成功

 7.测试

在生产者 窗口中输入任意字符串,可以在消费者窗口中看到打印的信息

 8.JAVA 代码创建topic

依赖的JAR包

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.12</artifactId>
            <version>1.1.0</version>
        </dependency>    
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>1.1.0</version>
        </dependency>

代码

private static String TOPIC = "demo";

    @Test
    public void createTopic() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "127.0.0.1:9092");
        AdminClient client = AdminClient.create(properties);
        List<NewTopic> topics = new ArrayList<NewTopic>();
        NewTopic newTopic = new NewTopic(TOPIC, 1, (short) 1);
        topics.add(newTopic);
        CreateTopicsResult result = client.createTopics(topics);
        try {
            KafkaFuture<Void> all = result.all();
            all.get();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }

运行结果

9.创建消费者

    @Test
    public void consumerMsg() throws InterruptedException {
        Properties props = new Properties();
        //定义kakfa 服务的地址
        props.put("bootstrap.servers", "127.0.0.1:9092");
        //制定consumer group
        props.put("group.id", "test");
        //是否自动确认offset
        props.put("enable.auto.commit", "true");
        //自动确认offset的时间间隔
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        //key的序列化类
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //value的序列化类
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //定义consumer
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        //消费者订阅的topic, 可同时订阅多个
        consumer.subscribe(Arrays.asList(TOPIC));
//        consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() {
//            public void onPartitionsRevoked(Collection<TopicPartition> collection) {
//            }
//
//            public void onPartitionsAssigned(Collection<TopicPartition> collection) {
//                //将偏移设置到最开始
//                consumer.seekToBeginning(collection);
//            }
//        });
//        final int max = 100;
//        List<ConsumerRecord<String, String>> list = new ArrayList<>();
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                Thread.sleep(100);
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
//                list.add(record);
            }
//            if (list.size() >= max) {
//                consumer.commitSync();
//                list.clear();
//            }
        }
    }
}

10.创建生产者

  @Test
    public void sendMsg() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "127.0.0.1:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<String, String>(props);
        for (int i = 0; i < 100; i++) {
            ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, Integer.toString(i), Integer.toString(i));
            producer.send(record);
        }
        producer.close();
    }

11.运行

先运行 consumerMsg 方法,再运行 sendMsg 方法

可以看到 consumerMsg  的console 打印结果

排坑:

开始用了第二台机器 在二号机运行kafka,用一号机执行代码一直不能成功

kafka服务会崩(像是死循环然后内存溢出),再启动kafka 也不能启动

之后用了一个工具 ZooInspector,解压后运行build 中的 jar (推荐这个工具)

连接本地zk,把数据都删掉,再启动kafka 就可以正常启动

最后发现一号机不成功原因是端口不通但在一号机启动kafka 代码是成功的

猜你喜欢

转载自www.cnblogs.com/yangk5636/p/9253968.html