设置参数
props.put("bootstrap.servers", "10.176.2.170:9092,10.176.1.97:9092,10.176.7.57:9092");
//producer用于压缩数据的压缩类型。默认是无压缩
props.put("compression.type", "gzip");
//增加延迟
props.put("linger.ms", "50");
//这意味着leader需要等待所有备份都成功写入日志,这种策略会保证只要有一个备份存活就不会丢失数据。这是最强的保证。
props.put("acks", "all");
props.put("batch.size","16384");
props.put("buffer.memory", "33554432");
//设置大于0的值将使客户端重新发送任何数据,一旦这些数据发送失败。注意,这些重试与客户端接收到发送错误时的重试没有什么不同。允许重试将潜在的改变数据的顺序,如果这两个消息记录都是发送到同一个partition,则第一个消息失败第二个发送成功,则第二条消息会比第一条消息出现要早。
props.put("retries ", 30);
props.put("reconnect.backoff.ms", 20000);
props.put("retry.backoff.ms", 20000);
props.put("transactional.id", "my-transactional-id");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
while(i<100000)
{
String personInfo=listPerson.get(new Random().nextInt(40)).dataToString();
ProducerRecord<String, String> msg = new ProducerRecord<String, String>(topic, personInfo);
producer.send(msg, new Callback() {
public void onCompletion(RecordMetadata metadata, Exception e) {
if(e != null) {
e.printStackTrace();
} else {
System.out.println("The offset of the record we just sent is: " + metadata.offset()+" 这条数据id是: "+i);
}
}
});
System.out.println("Producer发送第 "+i+" 数据:" + personInfo);
/*try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}*/
i++;
}
//producer.flush很关键
producer.flush();
//列出topic的相关信息
List<PartitionInfo> partitions = new ArrayList<PartitionInfo>() ;
partitions = producer.partitionsFor(topic);
for(PartitionInfo p:partitions)
{
System.out.println(p);
}
System.out.println("send message over.");
producer.close(100,TimeUnit.MILLISECONDS);
所以,造成上面生产不成功的原因就是虽然调用了 producer.send()
,但是数据还没来得及生产到 Kafka 集群 主程序就挂掉了,于是数据就没有生产到 Kafka 集群中了~~
三、解决方法
如果对性能要求不高的话,可以再 producer.send()
方法调用后再调用 producer.flush()
方法,该方法会将数据全部生产到Kafka,否则就会阻塞。对于 producer.flush()
方法,源码原话如下:
"Flush any accumulated records form the producer. Blocks until all sends are complete."
但是这个方法有一点局限性,就是对性能的影响有点大,这个是要注意的地方~
如果对性能要求比较高,同时也想把数据确切的生产到集群的话,推荐将 linger.ms
参数设置一个比 0
大的值(默认是 0
),batch.size
也可以设置一下(默认是16384),同时用 producer.send(ProducerRecord<K,V>, Callback)
来将数据生产到集群中,其中 Callback 匿名内部类中的 onCompletion()
方法用来处理 “确认生产到集群” 的逻辑~~