在kafka集群的单服务器情况下,如何通过consumer消费者确定kafka服务器或者zookeeper服务器是否启动(因为消费者目前是无法判断服务器是否启动的,它只是去轮询获取服务器数据而不报错),如果没有启动,消费者端做出相应的操作来提醒消费者端使用人员进行维护,在这里我提供一个简单的解决方案,可能并不是非常通用,提供一个简单的思路而已。
如果kafka服务器或者zookeeper服务器没有启动,在producer生产者端向服务器发送信息的时候会出现错误(org.apache.kafka.common.errors.TimeoutException),我使用的kafka的maven版本是0.11.0.0的,当有异常错误出现的时候我们可以在生产者端使用一个静态变量来变更记录这个状态,同时在生产者端提供一个接口以供消费者端调用;在consumer消费者端我们提供一个定时任务,如果消费者端在规定的时间里面没有从kafka服务器获取到数据,定义一个静态变量去记录获取数据状态,那么在定时任务里面就会根据这个静态变量的值决定是否去调用producer消费者端的接口判断接口返回的状态情况,如果返回的状态表示服务器没有启动则消费者端做出相应的操作。如果在集群多服务器的情况下,客户端可以根据订阅的主题topic判断哪些服务器是否存活的,下面提供了生产者producer和消费者consumer的main方法,并没有按照上面的叙述去完成接口,如果需要自己去实现。
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>0.11.0.0</version> </dependency>
import org.apache.kafka.clients.producer.*; import org.apache.kafka.clients.producer.KafkaProducer; import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; /** * @author liaoyubo * @version 1.0 2017/7/25 * @description */ public class KafkaProducerTest { public static void main(String [] args){ Properties properties = new Properties(); //properties.put("zookeeper.connect","localhost:2181"); properties.put("bootstrap.servers", "192.168.201.190:9092"); properties.put("acks", "all"); properties.put("retries", 0); properties.put("batch.size", 16384); properties.put("linger.ms", 1); properties.put("buffer.memory", 33554432); properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String,String> producer = new KafkaProducer<String, String>(properties); for(int i = 0;i < 10;i++){ Future<RecordMetadata> futureRecordMetadata = producer.send(new ProducerRecord<String, String>("myTopic",Integer.toString(i),Integer.toString(i))); try { futureRecordMetadata.get(3, TimeUnit.SECONDS); System.out.println("发送的message:"+i); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { if(e.getMessage().split(":")[0].split("\\.")[5].equals("TimeoutException")){ System.out.println("无法连接到服务器"); } e.printStackTrace(); } catch (TimeoutException e) { System.out.println("无法连接到服务器"); e.printStackTrace(); } } producer.close(); } }
import org.apache.kafka.clients.consumer.*; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import java.util.Arrays; import java.util.List; import java.util.Properties; /** * @author liaoyubo * @version 1.0 2017/7/26 * @description */ public class KafkaConsumerClientTest { public static void main(String [] args){ Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.201.190:9092"); properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test"); properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); Consumer<String,String> consumer = new KafkaConsumer<String, String>(properties); consumer.subscribe(Arrays.asList("myTopic","myTest")); while (true){ ConsumerRecords<String,String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records){ //int partition = record.partition(); String topic = record.topic(); List<PartitionInfo> partitionInfoList = consumer.partitionsFor(topic); for (PartitionInfo partitionInfo : partitionInfoList){ Node node = partitionInfo.leader(); System.out.println(node.host()); //获取存活的服务器 Node [] nodes = partitionInfo.replicas(); } System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } } } }