kafka生产者:
public static void main(String[] args) {
final KafkaProducer<String, String> producer;
try {
Properties properties = new Properties();
properties.put("bootstrap.servers", "node06:9092,node07:9092,node08:9092");
properties.put("acks", "all");
properties.put("retries", "0");
properties.put("batch.size", "1");
properties.put("auto.commit.interval.ms", "10000");
properties.put("linger.ms", "0");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("block.on.buffer.full", "true");
properties.put("producer.type", "async");
producer = new KafkaProducer<String, String>(properties);
int i = 1;
while (true) {
Thread.sleep(2000);
String time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
producer.send(new ProducerRecord<String, String>("topic_name", String.valueOf(i))).get().topic();
i++;
}
} catch (Exception e) {
e.printStackTrace();
}
}
kafka消费者:
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "node06:9092,node07:9092,node08:9092");
props.put("group.id", "AL_LOGS_DC_WUSGCNDHGFJSLKSMO2");
// 关闭自动确认消费。
props.put("enable.auto.commit", "false");
props.put("auto.commit.interval.ms", "10000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList("al_logs"));
while (true) {
try{
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.println("Consumer:" + record.value());
}
consumer.commitSync();
}catch (Exception e){
e.printStackTrace();
consumer.close();
}
}
}
PS:kafka broker地址需要在HOST文件中注明对应地址。