版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/liweijie231/article/details/85710495
安装启动zookeeper
#配置文件
dataDir=/usr/local/zookeeper-3.4.12/zookeeper_data
#启动
./zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper-3.4.12/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
#端口监听
jps
40823 Jps
40256 Kafka
3223 Bootstrap
40715 QuorumPeerMain #QuorumPeearMain 是 zookeeper的进程
#查看zookeeper状态
./zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper-3.4.12/bin/../conf/zoo.cfg
Mode: standalone
安装启动kafka
#配置文件 server.properties
port=9092
host.name=172.24.183.190
log.dirs=/usr/local/kafka_2.11-2.0.0/kafka_logs
#启动kafka
bin/kafka-server-start.sh config/server.properties &
kafka生产者发送数据
#开一个窗口,输入数据
bin/kafka-console-producer.sh --broker-list 172.24.183.190:9092 --topic test
kafka 消费者接收数据
#开另一个窗口接收数据
#消费者 接收数据
## from-beginning
bin/kafka-console-consumer.sh --bootstrap-server 172.24.183.190:9092 --topic test --from-beginning
## max-messages
bin/kafka-console-consumer.sh --bootstrap-server 172.24.183.190:9092 --topic test --max-messages 15 --from-beginning
## offset
bin/kafka-console-consumer.sh --bootstrap-server 172.24.183.190:9092 --topic test --partition 0 --offset 100 --max-messages 1
#查看topic某分区偏移量最大(小)值 注: time为-1时表示最大值,time为-2时表示最小值
#查看偏移量最大值
[root@localhost kafka_2.11-2.0.0]# bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 172.24.183.190:9092 -topic test --time -1
test:0:165
#查看偏移量最小值
[root@localhost kafka_2.11-2.0.0]# bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 172.24.183.190:9092 -topic test --time -2
test:0:77
安装kazoo
tar -xzvf kazoo-2.6.0.tar.gz
cd /usr/local/kazoo-2.6.0
python setup.py build
python setup.py install
Installed /usr/local/share/Anaconda/lib/python2.7/site-packages/kazoo-2.6.0-py2.7.egg
Processing dependencies for kazoo==2.6.0
Searching for six==1.10.0
Best match: six 1.10.0
Adding six 1.10.0 to easy-install.pth file
Using /usr/local/share/Anaconda/lib/python2.7/site-packages
Finished processing dependencies for kazoo==2.6.0
/
安装pykafka
tar -xzvf pykafka-1.0.3.tar.gz
cd /usr/local/pykafka-1.0.3
python setup.py build
python setup.py install
Installed /usr/local/share/Anaconda/lib/python2.7/site-packages/pykafka-1.0.3-py2.7.egg
Processing dependencies for pykafka==1.0.3
Searching for kazoo==2.6.0
Best match: kazoo 2.6.0
Processing kazoo-2.6.0-py2.7.egg
kazoo 2.6.0 is already the active version in easy-install.pth
Using /usr/local/share/Anaconda/lib/python2.7/site-packages/kazoo-2.6.0-py2.7.egg
Searching for six==1.10.0
Best match: six 1.10.0
Adding six 1.10.0 to easy-install.pth file
Using /usr/local/share/Anaconda/lib/python2.7/site-packages
Finished processing dependencies for pykafka==1.0.3
pykfaka消费数据
from pykafka import KafkaClient
client = KafkaClient(hosts="172.24.183.190:9092")
topic = client.topics['test']
for message in consumer:
if message is not None:
print message.offset, message.value
171 xx
172 x
173 x
174 x
175 x
176 hello
177 wrold
178 xxxx
179 eh
180 hell
181 hello world
182 ni hao