Rとビッグデータ:Replyrの使用

replyr



- RのビッグデータのREモートPLY ing( Rのビッグデータのリモート処理)の略。



replyr



価値があるのはなぜですか? なぜなら、リモートデータ(データベースまたはSpark )に標準の作業アプローチを適用できるからです。



ローカルdata.frame



と同じ方法で作業できます。 replyr



replyr



機能を提供します。





ほとんどの場合、これらすべてをデータでローカルに実行するため、これらの機能によりSpark



およびsparklyr



はるかに簡単になります。



replyr



は、多くの顧客のアプリケーションソリューションでRを使用し、フィードバックを収集し、欠陥を修正した集合的な経験の製品です。



以下に例を示します。



すべてが急速に変化しているため、例として開発中のパッケージバージョンを使用します。



 base::date() ## [1] "Thu Jul 6 15:56:28 2017" # devtools::install_github('rstudio/sparklyr') # devtools::install_github('tidyverse/dplyr') # devtools::install_github('tidyverse/dbplyr') # install.packages("replyr") suppressPackageStartupMessages(library("dplyr")) packageVersion("dplyr") ## [1] '0.7.1.9000' packageVersion("dbplyr") ## [1] '1.1.0.9000' library("tidyr") packageVersion("tidyr") ## [1] '0.6.3' library("replyr") packageVersion("replyr") ## [1] '0.4.2' suppressPackageStartupMessages(library("sparklyr")) packageVersion("sparklyr") ## [1] '0.5.6.9012' #  ,    https://github.com/rstudio/sparklyr/issues/783 config <- spark_config() config[["sparklyr.shell.driver-memory"]] <- "8G" sc <- sparklyr::spark_connect(version='2.1.0', hadoop_version = '2.7', master = "local", config = config)
      
      





まとめ



Spark



で実行できない標準のsummary()



およびglance()







 mtcars_spark <- copy_to(sc, mtcars) #  ,    summary(mtcars_spark) ## Length Class Mode ## src 1 src_spark list ## ops 2 op_base_remote list packageVersion("broom") ## [1] '0.4.2' broom::glance(mtcars_spark) ## Error: glance doesn't know how to deal with data of class tbl_sparktbl_sqltbl_lazytbl
      
      





replyr_summary



機能しています。



 replyr_summary(mtcars_spark) %>% select(-lexmin, -lexmax, -nunique, -index) ## column class nrows nna min max mean sd ## 1 mpg numeric 32 0 10.400 33.900 20.090625 6.0269481 ## 2 cyl numeric 32 0 4.000 8.000 6.187500 1.7859216 ## 3 disp numeric 32 0 71.100 472.000 230.721875 123.9386938 ## 4 hp numeric 32 0 52.000 335.000 146.687500 68.5628685 ## 5 drat numeric 32 0 2.760 4.930 3.596563 0.5346787 ## 6 wt numeric 32 0 1.513 5.424 3.217250 0.9784574 ## 7 qsec numeric 32 0 14.500 22.900 17.848750 1.7869432 ## 8 vs numeric 32 0 0.000 1.000 0.437500 0.5040161 ## 9 am numeric 32 0 0.000 1.000 0.406250 0.4989909 ## 10 gear numeric 32 0 3.000 5.000 3.687500 0.7378041 ## 11 carb numeric 32 0 1.000 8.000 2.812500 1.6152000
      
      





集約/分配



tidyr



