页面树结构

2017-11-09 ApacheCN 开源组织,第二期邀请成员活动,一起走的更远 : http://www.apachecn.org/member/209.html


MachineLearning 优酷地址 : http://i.youku.com/apachecn

转至元数据结尾
转至元数据起始

获取数据到TensorFlow程序有三种主要方法:

  • Feeding:Python代码在运行每个步骤时提供数据。
  • 从文件读取:输入流水线从TensorFlow图开头的文件中读取数据。
  • 预加载数据:TensorFlow图中的常量或变量保留所有数据(对于小数据集)。

Feeding

TensorFlow的进给机制允许您在计算图中将数据注入任何Tensor。因此,蟒蛇计算可以将数据直接馈送到图中。

通过feed_dict参数将Feed数据提供给启动计算的run()或eval()调用。

with tf.Session():
  input = tf.placeholder(tf.float32)
  classifier = ...
  print(classifier.eval(feed_dict={input: my_python_preprocessing_fn()}))
虽然您可以使用Feed数据(包括变量和常量)替换任何Tensor,但最佳做法是使用 tf.placeholder节点。Aplaceholder仅仅作为饲料的目标而存在。它未初始化并且不包含任何数据。占位符如果在没有Feed的情况下执行,则会生成错误,因此您不会忘记将其提供。

MNIST教程placeholder中可以找到使用和馈送在MNIST数据上进行训练 的示例tensorflow/examples/tutorials/mnist/fully_connected_feed.py,并在MNIST教程中进行描述

 

从文件读取

从文件读取记录的典型管道有以下几个阶段:

  1. 文件名列表
  2. 可选文件名shuffling
  3. 可选时期限制
  4. 文件名队列
  5. 文件格式的读者
  6. 用于由读取器读取的记录的解码器
  7. 可选的预处理
  8. 示例队列

文件名,洗牌和时代限制

对于文件名列表,请使用常量字符串Tensor(like ["file0", "file1"][("file%d" % i) for i in range(2)])或 tf.train.match_filenames_once函数。

将文件名列表传递给tf.train.string_input_producer函数。 string_input_producer创建一个用于保存文件名的FIFO队列,直到读者需要它们。

string_input_producer具有混洗和设置最大数量的时期的选项。队列转轮将文件名的整个列表添加到队列中一次,用于每个时期,在一个时期内混洗文件名,如果shuffle=True。该过程提供了文件的统一采样,以便相对于彼此不相关或过度采样。

队列运行程序与从读取器分离的线程工作,从队列中提取文件名,因此混洗和排队过程不会阻止读者。

文件格式

选择符合输入文件格式的阅读器,并将文件名队列传递给读者的阅读方法。读取方法输出一个标识文件和记录的键(如果您有一些奇怪的记录,则用于调试)和标量字符串值。使用一个(或多个)解码器和转换ops将该字符串解码为组成示例的张量。

CSV文件

要在阅读文本文件逗号分隔值(CSV)格式,使用 tf.TextLineReader与 tf.decode_csv操作。例如:

filename_queue = tf.train.string_input_producer(["file0.csv", "file1.csv"])

reader = tf.TextLineReader()
key, value = reader.read(filename_queue)

# Default values, in case of empty columns. Also specifies the type of the
# decoded result.
record_defaults = [[1], [1], [1], [1], [1]]
col1, col2, col3, col4, col5 = tf.decode_csv(
    value, record_defaults=record_defaults)
features = tf.stack([col1, col2, col3, col4])

with tf.Session() as sess:
  # Start populating the filename queue.
  coord = tf.train.Coordinator()
  threads = tf.train.start_queue_runners(coord=coord)

  for i in range(1200):
    # Retrieve a single instance:
    example, label = sess.run([features, col5])

  coord.request_stop()
  coord.join(threads) 
每次执行read从文件读取一行。然后 decode_csv操作将结果解析成张量列表。该 record_defaults参数决定了所得张量的类型,并设置一个值在输入字符串中缺少使用默认值。

 tf.train.start_queue_runners在调用runeval执行之前,必须调用填充队列read。否则read将在等待队列中的文件名时阻止。

固定长度记录

要读取每个记录是固定数量字节的二进制文件,请 tf.FixedLengthRecordReader 使用该tf.decode_raw操作。该decode_rawOP从字符串到UINT8张量转换。

