您的位置:首页 > 编程语言 > Python开发

第八课 tensorflow numpy_input_fn 以及队列性质

2017-10-02 12:58 337 查看

numpy_input_fn 以及队列性质

该函数的作用是从numpy的输入数据中,产生读取的featrues和labels数据。这样当我们在使用numpy的数据作为输入的时候就很方便。对于所有的input来说,都是要建立队列来进行读入,所以对于队列的处理就会比较麻烦,而numpy_input的数据将这些对队列的输入封装在一起方便了我们使用.

import tensorflow as tf
import numpy as np
from tensorflow.python.estimator.inputs import numpy_io
from tensorflow.python.training import coordinator


# 构造numpy数据进行测试

age = np.arange(4) * 1.0
height = np.arange(32, 36)
x = {'age': age, 'height': height}
y = np.arange(-32, -28)

print('age shape:', age.shape, 'height shape:', height.shape, 'y shape:', y.shape)


('age shape:', (4,), 'height shape:', (4,), 'y shape:', (4,))


batch_size 与 num_epochs解析

with tf.Session() as session:

# 定义numpy input fn
input_fn = numpy_io.numpy_input_fn(x, y, batch_size=2, shuffle=False, num_epochs=1)

# 运行input_fn, 产生featrue和targets
featrues, targets = input_fn()

coord = coordinator.Coordinator()
threads = tf.train.start_queue_runners(session, coord=coord)

ret_features, ret_targets = session.run([featrues, targets])

print('featrues:', ret_features)
print('targets:', ret_targets)
coord.request_stop()
coord.join(threads)


('featrues:', {'age': array([ 0.,  1.]), 'height': array([32, 33])})
('targets:', array([-32, -31]))


上面结果看到会输出2个值。对应到
input_fn = numpy_io.numpy_input_fn(x, y, batch_size=2, shuffle=False, num_epochs=1)
就是
batch_size=2
. shuffle是表示是否随机的意思.
num_epochs=1
表示对于整个输入数据来说一共灌入到队列中几份。当前的数据大小是4,那么
num_epochs=1
就是说队列中有
4*1=4
分数据。总共是4分数据,
batch_size=2
,那么就是一共能够获取
2=4/2
份数据来进行训练,如果超过2,那么就会报队列溢出的异常。

with tf.Session() as session:

# 定义numpy input fn
input_fn = numpy_io.numpy_input_fn(x, y, batch_size=2, shuffle=False, num_epochs=1)

# 运行input_fn, 产生featrue和targets
featrues, targets = input_fn()

coord = coordinator.Coordinator()
threads = tf.train.start_queue_runners(session, coord=coord)

num_step = 3
for step in range(num_step):
ret_features, ret_targets = session.run([featrues, targets])

print('featrues:', ret_features)
print('targets:', ret_targets)
coord.request_stop()
coord.join(threads)


('featrues:', {'age': array([ 0.,  1.]), 'height': array([32, 33])})
('targets:', array([-32, -31]))('featrues:', {'age': array([ 2.,  3.]), 'height': array([34, 35])})
('targets:', array([-30, -29]))

---------------------------------------------------------------------------

OutOfRangeError Traceback (most recent call last)

<ipython-input-5-b1111e315ccf> in <module>()
12 num_step = 3
13 for step in range(num_step):
---> 14 ret_features, ret_targets = session.run([featrues, targets])
15
16 print('featrues:', ret_features)

/Library/Python/2.7/site-packages/tensorflow/python/client/session.pyc in run(self, fetches, feed_dict, options, run_metadata)
893 try:
894 result = self._run(None, fetches, feed_dict, options_ptr,
--> 895 run_metadata_ptr)
896 if run_metadata:
897 proto_data = tf_session.TF_GetBuffer(run_metadata_ptr)

/Library/Python/2.7/site-packages/tensorflow/python/client/session.pyc in _run(self, handle, fetches, feed_dict, options, run_metadata)
1122 if final_fetches or final_targets or (handle and feed_dict_tensor):
1123 results = self._do_run(handle, final_targets, final_fetches,
-> 1124 feed_dict_tensor, options, run_metadata)
1125 else:
1126 results = []

/Library/Python/2.7/site-packages/tensorflow/python/client/session.pyc in _do_run(self, handle, target_list, fetch_list, feed_dict, options, run_metadata)
1319 if handle is None:
1320 return self._do_call(_run_fn, self._session, feeds, fetches, targets,
-> 1321 options, run_metadata)
1322 else:
1323 return self._do_call(_prun_fn, self._session, handle, feeds, fetches)

/Library/Python/2.7/site-packages/tensorflow/python/client/session.pyc in _do_call(self, fn, *args)
1338 except KeyError:
1339 pass
-> 1340 raise type(e)(node_def, op, message)
1341
1342 def _extend_graph(self):

