版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/m0_37789876/article/details/81270752
列队的作用重点内容
一、tensorflow提供了2种列队
- 第一种列队是FIFOQueue(2,tf.int32),先入先出
- 第二种列队是RandomShuffle(2,tf.int32),入队顺序和出队顺序是无关的
二、tensorflow提供了2中列队操作
- 入队操作 enqueue(),enqueue_many(([0,10],]))
- 出队操作dequeue()
三、列队的意义
- 能被多线程操作
四、列队如何进行多线程的操作
# Copyright 2016 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""CIFAR dataset input module.
"""
import tensorflow as tf
def build_input(dataset, data_path, batch_size, mode):
"""Build CIFAR image and labels.
Args:
dataset(数据集): Either 'cifar10' or 'cifar100'.
data_path(数据集路径): Filename for data.
batch_size: Input batch size.
mode(模式): Either 'train' or 'eval'.
Returns:
images(图片): Batches of images. [batch_size, image_size, image_size, 3]
labels(类别标签): Batches of labels. [batch_size, num_classes]
Raises:
ValueError: when the specified dataset is not supported.
"""
# 数据集参数
image_size = 32 #图片大小为32*32
if dataset == 'cifar10':
label_bytes = 1
label_offset = 0#文件中记录的间隔
num_classes = 10
elif dataset == 'cifar100':
label_bytes = 1
label_offset = 1#文件中记录的间隔
num_classes = 100
else:
raise ValueError('Not supported dataset %s', dataset)
# 数据读取参数
depth = 3
image_bytes = image_size * image_size * depth #图片的字节大小
record_bytes = label_bytes + label_offset + image_bytes#需要记录的字节大小
# 获取文件名列表
data_files = tf.gfile.Glob(data_path)#得到符合文件名的文件列表
# 文件名列表生成器
file_queue = tf.train.string_input_producer(data_files, shuffle=True)#打包成一个队列
# 文件名列表里读取原始二进制数据
reader = tf.FixedLengthRecordReader(record_bytes=record_bytes)#生成一个读取器,读取出多少个字节
_, value = reader.read(file_queue)
# 将原始二进制数据转换成图片数据及类别标签
record = tf.reshape(tf.decode_raw(value, tf.uint8), [record_bytes])#把字符串转化为tf.uint8,并把列表转换为tensor
label = tf.cast(tf.slice(record, [label_offset], [label_bytes]), tf.int32)#tf.slice()表示切片,开始位置,和大小。然后转化为tf.int32的值
# 将数据串 [depth * height * width] 转换成矩阵 [depth, height, width].
depth_major = tf.reshape(tf.slice(record, [label_bytes], [image_bytes]),#
[depth, image_size, image_size])
# 转换维数:[depth, height, width]转成[height, width, depth].
image = tf.cast(tf.transpose(depth_major, [1, 2, 0]), tf.float32)#转换维度,并且转化类型
if mode == 'train':
# 增减图片尺寸
image = tf.image.resize_image_with_crop_or_pad(
image, image_size+4, image_size+4)#调整尺寸
# 随机裁剪图片
image = tf.random_crop(image, [image_size, image_size, 3])#随机的剪裁图像
# 随机水平翻转图片
image = tf.image.random_flip_left_right(image)#随机的翻转
# 逐图片做像素值中心化(减均值)
image = tf.image.per_image_standardization(image)#标准化图像,为的是加边的时候好加,同时也保留了特征分布
# 建立输入数据队列(随机洗牌)
example_queue = tf.RandomShuffleQueue(
# 队列容量
capacity=16 * batch_size,
# 队列数据的最小容许量
min_after_dequeue=8 * batch_size,
dtypes=[tf.float32, tf.int32],
# 图片数据尺寸,标签尺寸
shapes=[[image_size, image_size, depth], [1]])
# 读线程的数量
num_threads = 16
else:
# 获取测试图片,并做像素值中心化
image = tf.image.resize_image_with_crop_or_pad(
image, image_size, image_size)
image = tf.image.per_image_standardization(image)
# 建立输入数据队列(先入先出队列)
example_queue = tf.FIFOQueue(
3 * batch_size,
dtypes=[tf.float32, tf.int32],
shapes=[[image_size, image_size, depth], [1]])
# 读线程的数量
num_threads = 1
# 数据入队操作
example_enqueue_op = example_queue.enqueue([image, label])
# 队列执行器
tf.train.add_queue_runner(tf.train.queue_runner.QueueRunner(#启动多线程,并加入到队列执行器
example_queue, [example_enqueue_op] * num_threads))
# 数据出队操作,从队列读取Batch数据
images, labels = example_queue.dequeue_many(batch_size)
# 将标签数据由稀疏格式转换成稠密格式
# [ 2, [[0,1,0,0,0]
# 4, [0,0,0,1,0]
# 3, --> [0,0,1,0,0]
# 5, [0,0,0,0,1]
# 1 ] [1,0,0,0,0]]
labels = tf.reshape(labels, [batch_size, 1])
indices = tf.reshape(tf.range(0, batch_size, 1), [batch_size, 1])
labels = tf.sparse_to_dense(
tf.concat(values=[indices, labels], axis=1),#要拼接序号和标签,才能转换为稠密格式
[batch_size, num_classes], 1.0, 0.0)
#检测数据维度
assert len(images.get_shape()) == 4
assert images.get_shape()[0] == batch_size
assert images.get_shape()[-1] == 3
assert len(labels.get_shape()) == 2
assert labels.get_shape()[0] == batch_size
assert labels.get_shape()[1] == num_classes
# 添加图片总结
tf.summary.image('images', images)#添加到可视化
return images, labels
以上看懂了之后就知道如何进行手动的开启多线程的入队操作
下面是如何利用tenorflow自带的多线程输入数据处理框架来,多线程生成文件列队,多线程读取列队,多线程批数据列队生成:
import tensorflow as tf
def get_data(batch_size,mode,data_path):
return Data(batch_size,mode,data_path)
class Data(object):
def __init__(self,batch_size,mode,data_path):
self.init_data(batch_size,mode,data_path)
def build_input(self):
image_matrix_batch, class_name_batch = self.sess.run([self.image_matrix_batch,self.class_name_batch])
return [image_matrix_batch, class_name_batch,self.coord,self.threads,self.sess]
def init_data(self,batch_size,mode,data_path = ""):
#默认值为CUI的TF数据
if mode == "train" and data_path == "":
data_path = r"C:\Users\Administrator\Desktop\UCI HAR Dataset\UCI HAR Dataset\cutten_train_signal_images_fft2shift_plus2_TFRecord\*\traindata.tfrecords-*"
elif mode == "test" and data_path == "":
data_path = r"C:\Users\Administrator\Desktop\UCI HAR Dataset\UCI HAR Dataset\cutten_test_signal_images_fft2shift_plus2_TFRecord\*\testdata.tfrecords-*"
# 得到符合文件名的文件列表
data_files = tf.train.match_filenames_once(data_path)
# 打包成一个文件名队列,这里需要列队操作集合
#https://www.cnblogs.com/qianblue/p/6971435.html
# 在我们使用tf.train.string_input_producer创建文件名队列后,整个系统其实还是处于“停滞状态”的,也就是说,我们文件名并没有真正被加入到队列中(如下图所示)。此时如果我们开始计算,因为内存队列中什么也没有,计算单元就会一直等待,导致整个系统被阻塞。
if mode == "train":
file_queue = tf.train.string_input_producer(data_files,shuffle=True)
if mode == "test":
file_queue = tf.train.string_input_producer(data_files, shuffle=False,num_epochs=1)
#生成读取器
reader = tf.TFRecordReader()
#读取文件名列队的文件
_,serialized_example = reader.read(file_queue)
features = tf.parse_single_example(serialized_example,
features={
'class': tf.FixedLenFeature([], tf.int64),
'image_matrix': tf.FixedLenFeature([], tf.string)
})
#用cv2读取的数据都是uint8类型的,解码后失去了维度,变成1维向量(这里存在一个BUG)
image_matrix = tf.decode_raw(features["image_matrix"],tf.uint8)
image_matrix = tf.cast(image_matrix,tf.float32)
class_name = tf.cast(features["class"],tf.int32)
#组合样例列对的大小(缓冲列队)
capacity = 6000+3*batch_size
#bug,解码后的tensor都会产生问题,解决方案,tf.reshape(),一般调用X.get_shape()就行,解码后的tensor没有shape
image_matrix = tf.reshape(image_matrix,[36,68])
#生成缓冲队列
image_matrix_batch,class_name_batch = tf.train.shuffle_batch([image_matrix,class_name],batch_size=batch_size,capacity=capacity,min_after_dequeue=1000,num_threads=4)
tf.summary.image('images', image_matrix_batch)
sess = tf.Session()
#初始化变量
sess.run([tf.global_variables_initializer(),tf.local_variables_initializer()])
#建立一个协同器
coord = tf.train.Coordinator()
#启动多个列队操作线程,返回线程对象列表(文件列队生成的列队多线程启动,reader的列队多线程,batch的列队多线程)
threads = tf.train.start_queue_runners(sess=sess,coord=coord)
#保存到成员属性中(为的是只执行一次)
self.image_matrix_batch = image_matrix_batch
self.class_name_batch = class_name_batch
self.sess = sess
self.coord = coord
self.threads = threads
# coord.request_stop()
# coord.join(threads)