例如,CIFAR-10数据集 使用一种文件格式,其中每个记录使用固定数量的字节表示:标签的1字节,后跟3072字节的图像数据。一旦你有一个uint8张量,标准操作就可以分割出每个部分,并根据需要重新格式化。对于CIFAR-10,您可以tensorflow_models/tutorials/image/cifar10/cifar10_input.py 在 本教程中看到如何进行阅读和解码 。

标准TensorFlow格式

另一种方法是将任何数据转换成支持的格式。这种方法可以更容易地混合和匹配数据集和网络架构。对于TensorFlow建议的格式是一个 TFRecords文件 包含 tf.train.Example协议缓冲区 (其含有 Features 作为场)。您编写一个获取数据的小程序,将其 Example填充到协议缓冲区中,将协议缓冲区序列化为字符串,然后使用该文件将该字符串写入TFRecords文件 tf.python_io.TFRecordWriter。例如,tensorflow/examples/how_tos/reading_data/convert_to_records.py 将MNIST数据转换为此格式。

要阅读TFRecords的文件,使用 tf.TFRecordReadertf.parse_single_example 解码器。该parse_single_example运算的示例协议缓冲器解码成张量。使用生成的数据的MNIST示例convert_to_records可以在tensorflow/examples/how_tos/reading_data/fully_connected_reader.py其中找到 ,您可以与fully_connected_feed版本进行比较。

预处理

然后,您可以对所需的这些示例进行任何预处理。这将是任何不依赖于可训练参数的处理。示例包括数据的归一化,选择随机切片,添加噪声或失真等。参见 tensorflow_models/tutorials/image/cifar10/cifar10_input.py 示例。

Batching

在管道结束时,我们使用另一个队列将训练,评估或推断的示例进行批处理。为此,我们使用一个随机化的示例顺序的队列 tf.train.shuffle_batch

例:

def read_my_file_format(filename_queue):
  reader = tf.SomeReader()
  key, record_string = reader.read(filename_queue)
  example, label = tf.some_decoder(record_string)
  processed_example = some_processing(example)
  return processed_example, label

def input_pipeline(filenames, batch_size, num_epochs=None):
  filename_queue = tf.train.string_input_producer(
      filenames, num_epochs=num_epochs, shuffle=True)
  example, label = read_my_file_format(filename_queue)
  # min_after_dequeue defines how big a buffer we will randomly sample
  #   from -- bigger means better shuffling but slower start up and more
  #   memory used.
  # capacity must be larger than min_after_dequeue and the amount larger
  #   determines the maximum we will prefetch.  Recommendation:
  #   min_after_dequeue + (num_threads + a small safety margin) * batch_size
  min_after_dequeue = 10000
  capacity = min_after_dequeue + 3 * batch_size
  example_batch, label_batch = tf.train.shuffle_batch(
      [example, label], batch_size=batch_size, capacity=capacity,
      min_after_dequeue=min_after_dequeue)
  return example_batch, label_batch 
如果您需要更多的并行性或文件之间的抽样,请使用多个读取器实例 tf.train.shuffle_batch_join。例如:
def read_my_file_format(filename_queue):
  # Same as above

def input_pipeline(filenames, batch_size, read_threads, num_epochs=None):
  filename_queue = tf.train.string_input_producer(
      filenames, num_epochs=num_epochs, shuffle=True)
  example_list = [read_my_file_format(filename_queue)
                  for _ in range(read_threads)]
  min_after_dequeue = 10000
  capacity = min_after_dequeue + 3 * batch_size
  example_batch, label_batch = tf.train.shuffle_batch_join(
      example_list, batch_size=batch_size, capacity=capacity,
      min_after_dequeue=min_after_dequeue)
  return example_batch, label_batch 
您仍然只使用所有读者共享的单个文件名队列。这样,我们确保不同的读者使用不同的文件从同一个纪元,直到所有的纪元的文件已经开始。(填充文件名队列的线程通常也足够)

另一种方法是使用通过一个单一的读者 tf.train.shuffle_batch 有num_threads一次大于1,这将使它从一个单一的文件在同一时间读取(但比1个线程快),而不是N个文件。这可能很重要:

  • 如果你有更多的阅读线程而不是输入文件,为了避免你有两个线程从彼此相邻的文件中读取相同的例子的风险。
  • 或者如果并行读取N个文件会导致太多的磁盘寻找。

