TensorFlow之队列与多线程(一)

TensorFlow之队列与多线程(一)

学习记录

在TensorFlow的学习过程中,队列与多线程的问题对我而言总是一个比较难搞懂的问题,看了几次总是云里雾里,特此记录一下。对我而言比较难搞懂的问题主要集中在多线程对队列的操作以及通过此方法的训练数据的读取。这部分的学习主要是参照着《TensorFlow实战Google深度学习框架》一书中对应的部分,以下也相当于是相应部分的一个阅读笔记。

队列

在TensorFlow中,队列和变量类似,都是计算图上与有状态的结点,其他的计算节点可以修改它们的状态。下面配合代码做详细叙述。

#创建一个先入先出的队列,初始化队列插入0.1,0.2,0.3三个数字
q = tf.FIFOQueue(3,'float')
init = q.enqueue_many(([0.1,0.2,0.3],))

#定义出队、+1、入队操作
x =q.dequeue()
y = x+1
q_inc = q.enqueue([y])
#然后开启一个会话,执行2次q_inc操作,随后查看队列内容。
with tf.Session() as sess:
    sess.run(init)
    quelen = sess.run(q.size())
    for i in range(2):
        sess.run(q_inc)#执行2次操作,队列中的值变为0.3,1.1,1.2
    quelen = sess.run(q.size())
    for i in range(quelen):
        print(sess.run(q.dequeue()))#输出队列的值

对于队列的创建主要有两种方式:tf.FIFOQueue和tf.RandomShuffleQueue。这两种方式用以创建的队列有所区别。

  1. tf.FIFOQueue(capacity,dtypes)用于创建一个先入先出的队列,capacity表示队列的容量,dtypes表示队列中的数据类型。

  2. tf.RandomShuffleQueue(capacity,min_after_dequeue,dtypes)主要用于创建一个随机的队列,它会将队列中的元素打乱,每次出队列得到的是从当前队列所有元素中随机选择的一个。其中的capacity与dtypes与tf.FIFOQueue中的含义一致,需要注意的是min_after_dequeue这个参数,它限定了队列中存储的数据的最小长度,当队列的长度已经等于min_after_dequeue时,再执行出队操作时程序就会发生阻断,即程序在运行,但是没有任何输出。

对于队列而言,修改队列的操作主要有三种方法:

  1. enqueue_many()
    这个方法主要用于初始化队列中的元素。如上面代码中的init = q.enqueue_many(([0.1,0.2,0.3],))
  2. dequeue()
    出队列操作,从队列中取出一个元素
  3. enqueue()
    入队列操作,向队列中加入一个元素

这么说可能有点抽象,一开始自己也是迷迷糊糊的,后来想想,直观上感觉可以这么理解。先入先出的队列就像是进游乐场游玩时排队得队伍,每个排队的人就像是队列中的元素。入队enqueue和出队dequeue操作就相当于检票和排队。由于空间有限,只能排一定的长度,这个长度就是capacity,排满了,后面的人就站不下了,也就排不进来了。先排队的人可以先检票,也就是可以先出队,后面来排队的只能排在队伍的末尾。这也就是先入先出的机制。

而随机队列感觉有点像双色球的摇奖机,摇奖机的搅拌室内的容量限制就是capacity,入队enqueue和出队dequeue操作就相当于出球和放球进去,搅拌室里的球是搅拌混乱的,每次出来的球都是随机的一个,每次放入的球也会直接和原有的球混到一起。

当然这些只是我自己在辅助自己记忆时的一个类比,可能不太贴切,但我自己记着感觉还行。

多线程

TensorFlow提供了 tf.train.Coordinator和tf.train.QueueRunner两个类来完成多线程协同的的功能。对于多线程的概念问题这里就不细说了,可以参阅其他资料,我的直观理解就是类似排队问题的多个队伍同时可排队检票。

tf.train.Coordinator

