您的位置:首页 > 其它

tensorflow使用range_input_producer多线程读取数据

2018-03-02 22:28 1111 查看
先放关键代码:[python] view plain copyi = tf.train.range_input_producer(NUM_EXPOCHES, num_epochs=1, shuffle=False).dequeue()  
inputs = tf.slice(array, [i * BATCH_SIZE], [BATCH_SIZE])  
原理解析:第一行会产生一个队列,队列包含0到NUM_EXPOCHES-1的元素,如果num_epochs有指定,则每个元素只产生num_epochs次,否则循环产生。shuffle指定是否打乱顺序,这里shuffle=False表示队列的元素是按0到NUM_EXPOCHES-1的顺序存储。在Graph运行的时候,每个线程从队列取出元素,假设值为i,然后按照第二行代码切出array的一小段数据作为一个batch。例如NUM_EXPOCHES=3,如果num_epochs=2,则队列的内容是这样子;0,1,2,0,1,2队列只有6个元素,这样在训练的时候只能产生6个batch,迭代6次以后训练就结束。如果num_epochs不指定,则队列内容是这样子:0,1,2,0,1,2,0,1,2,0,1,2...
队列可以一直生成元素,训练的时候可以产生无限的batch,需要自己控制什么时候停止训练。下面是完整的演示代码。import tensorflow as tf
import codecs

BATCH_SIZE = 6
NUM_EXPOCHES = 5

def input_producer():
array = [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30]
i = tf.train.range_input_producer(NUM_EXPOCHES, num_epochs=1, shuffle=False).dequeue()
inputs = tf.slice(array, [i * BATCH_SIZE], [BATCH_SIZE])
return inputs

class Inputs(object):
def __init__(self):
self.inputs = input_producer()

def main(*args, **kwargs):
inputs = Inputs()
init = tf.group(tf.global_variables_initializer(),tf.local_variables_initializer())
sess = tf.Session()
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess=sess, coord=coord)
sess.run(init)
try:
index = 0
while not coord.should_stop() and index < 10:
datalines = sess.run(inputs.inputs)
index += 1
print("step: %d, batch data: %s" % (index, str(datalines)))
except tf.errors.OutOfRangeError:
print("Done traing:-------Epoch limit reached")
except KeyboardInterrupt:
print("keyboard interrput detected, stop training")
finally:
coord.request_stop()
coord.join(threads)
sess.close()
del sess

if __name__ == "__main__":
main()
输出:[html] view plain copystep: 1, batch data: ['1' '2' '3' '4' '5' '6']  
step: 2, batch data: ['7' '8' '9' '10' '11' '12']  
step: 3, batch data: ['13' '14' '15' '16' '17' '18']  
step: 4, batch data: ['19' '20' '21' '22' '23' '24']  
step: 5, batch data: ['25' '26' '27' '28' '29' '30']  
Done traing:-------Epoch limit reached  

如果range_input_producer去掉参数num_epochs=1,则输出:
[html] view plain copystep: 1, batch data: ['1' '2' '3' '4' '5' '6']  
step: 2, batch data: ['7' '8' '9' '10' '11' '12']  
step: 3, batch data: ['13' '14' '15' '16' '17' '18']  
step: 4, batch data: ['19' '20' '21' '22' '23' '24']  
step: 5, batch data: ['25' '26' '27' '28' '29' '30']  
step: 6, batch data: ['1' '2' '3' '4' '5' '6']  
step: 7, batch data: ['7' '8' '9' '10' '11' '12']  
step: 8, batch data: ['13' '14' '15' '16' '17' '18']  
step: 9, batch data: ['19' '20' '21' '22' '23' '24']  
step: 10, batch data: ['25' '26' '27' '28' '29' '30']  

有一点需要注意,文件总共有35条数据,BATCH_SIZE = 6表示每个batch包含6条数据,NUM_EXPOCHES = 5表示产生5个batch,如果NUM_EXPOCHES =6,则总共需要36条数据,就会报如下错误:[html] view plain copyInvalidArgumentError (see above for traceback): Expected size[0] in [0, 5], but got 6  
     [[Node: Slice = Slice[Index=DT_INT32, T=DT_STRING, _device="/job:localhost/replica:0/task:0/cpu:0"](Slice/input, Slice/begin/_5, Slice/size)]]  

错误信息的意思是35/BATCH_SIZE=5,即NUM_EXPOCHES 的取值能只能在0到5之间。

From http://blog.csdn.net/lyg5623/article/details/69387917
最后是关于queue_runner的一些介绍:

创建线程并使用
QueueRunner
对象来预取

简单来说:使用上面列出的许多
tf.train
函数添加
QueueRunner
到你的数据流图中。在你运行任何训练步骤之前,需要调用
tf.train.start_queue_runners
函数,否则数据流图将一直挂起。
tf.train.start_queue_runners
 这个函数将会启动输入管道的线程,填充样本到队列中,以便出队操作可以从队列中拿到样本。这种情况下最好配合使用一个
