tensorflow中关于队列使用的实验
2016-11-28 00:36
405 查看
Queue:
队列本身也是图中的一个节点。其他节点(enqueue, dequeue)可以修改队列节点中的内容。
#-*- coding:utf-8 -*-
import tensorflow as tf
#创建的图:一个先入先出队列,以及初始化,出队,+1,入队操作
q = tf.FIFOQueue(3, "float")
init = q.enqueue_many(([0.1, 0.2, 0.3],))
x = q.dequeue()
y = x + 1
q_inc = q.enqueue([y])
#开启一个session,session是会话,会话的潜在含义是状态保持,各种tensor的状态保持
with tf.Session() as sess:
sess.run(init)
for i in range(2):
sess.run(q_inc)
quelen = sess.run(q.size())
for i in range(quelen):
print (sess.run(q.dequeue()))
queueRunner
之前的例子中,入队操作都在主线程中进行,Session中可以多个线程一起运行。 在数据输入的应用场景中,入队操作从硬盘上读取,入队操作是从硬盘中读取输入,放到内存当中,速度较慢。 使用
例子1:
#-*- coding:utf-8 -*-
import tensorflow as tf
import sys
#创建稍微复杂一点的队列作为例子
q = tf.FIFOQueue(1000,"float")
#计数器
counter = tf.Variable(0.0)
#操作:给计数器加一
increment_op = tf.assign_add(counter,tf.constant(1.0))
#操作:将计数器加入队列
enqueue_op = q.enqueue(counter)
#创建一个队列管理器QueueRunner,用这两个操作向q中添加元素。目前我们只使用一个线程:
qr = tf.train.QueueRunner(q,enqueue_ops=[increment_op,enqueue_op]*1)
#从队列管理器中创建线程,并启动:
#主线程
with tf.Session() as sess:
sess.run(tf.initialize_all_variables())
enqueue_threads = qr.create_threads(sess, start=True) # 启动入队线程
#主线程
for i in range(10):
print "----------------------:"
print (sess.run(q.dequeue()))
说明:能正确输出结果,但是最后会报错,ERROR:tensorflow:Exception in QueueRunner: Attempted to use a closed Session.ERROR:tensorflow:Exception in QueueRunner: Session has been closed.也就是说,当循环结束后,该Session就会自动关闭,相当于main函数已经结束了
例子2:
#-*- coding:utf-8 -*-
import tensorflow as tf
import sys
#创建稍微复杂一点的队列作为例子
q = tf.FIFOQueue(1000,"float")
#计数器
counter = tf.Variable(0.0)
#操作:给计数器加一
increment_op = tf.assign_add(counter,tf.constant(1.0))
#操作:将计数器加入队列
enqueue_op = q.enqueue(counter)
#创建一个队列管理器QueueRunner,用这两个操作向q中添加元素。目前我们只使用一个线程:
qr = tf.train.QueueRunner(q,enqueue_ops=[increment_op,enqueue_op]*1)
# 主线程
sess = tf.Session()
sess.run(tf.initialize_all_variables())
enqueue_threads = qr.create_threads(sess, start=True) # 启动入队线程
# 主线程
for i in range(0, 10):
print(sess.run(q.dequeue()))
说明:例子2中不使用with tf.Session,那么Session就不会自动关闭,输出的结果是3.0
6.0
9.0
13.0
并不是我们设想的1,2,3,4,本质原因是+1操作和入队操作不同步,可能+1操作执行了很多次之后,才会进行一次入队操作.并且出队结束后,本应程序要结束,但是因为入队线程没有显示结束,所以,整个程序就跟挂起一样,也结束不了。
经验:因为tensorflow是在图上进行计算,要驱动一张图进行计算,必须要送入数据,如果说数据没有送进去,那么sess.run(),就无法执行,tf也不会主动报错,提示没有数据送进去,其实tf也不能主动报错,因为tf的训练过程和读取数据的过程其实是异步的。tf会一直挂起,等待数据准备好。现象就是tf的程序不报错,但是一直不动,跟挂起类似。
例子如下:
#-*- coding:utf-8 -*-
import tensorflow as tf
import sys
#创建稍微复杂一点的队列作为例子
q = tf.FIFOQueue(1000,"float")
#计数器
counter = tf.Variable(0.0)
#操作:给计数器加一
increment_op = tf.assign_add(counter,tf.constant(1.0))
#操作:将计数器加入队列
enqueue_op = q.enqueue(counter)
#创建一个队列管理器QueueRunner,用这两个操作向q中添加元素。目前我们只使用一个线程:
qr = tf.train.QueueRunner(q,enqueue_ops=[increment_op,enqueue_op]*1)
# 主线程
sess = tf.Session()
sess.run(tf.initialize_all_variables())
#enqueue_threads = qr.create_threads(sess, start=True) # 启动入队线程
# 主线程
for i in range(0, 10):
print "-------------------------"
print(sess.run(q.dequeue()))
说明:上图将生成数据的线程注释掉,程序就会卡在sess.run(q.dequeue()),等待数据的到来。queueRunner是用来启动入队线程用的。
在读入数据例子中,使用
由于没有显式地返回
说明:
队列本身也是图中的一个节点。其他节点(enqueue, dequeue)可以修改队列节点中的内容。
#-*- coding:utf-8 -*-
import tensorflow as tf
#创建的图:一个先入先出队列,以及初始化,出队,+1,入队操作
q = tf.FIFOQueue(3, "float")
init = q.enqueue_many(([0.1, 0.2, 0.3],))
x = q.dequeue()
y = x + 1
q_inc = q.enqueue([y])
#开启一个session,session是会话,会话的潜在含义是状态保持,各种tensor的状态保持
with tf.Session() as sess:
sess.run(init)
for i in range(2):
sess.run(q_inc)
quelen = sess.run(q.size())
for i in range(quelen):
print (sess.run(q.dequeue()))
queueRunner
之前的例子中,入队操作都在主线程中进行,Session中可以多个线程一起运行。 在数据输入的应用场景中,入队操作从硬盘上读取,入队操作是从硬盘中读取输入,放到内存当中,速度较慢。 使用
QueueRunner可以创建一系列新的线程进行入队操作,让主线程继续使用数据。如果在训练神经网络的场景中,就是训练网络和读取数据是异步的,主线程在训练网络,另一个线程在将数据从硬盘读入内存。
例子1:
#-*- coding:utf-8 -*-
import tensorflow as tf
import sys
#创建稍微复杂一点的队列作为例子
q = tf.FIFOQueue(1000,"float")
#计数器
counter = tf.Variable(0.0)
#操作:给计数器加一
increment_op = tf.assign_add(counter,tf.constant(1.0))
#操作:将计数器加入队列
enqueue_op = q.enqueue(counter)
#创建一个队列管理器QueueRunner,用这两个操作向q中添加元素。目前我们只使用一个线程:
qr = tf.train.QueueRunner(q,enqueue_ops=[increment_op,enqueue_op]*1)
#从队列管理器中创建线程,并启动:
#主线程
with tf.Session() as sess:
sess.run(tf.initialize_all_variables())
enqueue_threads = qr.create_threads(sess, start=True) # 启动入队线程
#主线程
for i in range(10):
print "----------------------:"
print (sess.run(q.dequeue()))
说明:能正确输出结果,但是最后会报错,ERROR:tensorflow:Exception in QueueRunner: Attempted to use a closed Session.ERROR:tensorflow:Exception in QueueRunner: Session has been closed.也就是说,当循环结束后,该Session就会自动关闭,相当于main函数已经结束了
例子2:
#-*- coding:utf-8 -*-
import tensorflow as tf
import sys
#创建稍微复杂一点的队列作为例子
q = tf.FIFOQueue(1000,"float")
#计数器
counter = tf.Variable(0.0)
#操作:给计数器加一
increment_op = tf.assign_add(counter,tf.constant(1.0))
#操作:将计数器加入队列
enqueue_op = q.enqueue(counter)
#创建一个队列管理器QueueRunner,用这两个操作向q中添加元素。目前我们只使用一个线程:
qr = tf.train.QueueRunner(q,enqueue_ops=[increment_op,enqueue_op]*1)
# 主线程
sess = tf.Session()
sess.run(tf.initialize_all_variables())
enqueue_threads = qr.create_threads(sess, start=True) # 启动入队线程
# 主线程
for i in range(0, 10):
print(sess.run(q.dequeue()))
说明:例子2中不使用with tf.Session,那么Session就不会自动关闭,输出的结果是3.0
6.0
9.0
13.0
并不是我们设想的1,2,3,4,本质原因是+1操作和入队操作不同步,可能+1操作执行了很多次之后,才会进行一次入队操作.并且出队结束后,本应程序要结束,但是因为入队线程没有显示结束,所以,整个程序就跟挂起一样,也结束不了。
经验:因为tensorflow是在图上进行计算,要驱动一张图进行计算,必须要送入数据,如果说数据没有送进去,那么sess.run(),就无法执行,tf也不会主动报错,提示没有数据送进去,其实tf也不能主动报错,因为tf的训练过程和读取数据的过程其实是异步的。tf会一直挂起,等待数据准备好。现象就是tf的程序不报错,但是一直不动,跟挂起类似。
例子如下:
#-*- coding:utf-8 -*-
import tensorflow as tf
import sys
#创建稍微复杂一点的队列作为例子
q = tf.FIFOQueue(1000,"float")
#计数器
counter = tf.Variable(0.0)
#操作:给计数器加一
increment_op = tf.assign_add(counter,tf.constant(1.0))
#操作:将计数器加入队列
enqueue_op = q.enqueue(counter)
#创建一个队列管理器QueueRunner,用这两个操作向q中添加元素。目前我们只使用一个线程:
qr = tf.train.QueueRunner(q,enqueue_ops=[increment_op,enqueue_op]*1)
# 主线程
sess = tf.Session()
sess.run(tf.initialize_all_variables())
#enqueue_threads = qr.create_threads(sess, start=True) # 启动入队线程
# 主线程
for i in range(0, 10):
print "-------------------------"
print(sess.run(q.dequeue()))
说明:上图将生成数据的线程注释掉,程序就会卡在sess.run(q.dequeue()),等待数据的到来。queueRunner是用来启动入队线程用的。
启动所有线程
在读入数据例子中,使用tf.train.string_input_produecer和
tf.train.shuffle_batch把两个
QueueRunner添加到全局图中。
由于没有显式地返回
QueueRunner来用
create_threads创建并启动线程,必须这样做:
tf.train.start_queue_runners(sess=sess)启动
tf.GraphKeys.QUEUE_RUNNERS集合中的所有队列线程。
#-*- coding:utf-8 -*- import tensorflow as tf import sys #创建稍微复杂一点的队列作为例子 q = tf.FIFOQueue(1000,"float") #计数器 counter = tf.Variable(0.0) #操作:给计数器加一 increment_op = tf.assign_add(counter,tf.constant(1.0)) enqueue_op = q.enqueue(counter) # 操作:计数器值加入队列 #操作:将计数器加入队列 qr = tf.train.QueueRunner(q, enqueue_ops=[increment_op, enqueue_op] * 1) # 主线程 sess = tf.Session() sess.run(tf.initialize_all_variables()) #Coordinator:协调器,协调线程间的关系,可以视为一种信号量,用来做同步 coord = tf.train.Coordinator() ## 启动入队线程, Coordinator是线程的参数 enqueue_threads = qr.create_threads(sess, coord = coord,start=True) # 启动入队线程 # 主线程 for i in range(0, 10): print "-------------------------" print(sess.run(q.dequeue())) #通知其他线程关闭 coord.request_stop() #其他所有线程关闭之后,这一函数才能返回 #join操作经常用在线程当中,其作用是等待某线程结束 coord.join(enqueue_threads)
说明:
QueueRunner的例子有一个问题:由于入队线程自顾自地执行,在需要的出队操作完成之后,程序没法结束。使用
tf.train.Coordinator来终止其他线程。其实可以认为是做一些线程间的同步关系。
相关文章推荐
- 关于RowNum使用的一些实验
- 关于消息队列的使用
- 关于消息队列的使用
- 关于MQ 消息队列的通俗理解和 rabbitMQ 使用
- 关于TensorFlow使用的一些笔记
- 关于消息队列的使用
- 关于堆栈与队列使用的小思考(1)
- 关于消息队列的使用----ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ
- Linux 使用 tensorflow 框架搭建与实验
- 关于消息队列的使用
- 关于GCD创建多线程时使用不同队列的说明
- [架构设计]关于消息队列的使用
- 实验四第4题:关于switch...case分支语句的使用
- 关于消息队列的使用(转载)
- 关于消息队列的使用
- 关于泛型队列的简单建立与使用
- 关于消息队列的使用
- 关于消息队列的使用
- TensorFlow学习系列(五):如何使用队列和多线程优化输入管道
- 关于消息队列的使用