版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/lyzx_in_csdn/article/details/82079293
package com.lyzx.kafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.Test;
import java.util.Properties;
/**
* A Kafka client that publishes records to the Kafka cluster.
* The producer is thread safe and sharing a single producer
* instance across threads will generally be faster than having multiple instances
*
* kafka客户端用于发送消息到kafka集群
* producer线程安全,多个线程共享时效率好于每个线程一个producer
*/
public class ProducerTest {
private static String ips = "192.168.29.164:9092";
/**
* producer包含着一个保持没有发送记录的缓冲池也是一个后台IO线程
* 这个线程的任务是转换把记录转换为提交请求并将其发送到集群,失败后如果在使用producer将丢失这些消息
* send()方法是异步的,当调用时会把record放入缓冲并立即返回,这允许生产者成批的有效率的发送
* 每个topic中的消息都有一个或多个副本,replica分为leader replica和follower replica,leader表示用户发过来的消息本体,follower表示由kafka集群复制而来的副本
* 其中有一部分副本(follower)和leader保持完全一致(ISR In-Sync Replica),acks表示ISR中和leader保持完全一致的副本个数
* acks有3个选项 0,1,all 0表示不管有没有副本和leader保持一致都返回,1表示只要有一个保持一致就返回,all代表全部的ISR都一致才返回,all这种慢但是很可靠
* 当然ISR同步数和吞吐量成反比
*
* 有时候由于leader和follower选举导致的瞬间错误可能会导致发送失败
* 如果发送请求失败则producer会自动重新尝试,重试次数有retries参数决定,
*
* producer为每个partition维护一个缓冲池(里面放着为发送记录),这个大小由batch.size指定,即当达到这个值时发送
* 可以指定这个值大一些,可以处理更多的数据,当然也需要更多的内存
*
* 默认情况下,调用send()会立即发送,即使缓冲池还没被用满,如果你想等待一段时间,则使用linger.ms参数指定等待的时间
* 向下面这样,为了等待更多的记录到大缓冲区则等待1毫秒,注意如果记录到达时间很接近,即使linger.ms=0也会一起发送,在很重的
* 压力下会忽略这个参数,设置这个参数会以较小的延迟为代价获取更好的性能
*
* buffer.memory 参数控制着producer缓冲的总大小
* 如果调用send()方法的速度大于发送到服务器的速度则缓冲池会被用光,此时send()会被阻塞,
* 通过设置max.block.ms(最大的阻塞时间) 超过这个时间会抛出TimeoutException
* key.serializer和value.serializer 表示如何把ProducerRecord记录对象转换为键和值
*
*/
@Test
public void basicTest(){
Properties props = new Properties();
props.put("bootstrap.servers",ips);
props.put("acks","all");
props.put("retries",0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
for(int i = 0; i < 100; i++){
producer.send(new ProducerRecord<>("yh1","key_"+i,"v_"+i));
}
producer.close();
}
/**
* 从 Kafka 0.11,支持两种提交模式:幂等模式和事务模式,幂等模式从kafka的至少一次语义而来
*
* 事务模式允许用户发送消息到多个 topic/分区,为了确保幂等性,enable.idempotence必须设置为true
* 如果设置了那么retries参数自动设置为Integer.MAX_VALUE,并且acks默认为all
* 所以有些app不需要修改代码而利用这个特性,为了利用这个特性,用该避免重新发送,
* 如果某个应用是幂等的那么不建议配置retries,因为他将自动是Integer.MAX_VALUE
* 如果发送方法返回了一个错误,那么建议停止并检查消息
*
* 最后producer只能保证在一个会话中是幂等的
* 为了使用事务提交模式必须设置 transactional.id,如果设置了该参数那么幂等随着设置幂等依赖自动启用
* 此外,事务中包含的主题应配置为耐久性。特别地 replication.factor 应该大于等于3,min.insync.replica=2
* 为了确保端到端的可靠consumer必须设置为只读去提交信息,
* transactional.id的目的是为了一个producer的多个会话恢复
*
*
* It would typically be derived from the shard identifier in a partitioned,
* stateful, application. As such, it should be unique to each producer instance running within a partitioned application.
* All the new transactional APIs are blocking and will throw exceptions on failure.
* The example below illustrates how the new APIs are meant to be used. It is similar to the example above,
* except that all 100 messages are part of a single transaction.
*/
@Test
public void basicTest2(){
Properties props = new Properties();
props.put("bootstrap.servers",ips);
props.put("transactional.id", "my-transactional-id");
Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
producer.initTransactions();
try{
producer.beginTransaction();
for(int i = 200; i < 300; i++){
producer.send(new ProducerRecord<>("yh1","k2_"+i,"v2_"+i));
}
producer.commitTransaction();
}catch(ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
// We can't recover from these exceptions, so our only option is to close the producer and exit.
System.out.println("ProducerFencedException,OutOfOrderSequenceException,AuthorizationException");
producer.close();
}catch(KafkaException e){
System.out.println("KafkaException ");
// For all other exceptions, just abort the transaction and try again.
producer.abortTransaction();
}catch(Exception e){
System.out.println("Exception ");
producer.abortTransaction();
}
producer.close();
}
}