您的位置:首页 > 其它

Tensorflow基础:多线程输入数据处理框架

2017-10-17 23:46 471 查看
图像预处理中介绍了使用Tensorflow对图像数据进行预处理的方法。但是这些复杂的预处理过程也会减慢整个训练过程。为了避免图像预处理成为神经网络模型训练效率的瓶颈,Tensorflow提供了一套多线程处理输入数据的框架。



队列与多线程

在Tensorflow中,队列不仅是一种数据结构,它更提供了多线程机制。队列和变量类似,都是计算图上有状态的节点。其他的计算节点可以修改它们的状态。对于队列,修改队列状态的操作主要有Enqueue、EnqueueMany、Dequeue。以下程序展示了如何使用这些函数来操作一个队列:

import tensorflow as tf

# 创建一个先进先出队列,指定队列中最多可以保存两个元素,并指定类型为整数
q = tf.FIFOQueue(2, "int32")
# 使用enqueue_many函数来初始化队列中的元素。和变量初始化类似,在使用队列之前
# 需要明确的调用这个初始化过程
init = q.enqueue_many(([0, 10], ))
# 使用Dequeue函数将队列中的第一个元素出队列。这个元素的值将被存在变量x中。
x = q.dequeue()
y = x + 1
q_inc = q.enqueue([y])

with tf.Session() as sess:
init.run()
# 运行初始化队列的操作
for _ in range(5):
v, _ = sess.run([x, q_inc])
print(v)


运行结果:

0
10
1
11
2

Process finished with exit code 0


Tensorflow中提供了FIFOQueue和RandomShuffleQueue两种队列。在上面的程序中,已经展示了如何使用FIFOQueue,它的实现是一个先进先出队列。RandomShuffleQueue会将队列中的元素打乱,每次出队列操作得到的是从当前队列所有元素中随机选择的一个。在训练神经网络时,希望每次使用的训练数据尽量随机,RandomShuffleQueue就提供了这样的功能。

在Tensorflow中,队列不仅仅是一种数据结构,还是异步张量取值的一个重要机制。比如多个线程可以同时向一个队列中写元素,或者同时读取一个队列中的元素。

Tensorflow提供了tf.Coordinator和tf.QueueRunner两个类来完成多线程协同的功能。tf.Coordinator主要用于协同多个线程一起停止,并提供了should_stop、request_stop和join三个函数。在启动线程之前,需要先声明一个tf.Coordinator类,并将这个类传入每一个创建的线程中。启动的线程需要一直查询tf.Coordinator类中提供的should_stop函数,当这个函数的返回值为True时,则当前线程也需要退出。每一个启动的线程都可以通过调用request_stop函数来通知其他线程退出。当某一个线程调用request_stop函数之后,should_stop函数的返回值将被设置为True,这样其他的线程就可以同时终止了:

import tensorflow as tf
import numpy as np
import threading
import time

# 线程中运行的程序,这个程序每隔1秒判断是否需要停止并打印自己的ID
def MyLoop(coord, worker_id):
while not coord.should_stop():
#随机停止所有的线程
if np.random.rand() < 0.1:
print("Stoping from id: %d\n" % worker_id)
coord.request_stop()
else:
print("Working on id: %d\n" % worker_id)

time.sleep(1)

coord = tf.train.Coordinator()

threads = [threading.Thread(target=MyLoop, args=(coord, i, )) for i in range(5)]
for t in threads: t.start()
coord.join(threads)


运行结果:

Working on id: 0
Working on id: 1

Working on id: 2

Working on id: 3

Working on id: 4

Working on id: 1

Working on id: 4

Working on id: 0

Working on id: 3

Working on id: 2

Working on id: 1

Working on id: 3

Working on id: 0
Working on id: 2

Stoping from id: 4

Process finished with exit code 0


tf.QueueRunner主要用于启动多个线程来操作同一个队列,启动的这些线程可以通过上面介绍的tf.Coordinator类来统一管理。以下代码展示了如何使用tf.QueueRunner和tf.Coordinator来管理多线程队列操作。