は主にローカルデータで動作します。



 mtcars2 <- mtcars %>% mutate(car = row.names(mtcars)) %>% copy_to(sc, ., 'mtcars2') #  mtcars2 %>% tidyr::gather('fact', 'value') ## Error in UseMethod("gather_"): no applicable method for 'gather_' applied to an object of class "c('tbl_spark', 'tbl_sql', 'tbl_lazy', 'tbl')" mtcars2 %>% replyr_moveValuesToRows(nameForNewKeyColumn= 'fact', nameForNewValueColumn= 'value', columnsToTakeFrom= colnames(mtcars), nameForNewClassColumn= 'class') %>% arrange(car, fact) ## # Source: lazy query [?? x 4] ## # Database: spark_connection ## # Ordered by: car, fact ## car fact value class ## ## 1 AMC Javelin am 0.00 numeric ## 2 AMC Javelin carb 2.00 numeric ## 3 AMC Javelin cyl 8.00 numeric ## 4 AMC Javelin disp 304.00 numeric ## 5 AMC Javelin drat 3.15 numeric ## 6 AMC Javelin gear 3.00 numeric ## 7 AMC Javelin hp 150.00 numeric ## 8 AMC Javelin mpg 15.20 numeric ## 9 AMC Javelin qsec 17.30 numeric ## 10 AMC Javelin vs 0.00 numeric ## # ... with 342 more rows
      
      





ラインバインディング



dplyr bind_rows



union



union_all



現在Spark



は適用されません。 replyr::replyr_union_all()



およびreplyr::replyr_bind_rows()



は実行可能な代替手段です。



bind_rows()



 db1 <- copy_to(sc, data.frame(x=1:2, y=c('a','b'), stringsAsFactors=FALSE), name='db1') db2 <- copy_to(sc, data.frame(y=c('c','d'), x=3:4, stringsAsFactors=FALSE), name='db2') #  -     ,    bind_rows(list(db1, db2)) ## Error in bind_rows_(x, .id): Argument 1 must be a data frame or a named atomic vector, not a tbl_spark/tbl_sql/tbl_lazy/tbl
      
      





union_all



 #          union_all(db1, db2) ## # Source: lazy query [?? x 2] ## # Database: spark_connection ## xy ## ## 1 1 a ## 2 2 b ## 3 3 c ## 4 4 d
      
      





組合



 #          #  ,     union(db1, db2) ## # Source: lazy query [?? x 2] ## # Database: spark_connection ## xy ## ## 1 4 d ## 2 1 a ## 3 3 c ## 4 2 b
      
      





replyr_bind_rows



replyr::replyr_bind_rows



は、複数のreplyr::replyr_bind_rows



をバインドできます。



 replyr_bind_rows(list(db1, db2)) ## # Source: table [?? x 2] ## # Database: spark_connection ## xy ## ## 1 1 a ## 2 2 b ## 3 3 c ## 4 4 d
      
      





dplyr :: do



この例では、集約されたデータセットの各グループから数行を取得します。 注意:配列を使用して明示的に順序を設定しないため、異なるデータソース(DBまたはSpark



)で結果が一致することを常に期待できるわけではありません。



dplyr::do



ローカルデータで実行



help('do', package='dplyr')



からhelp('do', package='dplyr')







 by_cyl <- group_by(mtcars, cyl) do(by_cyl, head(., 2)) ## # A tibble: 6 x 11 ## # Groups: cyl [3] ## mpg cyl disp hp drat wt qsec vs am gear carb ## ## 1 22.8 4 108.0 93 3.85 2.320 18.61 1 1 4 1 ## 2 24.4 4 146.7 62 3.69 3.190 20.00 1 0 4 2 ## 3 21.0 6 160.0 110 3.90 2.620 16.46 0 1 4 4 ## 4 21.0 6 160.0 110 3.90 2.875 17.02 0 1 4 4 ## 5 18.7 8 360.0 175 3.15 3.440 17.02 0 0 3 2 ## 6 14.3 8 360.0 245 3.21 3.570 15.84 0 0 3 4
      
      





dplyr::do



on Spark





 by_cyl <- group_by(mtcars_spark, cyl) do(by_cyl, head(., 2)) ## # A tibble: 3 x 2 ## cyl V2 ## ## 1 6 ## 2 4 ## 3 8
      
      





使用できるものがまったく得られません。



replyr



