创建生产者
import java.util.Properties; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import com.lzl.kafka.partition.CustomPartition; public class CustomProducer { public static void main(String[] args) { Properties properties = new Properties(); properties.put("bootstrap.servers", "liangzelei002:9092"); properties.put("acks", "all"); properties.put("buffer.memory", 33554432); properties.put("retries", 0); properties.put("linger.ms", 1); properties.put("batch.size", 16384); properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("partitioner.class", "com.lzl.kafka.partition.CustomPartition"); KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties); for (int i=0;i<50;i++) { // new ProducerRecord<String, String>("second", 1, "key=>"+i,"asdfasdfa..."+i) partition key value // new ProducerRecord<String, String>("second","key=>"+i,"asdfasdfa..."+i) key value // new ProducerRecord<String, String>("second","asdfasdfa..."+i) value kafkaProducer.send(new ProducerRecord<String, String>("second","key=>"+i,"liangzelei..."+i), new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception!=null) { System.out.println(exception.getMessage()); } else { System.out.println("topic=>"+metadata.topic()); System.out.println("offset=>"+metadata.partition()); } } }); } kafkaProducer.close(); } } |
自定义分区生产者
package com.lzl.kafka.partition; import java.util.Map; import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster; public class CustomPartition implements Partitioner{ @Override public void configure(Map<String, ?> configs) { } @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { // TODO Auto-generated method stub System.out.println("topic==>"+topic); System.out.println("key==>"+key.toString()); System.out.println("keyBytes==>"+(new String(keyBytes))); System.out.println("value==>"+value.toString()); System.out.println("valueBytes==>"+(new String(valueBytes))); return 0; } @Override public void close() { } } |
Kafka消费者
import java.util.Arrays; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; public class CustomConsumer { public static void main(String[] args) { Properties properties = new Properties(); properties.put("bootstrap.servers", "liangzelei002:9092"); properties.put("group.id", "lzltest2"); properties.put("enable.auto.commit", "true"); properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties); // 订阅topic kafkaConsumer.subscribe(Arrays.asList("second")); Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { @Override public void run() { if (kafkaConsumer!=null) { kafkaConsumer.close(); } } })); // 拉消息 while(true) { ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(100); for(ConsumerRecord<String, String> record : consumerRecords) { System.out.println("key-------------"+record.key()); System.out.println("value-------------"+record.value()); } } } } |