第一步:创建两个Topic
命令:
bin/kafka-topics.sh --create --zookeeper node01:2181,node02:2181,node03:2181 --replication-factor 2 --partitions 3 --topic student
-----------------------------------------------------------------------------------------------------------------------------------
bin/kafka-topics.sh --create --zookeeper node01:2181,node02:2181,node03:2181 --replication-factor 2 --partitions 3 --topic teacher
第二步:创建StreamApi(使用IDEA)
public class SteamApi {
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "node01:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder streamsBuilder = new StreamsBuilder();
streamsBuilder.stream("student").mapValues(line->line.toString().toUpperCase()).to("teacher");
final Topology topology = streamsBuilder.build();
KafkaStreams streams = new KafkaStreams(topology, props);
streams.start();
}
}
第三步:创建消费者Consumer
命令:
bin/kafka-console-consumer.sh --zookeeper node01:2181,node02:2181,node03:2181 --topic student --from-beginning
--------------------------------------------------------------------------------------------
bin/kafka-console-consumer.sh --zookeeper node01:2181,node02:2181,node03:2181 --topic teacher --from-beginning
第四步:创建生产者Producer
public class Producer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);
for (int i = 0; i < 10; i++) {
ProducerRecord record = new ProducerRecord("student", "bbbb_" + i);
kafkaProducer.send(record);
}
kafkaProducer.close();
}
}
结果:
bin/kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic student