What is sparklyr?

How does Spark work?

Notebook setup

Clone this repo

$ git clone https://github.com/aurora-mareviv/sparklyr_start
R_version_current <- paste0(version$major, ".", version$minor)
R_version_tutorial <- "3.5.1"
if (R_version_current < R_version_tutorial) message("Your R version is not updated; please go to: https://cran.r-project.org")
## Your R version is not updated; please go to: https://cran.r-project.org
# Installs missing libraries on render!
list.of.packages <- c("sparklyr", "rmarkdown", "dplyr", "Rcpp", "knitr", "ggplot2")
new.packages <- list.of.packages[!(list.of.packages %in% installed.packages()[,"Package"])]
if(length(new.packages)) install.packages(new.packages, repos='https://cran.rstudio.com/')
library(dplyr)
library(ggplot2)
library(sparklyr)
options(scipen=999)
source("./assets/rscripts/test.R", echo=FALSE) # if you want to source some R script
# directory where the notebook is
wdir <- getwd() 
# directory where data are imported from & saved to
datadir <- file.path(wdir, "data") # better than datadir <- paste(wdir, "/data", sep="")
# directory where external images are imported from
imgdir <- file.path(wdir, "img")
# directory where plots are saved to
outdir <- file.path(wdir, "out")
# the folder immediately above root dir
Up <- paste("\\", basename(wdir), sep="")
wdirUp <- gsub(Up, "", wdir) 

Spark setup

From within RStudio, we can easily install Spark:

spark_install()

Loading spark context (RDDs)

Currently there are three types of contexts:

We’ll set some session variables

SPARK_HOME = Sys.getenv("SPARK_HOME") # but the path to Spark may be different in some cases!
# SPARK_HOME = "" 
# SPARK_HOME = "~/spark/spark-2.3.1-bin-hadoop2.7" # MacOS
# SPARK_HOME = "/usr/lib/spark" # GNUL

Create a Spark context:

#sc <- spark_connect(master = "local")
sc <- spark_connect(master = "local", spark_home = SPARK_HOME)

The connection sc will let us interact with Spark. The general pipeline we’ll use will be:

Upload data to HDFS

In case we are dealing with an HDFS cluster, we must first upload our data to our HDFS HOME

In local mode, this is not necessary, as we are not leaving our PC.

Reading JSON into Spark context: jscars.json

jscars <- spark_read_json(sc, name = "jscars", path = file.path(datadir, "jscars.json") )
jscars %>%
  head(6) %>%
  collect()
   # A tibble: 6 x 11
        am  carb   cyl  disp  drat  gear    hp   mpg  qsec    vs    wt
     <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl>
   1     1     4     6   160  3.9      4   110  21    16.5     0  2.62
   2     1     4     6   160  3.9      4   110  21    17.0     0  2.88
   3     1     1     4   108  3.85     4    93  22.8  18.6     1  2.32
   4     0     1     6   258  3.08     3   110  21.4  19.4     1  3.22
   5     0     2     8   360  3.15     3   175  18.7  17.0     0  3.44
   6     0     1     6   225  2.76     3   105  18.1  20.2     1  3.46

Basic data wrangling

We can make good use of magrittr::%>% pipes for data wrangling or window operations:
The warnings issued here are usually not important. They depend on the sparklyr version. See here for more info.

jscars %>%
  group_by(vs) %>%
  filter(gear == 3, hp > 100) %>%
  mutate(horsepower_by_gear = rank(desc(hp / gear))) %>%
  mutate(mpg_rank = rank(mpg)) %>%
  select(gear, mpg_rank, horsepower_by_gear) %>% 
  head(6) %>%
  collect()
   # A tibble: 6 x 4
        vs  gear mpg_rank horsepower_by_gear
     <dbl> <dbl>    <int>              <int>
   1     0     3        1                  4
   2     0     3        1                  5
   3     0     3        3                  1
   4     0     3        4                  1
   5     0     3        5                  3
   6     0     3        6                  6

Graphs

jscars %>%
  collect() %>%
    ggplot(aes(wt, mpg)) +
    geom_point(aes(colour=factor(gear), size=hp)) + 
    geom_smooth(colour="#737373", alpha=0.3) + 
    theme_bw()

Models: K-means

kmeans_model <- jscars %>%
  select(wt, mpg) %>%
  ml_kmeans(y ~ wt + mpg, k = 3)

