用相同的GroupId消费已经消费过的信息,可以进行回滚。
import time
import redis
import json
import math
import Queue
import os
def cont(t):
time_local = time.localtime(t)
dt = time.strftime("%Y-%m-%d %H:%M:%S",time_local)
return dt
def seek2ts(ts,topic,consumer):
x= [0,0,0,0,0,0 ]
ts*=1000
m=[]
for y in range(6):
i=TopicPartition(topic, y)
x[y] = consumer.offsets_for_times({i:ts})[i][0]
m.append(i)
consumer.assign(m)
for y in range(6):
i=TopicPartition(topic, y)
consumer.seek(i,x[y])
return x
def main():
consumer = KafkaConsumer(bootstrap_servers=[#集群IP] , group_id ="")
seek2ts(1530748800#回滚到的时间戳,"#TOPIC",consumer)
consumer.poll()
consumer.commit()
print "done"
main()