你需要多少个线程?的tf.train.shuffle_batch*功能添加到概要,其指示例如队列有多满的曲线图。如果您有足够的阅读主题,该摘要将保持在零以上。您可以 使用TensorBoard查看您的摘要作为培训进度

创建线程以使用QueueRunner对象进行预取

简短版本:tf.train上面列出的许多功能将tf.train.QueueRunner对象添加 到图形中。这些要求您tf.train.start_queue_runners 在运行任何培训或推理步骤之前打电话 ,否则将永久挂起。这将启动运行输入管道的线程,填充示例队列,以使出现的队列成功。这是最好的组合与 tf.train.Coordinator干净地关闭这些线程当有错误。如果你设置一个限制数量的纪元,那将使用一个需要初始化的时期计数器。结合这些建议的代码模式是:

# Create the graph, etc.
init_op = tf.global_variables_initializer()

# Create a session for running operations in the Graph.
sess = tf.Session()

# Initialize the variables (like the epoch counter).
sess.run(init_op)

# Start input enqueue threads.
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess=sess, coord=coord)

try:
    while not coord.should_stop():
        # Run training steps or whatever
        sess.run(train_op)

except tf.errors.OutOfRangeError:
    print('Done training -- epoch limit reached')
finally:
    # When done, ask the threads to stop.
    coord.request_stop()

# Wait for threads to finish.
coord.join(threads)
sess.close() 

旁边:这里发生了什么?

首先我们创建图。它将具有通过队列连接的几个流水线阶段。第一阶段将生成文件名以读取并将其排入文件名队列。第二阶段消耗文件名(使用a Reader),生成示例,并将它们排列 在示例队列中。根据您的设置方式,您实际上可以拥有第二阶段的几个独立副本,以便您可以并行读取多个文件。在这些阶段结束时,是一个入队操作,进入队列,下一个队列出队。我们希望启动运行这些入队操作的线程,以便我们的训练循环可以从示例队列中出示示例。

 tf.train创建这些队列和入队操作的帮助者tf.train.QueueRunner使用该tf.train.add_queue_runner 功能添加到图形中 。每个QueueRunner都负责一个阶段,并保存需要在线程中运行的入队操作列表。构建图形后,该tf.train.start_queue_runners 函数将在图中查询 每个QueueRunner,以启动其运行入队操作的线程。

如果一切顺利,您现在可以运行您的培训步骤,队列将被后台线程填充。如果你设置了一个时期限制,在某个时候,尝试出示示例将会得到一个 tf.errors.OutOfRangeError。这是TensorFlow相当于“文件结束”(EOF) - 这意味着已经达到了时代限制,没有更多的例子可用。

最后的成分是 tf.train.Coordinator。这是负责让所有的线程知道是否有任何信号表示关闭。最常见的是,这是因为引发了异常,例如,一个线程在运行某些操作(或普通的Python异常)时出错。

有关线程,队列,QueueRunners和协调器的更多信息, 请参见此处

旁边:限制时代的干净关闭是如何工作的

想象一下,你有一个模型,限制了要训练的时代的数量。这意味着生成文件名的线程只会在生成OutOfRange错误之前运行多次。QueueRunner将捕获该错误,关闭文件名队列,并退出该线程。关闭队列有两件事情:

  • 任何将来尝试排入文件名队列时都会产生错误。在这一点上,不应该有任何线程尝试这样做,但是当队列由于其他错误而关闭时,这是有帮助的。
  • 任何当前或未来的出队都将成功(如果剩下足够的元素)或失败(有OutOfRange错误)。他们不会阻止等待更多元素进入,因为以前的观点是不可能的。

