tensorflow中线程和队列

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/summer2day/article/details/82497335

队列(queue)本身也是图中的一个节点,是一种有状态的节点,主要包括入队节点(enqueue)和出队节点(dequeue),可以修改它的内容。enqueue操作返回计算图中的一个Operation节点,dequeue操作返回一个Tensor值。Tensor在创建时同样只是一个定义(或称为“声明”),需要放在Session中运行才能获得真正的数值。

FIFOQueue先入先出队列

这里写图片描述

import tensorflow as tf

# 创建一个先入先出的队列,指定队列最多可以保存3个元素,并指定类型为整数
q = tf.FIFOQueue(3, 'int32')
# 初始化队列中的元素,将[0,10,20]3个元素排入此队列
init = q.enqueue_many(([0, 10, 20], ))
# 将队列中的第1个元素出队列,并存入变量x中
x = q.dequeue()
# 将得到的值加1
y = x + 1
# 将加1后的值重新加入队列
q_inc = q.enqueue([y])

with tf.Session() as sess:
    # 队列初始化
    init.run()
    for _ in range(5):
        # 执行数据出队列/出队元素+1/重新加入队列的过程
        v, _ = sess.run([x, q_inc])
        print(v)
输出:
0
10
20
1
11

分析:队列开始有[0,10,20]三个元素,执行5次数据出队列,出队元素+1,重新加入队列的过程中: 
x=0,  y=1,   队列:[10,20,1] 
x=10,  y=11,  队列:[20,1,11] 
x=20,  y=21,  队列:[1,11,21] 
x=1,   y=2,   队列:[11,21,2] 
x=11,  y=12,  队列:[21,2,12]

RandomShuffleQueue随机队列

在出队列时,是以随机的顺序产生元素的。例如,我们在训练一些图像样本时,使用CNN的网络结构,希望可以无序地读入训练样本,就要用RandomShuffleQueue,每次随机产生一个训练样本。

RandomShuffleQueue在Tensorflow使用异步计算时非常重要。因为Tensorflow的会话是支持多线程的,我们可以在主线程里执行训练操作,使用RandomShuffleQueue作为训练输入,开多个线程来准备训练样本,将样本压入队列后,主线程会从队列中每次取出mini-batch的样本进行训练

q=tf.RandomShuffleQueue(capacity=10,min_after_dequeue=2,dtypes="float")#队列最大长度为10,出队后最小长度为2
sess=tf.Session()
for i in range(0,10):#10次入队
    sess.run(q.enqueue(i))
for i in range(0,8):#8次出队
    print(sess.run(q.dequeue()))
输出:顺序不固定的八个数
7.0
8.0
5.0
6.0
9.0
4.0
3.0
0.0

队列管理器

当数据量很大时,入队操作从硬盘中读取数据,放入内存中,主线程需要等待入队操作完成,才能进行训练操作。会话中可以运行多个线程,我们使用线程管理器QueueRunner创建一系列的新线程进行入队操作,让主线程继续使用数据,即训练网络和读取数据是异步的,主线程在训练网络,另一个线程在将数据从硬盘读入内存。

import tensorflow as tf

# 创建一个含有队列的图
q = tf.FIFOQueue(1000,"float")  # 创建一个长度为1000的队列
counter = tf.Variable(0.0)  # 计数器
increment_op = tf.assign_add(counter,tf.constant(1.0))   # 操作:给计数器加1
enqueque_op = q.enqueue(counter)  # 操作:计数器值加入队列

# 创建一个队列管理器 QueueRunner,用这两个操作向队列 q 中添加元素,启动一个线程。
qr = tf.train.QueueRunner(q,enqueue_ops=[increment_op,enqueque_op]*1)

# 启动一个会话,从队列管理器qr中创建线程
# 主线程
sess = tf.Session()
sess.run(tf.global_variables_initializer())

enqueue_threads = qr.create_threads(sess,start=True)

# 主线程
for i in range(10):
    print(sess.run(q.dequeue()))
输出:
1.0
3.0
4.0
5.0
6.0
6.0
7.0
7.0
8.0
10.0
分析:这个本质是+1操作和入队操作是异步的,也就是说如果加1操作执行了很多次之后,才执行一次入队的话,就会出现入队不是按我们预想的顺序那样;反过来,当我执行几次入队之后,才执行一次加1操作就会出现一个数重复入队的情况。

如何解决:

方法一:把两个操作变成列表中的一个元素

import tensorflow as tf

