一.创建生产者。
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
import java.util.concurrent.Future;
public class MyKafkaProducer implements Runnable{
private static KafkaProducer<String, String> kafkaProducer;
public MyKafkaProducer() {
Properties properties = new Properties();
properties.put("bootstrap.servers","xxx.xx.xx.xxx:9092");
properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.put("acks","0");
properties.put("retries","0");
properties.put("batch.size","16384");
kafkaProducer = new KafkaProducer<>(properties);
}
@Override
public void run() {
try {
main(null);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws InterruptedException {
System.out.println(kafkaProducer);
MyKafkaProducer myKafkaProducer = new MyKafkaProducer();
System.out.println(kafkaProducer);
String topic = "topic1";
for (int i = 200; i < 300; i++) {
String message = "hello kafka 哈哈哈 i="+i;
ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic,message);
Future<RecordMetadata> send = kafkaProducer.send(producerRecord, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if(e != null){
e.printStackTrace();
}
System.out.println("===========partition======"+recordMetadata.partition()+"===========offset=========="+recordMetadata.offset());
}
});
}
Thread.sleep(3000);
}
}
二.创建消费者。
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
public class MyKafkaConsumer implements Runnable {
private static KafkaConsumer<String,String> kafkaConsumer;
public MyKafkaConsumer() {
Properties properties = new Properties();
properties.put("bootstrap.servers","xxx.xx.xx.xxx:9092");
properties.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
properties.put("group.id","group2");
properties.put("enable.auto.commit","true");
properties.put("auto.commit.interval.ms", "1000");
properties.put("session.timeout.ms", "30000");
properties.put("auto.offset.reset", "earliest");
kafkaConsumer = new KafkaConsumer<String, String>(properties);
}
@Override
public void run() {
main(null);
}
public static void main(String[] args) {
MyKafkaConsumer myKafkaConsumer = new MyKafkaConsumer();
kafkaConsumer.subscribe(Arrays.asList("topic1"));//订阅
while(true){
ConsumerRecords<String, String> poll = kafkaConsumer.poll(1000);//检验
for (ConsumerRecord<String,String> record : poll) {
System.out.println("=========="+"offset="+record.offset()+",value="+record.value());
}
}
}
}
三.启动线程,生产者生产数据,同时消费者消费数据。
import static java.lang.Thread.*;
public class MyKafkaBroker {
public static void main(String[] args) throws InterruptedException {
Thread producer = new Thread(new MyKafkaProducer());
producer.start();
sleep(3000);
Thread consumer = new Thread(new MyKafkaConsumer());
consumer.start();
}
}