java操作kafka非常的简单,然后kafka也提供了很多缺省值,一般情况下我们不需要修改太多的参数就能使用。下面我贴出代码。
创建消息队列
[root@hw1 ~]# kafka-topics.sh --create --zookeeper 192.168.56.122:2181 --topic test --replication-factor 1 --partitions 1
Created topic "test".
pom.xml
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.1.1</version>
</dependency>
生产者
package com.njbdqn.services;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
/**
* @Author: Stephen
* @Date: 2020/2/11 9:51
* @Content: 生产者:往test消息队列写入消息
*/
public class MyProduce {
public static void main(String[] args) {
// 定义配置信息
Properties prop = new Properties();
// kafka地址,多个地址用逗号分割 "192.168.23.76:9092,192.168.23.77:9092"
prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.56.122:9092");
prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
KafkaProducer<String,String> prod = new KafkaProducer<String, String>(prop);
// 发送消息
try {
for (int i =0;i<10;i++){
// 生产者记录消息
ProducerRecord<String,String> pr = new ProducerRecord<String, String>("test","hello world"+i);
prod.send(pr);
Thread.sleep(500);
}
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
prod.close();
}
}
}
消费者
package com.njbdqn.services;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
/**
* @Author: Stephen
* @Date: 2020/2/11 11:15
* @Content: 消费者:读取kafka数据
*/
public class MyConsumer {
public static void main(String[] args) {
Properties prop = new Properties();
prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.56.122:9092");
prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
prop.put("session.timeout.ms","30000");
//消费者是否自动提交偏移量,默认是true 避免出现重复数据 设为false
prop.put("enable.auto.commit","false");
prop.put("auto.commit.interval.ms","1000");
//auto.offset.reset 消费者在读取一个没有偏移量的分区或者偏移量无效的情况下的处理
//earliest 在偏移量无效的情况下 消费者将从起始位置读取分区的记录
//latest 在偏移量无效的情况下 消费者将从最新位置读取分区的记录
prop.put("auto.offset.reset","earliest");
// 设置组名
prop.put(ConsumerConfig.GROUP_ID_CONFIG,"hwzhenshuai2");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(prop);
consumer.subscribe(Collections.singletonList("test"));
while (true){
// 每隔一秒读取Kafka数据
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String,String> red:records){
System.out.printf("offset:%d,key:%s,value:%s",red.offset(),red.key(),red.value());
}
}
}
}