import threading import redis #pool = redis.ConnectionPool(host="192.168.1.100", port=int(6379), db=1) #redis_cache = redis.Redis(connection_pool=pool) redis_cache = redis.StrictRedis(host="192.168.1.100",port=int(6379),db=1) class TestThread(threading.Thread): def __init__(self,redis_cache): threading.Thread.__init__(self) self.redis_cache = redis_cache def run(self): while True: obj = self.redis_cache.get("uid:1002") if not obj: self.redis_cache.set("uid:1002","fdsjkfldsjlkdsflkds") self.redis_cache.expire("uid:1002", 60) print self.getName()+'set' #print self.redis_cache.connection_pool._created_connections for i in xrange(8): t = TestThread(redis_cache) t.start()
测试redis-py的线程安全问题
猜你喜欢
转载自san-yun.iteye.com/blog/1674939
今日推荐
周排行