python 操作kafka

首先需要安装 PyKafka:

$ pip install pykafka

连接kafka,默认端口9092

>>> from pykafka import KafkaClient
>>> client = KafkaClient(hosts="127.0.0.1:9092")

查看主题:

client.topics

选择一个自定义topic生产消息:

>>> topic = client.topics['my.test']
>>> producer = topic.get_producer()
>>> producer.produce(['test message ' + i ** 2 for i in range(4)])

消费消息:

>>> consumer = topic.get_simple_consumer()
>>> for message in consumer:
    if message is not None:
        print message.offset, message.value
0 test message 0
1 test message 1
2 test message 4
3 test message 9

python脚本:

producer.py:

# -*- coding: utf-8 -*-

from kafka import KafkaProducer 

producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'])
for i in range(3):
     msg = "msg %d" % i
     print msg
     producer.send('test', msg)
producer.close()

consumer.py 

#encoding:utf8
from pykafka import KafkaClient
client = KafkaClient(hosts="127.0.0.1:9092")
topic = client.topics['test']
# 获取 consumer 消费者
consumer = topic.get_simple_consumer(consumer_group="test",reset_offset_on_start=True)
for message in consumer:
    print message
    if message is not None:      
        print ">>>>>>>>>>",message.offset
        print ">>>>>>>>>>",message.value  

发布了123 篇原创文章 · 获赞 71 · 访问量 11万+

猜你喜欢

转载自blog.csdn.net/boke14122621/article/details/103102460