Kafka配置
linux jdk配置
看这里吧,这个没有难度就不再讲解
https://www.cnblogs.com/kuoAT/p/7555298.html
linux ZooKeeper配置
1.这里下载zookeeper的tar包
https://www.apache.org/dyn/closer.cgi/zookeeper/
我这里下载的3.4.13
2.拷贝到linux系统目录下,我是root权限于是拷到opt下,cd到opt目录下运行
tar -xzvf zookeeper-3.4.13.tar.gz
然后可见解压后的内容
3.接着进入conf文件夹下,打开将zoo_sample.cfg复制一份并重命名为zoo.cfg
确保如下几行配置处于非注释状态
tickTime=2000
dataDir=/path/to/zookeeper/data1
clientPort=2181
initLimit=5
syncLimit=2
4.cd到zookeeper根目录下输入如下指令运行zookeeper
bin/zkServer.sh start
5.这时我们在命令行输入jps看一下
有quorumpeermain说明zookeeper运行正常
linux kafka配置
1.去这里下载kafka的tar包
http://kafka.apache.org/downloads
我这里用kafka_2.12-2.2.0.tgz
与装zookeeper一样进行tar指令解压
2.然后就可以看见如下目录
3.进入config文件夹打开server.properties
确保以下几行处于非注释状态,其他保持默认状态即可
(把下放代码中0.0.0.0改成你的linux主机的ip,其他不用动。//不是注释的意思不要删掉)
broker.id=0
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://0.0.0.0:9092
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000
4.启动kakfa (路径根据你自己放的位置自行调整)
/opt/kafka/kafka_2.12-2.2.0/bin/kafka-server-start.sh /opt/kafka/kafka_2.12-2.2.0/config/server.properties
python生产及消费实例
简洁如python!
生产者
from kafka import KafkaProducer
import json
msg_dict = {
"sleep_time": 10,
"msg": "Hello World"
}
msg = json.dumps(msg_dict)
producer = KafkaProducer(bootstrap_servers=['0.0.0.1:9092']) #此处ip可以是多个['0.0.0.1:9092','0.0.0.2:9092','0.0.0.3:9092' ]
for i in range(3):
producer.send('test', msg.encode('utf-8'))
producer.close()
消费者
from kafka import KafkaConsumer
import time
consumer = KafkaConsumer('test', bootstrap_servers=['39.105.126.10:9092'])
while True:
msg = consumer.poll(timeout_ms=5) #从kafka拉取消息
print(msg)
time.sleep(1)
亲测有效,消费者运行时可正常拉取生产者push上去的内容
提示:
python3.7与kafka的python库有兼容性问题详见StackOverflow网站上有人已经遇到,目前解决方法只能等待kafka适配,换用3.6版本即可解决。