第一步:下载0.10.2.0压缩包,解压缩
官网http://kafka.apache.org/下载
> tar -xzf kafka_2.11-0.10.2.0.tgz
> cd kafka_2.11-0.10.2.0
第二步 启动服务器
启动zookeeper服务器
> bin/zookeeper-server-start.sh config/zookeeper.properties
启动Kafka服务器
> bin/kafka-server-start.sh config/server.properties
第三步 创建主题
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test > bin/kafka-topics.sh --list --zookeeper localhost:2181
第四步 给主题发信息
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test This is a message This is another message
第五步 创建用户(consumer)接收信息
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning This is a message This is another message
使用java创建demo主要针对创建主题,发信息,这主要靠kafka生产者(Producer).
//连接配置 Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //生产者接口及相关实现类 Producer<String, String> producer = new KafkaProducer<>(props); for (int i = 0; i < 100; i++) { //发送消息 /** * ProducerRecord(String topic, K key, V value) Create a record to * be sent to Kafka */ producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i))); } producer.close();
从服务器主动拉取信息需要Consumer.
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("my-topic")); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.printf("topic = %s, partition = %d, offset = %d, key = %s, value = %s%n", record.topic(),record.partition(),record.offset(), record.key(), record.value()); } }
使用main方法启动consumerTester类再启动producerTester,在console就可以看到接收到的信息了。
--- exec-maven-plugin:1.2.1:exec (default-cli) @ KafkaDemo --- SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details topic = my-topic, partition = 0, offset = 300, key = 0, value = 0 topic = my-topic, partition = 0, offset = 301, key = 1, value = 1 topic = my-topic, partition = 0, offset = 302, key = 2, value = 2 topic = my-topic, partition = 0, offset = 303, key = 3, value = 3 topic = my-topic, partition = 0, offset = 304, key = 4, value = 4 topic = my-topic, partition = 0, offset = 305, key = 5, value = 5 topic = my-topic, partition = 0, offset = 306, key = 6, value = 6 topic = my-topic, partition = 0, offset = 307, key = 7, value = 7 topic = my-topic, partition = 0, offset = 308, key = 8, value = 8 topic = my-topic, partition = 0, offset = 309, key = 9, value = 9
具体相关的配置可以看附件maven项目。