package kafkajavaapi;import org.apache.kafka.clients.producer.Callback;import org.apache.kafka.clients.producer.RecordMetadata;classDemoCallBackimplementsCallback{privatefinallong startTime;privatefinalint key;privatefinal String message;publicDemoCallBack(long startTime,int key, String message){this.startTime = startTime;this.key = key;this.message = message;}/**
* A callback method the user can implement to provide asynchronous handling of request completion. This method will
* be called when the record sent to the server has been acknowledged. When exception is not null in the callback,
* metadata will contain the special -1 value for all fields except for topicPartition, which will be valid.
*
* @param metadata The metadata for the record that was sent (i.e. the partition and offset). Null if an error
* occurred.
* @param exception The exception thrown during processing of this record. Null if no error occurred.
*/publicvoidonCompletion(RecordMetadata metadata, Exception exception){long elapsedTime = System.currentTimeMillis()- startTime;if(metadata != null){
System.out.println("message("+ key +", "+ message +") sent to partition("+ metadata.partition()+"), "+"offset("+ metadata.offset()+") in "+ elapsedTime +" ms");}else{
exception.printStackTrace();}}}package kafkajavaapi;/***
* kafka常用配置文件
*/publicclassKafkaProperties{publicstaticfinal String ZK_List="t1:2181,t2:2181,t3:2181";publicstaticfinal String BROKER_LIST ="t1:9092,t2:9092,t3:9092";publicstaticfinal String TOPIC ="helloworld";publicstaticfinal String KAFKA_SERVER_URL ="localhost";publicstaticfinalint KAFKA_SERVER_PORT =9092;publicstaticfinalint KAFKA_PRODUCER_BUFFER_SIZE =64*1024;publicstaticfinalint CONNECTION_TIMEOUT =100000;publicstaticfinal String TOPIC2 ="topic2";publicstaticfinal String TOPIC3 ="topic3";publicstaticfinal String CLIENT_ID ="SimpleConsumerDemoClient";privateKafkaProperties(){}}package kafkajavaapi;import org.apache.kafka.clients.producer.*;import org.apache.kafka.common.serialization.IntegerSerializer;import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;import java.util.concurrent.ExecutionException;/**Kafka 生成者*/publicclassMyKafkaProducerextendsThread{privatefinal String topic;privatefinal Boolean isAsync;//用来生产消息的类privatefinal KafkaProducer<Integer,String> producer;publicMyKafkaProducer(String topic , Boolean isAsync){
Properties props =newProperties();//ProducerConfig.BOOTSTRAP_SERVERS_CONFIG = bootstrap.servers
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,KafkaProperties.BROKER_LIST);//ProducerConfig.CLIENT_ID_CONFIG = client.id
props.put(ProducerConfig.CLIENT_ID_CONFIG,"TestMyKafkaProducer");//key的序列化类
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,IntegerSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
producer =newKafkaProducer<>(props);this.topic = topic;this.isAsync = isAsync;}publicvoidrun(){int messageNo =1;while(true){
String messageStr ="Message_"+ messageNo;long startTime = System.currentTimeMillis();if(isAsync){//send asynchronously 异步发送
producer.send(newProducerRecord<>(topic,messageNo,messageStr),newDemoCallBack(startTime , messageNo ,messageStr));}else{//send synchronouslytry{
producer.send(newProducerRecord<>(topic,messageNo,messageStr)).get();
System.out.println("发送------>Sent message: ("+ messageNo +", "+ messageStr +")");}catch(InterruptedException| ExecutionException e){
e.printStackTrace();}}++messageNo;}}}package kafkajavaapi;publicclassTestMykafkaProducer{publicstaticvoidmain(String[] args){
MyKafkaProducer myKafkaProducer =newMyKafkaProducer(KafkaProperties.TOPIC,false);
myKafkaProducer.run();
System.out.println("hello world");}}