OutOfRangeError: FIFOQueue '_2_enqueue_input_1/fifo_queue' is closed and has insufficient elements (requested 2, current size 0)
[[Node: fifo_queue_DequeueUpTo_1 = QueueDequeueUpToV2[component_types=[DT_INT64, DT_DOUBLE, DT_INT64, DT_INT64], timeout_ms=-1, _device="/job:localhost/replica:0/task:0/cpu:0"](enqueue_input_1/fifo_queue, fifo_queue_DequeueUpTo_1/n)]]

Caused by op u'fifo_queue_DequeueUpTo_1', defined at:
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/runpy.py", line 162, in _run_module_as_main
"__main__", fname, loader, pkg_name)
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/runpy.py", line 72, in _run_code
exec code in run_globals
File "/Library/Python/2.7/site-packages/ipykernel/__main__.py", line 3, in <module>
app.launch_new_instance()
File "/Library/Python/2.7/site-packages/traitlets-4.1.0-py2.7.egg/traitlets/config/application.py", line 589, in launch_instance
app.start()
File "/Library/Python/2.7/site-packages/ipykernel/kernelapp.py", line 474, in start
ioloop.IOLoop.instance().start()
File "/Library/Python/2.7/site-packages/zmq/eventloop/ioloop.py", line 162, in start
super(ZMQIOLoop, self).start()
File "/Library/Python/2.7/site-packages/tornado/ioloop.py", line 887, in start
handler_func(fd_obj, events)
File "/Library/Python/2.7/site-packages/tornado/stack_context.py", line 275, in null_wrapper
return fn(*args, **kwargs)
File "/Library/Python/2.7/site-packages/zmq/eventloop/zmqstream.py", line 440, in _handle_events
self._handle_recv()
File "/Library/Python/2.7/site-packages/zmq/eventloop/zmqstream.py", line 472, in _handle_recv
self._run_callback(callback, msg)
File "/Library/Python/2.7/site-packages/zmq/eventloop/zmqstream.py", line 414, in _run_callback
callback(*args, **kwargs)
File "/Library/Python/2.7/site-packages/tornado/stack_context.py", line 275, in null_wrapper
return fn(*args, **kwargs)
File "/Library/Python/2.7/site-packages/ipykernel/kernelbase.py", line 276, in dispatcher
return self.dispatch_shell(stream, msg)
File "/Library/Python/2.7/site-packages/ipykernel/kernelbase.py", line 228, in dispatch_shell
handler(stream, idents, msg)
File "/Library/Python/2.7/site-packages/ipykernel/kernelbase.py", line 390, in execute_request
user_expressions, allow_stdin)
File "/Library/Python/2.7/site-packages/ipykernel/ipkernel.py", line 196, in do_execute
res = shell.run_cell(code, store_history=store_history, silent=silent)
File "/Library/Python/2.7/site-packages/ipykernel/zmqshell.py", line 498, in run_cell
return super(ZMQInteractiveShell, self).run_cell(*args, **kwargs)
File "/Library/Python/2.7/site-packages/ipython-4.1.2-py2.7.egg/IPython/core/interactiveshell.py", line 2723, in run_cell
interactivity=interactivity, compiler=compiler, result=result)
File "/Library/Python/2.7/site-packages/ipython-4.1.2-py2.7.egg/IPython/core/interactiveshell.py", line 2825, in run_ast_nodes
if self.run_code(code, result):
File "/Library/Python/2.7/site-packages/ipython-4.1.2-py2.7.egg/IPython/core/interactiveshell.py", line 2885, in run_code
exec(code_obj, self.user_global_ns, self.user_ns)
File "<ipython-input-5-b1111e315ccf>", line 7, in <module>
featrues, targets = input_fn()
File "/Library/Python/2.7/site-packages/tensorflow/python/estimator/inputs/numpy_io.py", line 127, in input_fn
else queue.dequeue_up_to(batch_size))
File "/Library/Python/2.7/site-packages/tensorflow/python/ops/data_flow_ops.py", line 498, in dequeue_up_to
self._queue_ref, n=n, component_types=self._dtypes, name=name)
File "/Library/Python/2.7/site-packages/tensorflow/python/ops/gen_data_flow_ops.py", line 1430, in _queue_dequeue_up_to_v2
timeout_ms=timeout_ms, name=name)
File "/Library/Python/2.7/site-packages/tensorflow/python/framework/op_def_library.py", line 767, in apply_op
op_def=op_def)
File "/Library/Python/2.7/site-packages/tensorflow/python/framework/ops.py", line 2630, in create_op
original_op=self._default_original_op, op_def=op_def)
File "/Library/Python/2.7/site-packages/tensorflow/python/framework/ops.py", line 1204, in __init__
self._traceback = self._graph._extract_stack() # pylint: disable=protected-access

