开启kafka服务
- 首选开启kafka服务前需要开启zookeeper。
apache-zookeeper-3.6.0-bin\bin\zkserver
- 开启kafaka服务后面是启动配置文件。
kafka-server-start.bat ./config/server.properties
window使用kafka
- 首选下载kafka后如果使用windows版进入bin\window目录下,使用此处命令。如果是linux系统下在bin目录中的.sh文件。
创建主题
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
查看主题
kafka-topics.bat --list --zookeeper localhost:2181
测试生产者
kafka-console-producer.bat --broker-list localhost:9092 --topic test
测试消费者
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning
删除主题
在server.properties中增加设置,默认未开启
delete.topic.enable=true
kafka-topics --delete --topic test --zookeeper localhost:2181
删除数据
如果想保留主题,只删除主题现有数据(log)。可以通过修改数据保留时间实现:
kafka-configs --zookeeper localhost:2181 --entity-type topics --entity-name test --alter --add-config retention.ms=3000
//修改保留时间为三秒,但不是修改后三秒就马上删掉,kafka是采用轮训的方式,轮训到这个主题发现三秒前的数据都是删掉。时间由自己在server.properties里面设置,设置见下面。
执行输出:Completed Updating config for entity: topic 'test'.
注意:修改保留时间为10秒钟,并不是10秒钟就马上删掉。kafka是采用轮询的方式,轮询到这个topic时,删除10秒钟前的数据。
时间由server.properties里面的log.retention.check.interval.ms选项为主
假设说 log.retention.check.interval.ms 值为1分钟,那么等待70秒,这个topic的数据就会自动被删除
再次查看topic
kafka-configs.bat --zookeeper zookeeper-1.default.svc.cluster.localhost:2181 --describe --entity-type topics --entity-name test
kafka配置
在 server.properties 文件中配置的是全局策略,针对每一个topic
server.properties
分段策略属性
属性名 | 含义 | 默认值 |
---|---|---|
log.roll.{hours,ms} | 日志滚动的周期时间,到达指定周期时间时,强制生成一个新的segment | 168(7day) |
log.segment.bytes | 每个segment的最大容量。到达指定容量时,将强制生成一个新的segment | 1G(-1为不限制) |
log.retention.check.interval.ms | 日志片段文件检查的周期时间 | 60000 |
日志刷新策略
Kafka的日志实际上是开始是在缓存中的,然后根据策略定期一批一批写入到日志文件中去,以提高吞吐率。
属性 | 说明 | 默认值 |
---|---|---|
log.flush.interval.messages | 消息达到多少条时将数据写入到日志文件 | 10000 |
advertised.listeners | Kafka集群无法外网访问问题解决方法,此处填入你的本地内网ip | PLAINTEXT://your.host.name:9092 |
log.flush.interval.ms | 当达到该时间时,强制执行一次flush | null |
log.flush.scheduler.interval.ms | 周期性检查,是否需要将信息flush | 很大的值 |
日志保存清理策略
属性名 | 含义 | 默认值 |
---|---|---|
log.cleanup.polict | 日志清理保存的策略只有delete和compact两种 | delete |
log.retention.hours | 日志保存的时间,可以选择hours,minutes和ms | 168(7day) |
log.retention.bytes | 删除前日志文件允许保存的最大值 | -1 |
log.segment.delete.delay.ms | 日志文件被真正删除前的保留时间 | 60000 |
log.cleanup.interval.mins | 每隔一段时间多久调用一次清理的步骤 | 10 |
log.retention.check.interval.ms | 周期性检查是否有日志符合删除的条件(新版本使用) | 300000 |
这里特别说明一下,日志的真正清楚时间。当删除的条件满足以后,日志将被“删除”,但是这里的删除其实只是将该日志进行了“delete”标注,文件只是无法被索引到了而已。但是文件本身,仍然是存在的,只有当过了log.segment.delete.delay.ms 这个时间以后,文件才会被真正的从文件系统中删除。