R interface to TensorFlow Dataset API

Overview

The TensorFlow Dataset API provides various facilities for creating scalable input pipelines for TensorFlow models, including:

  • Reading data from a variety of formats including CSV files and TFRecords files (the standard binary format for TensorFlow training data).

  • Transforming datasets in a variety of ways including mapping arbitrary functions against them.

  • Shuffling, batching, and repeating datasets over a number of epochs.

  • Streaming interface to data for reading arbitrarily large datasets.

  • Reading and transforming data are TensorFlow graph operations, so are executed in C++ and in parallel with model training.

The R interface to TensorFlow datasets provides access to the Dataset API, including high-level convenience functions for easy integration with the keras R package.

Installation

To use tfdatasets you need to install both the R package as well as TensorFlow itself.

First, install the tfdatasets R package from GitHub as follows:

devtools::install_github("rstudio/tfdatasets")

Then, use the install_tensorflow() function to install TensorFlow:

library(tfdtasets)
install_tensorflow()

Creating a Dataset

To create a dataset, use one of the dataset creation functions. Dataset can be created from delimted text files, TFRecords files, as well as from in-memory data.

Text Files

For example, to create a dataset from a text file, first create a specification for how records will be decoded from the file, then call text_line_dataset() with the file to be read and the specification:

library(tfdatasets)

# create specification for parsing records from an example file
iris_spec <- csv_record_spec("iris.csv")

# read the datset
dataset <- text_line_dataset("iris.csv", record_spec = iris_spec) 

# take a glimpse at the dataset
str(dataset)
<MapDataset shapes: {Sepal.Length: (), Sepal.Width: (), Petal.Length: (), Petal.Width: (),
Species: ()}, types: {Sepal.Length: tf.float32, Sepal.Width: tf.float32, Petal.Length:
tf.float32, Petal.Width: tf.float32, Species: tf.int32}>

In the example above, the csv_record_spec() function is passed an example file which is used to automatically detect column names and types (done by reading up to the first 1,000 lines of the file). You can also provide explicit column names and/or data types using the names and types parameters (note that in this case we don’t pass an example file):

# provide colum names and types explicitly
iris_spec <- csv_record_spec(
  names = c("SepalLength", "SepalWidth", "PetalLength", "PetalWidth", "Species"),
  types = c("double", "double", "double", "double", "integer"), 
  skip = 1
)

# read the datset
dataset <- text_line_dataset("iris.csv", record_spec = iris_spec)

Note that we’ve also specified skip = 1 to indicate that the first row of the CSV that contains column names should be skipped.

Supported column types are integer, double, and character. You can also provide types in a more compact form using single-letter abbreviations (e.g. types = "dddi"). For example:

mtcars_spec <- csv_record_spec("mtcars.csv", types = "dididddiiii")

Parallel Decoding

Decoding lines of text into a record can be computationally expensive. You can parallelize these computations using the parallel_records parameter. For example:

dataset <- text_line_dataset("iris.csv", record_spec = iris_spec, parallel_records = 4)

You can also parallelize the reading of data from storage by requesting that a buffer of records be prefected. You do this with the dataset_prefetch() function. For example:

dataset <- text_line_dataset("iris.csv", record_spec = iris_spec, parallel_records = 4) %>% 
  dataset_batch(128) %>% 
  dataset_prefetch(1)

This code will result in the prefetching of a single batch of data on a background thread (i.e. in parallel with training operations).

If you have multiple input files, you can also parallelize reading of these files both across multiple machines (sharding) and/or on multiple threads per-machine (parallel reads with interleaving). See the section on Reading Multiple Files below for additional details.

TFRecords Files

You can read datasets from TFRecords files using the tfrecord_dataset() function.

In many cases you’ll want to map the records in the dataset into a set of named columns. You can do this using the dataset_map() function along with the tf$parse_single_example() function. for example:

