tensorfrom—文件读取
import tensorflow as tf
# 模拟一下同步数据处理
# 1.首先定义队列
Q = tf.FIFOQueue(3, tf.float32)
# 1.1 放入一些数据
enq_many = Q.enqueue_many([[0.1, 0.2, 0.4],])
# 2.定义一些数据读取,
out_q = Q.dequeue() # 从Q中读取数据
data = out_q + 1
en_q = Q.enqueue(data) # 将一个元素加入此队列
with tf.Session() as sess:
sess.run(enq_many)
# 处理数据
for i in range(3):
sess.run(en_q)
for i in range(Q.size().eval()):
print(sess.run(Q.dequeue()))
1.1
1.2
1.4
队列管理器—创建线程
api
# 定义线程
tf.train.QueueRunner(queue,enqueue_ops=None)
queue:写名字
enqueue_ops=None:添加线程的队列操作列表,[]*2指定2个线程
# session
线程协调器
# k开启线程管理器
coord = tf.train.Coordinator()
# 回收你,回来吧皮卡丘
coord.request_stop()
coord.join(th)
import tensorflow as tf
# 模拟异步的数据
# 1.首先定义队列 1000
Q = tf.FIFOQueue(1000, tf.float32)
# 2.定义子线程干的事情,循环 值+1 放入队列中
var = tf.Variable(0.0)
# 实现一个自增
data = tf.assign_add(var, tf.constant(1.0))
en_q = Q.enqueue(data)
# 3.定义队列管理器op,指定多少子线程,子线程该干嘛
qr = tf.train.QueueRunner(Q, enqueue_ops=[en_q]*2)
init_op = tf.global_variables_initializer()
with tf.Session() as sess:
sess.run(init_op)
# k开启线程管理器
coord = tf.train.Coordinator()
# 真正开启子线程
th = qr.create_threads(sess, coord=coord, start=True)
#
for i in range(300):
print(sess.run(Q.dequeue()))
# 回收你,回来吧皮卡丘
coord.request_stop()
coord.join(th)
文件读取
csv文件读取:
- 先找到文件
- 构件文件队列表
- 构造阅读器,读取队列内容(一行)
- 解码内容
- 批处理(多个样本)
1.文件读取Api
tf.train.string_inport_producer(string_tenor,shuffle=True)
讲输出字符串(一般为文件名),输入到管道队列
read(file_queue):从队列中指定数量的内容返回一个Tensors元祖(key,文件名称;value,默认的内容(行,字节))
# 3. 解码器
tf.decode_csv()
将文本文件转换成一个张量
import tensorflow as tf
import os
# # 模拟一下同步数据处理
#
# # 1.首先定义队列
# Q = tf.FIFOQueue(3, tf.float32)
# # 1.1 放入一些数据
# enq_many = Q.enqueue_many([[0.1, 0.2, 0.4],])
#
# # 2.定义一些数据读取,
#
# out_q = Q.dequeue() # 从Q中读取数据
#
# data = out_q + 1
# en_q = Q.enqueue(data) # 将一个元素加入此队列
#
# with tf.Session() as sess:
# sess.run(enq_many)
#
# # 处理数据
# for i in range(3):
# sess.run(en_q)
#
# for i in range(Q.size().eval()):
# print(sess.run(Q.dequeue()))
# AA(filelist)
def AA(filelist):
"""
读取csv文件
parm:filelist
"""
# 1.构造文件队列
file_queue = tf.train.string_input_producer(filelist) # 这是一个队列
# 2.构造csv阅读器读取队列数据(安行)
reader = tf.TextLineReader()
key, value = reader.read(file_queue)
# 3.对每行进行解码
# record_defaults=指定每一个样本每一列的类型(指定这一列解码用什么类型),指定默认值[[2]],以int类型来读取,并且空缺位置是2
records = [['None'], ['None']]
example, label = tf.decode_csv(value, record_defaults=records)
# print(example, label)
# 4.想读取多个数据,就需批处理,batch_size=8 每次读多少数据读8个样本,num_threads=1开几个线程,capacity=9队列有9个数据
#example_batch , label_batch = tf.train.batch([example, label], batch_size=8, num_threads=1,capacity=9)
# print(example, label)
return example, label
if __name__ == '__main__':
# 1.找到文件,放入列表, 路径-->名字 -->类表当中
file_name = os.listdir('./data/')
filelist = [os.path.join('./data/', file) for file in file_name]
example, label = AA(filelist)
# 开启绘画,运行结果
with tf.Session() as sess:
# 定义线程协调器
coord = tf.train.Coordinator()
# 开启读文件的线程
th = tf.train.start_queue_runners(sess, coord=coord)
print(sess.run([example, label]))
# 回收子线程
coord.request_stop()
coord.join(th)