tf.train.Coordinator主要用于用于协同多个线程一起停止,并提供了request_stop()、should_stop()和join()这三个函数。

  1. request_stop()
    每个启动的线程都通过调用request_stop()来通知其他线程一起退出。当某一个线程调用了request_stop()函数之后,should_stop函数的返回值将被设置为True,这样其他线程就可以一起停止了。
  2. should_stop()
    启动的每个线程都会一直查询 tf.train.Coordinator类中提供的should_stop()函数,当这个函数的返回值为True时,则需要退出当前的线程
  3. join()
    join()函数的主要作用是阻塞主进程(挡住,无法执行join()以后的语句,知道线程结束),使之专注于执行多线程

下面给出一小段参考代码配合理解:

# 定义线程中运行的程序,每隔5秒判断是否要停止bing打印自己的ID
def MyLoop(coord, worker_id):
    while (not coord.should_stop()):
        # 随机停止所有的线程
        if (np.random.rand() < 0.1):
            print("stoping from id:{}".format(worker_id))
            coord.request_stop()
        else:
            print("working on id:{}".format(worker_id))
        time.sleep(5)
 
# 声明 tf.train.Coordinator类      
coord = tf.train.Coordinator()

# 声明创建5个运行前面定义函数功能的线程
threads = [threading.Thread(target=MyLoop, args=(coord,i,)) for i in range(5)]

# 启动所有线程
for t in threads:
    t.start()

coord.join(threads)

tf.train.QueueRunner

tf.train.QueueRunner主要是用于启动多个线程来操作同一个队列,启动的这些线程可以通过上面介绍的tf.train.Coordinator类来统一管理。
下面给出一小段代码配合理解:

# 声明一个先进先出的队列,队列中最多100个元素,类型为实数
queue = tf.FIFOQueue(100, 'float')

# 定义列表的入队操作
enqueue_op = queue.enqueue([tf.random_normal([1])])

# 使用tf.train.QueueRunner来创建多个线程运行队列的入队操作
# 第一个参数给出了被操作的队列
# [enqueue_op] * 5表示要启动5个线程,每个线程中运行的是enqueue_op操作
qr = tf.train.QueueRunner(queue, [enqueue_op] * 5)

# 将定义过的QueueRunner加入TensorFlow计算图上指定的集合
# tf.train.add_queue_runner没有指定集合,
# 则加入默认集合tf.GraphKeys.QUEUE_RUNNERS
# 下面的函数就是讲刚刚定义的qr加入默认的tf.GraphKeys.QUEUE_RUNNERS集合
tf.train.add_queue_runner(qr)

# 定义出队操作
out_tensor = queue.dequeue()

with tf.Session() as sess:
    # 使用tf.train.Coordinator来协同启动的线程
    coord = tf.train.Coordinator()
    
    # 使用tf.train.QueueRunner时,需要明确强调用tf.train.start_queue_runners
    # 来启动所有的线程。否则因为没有线程进行入队操作,当调用出队操作时,
    # 程序会一直等待入队操作被执行
    # tf.train.start_queue_runners函数会默认启动指定集合中的QueueRunner
    # 所以一般来说,tf.train.add_queue_runner函数
    # 和tf.train.start_queue_runners函数会指向同一个集合
    threads = tf.train.start_queue_runners(sess=sess, coord=coord)
    
    for _ in range(13):
        print(sess.run(out_tensor)[0])
        
    coord.request_stop()
    coord.join(threads)

c)tf.train.QueueRunner()类的应用主要要注意有两个配合使用的函数:

  1. tf.train.add_queue_runner()
    将定义过的QueueRunner加入TensorFlow计算图上指定的集合。
  2. tf.train.start_queue_runners(sess=sess, coord=coord)
    用于启动所有的线程,感觉和上面的for循环启动线程的作用类似。

tf.train.start_queue_runners函数会默认启动指定集合中的QueueRunner,所以一般来说tf.train.add_queue_runner函数和tf.train.start_queue_runners函数会指向同一个集合。

本篇主要介绍多线程对队列的操作的相关内容,通过此方法的训练数据的读取以用于训练的内容将会写在下一篇。

猜你喜欢

转载自blog.csdn.net/weixin_43923472/article/details/89436768