# Creates a dataset that reads all of the examples from two files, and extracts
# the image and label features.
filenames <- c("/var/data/file1.tfrecord", "/var/data/file2.tfrecord")
dataset <- tfrecord_dataset(filenames) %>%
  dataset_map(function(example_proto) {
    features <- list(
      image = tf$FixedLenFeature(shape(), tf$string),
      label = tf$FixedLenFeature(shape(), tf$int32)
    )
    tf$parse_single_example(example_proto, features)
  })

You can parallelize reading of TFRecord files using the num_parallel_reads option, for example:

filenames <- c("/var/data/file1.tfrecord", "/var/data/file2.tfrecord")
dataset <- tfrecord_dataset(filenames, num_parallel_reads = 4)

SQLite Databases

You can read datasets from SQLite databases using the sqlite_dataset() function. To use sqlite_dataset() you provide the filename of the database, a SQL query to execute, and sql_record_spec() that describes the names and TensorFlow types of columns within the query. For example:

library(tfdatasets)

record_spec <- sql_record_spec(
  names = c("disp", "drat", "vs", "gear", "mpg", "qsec", "hp", "am", "wt",  "carb", "cyl"),
  types = c(tf$float64, tf$int32, tf$float64, tf$int32, tf$float64, tf$float64,
            tf$float64, tf$int32, tf$int32, tf$int32, tf$int32)
)

dataset <- sqlite_dataset(
  "data/mtcars.sqlite3",
  "select * from mtcars",
  record_spec
)

dataset
<MapDataset shapes: {disp: (), drat: (), vs: (), gear: (), mpg: (), qsec: (), hp: (), am: (),
wt: (), carb: (), cyl: ()}, types: {disp: tf.float64, drat: tf.int32, vs: tf.float64, gear:
tf.int32, mpg: tf.float64, qsec: tf.float64, hp: tf.float64, am: tf.int32, wt: tf.int32, carb:
tf.int32, cyl: tf.int32}>

Note that for floating point data you must use tf$float64 (reading tf$float32 is not supported for SQLite databases).

Transformations

Mapping

You can map arbitrary transformation functions onto dataset records using the dataset_map() function. For example, to transform the “Species” column into a one-hot encoded vector you would do this:

dataset <- dataset %>% 
  dataset_map(function(record) {
    record$Species <- tf$one_hot(record$Species, 3L)
    record
  })

Note that while dataset_map() is defined using an R function, there are some special constraints on this function which allow it to execute not within R but rather within the TensorFlow graph.

For a dataset created with the csv_dataset() function, the passed record will be named list of tensors (one for each column of the dataset). The return value should be another set of tensors which were created from TensorFlow functions (e.g. tf$one_hot as illustrated above). This function will be converted to a TensorFlow graph operation that performs the transformation within native code.

Parallel Mapping

If these transformations are computationally expensive they can be executed on multiple threads using the num_parallel_calls parameter. For example:

dataset <- dataset %>% 
  dataset_map(num_parallel_calls = 4, function(record) {
    record$Species <- tf$one_hot(record$Species, 3L)
    record
  })

You can control the maximum number of processed elements that will be buffered when processing in parallel using the dataset_prefetch() transformation. For example:

dataset <- dataset %>% 
  dataset_map(num_parallel_calls = 4, function(record) {
    record$Species <- tf$one_hot(record$Species, 3L)
    record
  }) %>% 
  datset_prefetch(1)

If you are batching your data for training, you can optimize performance using the dataset_map_and_batch() function (which fuses together the map and batch operations). For example:

dataset <- dataset %>% 
  dataset_map_and_batch(batch_size = 128, function(record) {
    record$Species <- tf$one_hot(record$Species, 3L)
    record
  }) %>% 
  datset_prefetch(1)

Filtering

You can filter the elements of a dataset using the dataset_filter() function, which takes a predicate function that returns a boolean tensor for records that should be included. For example:

dataset <- csv_dataset("mtcars.csv") %>%
  dataset_filter(function(record) {
    record$mpg >= 20
})

dataset <- csv_dataset("mtcars.csv") %>%
  dataset_filter(function(record) {
    record$mpg >= 20 & record$cyl >= 6L
  })

