版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/u011342403/article/details/89283932
安装kafka
使用homebrew 安装kafka非常简单
brew install kafka
即可安装成功
启动服务
启动默认的kafka服务
先启动默认的kafka内置的zookeeper服务
zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties
然后启动kafka服务
kafka-server-start /usr/local/etc/kafka/server.properties
以上均在kafka安装目录的bin目录下执行。默认路径是
/usr/local/Cellar/kafka/2.1.1/bin
创建生产者
默认连接到本机的localhost的9092端口。此端口在上述启动kafka服务的文件中有修改位置。即/usr/local/etc/kafka/server.properties,可自行修改
旧API方式创建
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class OldProducer {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("metadata.broker.list", "localhost:9092");
properties.put("request.required.acks", "1");
properties.put("serializer.class", "kafka.serializer.StringEncoder");
Producer<Integer, String> producer = new Producer<Integer,String>(new ProducerConfig(properties));
KeyedMessage<Integer, String> message = new KeyedMessage<Integer, String>("first", "hello world");
producer.send(message );
}
}
新API方式创建
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class Newproducer {
public static void main(String[] args) {
Properties props = new Properties();
// Kafka服务端的主机名和端口号
props.put("bootstrap.servers", "localhost:9092");
// 等待所有副本节点的应答
props.put("acks", "all");
// 消息发送最大尝试次数
props.put("retries", 0);
// 一批消息处理大小
props.put("batch.size", 16384);
// 请求延时
props.put("linger.ms", 1);
// 发送缓存区内存大小
props.put("buffer.memory", 33554432);
// key序列化
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// value序列化
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 50; i++) {
producer.send(new ProducerRecord<String, String>("first", Integer.toString(i), "hello world-" + i));
}
producer.close();
}
}
使用命令创建消费者
kafka-console-consumer --bootstrap-server localhost:9092 --topic first
上述命令即是启动消费者服务在localhost的9092端口上的first主题
先后启动上述生产者的老API和新API方式,即可在消费者控制台上看到如下结果
至此简单的安装就到此结束了。
参考文章:https://www.cnblogs.com/zhangliwei/p/9816937.html
参考书籍《深入理解Kafka核心设计与实践原理》,作者博客地址:https://blog.csdn.net/u013256816