分割/マージ



 mtcars_spark %>% replyr_split('cyl', partitionMethod = 'extract') %>% lapply(function(di) head(di, 2)) %>% replyr_bind_rows() ## # Source: table [?? x 11] ## # Database: spark_connection ## mpg cyl disp hp drat wt qsec vs am gear carb ## ## 1 21.0 6 160.0 110 3.90 2.620 16.46 0 1 4 4 ## 2 21.0 6 160.0 110 3.90 2.875 17.02 0 1 4 4 ## 3 22.8 4 108.0 93 3.85 2.320 18.61 1 1 4 1 ## 4 24.4 4 146.7 62 3.69 3.190 20.00 1 0 4 2 ## 5 18.7 8 360.0 175 3.15 3.440 17.02 0 0 3 2 ## 6 14.3 8 360.0 245 3.21 3.570 15.84 0 0 3 4
      
      





replyr gapply





 mtcars_spark %>% gapply('cyl', partitionMethod = 'extract', function(di) head(di, 2)) ## # Source: table [?? x 11] ## # Database: spark_connection ## mpg cyl disp hp drat wt qsec vs am gear carb ## ## 1 21.0 6 160.0 110 3.90 2.620 16.46 0 1 4 4 ## 2 21.0 6 160.0 110 3.90 2.875 17.02 0 1 4 4 ## 3 22.8 4 108.0 93 3.85 2.320 18.61 1 1 4 1 ## 4 24.4 4 146.7 62 3.69 3.190 20.00 1 0 4 2 ## 5 18.7 8 360.0 175 3.15 3.440 17.02 0 0 3 2 ## 6 14.3 8 360.0 245 3.21 3.570 15.84 0 0 3 4
      
      





replyr::replyr_apply_f_mapped





受け取りたいもの:コードに対応する名前のデータ(つまり、コードではなくデータを変更します)。



ある考えでは、データの変更がデータではなく機能の環境に関連付けられている場合、これを実現できます。 つまり、対応する監視機能が実行されるまでデータが変更されます。 私たちの場合、この関数はreplyr::replyr_apply_f_mapped()



であり、次のように機能します。



使用する操作が、私たちの制御下にないソース(パッケージなど)から取得したダウングレード関数であるとします。 これは単純な関数かもしれませんが(以下のように)、変更せずに使用したいとします( wrapr::let()



導入のように、非常に小さな関数も除外します)。



 #          DecreaseRankColumnByOne <- function(d) { d$RankColumn <- d$RankColumn - 1 d }
      
      





この関数をd



(列名が期待どおりではない!)に適用するには、 replyr::replyr_apply_f_mapped()



を使用して、新しいパラメーター化されたアダプターを作成します。



 #   d <- data.frame(Sepal_Length = c(5.8,5.7), Sepal_Width = c(4.0,4.4), Species = 'setosa', rank = c(1,2)) #     DecreaseRankColumnByOneNamed <- function(d, ColName) { replyr::replyr_apply_f_mapped(d, f = DecreaseRankColumnByOne, nmap = c(RankColumn = ColName), restrictMapIn = FALSE, restrictMapOut = FALSE) } #  dF <- DecreaseRankColumnByOneNamed(d, 'rank') print(dF) ## Sepal_Length Sepal_Width Species rank ## 1 5.8 4.0 setosa 0 ## 2 5.7 4.4 setosa 1
      
      





replyr::replyr_apply_f_mapped()



は、 replyr::replyr_apply_f_mapped()