import tensorflow as tf

# 声明一个先进先出的队列,队列中最多100个元素,类型为实数
queue = tf.FIFOQueue(100, "float")
# 定义队列的入队操作
enqueue_op = queue.enqueue([tf.random_normal([1])])

# 使用tf.train.QueueRunner来创建多个线程运行队列的入队操作
qr = tf.train.QueueRunner(queue, [enqueue_op] * 5)

# 将定义过的QueueRunner加入Tensorflow计算图上指定的集合
# tf.train.add_queue_runner函数没有指定集合
# 则加入默认集合tf.Graphkeys.QUEUE_RUNNERS
tf.train.add_queue_runner(qr)

# 定义出队操作
out_tensor = queue.dequeue()

with tf.Session() as sess:
#使用tf.train.Coordinator来协同启动的线程
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess=sess, coord=coord)
for _ in range(3):
print(sess.run(out_tensor)[0])
coord.request_stop()
coord.join(threads)


运行结果:

0.878208
1.29909
-1.02023

Process finished with exit code 0


输入文件队列

假设所有的输入数据都已经整理成了一个TFRecord格式。虽然一个TFRecord文件中可以存储多个训练样例,但是当训练数据量较大时,可以将数据分成多个TFRecord文件来提供处理效率。Tensorflow提供了tf.train.match_filenames_once函数来获取符合一个正则表达式的所有文件,得到的文件列表可以通过tf.train.string_input_producer函数进行有效地管理。

通过设置shuffle参数,tf.train.string_input_producer函数支持随机打乱文件列表中文件出队的顺序。当shuffle参数为True时,文件在加入队列之前会被打乱顺序,所以出队的顺序也是随机地。随机打乱文件顺序以及加入输入队列的过程会跑在一个单独的线程上,这样不会影响获取文件的速度。tf.train_string_input_producer生成的输入队列可以同时被多个文件读取线程操作,而且输入队列会将队列中的文件均匀地分给不同的线程,不出现有些文件被处理过多次而有些文件还没有被处理过的情况。

当一个输入队列中的所有文件都被处理完之后,它会将初始化时提供的文件列表中的文件全部重新加入队列。tf.train.string_input_producer函数可以设置num_epochs参数来限制加载初始文件列表的最大轮数。当所有文件都已经被使用了设定的轮数后,如果继续尝试读取新的文件,输入队列会报OutOfRange的错误。

在测试神经网络模型时,因为所有测试数据只需要使用一次,所以可以将num_epochs参数设置为1。这样在计算完一轮之后程序将自动停止。在展示tf.train.match_filenames_once和tf.train.string_input_producer函数的使用方法之前,下面先给出一个简单的程序来生成样例数据:

import tensorflow as tf

# 创建TFRecord文件的帮助函数
def _int64_feature(value):
return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))

# 模拟海量数据情况下将数据写入不同的文件。num_shards定义了总共写入多少个文件,num_shards定义了总共写入多少个文件,
# instances_per_shard定义了每个文件中有多少个数据
num_shards = 2
instances_per_shard = 2

for i in range(num_shards):
# 将数据分为多个文件时,可以将不同文件以类似000n-of-000m的后缀区分。其中m表示了数据总共被存了多少个文件中,n表示当前文件
# 的编号。这样的方式既方便了通过正则表达式获取文件列表,又在文件名中加入了更多的信息。
filename = ('.\data.tfrecords-%.5d-of-%.5d' % (i, num_shards))
writer = tf.python_io.TFRecordWriter(filename)
# 将数据封装成Example结构并写入TFRecord文件。
for j in range(instances_per_shard):
# Example结构仅包含当前样例属于第几个文件以及是当前文件的第几个样本
example = tf.train.Example(features=tf.train.Features(feature={
'i': _int64_feature(i),
'j': _int64_feature(j)
}) )
writer.write(example.SerializeToString())
writer.close()


