拦截器一:为消息添加指定前缀
package com.aura.interceptor;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Map;
public class MyInterceptor implements ProducerInterceptor<String, String>{
private String prefix;
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) {
return new ProducerRecord<String, String>(
producerRecord.topic(),
producerRecord.partition(),
producerRecord.timestamp(),
producerRecord.key(),
prefix + producerRecord.value(),
producerRecord.headers()
);
}
@Override
public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> map) {
prefix = (String) map.get("prefix");
}
}
拦截器二:统计消息发送成功和失败的数量
package com.aura.interceptor;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Map;
public class CounterInterceptor implements ProducerInterceptor<String, String> {
private int success;
private int failed;
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) {
return producerRecord;
}
@Override
public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
if (e == null) {
success++;
} else {
failed++;
}
}
@Override
public void close() {
System.out.println("成功了" + success + "条");
System.out.println("失败了" + failed + "条");
}
@Override
public void configure(Map<String, ?> map) {
}
}
生产者
package com.aura.interceptor;
import org.apache.kafka.clients.producer.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Future;
public class ProducerWithInterceptor {
public static void main(String[] args) {
List<String> interceptors = new ArrayList<>();
interceptors.add("com.aura.interceptor.MyInterceptor");
interceptors.add("com.aura.interceptor.CounterInterceptor");
Properties prop = new Properties();
prop.setProperty("bootstrap.servers", "hadoop01:9092,hadoop02:9092,hadoop03:9092");
prop.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
prop.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
prop.setProperty("acks", "all");
prop.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
prop.setProperty("prefix", "log-");
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(prop);
for (int i = 0; i < 20; i++) {
Future<RecordMetadata> future = producer.send(
new ProducerRecord<String, String>("first", Integer.toString(i), "first" + i),
new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e == null) {
System.out.println(recordMetadata.toString());
System.out.println();
}
}
}
);
}
producer.close();
}
}