关键是当文件名队列关闭时,该队列中可能仍然有很多文件名,因此管道的下一个阶段(读者和其他预处理)可能会持续运行一段时间。一旦文件名队列耗尽,下一次尝试将一个文件名排队(例如,从读取器中完成文件正在处理的文件)将触发 OutOfRange错误。在这种情况下,您可能会有多个线程与单个QueueRunner关联。如果这不是QueueRunner中的最后一个线程,OutOfRange错误只会导致一个线程退出。这允许其他仍然完成最后一个文件的线程继续进行,直到它们完成。(假设您正在使用 tf.train.Coordinator,其他类型的错误将导致所有线程停止。OutOfRange 

再次,示例队列将有一些元素排队,所以训练将持续到那些用尽。如果示例队列是a tf.RandomShuffleQueue,说是因为你正在使用,shuffle_batch或者shuffle_batch_join通常会避免比其min_after_dequeue缓冲的attr元素少。但是,一旦队列关闭,限制将被解除,队列最终将被清空。在这一点上,实际的训练线程,当他们尝试从示例队列出队时,将开始获取OutOfRange错误并退出。一旦所有的训练线程完成, tf.train.Coordinator.join 将返回,您可以彻底退出。

过滤记录或每个记录生成多个示例

而不是使用形状的例子[x, y, z],您将生成一批具有形状的示例[batch, x, y, z]。如果要将此记录过滤(可能是在一个保留集合中),则批量大小可以为0,如果要为每个记录生成多个示例,则大于1。然后enqueue_many=True 在调用其中一个批处理函数(例如shuffle_batch或 shuffle_batch_join)时简单地设置。

稀疏输入数据

稀疏传感器不能很好地排队。如果你使用SparseTensors你必须使用字符串的记录解码 后的 batching(而不是使用batching之前)。tf.parse_example   tf.parse_single_example

 

预装数据

这仅用于可以完全加载到内存中的小型数据集。有两种方法:

  • 将数据存储在常量中。
  • 将数据存储在变量中,即初始化,然后再不变。

使用常数有点简单,但使用更多的内存(因为常量被内联存储在图形数据结构中,这可能会被重复几次)。

training_data = ...
training_labels = ...
with tf.Session():
  input_data = tf.constant(training_data)
  input_labels = tf.constant(training_labels)
  ... 
为了使用变量,您还需要在构建图形之后对其进行初始化。
training_data = ...
training_labels = ...
with tf.Session() as sess:
  data_initializer = tf.placeholder(dtype=training_data.dtype,
                                    shape=training_data.shape)
  label_initializer = tf.placeholder(dtype=training_labels.dtype,
                                     shape=training_labels.shape)
  input_data = tf.Variable(data_initializer, trainable=False, collections=[])
  input_labels = tf.Variable(label_initializer, trainable=False, collections=[])
  ...
  sess.run(input_data.initializer,
           feed_dict={data_initializer: training_data})
  sess.run(input_labels.initializer,
           feed_dict={label_initializer: training_labels})

设置trainable=False将变量GraphKeys.TRAINABLE_VARIABLES保留在图形中的 集合中,因此我们不会在进行培训时尝试更新。设置collections=[]将变量保存在 GraphKeys.GLOBAL_VARIABLES用于保存和还原检查点的集合中。

无论哪种方式, tf.train.slice_input_producer 都可以一次使用来生成切片。这样可以在整个时期将这些例子进行洗牌,所以当批量化是不希望的时候进一步洗牌。所以shuffle_batch我们使用plain tf.train.batch函数来代替使用这些 函数。要使用多个预处理线程,请将参数设置为num_threads大于1的数字。

用于预先加载使用常量数据的MNIST例子可以发现tensorflow/examples/how_tos/reading_data/fully_connected_preloaded.py,和一个用于预先加载使用变量的数据可以发现 tensorflow/examples/how_tos/reading_data/fully_connected_preloaded_var.py,你可以用比较这些fully_connected_feed与 fully_connected_reader以上的版本。

 

多输入管道

通常,您将需要对一个数据集进行训练,并对另一个数据集进行评估(或“eval”)。实现这一目标的一个方法是实际上有两个独立的过程:

  • 训练过程读取训练输入数据,并定期向所有训练变量写入检查点文件。
  • 评估过程将检查点文件恢复为读取验证输入数据的推理模型。

这是 CIFAR-10示例中所做的。这有几个好处:

  • eval在训练变量的单个快照上执行。
  • 即使训练完成并退出,您也可以执行该评估。

您可以在同一个过程中的同一个图表中列车和eval,并分享他们训练有素的变量。请参阅 共享变量教程

 

  • 无标签