程序运行之后,在指定的目录下将生成两个文件:data.tfrecords-00000-of-00002和data.tfrecords-00001-of-00002。每一个文件中存储了两个样例。在生成了样例数据之后,一下代码展示了tf.train.match_filenames_once函数和tf.train_input_producer函数的使用方法:

import tensorflow as tf

# 使用tf.match_filename_once函数获取文件列表
files = tf.train.match_filenames_once('.\data.tfrecords-*')

# 通过tf.train.string_input_producer函数创建输入队列,输入队列中的文件列表为tf.train.match_filenames_once函数获取的文件列表
# 这里将shuffle参数设置为False来避免随机打乱读文件的顺序。但一般在解决真实问题时,会将shuffle参数设置为True

filename_queue = tf.train.string_input_producer(files, shuffle=False)

reader = tf.TFRecordReader()
_, serialized_example = reader.read(filename_queue)
features = tf.parse_single_example(serialized_example,
features={
'i': tf.FixedLenFeature([], tf.int64),
'j': tf.FixedLenFeature([], tf.int64),
})

with tf.Session() as sess:
#虽然在本段程序中没有声明任何变量,但使用tf.train.match_filenames_once函数时需要初始化一些变量
tf.initialize_all_variables().run()
print(sess.run(files))

# 声明tf.train.Coordinator类来协同不同线程,并启动线程
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess=sess, coord=coord)

# 多次执行获取数据的操作
for i in range(6):
print(sess.run([features['i'], features['j']]))
coord.request_stop()
coord.join(threads)


组合训练数据(batch)

将多个输入样例组织成一个batch可以提高模型训练的效率。所以在得到单个样例的预处理结果之后,还需要将它们组织成batch,然后再提供给神经网络的输入层。Tensorflow提供了tf.train.batch和tf.train.shuffle_batch函数来将单个的样例组织成batch的形式输出。这两个函数都会生成一个队列,队列的入队操作是生成单个样例的方法,而每次出队得到的是一个batch的样例。以下代码展示了这两个函数的使用方法:

import tensorflow as tf

# 使用tf.match_filename_once函数获取文件列表
files = tf.train.match_filenames_once('.\data.tfrecords-*')

# 通过tf.train.string_input_producer函数创建输入队列,输入队列中的文件列表为tf.train.match_filenames_once函数获取的文件列表
# 这里将shuffle参数设置为False来避免随机打乱读文件的顺序。但一般在解决真实问题时,会将shuffle参数设置为True

filename_queue = tf.train.string_input_producer(files, shuffle=False)

reader = tf.TFRecordReader()
_, serialized_example = reader.read(filename_queue)
features = tf.parse_single_example(serialized_example,
features={
'i': tf.FixedLenFeature([], tf.int64),
'j': tf.FixedLenFeature([], tf.int64),
})

example, label = features['i'], features['j']

batch_size = 3
# 组合样例的队列中最多可以存储的样例个数。这个队列如果太大,那么需要占用很多内存资料;如果太小,那么出队操作可能会因为没有数据
# 而被阻碍(block),从而导致训练效率降低。
capacity = 1000 + 3 * batch_size

# 使用tf.train.batch函数来组合样例。[example, label]参数给出了需要组合的元素,一般example和label分别代表训练样本和这个样本对应的正确标签。
# 当队列长度等于容量时,Tensorflow将暂停入队操作,而只是等待出队。当元素个数小于容量时,Tensorflow将自动启动入队操作
example_batch, label_batch = tf.train.batch([example, label], batch_size=batch_size, capacity=capacity)

with tf.Session() as sess:
tf.global_variables_initializer().run()
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess=sess, coord=coord)
# 获取并打印组合之后的样例。在真实问题中,这个输出一般会作为神经网络的输入
for i in range(2):
cur_example_batch, cur_label_batch = sess.run([example_batch, label_batch])
print(cur_example_batch, cur_label_batch)
coord.request_stop()
coord.join(threads)
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: