第八课 tensorflow numpy_input_fn 以及队列性质
2017-10-02 12:58
337 查看
numpy_input_fn 以及队列性质
该函数的作用是从numpy的输入数据中,产生读取的featrues和labels数据。这样当我们在使用numpy的数据作为输入的时候就很方便。对于所有的input来说,都是要建立队列来进行读入,所以对于队列的处理就会比较麻烦,而numpy_input的数据将这些对队列的输入封装在一起方便了我们使用.import tensorflow as tf import numpy as np from tensorflow.python.estimator.inputs import numpy_io from tensorflow.python.training import coordinator
# 构造numpy数据进行测试 age = np.arange(4) * 1.0 height = np.arange(32, 36) x = {'age': age, 'height': height} y = np.arange(-32, -28) print('age shape:', age.shape, 'height shape:', height.shape, 'y shape:', y.shape)
('age shape:', (4,), 'height shape:', (4,), 'y shape:', (4,))
batch_size 与 num_epochs解析
with tf.Session() as session: # 定义numpy input fn input_fn = numpy_io.numpy_input_fn(x, y, batch_size=2, shuffle=False, num_epochs=1) # 运行input_fn, 产生featrue和targets featrues, targets = input_fn() coord = coordinator.Coordinator() threads = tf.train.start_queue_runners(session, coord=coord) ret_features, ret_targets = session.run([featrues, targets]) print('featrues:', ret_features) print('targets:', ret_targets) coord.request_stop() coord.join(threads)
('featrues:', {'age': array([ 0., 1.]), 'height': array([32, 33])}) ('targets:', array([-32, -31]))
上面结果看到会输出2个值。对应到
input_fn = numpy_io.numpy_input_fn(x, y, batch_size=2, shuffle=False, num_epochs=1)就是
batch_size=2. shuffle是表示是否随机的意思.
num_epochs=1表示对于整个输入数据来说一共灌入到队列中几份。当前的数据大小是4,那么
num_epochs=1就是说队列中有
4*1=4分数据。总共是4分数据,
batch_size=2,那么就是一共能够获取
2=4/2份数据来进行训练,如果超过2,那么就会报队列溢出的异常。
with tf.Session() as session: # 定义numpy input fn input_fn = numpy_io.numpy_input_fn(x, y, batch_size=2, shuffle=False, num_epochs=1) # 运行input_fn, 产生featrue和targets featrues, targets = input_fn() coord = coordinator.Coordinator() threads = tf.train.start_queue_runners(session, coord=coord) num_step = 3 for step in range(num_step): ret_features, ret_targets = session.run([featrues, targets]) print('featrues:', ret_features) print('targets:', ret_targets) coord.request_stop() coord.join(threads)
('featrues:', {'age': array([ 0., 1.]), 'height': array([32, 33])}) ('targets:', array([-32, -31]))('featrues:', {'age': array([ 2., 3.]), 'height': array([34, 35])})
('targets:', array([-30, -29]))
---------------------------------------------------------------------------
OutOfRangeError Traceback (most recent call last)
<ipython-input-5-b1111e315ccf> in <module>()
12 num_step = 3
13 for step in range(num_step):
---> 14 ret_features, ret_targets = session.run([featrues, targets])
15
16 print('featrues:', ret_features)
/Library/Python/2.7/site-packages/tensorflow/python/client/session.pyc in run(self, fetches, feed_dict, options, run_metadata)
893 try:
894 result = self._run(None, fetches, feed_dict, options_ptr,
--> 895 run_metadata_ptr)
896 if run_metadata:
897 proto_data = tf_session.TF_GetBuffer(run_metadata_ptr)
/Library/Python/2.7/site-packages/tensorflow/python/client/session.pyc in _run(self, handle, fetches, feed_dict, options, run_metadata)
1122 if final_fetches or final_targets or (handle and feed_dict_tensor):
1123 results = self._do_run(handle, final_targets, final_fetches,
-> 1124 feed_dict_tensor, options, run_metadata)
1125 else:
1126 results = []
/Library/Python/2.7/site-packages/tensorflow/python/client/session.pyc in _do_run(self, handle, target_list, fetch_list, feed_dict, options, run_metadata)
1319 if handle is None:
1320 return self._do_call(_run_fn, self._session, feeds, fetches, targets,
-> 1321 options, run_metadata)
1322 else:
1323 return self._do_call(_prun_fn, self._session, handle, feeds, fetches)
/Library/Python/2.7/site-packages/tensorflow/python/client/session.pyc in _do_call(self, fn, *args)
1338 except KeyError:
1339 pass
-> 1340 raise type(e)(node_def, op, message)
1341
1342 def _extend_graph(self):
OutOfRangeError: FIFOQueue '_2_enqueue_input_1/fifo_queue' is closed and has insufficient elements (requested 2, current size 0)
[[Node: fifo_queue_DequeueUpTo_1 = QueueDequeueUpToV2[component_types=[DT_INT64, DT_DOUBLE, DT_INT64, DT_INT64], timeout_ms=-1, _device="/job:localhost/replica:0/task:0/cpu:0"](enqueue_input_1/fifo_queue, fifo_queue_DequeueUpTo_1/n)]]
Caused by op u'fifo_queue_DequeueUpTo_1', defined at:
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/runpy.py", line 162, in _run_module_as_main
"__main__", fname, loader, pkg_name)
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/runpy.py", line 72, in _run_code
exec code in run_globals
File "/Library/Python/2.7/site-packages/ipykernel/__main__.py", line 3, in <module>
app.launch_new_instance()
File "/Library/Python/2.7/site-packages/traitlets-4.1.0-py2.7.egg/traitlets/config/application.py", line 589, in launch_instance
app.start()
File "/Library/Python/2.7/site-packages/ipykernel/kernelapp.py", line 474, in start
ioloop.IOLoop.instance().start()
File "/Library/Python/2.7/site-packages/zmq/eventloop/ioloop.py", line 162, in start
super(ZMQIOLoop, self).start()
File "/Library/Python/2.7/site-packages/tornado/ioloop.py", line 887, in start
handler_func(fd_obj, events)
File "/Library/Python/2.7/site-packages/tornado/stack_context.py", line 275, in null_wrapper
return fn(*args, **kwargs)
File "/Library/Python/2.7/site-packages/zmq/eventloop/zmqstream.py", line 440, in _handle_events
self._handle_recv()
File "/Library/Python/2.7/site-packages/zmq/eventloop/zmqstream.py", line 472, in _handle_recv
self._run_callback(callback, msg)
File "/Library/Python/2.7/site-packages/zmq/eventloop/zmqstream.py", line 414, in _run_callback
callback(*args, **kwargs)
File "/Library/Python/2.7/site-packages/tornado/stack_context.py", line 275, in null_wrapper
return fn(*args, **kwargs)
File "/Library/Python/2.7/site-packages/ipykernel/kernelbase.py", line 276, in dispatcher
return self.dispatch_shell(stream, msg)
File "/Library/Python/2.7/site-packages/ipykernel/kernelbase.py", line 228, in dispatch_shell
handler(stream, idents, msg)
File "/Library/Python/2.7/site-packages/ipykernel/kernelbase.py", line 390, in execute_request
user_expressions, allow_stdin)
File "/Library/Python/2.7/site-packages/ipykernel/ipkernel.py", line 196, in do_execute
res = shell.run_cell(code, store_history=store_history, silent=silent)
File "/Library/Python/2.7/site-packages/ipykernel/zmqshell.py", line 498, in run_cell
return super(ZMQInteractiveShell, self).run_cell(*args, **kwargs)
File "/Library/Python/2.7/site-packages/ipython-4.1.2-py2.7.egg/IPython/core/interactiveshell.py", line 2723, in run_cell
interactivity=interactivity, compiler=compiler, result=result)
File "/Library/Python/2.7/site-packages/ipython-4.1.2-py2.7.egg/IPython/core/interactiveshell.py", line 2825, in run_ast_nodes
if self.run_code(code, result):
File "/Library/Python/2.7/site-packages/ipython-4.1.2-py2.7.egg/IPython/core/interactiveshell.py", line 2885, in run_code
exec(code_obj, self.user_global_ns, self.user_ns)
File "<ipython-input-5-b1111e315ccf>", line 7, in <module>
featrues, targets = input_fn()
File "/Library/Python/2.7/site-packages/tensorflow/python/estimator/inputs/numpy_io.py", line 127, in input_fn
else queue.dequeue_up_to(batch_size))
File "/Library/Python/2.7/site-packages/tensorflow/python/ops/data_flow_ops.py", line 498, in dequeue_up_to
self._queue_ref, n=n, component_types=self._dtypes, name=name)
File "/Library/Python/2.7/site-packages/tensorflow/python/ops/gen_data_flow_ops.py", line 1430, in _queue_dequeue_up_to_v2
timeout_ms=timeout_ms, name=name)
File "/Library/Python/2.7/site-packages/tensorflow/python/framework/op_def_library.py", line 767, in apply_op
op_def=op_def)
File "/Library/Python/2.7/site-packages/tensorflow/python/framework/ops.py", line 2630, in create_op
original_op=self._default_original_op, op_def=op_def)
File "/Library/Python/2.7/site-packages/tensorflow/python/framework/ops.py", line 1204, in __init__
self._traceback = self._graph._extract_stack() # pylint: disable=protected-access
OutOfRangeError (see above for traceback): FIFOQueue '_2_enqueue_input_1/fifo_queue' is closed and has insufficient elements (requested 2, current size 0)
[[Node: fifo_queue_DequeueUpTo_1 = QueueDequeueUpToV2[component_types=[DT_INT64, DT_DOUBLE, DT_INT64, DT_INT64], timeout_ms=-1, _device="/job:localhost/replica:0/task:0/cpu:0"](enqueue_input_1/fifo_queue, fifo_queue_DequeueUpTo_1/n)]]
正如上面的分析,这里
num_step=3,每次取2个,所以总量是6个就会队列溢出. 所以如果想要精确的控制,必须知道总的数据集有多大。但是,问题是实际可能我们只是知道大概的数据集大小,因为在离线训练的时候,我们需要不断的更新数据集,那么就会导致,这个数据集的大小是不确定,那么该如何处理呢?直观的做法就是,对队列的溢出异常进行捕获,就好了。
with tf.Session() as session: # 定义numpy input fn input_fn = numpy_io.numpy_input_fn(x, y, batch_size=2, shuffle=False, num_epochs=1) # 运行input_fn, 产生featrue和targets featrues, targets = input_fn() coord = coordinator.Coordinator() threads = tf.train.start_queue_runners(session, coord=coord) num_step = 3 try: for step in range(num_step): # 这一行表示 如果coord 已经stop,就不要再进行下去了, 对应到 coord.request_stop() # 因为 request_stop 不会立马stop, 但是通过should_stop能获取到状态 if coord.should_stop(): break ret_features, ret_targets = session.run([featrues, targets]) print('featrues:', ret_features) print('targets:', ret_targets) except tf.errors.OutOfRangeError as error: print('ignore: ', error) finally: coord.request_stop() coord.join(threads)
('featrues:', {'age': array([ 0., 1.]), 'height': array([32, 33])}) ('targets:', array([-32, -31]))('featrues:', {'age': array([ 2., 3.]), 'height': array([34, 35])})
('targets:', array([-30, -29]))
('ignore: ', OutOfRangeError())
当我们加入这个异常的保护,就可以很轻松的不用管数据集的数量问题了。除了上面方法,还有一种更为优雅的方法,就是队列可以自动循环的读取就好了。这里唯一要做的就是就 将
num_epochs=1=>
num_epochs=None.那么,这种情况下就不会溢出了,队列会自己控制
num_epochs,会循环的读入.
with tf.Session() as session: # 定义numpy input fn, 注意: num_epochs=None,队列会循环的读入. input_fn = numpy_io.numpy_input_fn(x, y, batch_size=2, shuffle=False, num_epochs=None) # 运行input_fn, 产生featrue和targets featrues, targets = input_fn() coord = coordinator.Coordinator() threads = tf.train.start_queue_runners(session, coord=coord) num_step = 3 for step in range(num_step): ret_features, ret_targets = session.run([featrues, targets]) print('featrues:', ret_features) print('targets:', ret_targets) coord.request_stop() coord.join(threads)
('featrues:', {'age': array([ 0., 1.]), 'height': array([32, 33])}) ('targets:', array([-32, -31]))('featrues:', {'age': array([ 2., 3.]), 'height': array([34, 35])})
('targets:', array([-30, -29]))
('featrues:', {'age': array([ 0., 1.]), 'height': array([32, 33])}) ('targets:', array([-32, -31]))
看到上面的最后一行的结果,就是 [0, 1] 这显然是第一个数据.
shuffle与threads
上面介绍了batch_size与num_epochs. 还有另外两个参数shuffle与threads.shuffle: 随机产出。这很好,方便产生随机的数据集
threads: 是指读入数据时候的线程数.
这两个参数,有些情况,需要进行解释:
条件 | 解释 |
---|---|
num_threads > 1 and num_epochs is not None | num_epochs会应用到每一个线程上,所以这就会产生超过你实际设置的epochs的数量,所以如果想要限制epochs的数量,需要使用一个线程. 也就是说如果使用超过1个线程,那么最好是num_epochs设置为None. 否则,你无法控制这个数量 |
shuffle and num_threads > 1 and num_epochs is not None | 多个线程同时读取放入到队列中,这可能会导致不会那么随机 |
if not shuffle and num_threads > 1 | 多线程无法保证顺序性 |
num_threads > 1, shuffle=True, num_epochs = None
num_threads = 1, shuffle=Ture/False, num_epochs = None
相关文章推荐
- TensorFlow Data Input (Part 1): Placeholders, Protobufs & Queues 占位符,原型和队列
- windows切换python版本以及numpy/scipy/tensorflow安装
- TensorFlow Data Input (Part 1): Placeholders, Protobufs & Queues
- 180306 Keras+Tensorflow指定运行显卡以及关闭session空出显存
- Tensorflow Saver & restore 以及报错问题 NotFoundError: "x_x" not found in checkpoint
- tensorflow Bug汇集以及解决
- tensorflow.slice_input_producer
- Windows在pip install tensorflow遇到的问题 一些python安装包的时候,超时问题以及权限问题
- ValueError: setting an array element with a sequence. tensorflow numpy
- Ubuntu16.04LTS+python 2.7安装tensorflow+keras,以及运行实例
- OReilly.Hands-On.Machine.Learning.with.Scikit-Learn.and.TensorFlow.翻译以及读书心得--p41-53
- Tensorflow Lite 编译demo时报错 ERROR: missing input file
- ubuntu安装python3.5 numpy tensorflow1.2
- 基于Anaconda的tensorflow/jupyter notebook/numpy/matplotlib/scipy的安装
- 线性回归 numpy normal equation & tensorflow gradient descent
- load jpeg with tensorflow-FIFOQueue '_1_input_producer' is closed and has insufficient elements()
- OReilly.Hands-On.Machine.Learning.with.Scikit-Learn.and.TensorFlow.翻译以及读书心得--p33-p40
- 『PyTorch x TensorFlow』第八弹_基本nn.Module层函数
- How to do sparse input text classification(dnn) using tensorflow
- tensorflow serving:bazel方式部署模型+docker方式部署模型及提供服务以及使用该服务介绍(总有一款适合你)