tf.train.Coordinator
,这样可以在发生错误的情况下正确地关闭这些线程。如果你对训练迭代数做了限制,那么需要使用一个训练迭代数计数器,并且需要被初始化。推荐的代码模板如下:
# Create the graph, etc.
init_op = tf.initialize_all_variables()

# Create a session for running operations in the Graph.
sess = tf.Session()

# Initialize the variables (like the epoch counter).
sess.run(init_op)

# Start input enqueue threads.
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess=sess, coord=coord)

try:
while not coord.should_stop():
# Run training steps or whatever
sess.run(train_op)

except tf.errors.OutOfRangeError:
print 'Done training -- epoch limit reached'
finally:
# When done, ask the threads to stop.
coord.request_stop()

# Wait for threads to finish.
coord.join(threads)
sess.close()

疑问: 这是怎么回事?

首先,我们先创建数据流图,这个数据流图由一些流水线的阶段组成,阶段间用队列连接在一起。第一阶段将生成文件名,我们读取这些文件名并且把他们排到文件名队列中。第二阶段从文件中读取数据(使用
Reader
),产生样本,而且把样本放在一个样本队列中。根据你的设置,实际上也可以拷贝第二阶段的样本,使得他们相互独立,这样就可以从多个文件中并行读取。在第二阶段的最后是一个排队操作,就是入队到队列中去,在下一阶段出队。因为我们是要开始运行这些入队操作的线程,所以我们的训练循环会使得样本队列中的样本不断地出队。

tf.train
中要创建这些队列和执行入队操作,就要添加
tf.train.QueueRunner
到一个使用
tf.train.add_queue_runner
函数的数据流图中。每个
QueueRunner
负责一个阶段,处理那些需要在线程中运行的入队操作的列表。一旦数据流图构造成功,
tf.train.start_queue_runners
函数就会要求数据流图中每个
QueueRunner
去开始它的线程运行入队操作。如果一切顺利的话,你现在可以执行你的训练步骤,同时队列也会被后台线程来填充。如果您设置了最大训练迭代数,在某些时候,样本出队的操作可能会得到一个
tf.OutOfRangeError
的错误。这其实是TensorFlow的“文件结束”(EOF) ———— 这就意味着已经达到了最大训练迭代数,已经没有更多可用的样本了。最后一个因素是
Coordinator
。这是负责在收到任何关闭信号的时候,让所有的线程都知道。最常用的是在发生异常时这种情况就会呈现出来,比如说其中一个线程在运行某些操作时出现错误(或一个普通的Python异常)。想要了解更多的关于threading, queues, QueueRunners, and Coordinators的内容可以看这里.

疑问: 在达到最大训练迭代数的时候如何清理关闭线程?

想象一下,你有一个模型并且设置了最大训练迭代数。这意味着,生成文件的那个线程将只会在产生
OutOfRange
错误之前运行许多次。该
QueueRunner
会捕获该错误,并且关闭文件名的队列,最后退出线程。关闭队列做了两件事情:如果还试着对文件名队列执行入队操作时将发生错误。任何线程不应该尝试去这样做,但是当队列因为其他错误而关闭时,这就会有用了。
任何当前或将来出队操作要么成功(如果队列中还有足够的元素)或立即失败(发生
OutOfRange
错误)。它们不会防止等待更多的元素被添加到队列中,因为上面的一点已经保证了这种情况不会发生。
关键是,当在文件名队列被关闭时候,有可能还有许多文件名在该队列中,这样下一阶段的流水线(包括reader和其它预处理)还可以继续运行一段时间。 一旦文件名队列空了之后,如果后面的流水线还要尝试从文件名队列中取出一个文件名(例如,从一个已经处理完文件的reader中),这将会触发
OutOfRange
错误。在这种情况下,即使你可能有一个QueueRunner关联着多个线程。如果这不是在QueueRunner中的最后那个线程,
OutOfRange
错误仅仅只会使得一个线程退出。这使得其他那些正处理自己的最后一个文件的线程继续运行,直至他们完成为止。 (但如果假设你使用的是
tf.train.Coordinator
,其他类型的错误将导致所有线程停止)。一旦所有的reader线程触发
OutOfRange
错误,然后才是下一个队列,再是样本队列被关闭。同样,样本队列中会有一些已经入队的元素,所以样本训练将一直持续直到样本队列中再没有样本为止。如果样本队列是一个
RandomShuffleQueue
,因为你使用了
shuffle_batch
 或者 
shuffle_batch_join
,所以通常不会出现以往那种队列中的元素会比
min_after_dequeue
 定义的更少的情况。 然而,一旦该队列被关闭,
min_after_dequeue
设置的限定值将失效,最终队列将为空。在这一点来说,当实际训练线程尝试从样本队列中取出数据时,将会触发
OutOfRange
错误,然后训练线程会退出。一旦所有的培训线程完成,
tf.train.Coordinator.join
会返回,你就可以正常退出了。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: