接手公司的老项目,公司使用的kafka版本 kafka_2.10-0.8.2.1 , 版本有点老旧,之后的版本和这个版本有很大不同,操作kafka时需要注意。这里简单说下 安装kafka和使用测试的过程。
一,kafka安装
1,下载
下载地址:https://kafka.apache.org/downloads.html
kafka的使用需要依赖于zookeeper,我们这里不用下载zookeeper,zookeeper已经包含在kafka中了,我这里直接下载 kafka_2.10-0.8.2.1 版本。从0.8.2版本到0.9.0有所改动,例如配置消费者接收数据大小的参数。我这里使用的0.8.2.1的kafka版本中还有一个问题,consumer.properties配置文件中配置 fetch.message.max.bytes=6291456 时无法生效,配置文件没有被引用,需要手动在java程序中添加此配置。
fetch.message.max.bytes 消费者接收数据的大小 0.8.2.X版本中
max.partition.fetch.bytes 消费者接收数据的大小 0.9.0版本及以上的版本
2,安装
用xftp工具上传到服务器上,然后解压,这里是单机部署,没有集群。
tar -zxvf xxxxxx.tgz
3,更改配置
config目录下更改service.properties文件
advertised.host.name=服务器外网地址 //原文件这个配置时注释掉的,放开更改即可
log.dirs=/usr/data/kafka/data //kafka中记录的数据存放位置,建议放在kafka目录下,与bin同级
delete.topic.enable=true //允许删除主题,看自己需要是否配置
//别的配置就不用动,我这里测试不需要改别的配置
zookeeper.connect=localhost:2181 //这个配置时默认的,别改,后面会说明
二,测试
1,启动zookeeper和kafka
必须要先启动zookeeper再启动kafka,关闭的时候要先关闭kafka再关闭zookeeper,特别注意别弄错
启动zookeeper
nohup bin/zookeeper-server-start.sh config/zookeeper.properties > ./logs/zookeeper-1.log 2>&1 &
启动kafka
nohup bin/kafka-server-start.sh config/server.properties > ./logs/kafka-1.log 2>&1 &
2,测试
创建主题
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
查看主题
bin/kafka-topics.sh --list --zookeeper localhost:2181
查看主题详情
bin/kafka-topics.sh --describe --zookeeper localhost:2181
删除主题
//service.properties中已经配置了能否删除主题
bin/kafka-topics.sh --delete --topic gnss_alarm --zookeeper localhost:2181
生产者发消息
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
会出现提示信息 WARN Property topic is not valid (kafka.utils.VerifiableProperties)
不用管,回车开始输入你要发的数据,例如:11111111
重新开一个服务器窗口,运行消费者接收消息的命令
消费者接消息
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
//需要注意的是0.8.2.1版本中如果更改了消费者配置文件,上面的命令无法生效,consumer.properties没有被引用
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --consumer.config config/consumer.properties
注意:
在输入生产者和消费者命令是要注意端口号不同,同时命令中zookeeper后面跟的地址要是localhost的,因为你service.properties中默认配置localhost,我测试过程中使用的是外网地址代替localhost,无法消费消息,不用用外网地址。
3,kafka发送接收大数据
如果kafka需要发送接收大数据,数据超过1M,需要增加配置
另外:0.8.2.1版本中配置了fetch.message.max.bytes的大小无法生效,配置文件没有被引用,需要在java程序中添加此配置
Properties properties = new Properties();
properties.put("fetch.message.max.bytes","6291456");
server.properties增加配置
message.max.bytes=6291456
replica.fetch.max.bytes=7340032
producer.properties增加配置
max.request.size=5242880
consumer.properties增加配置
fetch.message.max.bytes=6291456 //对0.8.2.x及其以下版本
或
max.partition.fetch.bytes=6291456//针对0.9.0及其以上版本
注意配置数据的大小顺序,消费者的数据大小大于或等于message.max.bytes即可
max.request.size < message.max.bytes < replica.fetch.max.bytes
三,kafka的其他信息
kafka详细信息参照:https://www.jianshu.com/p/61b6220a9ef2
- kafka的版本和一些概念
https://www.cnblogs.com/huxi2b/p/6223228.html - kafka和JVM的配置
https://blog.csdn.net/chen88358323/article/details/51824232
https://blog.csdn.net/xzj9581/article/details/72866225 - kafka内存考虑
https://www.cloudera.com/documentation/kafka/latest/topics/kafka_performance.html#concept_exp_hzk_br__d22306e79
http://www.cnblogs.com/doubletree/p/4264969.html - kafka的复制
https://blog.csdn.net/lizhitao/article/details/51718185