Note that the functions used inside the predicate must be tensor operations (e.g. tf$not_equal, tf$less, etc.). R generic methods for relational operators (e.g. <, >, <=, etc.) and logical operators (e.g. !, &, |, etc.) are provided so you can use shorthand syntax for most common comparisons (as illustrated above).

Features and Response

A common transformation is taking a column oriented dataset (e.g. one created by csv_dataset() or tfrecord_dataset()) and transforming it into a two-element list with features (“x”) and response (“y”). You can use the dataset_prepare() function to do this type of transformation. For example:

mtcars_dataset <- text_line_dataset("mtcars.csv", record_spec = mtcars_spec) %>% 
  dataset_prepare(x = c(mpg, disp), y = cyl)

iris_dataset <- text_line_dataset("iris.csv", record_spec = iris_spec) %>% 
  dataset_prepare(x = -Species, y = Species)

The dataset_prepare() function also accepts standard R formula syntax for defining features and response:

mtcars_dataset <- text_line_dataset("mtcars.csv", record_spec = mtcars_spec) %>% 
  dataset_prepare(cyl ~ mpg + disp)

If you are batching your data for training you add a batch_size parameter to fuse together the dataset_prepare() and dataset_batch() steps (which generally results in faster training). For example:

mtcars_dataset <- text_line_dataset("mtcars.csv", record_spec = mtcars_spec) %>% 
  dataset_prepare(cyl ~ mpg + disp, batch_size = 16)

Shuffling and Batching

There are several functions which control how batches are drawn from the dataset. For example, the following specifies that data will be drawn in batches of 128 from a shuffled window of 1000 records, and that the dataset will be repeated for 10 epochs:

dataset <- dataset %>% 
  dataset_shuffle(1000) %>%
  dataset_repeat(10) %>% 
  dataset_batch(128) %>% 

Note that you can optimize performance by fusing the shuffle and repeat operations into a single step using the dataset_shuffle_and_repeat() function. For example:

dataset <- dataset %>% 
  dataset_shuffle_and_repeat(buffer_size = 1000, count = 10) %>%
  dataset_batch(128)

Prefetching

Earlier we alluded to the dataset_prefetch() function, which enables you to ensure that a given number of records (or batches of records) are prefetched in parallel so they are ready to go when the next batch is processed. For example:

dataset <- dataset %>% 
  dataset_map_and_batch(batch_size = 128, function(record) {
    record$Species <- tf$one_hot(record$Species, 3L)
    record
  }) %>% 
  dataset_prefetch(1)

If you are using a GPU for training, you can also use the dataset_prefetch_to_device() function to specify that the parallel prefetch operation stage the data directly into GPU memory. For example:

dataset <- dataset %>% 
  dataset_map_and_batch(batch_size = 128, function(record) {
    record$Species <- tf$one_hot(record$Species, 3L)
    record
  }) %>% 
  dataset_prefetch_to_device("/gpu:0")

In this case the buffer size for prefetches is determined automatically (you can manually speicfy it using the buffer_size parameter).

Complete Example

Here’s a complete example of using the various dataset transformation functions together. We’ll read the mtcars dataset from a CSV, filter it on some threshold values, map it into x and y components for modeling, and specify desired shuffling and batch iteration behavior:

dataset <- text_line_dataset("mtcars.csv", record_spec = mtcars_spec) %>%
  dataset_filter(function(record) {
    record$mpg >= 20 & record$cyl >= 6L
  }) %>% 
  dataset_shuffle_and_repeat(buffer_size = 1000, count = 10) %>% 
  dataset_prepare(cyl ~ mpg + disp, batch_size = 128) %>% 
  dataset_prefetch(1)

Reading Datasets

The method for reading data from a TensorFlow Dataset varies depending upon which API you are using to build your models. If you are using the keras, then TensorFlow Datasets can be used much like in-memory R matrices and arrays. If you are using the lower-level tensorflow core API then you’ll use explicit dataset iteration functions.

The sections below provide additional details and examples for each of the supported APIs.

keras package

