首先需要安装 PyKafka:
$ pip install pykafka
连接kafka,默认端口9092
>>> from pykafka import KafkaClient
>>> client = KafkaClient(hosts="127.0.0.1:9092")
查看主题:
client.topics
选择一个自定义topic生产消息:
>>> topic = client.topics['my.test']
>>> producer = topic.get_producer()
>>> producer.produce(['test message ' + i ** 2 for i in range(4)])
消费消息:
>>> consumer = topic.get_simple_consumer()
>>> for message in consumer:
if message is not None:
print message.offset, message.value
0 test message 0
1 test message 1
2 test message 4
3 test message 9
python脚本:
producer.py:
# -*- coding: utf-8 -*-
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'])
for i in range(3):
msg = "msg %d" % i
print msg
producer.send('test', msg)
producer.close()
consumer.py
#encoding:utf8
from pykafka import KafkaClient
client = KafkaClient(hosts="127.0.0.1:9092")
topic = client.topics['test']
# 获取 consumer 消费者
consumer = topic.get_simple_consumer(consumer_group="test",reset_offset_on_start=True)
for message in consumer:
print message
if message is not None:
print ">>>>>>>>>>",message.offset
print ">>>>>>>>>>",message.value