package com.yy.fastcustom; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Arrays; import java.util.Properties; /** * Created by zzq on 2019/6/13/013. */ public class KafkaTest implements Runnable { public void read(String clientId) { producer.send(new ProducerRecord<String, String>("read", clientId)); System.out.println("=========" + clientId); System.out.println(); } public volatile Producer<String, String> producer; public volatile KafkaConsumer<String, String> customer; //消费方法 @Override public void run() { customer.subscribe(Arrays.asList("read")); for (; ; ) { ConsumerRecords<String, String> records = customer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.printf("偏移量 = %d, 值 = %s", record.offset(), record.value()); System.out.println(); } } } public Producer<String, String> producer() { Properties properties = new Properties(); properties.put("bootstrap.servers", "10.10.210.123:9092"); // 生产者需要server接收到数据之后,要发出一个确认接收的信号 // 0 producer不需要等待任何确认的消息,吞吐量最高 // 1 意味着至少要等待leader已经成功将数据写入本地log,并不意味着所有follower已经写入 // all 意味着leader需要等待所有备份都成功写入到日志中 properties.put("acks", "0"); properties.put("retries", 0);// 重试次数 properties.put("batch.size", 16384);// producer试图批量处理消息记录。目的是减少请求次数,改善客户端和服务端之间的性能。这个配置是控制批量处理消息的字节数。如果设置为0,则禁用批处理。如果设置过大,会占用内存空间. properties.put("linger.ms", 1); properties.put("buffer.memory", 33554432);// 缓存大小 properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = null; producer = new KafkaProducer<String, String>(properties); return producer; } public KafkaConsumer<String, String> customer() { Properties properties = new Properties(); properties.put("bootstrap.servers", "10.10.210.123:9092"); properties.put("group.id", "read"); properties.put("enable.auto.commit", "true");//consumer所接收到的消息的offset将会自动同步到zookeeper properties.put("auto.commit.interval.ms", "1000");//consumer向zookeeper提交offset的频率,单位是毫秒 properties.put("auto.offset.reset", "earliest");//如果zk中没有offset记录就从最初的位置开始消费 properties.put("session.timeout.ms", "30000"); properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties); return kafkaConsumer; } }
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.2.1</version> </dependency>