IMPORTANT NOTE: Using TensorFlow Datasets with Keras requires that you are running the very latest versions of Keras (v2.2) and TensorFlow (v1.9). You can ensure that you have the latest versions of the core Keras and TensorFlow libraries with:

Keras models are often trained by passing in-memory arrays directly to the fit function. For example:

model %>% fit(
  x_train, y_train, 
  epochs = 30, 
  batch_size = 128
)

However, this requires loading data into an R data frame or matrix before calling fit. You can use the train_on_batch() function to stream data one batch at a time, however the reading and processing of the input data is still being done serially and outside of native code.

Alternatively, Keras enables you to pass a dataset directly as the x argument to fit() and evaluate(). Here’s a complete example that uses datasets to read from TFRecord files containing MNIST digits:

library(keras)
library(tfdatasets)

batch_size = 128
steps_per_epoch = 500

# function to read and preprocess mnist dataset
mnist_dataset <- function(filename) {
  dataset <- tfrecord_dataset(filename) %>%
    dataset_map(function(example_proto) {

      # parse record
      features <- tf$parse_single_example(
        example_proto,
        features = list(
          image_raw = tf$FixedLenFeature(shape(), tf$string),
          label = tf$FixedLenFeature(shape(), tf$int64)
        )
      )

      # preprocess image
      image <- tf$decode_raw(features$image_raw, tf$uint8)
      image <- tf$cast(image, tf$float32) / 255

      # convert label to one-hot
      label <- tf$one_hot(tf$cast(features$label, tf$int32), 10L)

      # return
      list(image, label)
    }) %>%
    dataset_repeat() %>%
    dataset_shuffle(1000) %>%
    dataset_batch(batch_size, drop_remainder = TRUE) %>%
    dataset_prefetch(1)
}

model <- keras_model_sequential() %>%
  layer_dense(units = 256, activation = 'relu', input_shape = c(784)) %>%
  layer_dropout(rate = 0.4) %>%
  layer_dense(units = 128, activation = 'relu') %>%
  layer_dropout(rate = 0.3) %>%
  layer_dense(units = 10, activation = 'softmax')

model %>% compile(
  loss = 'categorical_crossentropy',
  optimizer = optimizer_rmsprop(),
  metrics = c('accuracy')
)

history <- model %>% fit(
  mnist_dataset("mnist/train.tfrecords"),
  steps_per_epoch = steps_per_epoch,
  epochs = 20,
  validation_data = mnist_dataset("mnist/validation.tfrecords"),
  validation_steps = steps_per_epoch
)

score <- model %>% evaluate(
  mnist_dataset("mnist/test.tfrecords"),
  steps = steps_per_epoch
)

print(score)

Note that all data preprocessing (e.g. one-hot encoding of the response variable) is done within the dataset_map() operation.

Also note that we pass drop_remainder = TRUE to the dataset_batch() function (this is to make sure that all batches are of equal size, a requirement for Keras tensor inputs).

tensorflow package

You read batches of data from a dataset by using tensors that yield the next batch. You can obtain this tensor from a dataset via the make_iterator_one_shot() and iterator_get_next() functions. For example:

dataset <- text_line_dataset("mtcars.csv", record_spec = mtcars_spec) %>% 
  dataset_prepare(cyl ~ mpg + disp) %>% 
  dataset_shuffle(20) %>% 
  dataset_batch(5)

iter <- make_iterator_one_shot(dataset)
next_batch <- iterator_get_next(iter)
next_batch
$x
Tensor("IteratorGetNext_13:0", shape=(?, 2), dtype=float32)

$y
Tensor("IteratorGetNext_13:1", shape=(?,), dtype=int32)

As you can see next_batch isn’t the data itself but rather a tensor that will yield the next batch of data when it is evaluated:

sess <- tf$Session()
sess$run(next_batch)
$x
     [,1] [,2]
[1,] 21.0  160
[2,] 21.0  160
[3,] 22.8  108
[4,] 21.4  258
[5,] 18.7  360

$y
[1] 6 6 4 6 8

If you are iterating over a dataset using these functions, you will need to determine at what point to stop iteration. One approach to this is to use the dataset_repeat() function to create an dataset that yields values infinitely. For example:

