kafka 消费比生产慢,生产一停,消费也停,说明数据没有进到kafka,那么在哪呢?
只可能在缓存里,阻塞了
import os from pykafka import KafkaClient import json import base64 import logging import time # import logging as log # log.basicConfig(level=log.DEBUG) import datetime host2='192.168.00.00:9092,192.168.00.01:9092' class Kafka_client(): def __init__(self,f_log): client = KafkaClient(hosts=host) print("conn kfaka ok") # self.topic = client.topics[b'ccat'] self.topic = client.topics[b'test2'] self.producer = self.topic.get_sync_producer(block_on_queue_full=False) self.producer.start() def send(self,data): # if self.index >=0: self.producer.produce(data) log_time = datetime.datetime.now().strftime('%m%d_%H%M%S') pass def stop(self): self.producer.stop() pass if __name__ == '__main__': kafka=Kafka_client() # kafka.run() path=r"D:\data\Images/" files = os.listdir(path) index=0 for file in files: img = open(path+file, 'rb') image_data = img.read() base64_data = base64.b64encode(image_data) kafka.send(base64_data) index+=1 print(str(index)) if index>2000: break kafka.stop()