Kafka生产者和消费者(Java代码案例)

       在上一篇我写了关于Kafka的下载安装以及一些报错记录,由于我是想在Jmeter上对Kafka进行测试,那么难以避免的就是可能要编写脚本,甚至需要自定义Kafka的相关类。所以也是对代码进行了一些研究。个人认为Kafka最重要的三个元素应该是主题Topic、生产者Producer和消费者Consumer。我也是找到一个关于Kafka学习的网站,感觉这个学习网站还是挺不错的,链接在此:https://www.tutorialspoint.com/apache_kafka/index.htm

话不多说,直接上代码,在运行之前请配置好本地生产环境,并启动ZooKeeper和Kafka,新建好一个主题,因为我们代码中会要用到。具体可以参照我上一篇博客,链接在此:https://blog.csdn.net/shan286/article/details/105242586

这里贴的都是主要代码,要注意的是参数必须与你们自己配置的一致,后面会注明

(一)生产者代码:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 100; i++) {
    producer.send(new ProducerRecord<String, String>("TestTopic", "这是第" + i + "次输出", Integer.toString(i)));
}
producer.close();

(二)消费者代码:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("TestTopic"));
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("offset = %d, key = %s, value = %s" + "\n", record.offset(), record.key(), record.value());
    }
}

运行结果:

要注意的是:

1、Topic名称必须一致,我这里都是“TestTopic”;

2、bootstrap.servers字段的值要正确,默认是localhost:9092;

3、必须要有“key.serializer”,否则会报错,报错内容如下:

Exception in thread "main" org.apache.kafka.common.config.ConfigException: Missing required configuration "key.serializer" which has no default value.
	at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:477)
	at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:467)
	at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:108)
	at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:129)
	at org.apache.kafka.clients.producer.ProducerConfig.<init>(ProducerConfig.java:409)
	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:326)
	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298)
	at com.jmeter.test.MyProducer.main(MyProducer.java:24)

4、如果有log的jar包没添加,可能会报错,报错如下:

、Exception in thread "main" java.lang.NoClassDefFoundError: org/slf4j/LoggerFactory
	at org.apache.kafka.clients.CommonClientConfigs.<clinit>(CommonClientConfigs.java:32)
	at org.apache.kafka.clients.producer.ProducerConfig.<clinit>(ProducerConfig.java:341)
	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:326)
	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298)
	at MyProducer.main(MyProducer.java:23)
Caused by: java.lang.ClassNotFoundException: org.slf4j.LoggerFactory
	at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
	at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)
	... 5 more

5、要添加的jar包如下:

最后,附上Demo地址:https://download.csdn.net/download/shan286/12310516

发布了63 篇原创文章 · 获赞 9 · 访问量 6万+

猜你喜欢

转载自blog.csdn.net/shan286/article/details/105358566