q = tf.FIFOQueue(1000,"float")  
counter = tf.Variable(0.0)  
increment_op = tf.assign_add(counter,tf.constant(1.0))  
enqueque_op = q.enqueue(counter) 

# 把加1和入队变成列表中的一个元素
# 原 :qr = tf.train.QueueRunner(q,enqueue_ops=[increment_op,enqueque_op]*1)
qr = tf.train.QueueRunner(q,enqueue_ops=[[increment_op,enqueque_op]]*1)

sess = tf.Session()
sess.run(tf.global_variables_initializer())

enqueue_threads = qr.create_threads(sess,start=True)

for i in range(10):
    print(sess.run(q.dequeue()))

方法二:把加1操作变成入队操作的依赖,只有加1才能入队

import tensorflow as tf

q = tf.FIFOQueue(1000,"float")  
counter = tf.Variable(0.0)  
increment_op = tf.assign_add(counter,tf.constant(1.0))  
# 原 enqueque_op = q.enqueue(counter) 

# 把加一操作变成入队操作的依赖
with tf.control_dependencies([increment_op]):
    enqueque_op = q.enqueue(counter)
# 由于将加1变成了入队的依赖,所以入队操作只需要传入enqueque_op就行了
qr = tf.train.QueueRunner(q,enqueue_ops=[enqueque_op]*1)

sess = tf.Session()
sess.run(tf.global_variables_initializer())

enqueue_threads = qr.create_threads(sess,start=True)

for i in range(10):
    print(sess.run(q.dequeue()))

方法三:把加1和入队变成空操作的依赖

import tensorflow as tf

q = tf.FIFOQueue(1000,"float")  
counter = tf.Variable(0.0)  
increment_op = tf.assign_add(counter,tf.constant(1.0))  
enqueque_op = q.enqueue(counter) 

# 把两个操作变成空操作的依赖
with tf.control_dependencies([increment_op,enqueque_op]):
    void_op = tf.no_op()
# 由于将两个操作变成了空操作的依赖,所以入队操作只需要传入void_op就行了
qr = tf.train.QueueRunner(q,enqueue_ops=[void_op]*1)

sess = tf.Session()
sess.run(tf.global_variables_initializer())

enqueue_threads = qr.create_threads(sess,start=True)

for i in range(10):
    print(sess.run(q.dequeue()))

方法四:将两个操作组合起来

import tensorflow as tf

q = tf.FIFOQueue(1000,"float")  
counter = tf.Variable(0.0)  
increment_op = tf.assign_add(counter,tf.constant(1.0))  
enqueque_op = q.enqueue(counter) 

# 原 :qr = tf.train.QueueRunner(q,enqueue_ops=[increment_op,enqueque_op]*1)
# 用tf.group()把两个操作组合起来
qr = tf.train.QueueRunner(q,enqueue_ops=[tf.group(increment_op,enqueque_op)]*1)

sess = tf.Session()
sess.run(tf.global_variables_initializer())

enqueue_threads = qr.create_threads(sess,start=True)

for i in range(10):
    print(sess.run(q.dequeue()))

可能会遇到的错误:

ERROR:tensorflow:Exception in QueueRunner: Run call was cancelled

ERROR:tensorflow:Exception in QueueRunner: Session has been closed.
Exception in thread Thread-22:
Traceback (most recent call last):
  File "C:\Anaconda3\envs\tensorflow-gpu\lib\threading.py", line 914, in _bootstrap_inner
    self.run()
  File "C:\Anaconda3\envs\tensorflow-gpu\lib\threading.py", line 862, in run
    self._target(*self._args, **self._kwargs)
  File "C:\Anaconda3\envs\tensorflow-gpu\lib\site-packages\tensorflow\python\training\queue_runner_impl.py", line 238, in _run
    enqueue_callable()
  File "C:\Anaconda3\envs\tensorflow-gpu\lib\site-packages\tensorflow\python\client\session.py", line 1245, in _single_tensor_run
    fetch_list_as_strings, [], status, None)
  File "C:\Anaconda3\envs\tensorflow-gpu\lib\contextlib.py", line 66, in __exit__
    next(self.gen)
  File "C:\Anaconda3\envs\tensorflow-gpu\lib\site-packages\tensorflow\python\framework\errors_impl.py", line 466, in raise_exception_on_not_ok_status
    pywrap_tensorflow.TF_GetCode(status))
tensorflow.python.framework.errors_impl.CancelledError: Run call was cancelled