library(tfdatasets)

sess <- tf$Session()

mtcars_spec <- csv_record_spec("mtcars.csv")
dataset <- text_line_dataset("mtcars.csv", record_spec = mtcars_spec) %>% 
  dataset_shuffle(5000) %>% 
  dataset_repeat() # repeat infinitely
  dataset_prepare(x = c(mpg, disp), y = cyl) %>% 
  dataset_batch(128) %>% 

iter <- make_iterator_one_shot(dataset)
next_batch <- iterator_get_next(iter)

steps <- 200
for (i in 1:steps) {
  
  # use next_batch for training, etc. 
  
  # (note that you need to actually use the next_batch e.g. by passing it to a
  # function that consumes a tensor or by running it explicitly) in order to 
  # advance to the next batch)
}

In this case the steps variable is used to determine when to stop drawing new batches of training data (we could have equally included code to detect a learning plateau or any other custom method of determining when to stop training).

Another approach is to detect when all batches have been yielded from the dataset. When a dataset iterator reaches the end, an out of range runtime error will occur. You can catch and ignore the error when it occurs by using out_of_range_handler as the error argument to tryCatch(). For example:

library(tfdatasets)

sess <- tf$Session()

mtcars_spec <- csv_record_spec("mtcars.csv")
dataset <- text_line_dataset("mtcars.csv", record_spec = mtcars_spec) %>% 
  dataset_prepare(x = c(mpg, disp), y = cyl) %>% 
  dataset_batch(128) %>% 
  dataset_repeat(10)
  
iter <- make_iterator_one_shot(dataset)
next_batch <- iterator_get_next(iter)

tryCatch({
  while(TRUE) {
    batch <- sess$run(next_batch)
    str(batch)
  }
}, error = out_of_range_handler)

You can write this iteration more elegantly using the until_out_of_range() function, which automatically handles the error and provides the while(TRUE) around an expression:

until_out_of_range({
  batch <- sess$run(next_batch)
  str(batch)
})

When running under eager execution, you organize the code a bit differently (since you don’t need to explicitly run() tensors):

iter <- make_iterator_one_shot(dataset)

until_out_of_range({
  batch <- iterator_get_next(iter)
  str(batch)
})

Reading Multiple Files

If you have multiple input files you can process them in parallel both across machines (sharding) and/or on multiple threads per-machine (parallel reads with interleaving). The read_files() function provides a high-level interface to parallel file reading.

The read_files() function takes a set of files and a read function along with various options to orchestrate parallel reading. For example, the following function reads all CSV files in a directory using the text_line_dataset() function:

dataset <- read_files("data/*.csv", text_line_dataset, record_spec = mtcars_spec,
                      parallel_files = 4, parallel_interleave = 16) %>% 
  dataset_prefetch(5000) %>% 
  dataset_shuffle_and_repeat(buffer_size = 1000, count = 3) %>% 
  dataset_batch(128)

The parallel_files argument requests that 4 files be processed in parallel and the parallel_interleave argument requests that blocks of 16 consecutive records from each file be interleaved in the resulting dataset.

Note that because we are processing files in parallel we do not pass the parallel_records argument to text_line_dataset(), since we are already parallelizing at the file level.

Multiple Machines

If you are training on multiple machines and the training supervisor passes a shard index to your training script, you can also parallelizing reading by sharding the file list. For example:

# command line flags for training script (shard info is passed by training 
# supervisor that executes the script)
FLAGS <- flags(
  flag_integer("num_shards", 1),
  flag_integer("shard_index", 1)
)

# forward shard info to read_files
dataset <- read_files("data/*.csv", text_line_dataset, record_spec = mtcars_spec,
                      parallel_files = 4, parallel_interleave = 16,
                      num_shards = FLAGS$num_shards, shard_index = FLAGS$shard_index) %>% 
  dataset_shuffle_and_repeat(buffer_size = 1000, count = 3) %>% 
  dataset_batch(128) %>% 
  dataset_prefetch(1)