注意用代码测试的时候执行脚本的先后顺序是有影响的,我之前的测试是先执行生产者脚本,发现可以生产消息,但是在执行消费者时,消费不了消息,困惑了好久,才发现自己犯了很低级的错误,消费者里面写的是实时消费,而执行完生产者脚本后,在执行消费者脚本,导致生产者生产的数据已变成历史数据了,因此需要消费者消费历史数据,参数是auto_offset_reset='earliest',脚本才能测试通。
#!/usr/bin/env python3
import json
#import scrapy
from kafka import KafkaProducer
from pykafka import KafkaClient
class Demo01():
@staticmethod
def exe():
print('###########Demo01################')
kafka_host = '10.72.15.102' # kafka服务器地址
kafka_port = 9092 # kafka服务器的端口
producer = KafkaProducer(bootstrap_servers=['{kafka_host}:{kafka_port}'.format(
kafka_host=kafka_host,
kafka_port=kafka_port
)])
# 简单for循环10次,发送10条消息
for i in range(1, 10):
message_string = 'some message'
# 调用send方法,发送名字为'topic1'的topicid ,发送的消息为message_string
response = producer.send('topic528', bytes('hello','utf-8'))
producer.flush()
print(response)
@staticmethod
def t():
#s = str(json.dumps("nihao", ensure_ascii=False))
producer = KafkaProducer(bootstrap_servers='10.72.15.102:9092')
response =producer.send('topic528', bytes('nihao1-new', 'utf-8'))
producer.flush()
print(response)
@staticmethod
def tr():
host = '10.72.15.102:9092'
client = KafkaClient(hosts=host)
print("--------method is tr-----------")
topic = client.topics[b'topic528']
producer = topic.get_producer()
producer.start()
# 生产消息
msg_dict = {
"sleep_time": 10,
"table": "msg",
"msg": "Hello World"
}
msg = json.dumps(msg_dict)
msg = bytes(msg, encoding="utf8")
for i in range(10):
producer.produce(msg)
producer.stop()
if __name__ == "__main__":
Demo01.t()
#Demo01.exe()
--------------------------------------------
# # -*- coding: utf-8 -*-
import sys
import time
import json
from kafka import KafkaProducer
from kafka import KafkaConsumer
from kafka.errors import KafkaError
KAFAKA_HOST = "10.72.15.102"
KAFAKA_PORT = 9092
KAFAKA_TOPIC = "topic528"
def main():
consumer = KafkaConsumer('topic528',group_id = 'topic528',consumer = KafkaConsumer(bootstrap_servers=BOOTSTRAP_SERVERS,
#消费历史没有消费的消息
auto_offset_reset='earliest',
#没有group_id是需要添加该参数,enable_auto_commit=False,
consumer_timeout_ms=1000
for message in consumer:
content = message.value
print(content)
if __name__ == '__main__':
main()