列の名前を変更し(一致はnmap



で指定されますreplyr::replyr_apply_f_mapped()



replyr::replyr_apply_f_mapped()



適用し、結果を返す前に元の名前に戻します。



中間追跡



Sparklyr



多くのタスクには、中間または一時テーブルの作成が含まれます。 これは、 dplyr::copy_to()



およびdplyr::compute()



を使用してdplyr::copy_to()



できます。 これらの方法は、リソースを大量に消費する可能性があります。



replyr



には、プロセスを制御できるようにする関数があります。データ自体を変更しない一時的な名前ジェネレーター(パッケージ自体の内部でも使用されます)。



関数自体は非常に簡単です。



 print(replyr::makeTempNameGenerator) ## function (prefix, suffix = NULL) ## { ## force(prefix) ## if ((length(prefix) != 1) || (!is.character(prefix))) { ## stop("repyr::makeTempNameGenerator prefix must be a string") ## } ## if (is.null(suffix)) { ## alphabet <- c(letters, toupper(letters), as.character(0:9)) ## suffix <- paste(base::sample(alphabet, size = 20, replace = TRUE), ## collapse = "") ## } ## count <- 0 ## nameList <- list() ## function(..., peek = FALSE, dumpList = FALSE, remove = NULL) { ## if (length(list(...)) > 0) { ## stop("replyr::makeTempNameGenerator tempname generate unexpected argument") ## } ## if (peek) { ## return(names(nameList)) ## } ## if (dumpList) { ## v <- names(nameList) ## nameList <<- list() ## return(v) ## } ## if (!is.null(remove)) { ## victims <- intersect(remove, names(nameList)) ## nameList[victims] <<- NULL ## return(victims) ## } ## nm <- paste(prefix, suffix, sprintf("%010d", count), ## sep = "_") ## nameList[[nm]] <<- 1 ## count <<- count + 1 ## nm ## } ## } ## ##
      
      





たとえば、複数のテーブルを組み合わせる場合、一部のデータソースの適切なソリューションは、各結合後にcompute



を呼び出すことです(そうしないと、結果のSQLが長くなり、理解および保守が困難になる場合があります)。 コードは次のようになります。



 #     names <- paste('table', 1:5, sep='_') tables <- lapply(names, function(ni) { di <- data.frame(key= 1:3) di[[paste('val',ni,sep='_')]] <- runif(nrow(di)) copy_to(sc, di, ni) }) #     tmpNamGen <- replyr::makeTempNameGenerator('JOINTMP') #      joined <- tables[[1]] for(i in seq(2,length(tables))) { ti <- tables[[i]] if(i<length(tables)) { joined <- compute(left_join(joined, ti, by='key'), name= tmpNamGen()) } else { #    joined <- compute(left_join(joined, ti, by='key'), name= 'joinres') } } #    temps <- tmpNamGen(dumpList = TRUE) print(temps) ## [1] "JOINTMP_1dr7xHI9CkSZJwXfKA1B_0000000000" ## [2] "JOINTMP_1dr7xHI9CkSZJwXfKA1B_0000000001" ## [3] "JOINTMP_1dr7xHI9CkSZJwXfKA1B_0000000002" for(ti in temps) { db_drop_table(sc, ti) } #  print(joined) ## # Source: table [?? x 6] ## # Database: spark_connection ## key val_table_1 val_table_2 val_table_3 val_table_4 val_table_5 ## ## 1 1 0.8045418 0.5006293 0.8656174 0.5248073 0.8611796 ## 2 2 0.1593121 0.5802938 0.9722113 0.4532369 0.7429018 ## 3 3 0.4853835 0.5313043 0.6224256 0.1843134 0.1125551
      
      





時間データの正確な導入と管理により、リソース(時間と場所の両方)を節約し、結果を大幅に改善できます。 一時的な名前ジェネレーターを明示的に設定し、それをすべてのSparklyr



変換にSparklyr



、結果がそれらに依存しなくなっSparklyr



一時的な値をまとめてクリアすることをおSparklyr



します。



おわりに



R



使用してSpark



またはデータベースのデータ処理を慎重に制御する場合は、 replyr



replyr



に加えてdplyr



sparklyr







 sparklyr::spark_disconnect(sc) rm(list=ls()) gc() ## used (Mb) gc trigger (Mb) max used (Mb) ## Ncells 821292 43.9 1442291 77.1 1168576 62.5 ## Vcells 1364897 10.5 2552219 19.5 1694265 13.0
      
      






All Articles