1.producer消息生产者
producer消息生产的两种方式同步,异步
同步demo
package com.hanwan.kafka.demo1;
import com.hanwan.common.ReadPropertity;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.stream.IntStream;
/**
* @ClassName SyncSender
* @Description Producer同步发送
* @Copyright: Copyright (c) 2018</p>
* @Company: www.lowan.com</ p>
* @Author hanwan
* @Date 2018/8/16 14:12
* @Version 1.0
**/
public class SyncSender {
private final static Logger LOGGER = LoggerFactory.getLogger(SyncSender.class);
public static void main(String[] args) {
Properties properties = initProp();
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
IntStream.range(0, 100).forEach(i ->{
ProducerRecord<String, String> record = new ProducerRecord<>("fire_and_forget_sender", String.valueOf(i), "hello"+i);
Future<RecordMetadata> future = producer.send(record);
try {
RecordMetadata metaDate = future.get();
LOGGER.info("this message is send done and the key is {} and offset is {}",i, metaDate.offset());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
});
producer.flush();
producer.close();
}
private static Properties initProp(){
final Properties prop = new Properties();
prop.put("bootstrap.servers", "120.55.125.58:9092,120.26.198.248:9092,121.40.200.37:9092");
prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
return prop;
}
}
异步demo
package com.hanwan.kafka.demo1;
import com.hanwan.common.ReadPropertity;
import com.sun.media.jfxmedia.logging.Logger;
import org.apache.kafka.clients.producer.*;
import org.slf4j.LoggerFactory;
import java.util.Properties;
import java.util.stream.IntStream;
/**
* @ClassName AsyncSender
* @Description Producer 异步发送
* @Copyright: Copyright (c) 2018</p>
* @Company: www.lowan.com</ p>
* @Author hanwan
* @Date 2018/8/16 15:09
* @Version 1.0
**/
public class AsyncSender {
private final static org.slf4j.Logger LOGGER = LoggerFactory.getLogger(AsyncSender.class);
public static void main(String[] args) {
Properties properties = initProp();
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
IntStream.range(0, 10).forEach(i ->{
ProducerRecord<String, String> record = new ProducerRecord<>("fire_and_forget_sender", String.valueOf(i), "hello async " + i);
producer.send(record, (r, e) ->{
if (e==null) {
LOGGER.info("this message is send done and the key is{}, offset {}", i, r.offset());
}
});
});
producer.flush();
producer.close();
}
private static Properties initProp(){
final Properties prop = new Properties();
prop.put("bootstrap.servers", "120.55.125.58:9092,120.26.198.248:9092,121.40.200.37:9092");
prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
return prop;
}
}
producer参数详解
acks:生产者需要server端在接收到消息后,进行反馈确认的尺度,主要用于消息的可靠性传输;acks=0表示生产者不需要来自server的确认;acks=1表示server端将消息保存后即可发送ack,而不必等到其他follower角色的都收到了该消息;acks=all(or acks=-1)意味着server端将等待所有的副本都被接收后才发送确认。
buffer.memory:生产者缓冲区的大小,保存的是还未来得及发送到server端的消息,如果生产者的发送速度大于消息被提交到server端的速度,该缓冲区将被耗尽。
compression.type:用于压缩数据的压缩类型。默认是无压缩。正确的选项值是none、gzip、snappy。压缩最好用于批量处理,批量处理消息越多,压缩性能越好
retries:生产者发送失败后,重试的次数
batch.size:当多条消息发送到同一个partition时,该值控制生产者批量发送消息的大小,批量发送可以减少生产者到服务端的请求数,有助于提高客户端和服务端的性能。
client.id:当向server发出请求时,这个字符串会发送给server。目的是能够追踪请求源头,以此来允许ip/port许可列表之外的一些应用可以发送信息。这项应用可以设置任意字符串,因为没有任何功能性的目的,除了记录和跟踪。
max.in.flight.requests.per.connection :kafka可以在一个connection中发送多个请求,叫作一个flight,这样可以减少开销,但是如果产生错误,可能会造成数据的发送顺序改变,默认是5 (修改)
request.timeout.ms:客户端将等待请求的响应的最大时间,如果在这个时间内没有收到响应,客户端将重发请求;超过重试次数将抛异常
timeout.ms:此配置选项控制server等待来自followers的确认的最大时间。如果确认的请求数目在此时间内没有实现,则会返回一个错误。这个超时限制是以server端度量的,没有包含请求的网络延迟
metadata.fetch.timeout.ms:是指我们所获取的一些元素据的第一个时间数据。元素据包含:topic,host,partitions。此项配置是指当等待元素据fetch成功完成所需要的时间,否则会跑出异常给客户端
max.block.ms:控制block的时长,当buffer空间不够或者metadata丢失时产生block
max.request.size:请求的最大字节数。这也是对最大记录尺寸的有效覆盖。注意:server具有自己对消息记录尺寸的覆盖,这些尺寸和这个设置不同。此项设置将会限制producer每次批量发送请求的数目,以防发出巨量的请求。
send.buffer.bytes:发送数据时的缓存空间大小
receive.buffer.bytes:socket的接收缓存空间大小,当阅读数据时使用
测试同样发10W条请求
acks=0
34897 [main] INFO com.hanwan.kafka.demo1.ProducerPerfWithAck - total send time 34366 ms
acks=1
2195163 [main] INFO com.hanwan.kafka.demo1.ProducerPerfWithAck - total send time 2194794 ms
acks=all
2178733 [main] INFO com.hanwan.kafka.demo1.ProducerPerfWithAck - total send time 2178320 ms
acks=all compression.type=snappy
2006740 [main] INFO com.hanwan.kafka.demo1.ProducerPerfWithAck - total send time 2006345 ms
2.partition消息分区
package com.hanwan.kafka.demo1;
import com.hanwan.common.ReadPropertity;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
/**
* @ClassName PartitionExample
* @Description partition消息分区
* @Copyright: Copyright (c) 2018</p>
* @Company: www.lowan.com</ p>
* @Author hanwan
* @Date 2018/8/16 20:30
* @Version 1.0
**/
public class PartitionExample {
private final static Logger LOGGER = LoggerFactory.getLogger(PartitionExample.class);
public static void main(String[] args) {
Properties properties = initProps();
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
/* partition 指定分区发送消息
try {
ProducerRecord<String, String> record = new ProducerRecord<>("test_p", "hello", "hello partition");
Future<RecordMetadata> future = producer.send(record);
RecordMetadata recordMetadata = future.get();
LOGGER.info("{}", recordMetadata.partition());
record = new ProducerRecord<>("test_p", "hello","hello partition");
future = producer.send(record);
recordMetadata = future.get();
LOGGER.info("{}", recordMetadata.partition());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
producer.flush();
producer.close();*/
/*partition自定义*/
ProducerRecord<String, String> record = new ProducerRecord<>("test_p", "order","hello mypartition");
Future<RecordMetadata> future = producer.send(record);
RecordMetadata recordMetadata = null;
try {
recordMetadata = future.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
LOGGER.info("{}",recordMetadata.partition());
producer.flush();
producer.close();
}
private static Properties initProps(){
final Properties prop = new Properties();
prop.put("bootstrap.servers", ReadPropertity.getProperty("bootstrap.servers"));
prop.put("key.serializer", ReadPropertity.getProperty("key.serializer"));
prop.put("value.serializer", ReadPropertity.getProperty("key.serializer"));
//自定义partitioner配置
prop.put("partitioner.class", "com.hanwan.kafka.demo1.MyPartitioner");
return prop;
}
}
自定义partitioner
package com.hanwan.kafka.demo1;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
/**
* @ClassName Partitioner
* @Description 自定义partitioner
* @Copyright: Copyright (c) 2018</p>
* @Company: www.lowan.com</ p>
* @Author hanwan
* @Date 2018/8/17 10:58
* @Version 1.0
**/
public class MyPartitioner implements Partitioner {
private final static String[] BIZ_TYPE = new String[]{"LOGIN","LOGOFF","ORDER"};
private final String LOGIN = "LOGIN";
private final String LOGOFF = "LOGOFF";
private final String ORDER = "ORDER";
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
if (keyBytes == null || keyBytes.length == 0) {
throw new IllegalArgumentException("The key is required for Biz.");
}
switch (key.toString().toUpperCase()) {
case LOGIN :
return 0;
case LOGOFF :
return 1;
case ORDER :
return 2;
default :
throw new IllegalArgumentException("The key is invalid.");
}
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> map) {
}
}