print(kmeans_model)
   K-means clustering with 3 clusters
   
   Cluster centers:
           wt      mpg
   1 1.873000 30.06667
   2 4.058667 14.45833
   3 3.072143 20.64286
   
   Within Set Sum of Squared Errors =  154.0229
# kmeans_model %>% str()
# predict the associated class
predicts <- sdf_predict(jscars, kmeans_model) 
predicted <- collect(predicts)
collect(head(predicted))
   # A tibble: 6 x 13
        am  carb   cyl  disp  drat  gear    hp   mpg  qsec    vs    wt
     <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl>
   1     1     4     6   160  3.9      4   110  21    16.5     0  2.62
   2     1     4     6   160  3.9      4   110  21    17.0     0  2.88
   3     1     1     4   108  3.85     4    93  22.8  18.6     1  2.32
   4     0     1     6   258  3.08     3   110  21.4  19.4     1  3.22
   5     0     2     8   360  3.15     3   175  18.7  17.0     0  3.44
   6     0     1     6   225  2.76     3   105  18.1  20.2     1  3.46
   # … with 2 more variables: features <list>, prediction <int>
base::table(predicted$am, predicted$prediction)
      
        0  1  2
     0  0 10  9
     1  6  2  5
# plot cluster membership
  sdf_predict(jscars, kmeans_model) %>%
  collect() %>%
  ggplot(aes(wt, mpg)) +
    geom_point(aes(wt, mpg, col = factor(prediction + 1)),
               size = 2, alpha = 0.5) + 
    geom_point(data = kmeans_model$centers, aes(wt, mpg),
               col = scales::muted(c("red", "green", "blue")),
               pch = 'x', size = 12) +
    scale_color_discrete(name = "Predicted Cluster",
                         labels = paste("Cluster", 1:3)) +
    labs(
      x = "wt",
      y = "mpg",
      title = "K-Means Clustering",
      subtitle = "Use Spark.ML to predict cluster membership with the jscars dataset."
    ) +
    theme_bw()

Session Info

.libPaths()
Sys.getenv("R_HOME")
sessionInfo()
   R version 3.4.3 (2017-11-30)
   Platform: x86_64-apple-darwin15.6.0 (64-bit)
   Running under: OS X El Capitan 10.11.6
   
   Matrix products: default
   BLAS: /Library/Frameworks/R.framework/Versions/3.4/Resources/lib/libRblas.0.dylib
   LAPACK: /Library/Frameworks/R.framework/Versions/3.4/Resources/lib/libRlapack.dylib
   
   locale:
   [1] es_ES.UTF-8/es_ES.UTF-8/es_ES.UTF-8/C/es_ES.UTF-8/es_ES.UTF-8
   
   attached base packages:
   [1] stats     graphics  grDevices utils     datasets  methods   base     
   
   other attached packages:
   [1] sparklyr_1.0.0     ggplot2_3.2.0.9000 dplyr_0.8.0.1     
   
   loaded via a namespace (and not attached):
    [1] Rcpp_1.0.1       dbplyr_1.4.0     pillar_1.4.2     compiler_3.4.3  
    [5] r2d3_0.2.3       base64enc_0.1-3  tools_3.4.3      zeallot_0.1.0   
    [9] digest_0.6.20    jsonlite_1.6     evaluate_0.14    tibble_2.1.3    
   [13] gtable_0.3.0     pkgconfig_2.0.2  rlang_0.4.0      cli_1.1.0       
   [17] rstudioapi_0.10  DBI_1.0.0        parallel_3.4.3   yaml_2.2.0      
   [21] xfun_0.8         withr_2.1.2      stringr_1.4.0    httr_1.4.0      
   [25] knitr_1.23       vctrs_0.2.0      askpass_1.1      rappdirs_0.3.1  
   [29] generics_0.0.2   htmlwidgets_1.3  rprojroot_1.3-2  grid_3.4.3      
   [33] tidyselect_0.2.5 glue_1.3.1       forge_0.2.0      R6_2.4.0        
   [37] fansi_0.4.0      rmarkdown_1.12   purrr_0.3.2      magrittr_1.5    
   [41] ellipsis_0.2.0.1 backports_1.1.4  scales_1.0.0     htmltools_0.3.6 
   [45] assertthat_0.2.1 colorspace_1.4-1 labeling_0.3     config_0.3      
   [49] utf8_1.1.4       stringi_1.4.3    openssl_1.3      lazyeval_0.2.2  
   [53] munsell_0.5.0    crayon_1.3.4

References