sparktf is a sparklyr extension that allows
writing of Spark DataFrame
s to TFRecord
, the
recommended format for persisting data to be used in training with
TensorFlow.
You can install sparktf from CRAN with:
install.packages("sparktf")
You can install the development version of sparktf from GitHub with:
::install_github("rstudio/sparktf") devtools
We first attach the required packages and establish a Spark connection.
library(sparktf)
library(sparklyr)
library(keras)
use_implementation("tensorflow")
library(tensorflow)
tfe_enable_eager_execution()
library(tfdatasets)
<- spark_connect(master = "local") sc
Copied a sample dataset to Spark then write it to disk via
spark_write_tfrecord()
.
<- file.path(tempdir(), "iris")
data_path <- sdf_copy_to(sc, iris)
iris_tbl
%>%
iris_tbl ft_string_indexer_model(
"Species", "label",
labels = c("setosa", "versicolor", "virginica")
%>%
) spark_write_tfrecord(
path = data_path,
write_locality = "local"
)
We now read the saved TFRecord
file and parse the
contents to create a dataset object. For details, refer to the package
website for tfdatasets.
<- tfrecord_dataset(list.files(data_path, full.names = TRUE)) %>%
dataset dataset_map(function(example_proto) {
<- list(
features label = tf$FixedLenFeature(shape(), tf$float32),
Sepal_Length = tf$FixedLenFeature(shape(), tf$float32),
Sepal_Width = tf$FixedLenFeature(shape(), tf$float32),
Petal_Length = tf$FixedLenFeature(shape(), tf$float32),
Petal_Width = tf$FixedLenFeature(shape(), tf$float32)
)
<- tf$parse_single_example(example_proto, features)
features <- list(
x $Sepal_Length, features$Sepal_Width,
features$Petal_Length, features$Petal_Width
features
)<- tf$one_hot(tf$cast(features$label, tf$int32), 3L)
y list(x, y)
%>%
}) dataset_shuffle(150) %>%
dataset_batch(16)
Now, we can define a Keras model using the keras package and fit it by
feeding the dataset
object defined above.
<- keras_model_sequential() %>%
model layer_dense(32, activation = "relu", input_shape = 4) %>%
layer_dense(3, activation = "softmax")
%>%
model compile(loss = "categorical_crossentropy", optimizer = tf$train$AdamOptimizer())
<- model %>%
history fit(dataset, epochs = 100, verbose = 0)
Finally, we can use the trained model to make some predictions.
<- tf$constant(c(4.9, 3.2, 1.4, 0.2), shape = c(1, 4))
new_data model(new_data)
#> tf.Tensor([[0.69612664 0.13773003 0.1661433 ]], shape=(1, 3), dtype=float32)