OutOfRangeError (see above for traceback): FIFOQueue '_2_enqueue_input_1/fifo_queue' is closed and has insufficient elements (requested 2, current size 0)
[[Node: fifo_queue_DequeueUpTo_1 = QueueDequeueUpToV2[component_types=[DT_INT64, DT_DOUBLE, DT_INT64, DT_INT64], timeout_ms=-1, _device="/job:localhost/replica:0/task:0/cpu:0"](enqueue_input_1/fifo_queue, fifo_queue_DequeueUpTo_1/n)]]


正如上面的分析,这里
num_step=3
,每次取2个,所以总量是6个就会队列溢出. 所以如果想要精确的控制,必须知道总的数据集有多大。但是,问题是实际可能我们只是知道大概的数据集大小,因为在离线训练的时候,我们需要不断的更新数据集,那么就会导致,这个数据集的大小是不确定,那么该如何处理呢?直观的做法就是,对队列的溢出异常进行捕获,就好了。

with tf.Session() as session:

# 定义numpy input fn
input_fn = numpy_io.numpy_input_fn(x, y, batch_size=2, shuffle=False, num_epochs=1)

# 运行input_fn, 产生featrue和targets
featrues, targets = input_fn()

coord = coordinator.Coordinator()
threads = tf.train.start_queue_runners(session, coord=coord)

num_step = 3

try:

for step in range(num_step):
# 这一行表示 如果coord 已经stop,就不要再进行下去了, 对应到 coord.request_stop()
# 因为 request_stop 不会立马stop, 但是通过should_stop能获取到状态
if coord.should_stop():
break

ret_features, ret_targets = session.run([featrues, targets])

print('featrues:', ret_features)
print('targets:', ret_targets)
except tf.errors.OutOfRangeError as error:
print('ignore: ', error)
finally:
coord.request_stop()
coord.join(threads)


('featrues:', {'age': array([ 0.,  1.]), 'height': array([32, 33])})
('targets:', array([-32, -31]))('featrues:', {'age': array([ 2.,  3.]), 'height': array([34, 35])})
('targets:', array([-30, -29]))
('ignore: ', OutOfRangeError())


当我们加入这个异常的保护,就可以很轻松的不用管数据集的数量问题了。除了上面方法,还有一种更为优雅的方法,就是队列可以自动循环的读取就好了。这里唯一要做的就是就 将
num_epochs=1
=>
num_epochs=None
.那么,这种情况下就不会溢出了,队列会自己控制
num_epochs
,会循环的读入.

with tf.Session() as session:

# 定义numpy input fn, 注意: num_epochs=None,队列会循环的读入.
input_fn = numpy_io.numpy_input_fn(x, y, batch_size=2, shuffle=False, num_epochs=None)

# 运行input_fn, 产生featrue和targets
featrues, targets = input_fn()

coord = coordinator.Coordinator()
threads = tf.train.start_queue_runners(session, coord=coord)

num_step = 3
for step in range(num_step):
ret_features, ret_targets = session.run([featrues, targets])

print('featrues:', ret_features)
print('targets:', ret_targets)
coord.request_stop()
coord.join(threads)


('featrues:', {'age': array([ 0.,  1.]), 'height': array([32, 33])})
('targets:', array([-32, -31]))('featrues:', {'age': array([ 2.,  3.]), 'height': array([34, 35])})
('targets:', array([-30, -29]))
('featrues:', {'age': array([ 0., 1.]), 'height': array([32, 33])}) ('targets:', array([-32, -31]))


看到上面的最后一行的结果,就是 [0, 1] 这显然是第一个数据.

shuffle与threads

上面介绍了batch_size与num_epochs. 还有另外两个参数shuffle与threads.

shuffle: 随机产出。这很好,方便产生随机的数据集

threads: 是指读入数据时候的线程数.

这两个参数,有些情况,需要进行解释:

条件解释
num_threads > 1 and num_epochs is not Nonenum_epochs会应用到每一个线程上,所以这就会产生超过你实际设置的epochs的数量,所以如果想要限制epochs的数量,需要使用一个线程. 也就是说如果使用超过1个线程,那么最好是num_epochs设置为None. 否则,你无法控制这个数量
shuffle and num_threads > 1 and num_epochs is not None多个线程同时读取放入到队列中,这可能会导致不会那么随机
if not shuffle and num_threads > 1多线程无法保证顺序性
所以通过以上总结,最佳的参数,有两个组

num_threads > 1, shuffle=True, num_epochs = None

num_threads = 1, shuffle=Ture/False, num_epochs = None


                                            
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
相关文章推荐