Exception in thread Thread-23:
Traceback (most recent call last):
  File "C:\Anaconda3\envs\tensorflow-gpu\lib\threading.py", line 914, in _bootstrap_inner
    self.run()
  File "C:\Anaconda3\envs\tensorflow-gpu\lib\threading.py", line 862, in run
    self._target(*self._args, **self._kwargs)
  File "C:\Anaconda3\envs\tensorflow-gpu\lib\site-packages\tensorflow\python\training\queue_runner_impl.py", line 238, in _run
    enqueue_callable()
  File "C:\Anaconda3\envs\tensorflow-gpu\lib\site-packages\tensorflow\python\client\session.py", line 1235, in _single_operation_run
    target_list_as_strings, status, None)
  File "C:\Anaconda3\envs\tensorflow-gpu\lib\contextlib.py", line 66, in __exit__
    next(self.gen)
  File "C:\Anaconda3\envs\tensorflow-gpu\lib\site-packages\tensorflow\python\framework\errors_impl.py", line 466, in raise_exception_on_not_ok_status
    pywrap_tensorflow.TF_GetCode(status))
tensorflow.python.framework.errors_impl.CancelledError: Session has been closed.

使用sess = tf.Session(),而不用with自动关闭

协调器Coordinator

入队线程自顾自的执行,在需要的出队操作完成之后,进程没法结束。这样就要使用 tf.train.Coordinator 来实现线程间的同步,终止其他线程。

主要方法为:

should_stop():如果线程应该停止则返回True。

request_stop(): 请求该线程停止。

join():等待被指定的线程终止。**

首先创建一个Coordinator对象,然后建立一些使用Coordinator对象的线程。这些线程通常一直循环运行,一直到should_stop()返回True时停止。 任何线程都可以决定计算什么时候应该停止。它只需要调用request_stop(),同时其他线程的should_stop()将会返回True,然后都停下来

import tensorflow as tf

q = tf.FIFOQueue(1000,"float")  
counter = tf.Variable(0.0)  
increment_op = tf.assign_add(counter,tf.constant(1.0))  
enqueque_op = q.enqueue(counter) 

qr = tf.train.QueueRunner(q,enqueue_ops=[[increment_op,enqueque_op]]*1)

# 主线程
sess = tf.Session()
sess.run(tf.global_variables_initializer())

#coordinator:协调器,协调线程间的关系可以被当做一种信号量,起同步作用
coord = tf.train.Coordinator()

# 启动入队线程,协调器是线程的参数
enqueue_threads = qr.create_threads(sess,coord=coord,start=True)

# 主线程
for i in range(0,10):
    print(sess.run(q.dequeue()))

coord.request_stop() # 通知其他线程关闭
 # join操作等待其他线程结束,其他所有的线程关闭后,这个函数才能返回
coord.join(enqueue_threads)

如果在关闭队列线程后,再执行出队操作,就会抛出 tf.errors.OutOfRange 错误。这种情况就需要使用 tf.errors.OutOfRangeError 来捕捉错误,终止循环:

import tensorflow as tf

q = tf.FIFOQueue(1000,"float")  
counter = tf.Variable(0.0)  
increment_op = tf.assign_add(counter,tf.constant(1.0))  
enqueque_op = q.enqueue(counter) 

qr = tf.train.QueueRunner(q,enqueue_ops=[[increment_op,enqueque_op]]*1)

# 主线程
sess = tf.Session()
sess.run(tf.global_variables_initializer())

#coordinator:协调器,协调线程间的关系可以被当做一种信号量,起同步作用
coord = tf.train.Coordinator()

# 启动入队线程,协调器是线程的参数
enqueue_threads = qr.create_threads(sess,coord=coord,start=True)

coord.request_stop() # 通知其他线程关闭

# 主线程
for i in range(0,10):
    try:
        print("i : ",i)
        print(sess.run(q.dequeue()))
    except tf.errors.OutOfRangeError:
        print('finish')
        break

# join操作等待其他线程结束,其他所有的线程关闭后,这个函数才能返回
coord.join(enqueue_threads)
i :  0
finish
分析:将请求线程关闭放置在出队的前面,也就是说我还没有出队之前就请求将线程关闭了,但关闭线程需要一定的时间,所以后来在遍历出队是还是可以执行的线程关闭后,如果不抛异常的话就像上个例子那样会报错,所以这里执行了异常,并打印出了“finish

猜你喜欢

转载自blog.csdn.net/summer2day/article/details/82497335