页面树结构

2017-11-09 ApacheCN 开源组织,第二期邀请成员活动,一起走的更远 : http://www.apachecn.org/member/209.html


MachineLearning 优酷地址 : http://i.youku.com/apachecn

转至元数据结尾
转至元数据起始

队列是使用TensorFlow进行异步计算的一种强大机制。

像TensorFlow中的所有组件一样,队列是TensorFlow图中的一个节点。它是一个有状态的节点,像变量一样:其他节点可以修改其内容。特别地,节点可以将新元素插入队列,或者从现有队列中取出队列。

为了更形象的理解队列,我们来看一个简单的例子。我们将创建一个“先进先出”队列(FIFOQueue)并用零初始化。然后,我们将构建一个将元素从队列中取出的图形,将“1”添加到该元素位置,并将其放回队列的末尾。慢慢地,队列上的数字增加。

EnqueueEnqueueMany以及Dequeue都是特殊节点。它们指向队列而不是正常值,允许更改它。我们建议您将它们视为队列的方法。实际上,在Python API中,它们是队列对象的方法(例如q.enqueue(...))。

N.B.队列方法(如q.enqueue(...)必须在与队列运行在相同的设备上。创建这些操作时,不兼容的设备布局指令将被忽略。

现在你有了一点队列的直观感觉,让我们深入细节...

队列使用情况概述

队列,例如tf.FIFOQueue 和tf.RandomShuffleQueue,是重要的TensorFlow对象,用于在图中异步计算张量。

例如,典型的输入架构是使用 RandomShuffleQueue用于训练模型的输入:

  • 多线程准备训练样本并将其推入队列。
  • 一个训练线程执行一个训练操作,使队列中的小批量样本出队。

这种架构有许多好处,如 Reading data how to中所强调的 ,这也给出了简化输入流结构函数的概述。

TensorFlow Session对象是多线程的,所以多个线程可以方便地使用相同的会话并行运行操作。但是,实现一个如上所述的驱动线程的Python程序并不容易。所有线程必须能够同步停止,异常必须被捕获和报告,队列必须在停止时被正确关闭。

TensorFlow提供了两个类来帮助实现:tf.train.Coordinatortf.train.QueueRunner。这两个类被设计为一起使用。Coordinator类能同时停止多个线程,并报告异常给等待他们停止的程序。QueueRunner类被用来协调多个线程并把张量推进同一队列中。

 

Coordinator

Coordinator类有助于多线程同步停止。

其关键方法是:

首先创建一个Coordinator对象,然后创建一些使用coordinator的线程。线程通常一直循环运行,直到should_stop()返回True时停止。

任何线程都可以决定计算应该何时停止。它只需要调用 request_stop(),由于此时should_stop()返回True,其他线程将停止。


# Thread body: loop until the coordinator indicates a stop was requested.
# If some condition becomes true, ask the coordinator to stop.
def MyLoop(coord):
  while not coord.should_stop():
    ...do something...
    if ...some condition...:
      coord.request_stop()

# Main thread: create a coordinator.
coord = tf.train.Coordinator()

# Create 10 threads that run 'MyLoop()'
threads = [threading.Thread(target=MyLoop, args=(coord,)) for i in xrange(10)]

# Start the threads and wait for all of them to stop.
for t in threads:
  t.start()
coord.join(threads)
显然,coordinator可以管理线程完成不同的事情。它们不一定要和上面的例子完全相同。coordinator还
支持捕获和报告异常。

更多详细信息,请参阅tf.train.Coordinator 文档。

QueueRunner

QueueRunner类创建一些反复运行enqueue操作的线程。这些线程可以使用coordinator来停止。另外,如果向coordinator报告异常,则queue runner会运行一个closer thread ,该线程会自动关闭队列。

您可以使用queue runner来实现上述架构。

首先构建一个使用TensorFlow队列(例如tf.RandomShuffleQueue)作为输入示例的图。增加处理样本并将样本推入队列中的操作。添加训练操作开始从队列移除样本。

 

example = ...ops to create one example...
# Create a queue, and an op that enqueues examples one at a time in the queue.
queue = tf.RandomShuffleQueue(...)
enqueue_op = queue.enqueue(example)
# Create a training graph that starts by dequeuing a batch of examples.
inputs = queue.dequeue_many(batch_size)
train_op = ...use 'inputs' to build the training part of the graph...

在Python训练程序中,创建一个QueueRunner运行几个线程来处理和排队样本。创建一个Coordinator请求队列开始运行coordinator线程。编写一个循环训练,使用coordinator控制终端。

# Create a queue runner that will run 4 threads in parallel to enqueue
# examples.
qr = tf.train.QueueRunner(queue, [enqueue_op] * 4)

# Launch the graph.
sess = tf.Session()
# Create a coordinator, launch the queue runner threads.
coord = tf.train.Coordinator()
enqueue_threads = qr.create_threads(sess, coord=coord, start=True)
# Run the training loop, controlling termination with the coordinator.
for step in xrange(1000000):
    if coord.should_stop():
        break
    sess.run(train_op)
# When done, ask the threads to stop.
coord.request_stop()
# And wait for them to actually do it.
coord.join(enqueue_threads)

处理异常

queue runners开始的线程不仅仅是运行入队操作。它们还捕获并处理由队列生成的异常,包括tf.errors.OutOfRangeError异常,用于报告队列已经关闭。

使用coordinator的训练过程必须在其主循环中同样捕获并报告异常。

这是上面的训练循环的改进版本。


try:
    for step in xrange(1000000):
        if coord.should_stop():
            break
        sess.run(train_op)
except Exception, e:
    # Report exceptions to the coordinator.
    coord.request_stop(e)
finally:
    # Terminate as usual. It is safe to call `coord.request_stop()` twice.
    coord.request_stop()
    coord.join(threads)
  • 无标签