From d29941517790e57c6f366d1b42b510ae05b4e463 Mon Sep 17 00:00:00 2001 From: talegari Date: Sun, 10 Sep 2023 23:09:27 +0530 Subject: [PATCH] to v2, adds support to remote tables --- DESCRIPTION | 7 +- NEWS.md | 4 + R/mutate.R | 883 +++++++++++------- README.Rmd | 22 +- README.md | 44 +- docs/404.html | 2 +- docs/LICENSE.html | 2 +- docs/authors.html | 2 +- docs/index.html | 47 +- docs/news/index.html | 7 +- docs/pkgdown.yml | 2 +- docs/reference/index.html | 2 +- docs/reference/mutate.html | 121 ++- docs/reference/mutate_.html | 128 ++- .../remove_common_nested_columns.html | 2 +- docs/search.json | 2 +- man/mutate.Rd | 63 +- man/mutate_.Rd | 89 +- tests/testthat/tests_tidier.R | 54 ++ 19 files changed, 955 insertions(+), 528 deletions(-) diff --git a/DESCRIPTION b/DESCRIPTION index 7e52679..f599de5 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -1,12 +1,12 @@ Package: tidier Title: Enhanced 'mutate' -Version: 0.1.0 +Version: 0.2.0 Authors@R: person("Srikanth", "Komala Sheshachala",, "sri.teach@gmail.com", role = c("aut", "cre")) -Description: Provides 'Apache Spark' style window aggregation for R dataframes via 'mutate' in 'dplyr' flavour. +Description: Provides 'Apache Spark' style window aggregation for R dataframes and remote 'dbplyr' tables via 'mutate' in 'dplyr' flavour. Imports: dplyr (>= 1.1.0), tidyr (>= 1.3.0), @@ -15,10 +15,13 @@ Imports: slider (>= 0.2.2), magrittr (>= 1.5), furrr (>= 0.3.0), + dbplyr (>= 2.3.1), Suggests: lubridate, stringr, testthat, + RSQLite, + tibble, URL: https://github.com/talegari/tidier License: GPL (>= 3) Encoding: UTF-8 diff --git a/NEWS.md b/NEWS.md index 996676c..02af8d1 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,3 +1,7 @@ +# tidier 0.2.0 + +* `tidier`'s mutate now supports same syntax over 'dbplyr' tbls. + # tidier 0.1.0 (on github: 2023-06-01) * Exposed slider's `.complete` argument in `tidier::mutate` diff --git a/R/mutate.R b/R/mutate.R index 327ee29..fcaaca1 100644 --- a/R/mutate.R +++ b/R/mutate.R @@ -1,27 +1,29 @@ -# mutate_ ---- +# mutate_ ---------------------------------------------------------------------- #' @name mutate_ #' @title Drop-in replacement for \code{\link[dplyr]{mutate}} #' @description Provides supercharged version of \code{\link[dplyr]{mutate}} #' with `group_by`, `order_by` and aggregation over arbitrary window frame -#' around a row. This function allows some arguments to be passed as strings -#' instead of expressions. +#' around a row for dataframes and lazy (remote) `tbl`s of class `tbl_lazy`. #' @seealso mutate #' @details A window function returns a value for every input row of a dataframe -#' based on a group of rows (frame) in the neighborhood of the input row. This -#' function implements computation over groups (`partition_by` in SQL) in a -#' predefined order (`order_by` in SQL) across a neighborhood of rows (frame) -#' defined by a (up, down) where +#' or `lazy_tbl` based on a group of rows (frame) in the neighborhood of the +#' input row. This function implements computation over groups (`partition_by` +#' in SQL) in a predefined order (`order_by` in SQL) across a neighborhood of +#' rows (frame) defined by a (up, down) where #' #' - up/down are number of rows before and after the corresponding row #' -#' - up/down are interval objects (ex: `c(days(2), days(1))`) +#' - up/down are interval objects (ex: `c(days(2), days(1))`). +#' Interval objects are currently supported for dataframe only. (not +#' `tbl_lazy`) #' #' This implementation is inspired by spark's [window #' API](https://www.databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html). #' +#' **Implementation Details**: #' -#' Implementation Details: +#' For dataframe input: #' #' - Iteration per row over the window is implemented using the versatile #' [`slider`](https://cran.r-project.org/package=slider). @@ -34,45 +36,51 @@ #' #' - function subsumes regular usecases of \code{\link[dplyr]{mutate}} #' -#' @param x (data.frame) +#' For `tbl_lazy` input: +#' +#' - Uses `dbplyr::window_order` and `dbplyr::window_frame` to translate to +#' `partition_by` and window frame specification. +#' +#' @param x (`data.frame` or `tbl_lazy`) #' @param ... expressions to be passed to \code{\link[dplyr]{mutate}} -#' @param .by (character vector, optional: Yes) columns to group by -#' @param .order_by (character vector, optional: Yes) columns to order by +#' @param .by (character vector, optional: Yes) Columns to group by +#' @param .order_by (string, optional: Yes) Columns to order by #' @param .frame (vector, optional: Yes) Vector of length 2 indicating the #' number of rows to consider before and after the current row. When argument #' `.index` is provided (typically a column of type date or datetime), before #' and after can be #' [interval](https://lubridate.tidyverse.org/reference/interval.html) -#' objects. See examples. -#' @param .index (string, optional: Yes) name of index column -#' @param .desc (logical_vector, default: FALSE) bool or logical vector of same -#' length as `.order_by`. +#' objects. See examples. When input is `tbl_lazy`, only number of rows as +#' vector of length 2 is supported. +#' @param .index (string, optional: Yes, default: NULL) index column. This is +#' supported when input is a dataframe only. +#' @param .desc (flag, default: FALSE) Whether to order in descending order #' @param .complete (flag, default: FALSE) This will be passed to #' `slider::slide` / `slider::slide_vec`. Should the function be evaluated on -#' complete windows only? If FALSE, the default, then partial computations -#' will be allowed. -#' @return data.frame +#' complete windows only? If FALSE or NULL, the default, then partial +#' computations will be allowed. This is supported when input is a dataframe +#' only. +#' @return `data.frame` or `tbl_lazy` #' @importFrom magrittr %>% #' @importFrom utils tail -#' @export #' #' @examples #' library("magrittr") -#' # example 1 +#' # example 1 (simple case with dataframe) #' # Using iris dataset, #' # compute cumulative mean of column `Sepal.Length` #' # ordered by `Petal.Width` and `Sepal.Width` columns #' # grouped by `Petal.Length` column #' #' iris %>% -#' mutate_(sl_mean = mean(Sepal.Length), -#' .order_by = c("Petal.Width", "Sepal.Width"), -#' .by = "Petal.Length", -#' .frame = c(Inf, 0), -#' ) %>% +#' tidier::mutate_(sl_mean = mean(Sepal.Length), +#' .order_by = c("Petal.Width", "Sepal.Width"), +#' .by = "Petal.Length", +#' .frame = c(Inf, 0), +#' ) %>% #' dplyr::slice_min(n = 3, Petal.Width, by = Species) #' -#' # example 2 +#' # example 2 (detailed case with dataframe) #' # Using a sample airquality dataset, #' # compute mean temp over last seven days in the same month for every row #' @@ -84,210 +92,293 @@ #' dplyr::slice_sample(prop = 0.8) %>% #' dplyr::arrange(date_col) %>% #' # compute mean temperature over last seven days in the same month -#' tidier::mutate(avg_temp_over_last_week = mean(Temp, na.rm = TRUE), -#' .order_by = "Day", -#' .by = "Month", -#' .frame = c(lubridate::days(7), # 7 days before current row +#' tidier::mutate_(avg_temp_over_last_week = mean(Temp, na.rm = TRUE), +#' .order_by = "Day", +#' .by = "Month", +#' .frame = c(lubridate::days(7), # 7 days before current row #' lubridate::days(-1) # do not include current row #' ), -#' .index = "date_col" -#' ) +#' .index = "date_col" +#' ) +#' # example 3 +#' airquality %>% +#' # create date column as character +#' dplyr::mutate(date_col = +#' as.character(lubridate::make_date(1973, Month, Day)) +#' ) %>% +#' tibble::as_tibble() %>% +#' # as `tbl_lazy` +#' dbplyr::memdb_frame() %>% +#' mutate_(avg_temp = mean(Temp), +#' .by = "Month", +#' .order_by = "date_col", +#' .frame = c(3, 3) +#' ) %>% +#' dplyr::collect() %>% +#' dplyr::select(Ozone, Solar.R, Wind, Temp, Month, Day, date_col, avg_temp) #' @export - -mutate_ = function(x, ..., .by, .order_by, .frame, .index, - .desc = FALSE, .complete = FALSE +mutate_ = function(x, + ..., + .by, + .order_by, + .frame, + .index, + .desc = FALSE, + .complete = FALSE ){ + checkmate::assert_multi_class(x, c("data.frame", "tbl_lazy")) + + if (inherits(x, "data.frame")){ + # capture expressions -------------------------------------------------------- + ddd = rlang::enquos(...) + + # assertions ----------------------------------------------------------------- + order_by_is_missing = missing(.order_by) + by_is_missing = missing(.by) + frame_is_missing = missing(.frame) + index_is_missing = missing(.index) + + if (!order_by_is_missing) { + checkmate::assert_character(.order_by, + unique = TRUE, + any.missing = FALSE, + min.len = 1 + ) + checkmate::assert_subset(.order_by, choices = colnames(x)) + checkmate::assert_logical(.desc, any.missing = FALSE, min.len = 1) + len_desc = length(.desc) + checkmate::assert(len_desc == length(.order_by) || len_desc == 1) + } - # capture expressions -------------------------------------------------------- - ddd = rlang::enquos(...) - - # assertions ----------------------------------------------------------------- - order_by_is_missing = missing(.order_by) - by_is_missing = missing(.by) - frame_is_missing = missing(.frame) - index_is_missing = missing(.index) - - if (!order_by_is_missing) { - checkmate::assert_character(.order_by, - unique = TRUE, - any.missing = FALSE, - min.len = 1 - ) - checkmate::assert_subset(.order_by, choices = colnames(x)) - checkmate::assert_logical(.desc, any.missing = FALSE, min.len = 1) - len_desc = length(.desc) - checkmate::assert(len_desc == length(.order_by) || len_desc == 1) - } - - if (!by_is_missing) { - checkmate::assert_character(.by, - unique = TRUE, - any.missing = FALSE, - min.len = 1, - ) - checkmate::assert_subset(.by, choices = colnames(x)) - } + if (!by_is_missing) { + checkmate::assert_character(.by, + unique = TRUE, + any.missing = FALSE, + min.len = 1, + ) + checkmate::assert_subset(.by, choices = colnames(x)) + } - if (!frame_is_missing) { - checkmate::assert(length(.frame) == 2) - checkmate::assert(inherits(.frame, c("numeric", "Period"))) - checkmate::assert_true(all(class(.frame[[1]]) == class(.frame[[2]]))) - if (!index_is_missing) { - checkmate::assert_string(.index) - checkmate::assert_subset(.index, choices = colnames(x)) - } else { - .index = NULL + if (!frame_is_missing) { + checkmate::assert(length(.frame) == 2) + checkmate::assert(inherits(.frame, c("numeric", "Period"))) + checkmate::assert_true(all(class(.frame[[1]]) == class(.frame[[2]]))) + if (!index_is_missing) { + checkmate::assert_string(.index) + checkmate::assert_subset(.index, choices = colnames(x)) + } else { + .index = NULL + } } - } - # order before mutate -------------------------------------------------------- - if (!order_by_is_missing){ + # order before mutate -------------------------------------------------------- + if (!order_by_is_missing){ - if (len_desc == 1){ - .desc = rep(.desc, length(.order_by)) - } - row_order = do.call(order, - c(lapply(.order_by, function(.x) x[[.x]]), - list(decreasing = .desc) + if (len_desc == 1){ + .desc = rep(.desc, length(.order_by)) + } + row_order = do.call(order, + c(lapply(.order_by, function(.x) x[[.x]]), + list(decreasing = .desc) + ) ) - ) - x_copy = x[row_order, ] + x_copy = x[row_order, ] - } else { - x_copy = x - } + } else { + x_copy = x + } - # mutate core operation ------------------------------------------------------ - # for cran checks - data__ = NULL - slide_output__ = NULL + # mutate core operation --------------------------------------------------- + # for cran checks + data__ = NULL + slide_output__ = NULL - if (by_is_missing){ - # without groups ---- - if (frame_is_missing){ - # simple mutate without slide - x_copy = dplyr::mutate(x_copy, !!!ddd) + if (by_is_missing){ + # without groups ---- + if (frame_is_missing){ + # simple mutate without slide + x_copy = dplyr::mutate(x_copy, !!!ddd) + } else { + # without groups and with slide + if (index_is_missing){ + x_copy = x_copy %>% + dplyr::mutate(slide_output__ = + slider::slide( + x_copy, + .f = ~ as.list(dplyr::summarise(.x, !!!ddd)), + .before = .frame[1], + .after = .frame[2], + .complete = .complete + ) + ) %>% + remove_common_nested_columns(slide_output__) %>% + tidyr::unnest_wider(slide_output__) + } else { + x_copy = x_copy %>% + dplyr::mutate(slide_output__ = + slider::slide_index( + x_copy, + .f = ~ as.list(dplyr::summarise(.x, !!!ddd)), + .i = x_copy[[.index]], + .before = .frame[1], + .after = .frame[2], + .complete = .complete + ) + ) %>% + remove_common_nested_columns(slide_output__) %>% + tidyr::unnest_wider(slide_output__) + } + } } else { - # without groups and with slide - if (index_is_missing){ + # with groups ---- + if (frame_is_missing){ + # groupby mutate x_copy = x_copy %>% - dplyr::mutate(slide_output__ = - slider::slide( - x_copy, - .f = ~ as.list(dplyr::summarise(.x, !!!ddd)), - .before = .frame[1], - .after = .frame[2], - .complete = .complete - ) - ) %>% - remove_common_nested_columns(slide_output__) %>% - tidyr::unnest_wider(slide_output__) + dplyr::group_by(dplyr::across(dplyr::all_of(.by))) %>% + dplyr::mutate(!!!ddd) %>% + dplyr::ungroup() + } else { - x_copy = x_copy %>% - dplyr::mutate(slide_output__ = - slider::slide_index( - x_copy, - .f = ~ as.list(dplyr::summarise(.x, !!!ddd)), - .i = x_copy[[.index]], - .before = .frame[1], - .after = .frame[2], - .complete = .complete + # with groups and with slide + fun_per_chunk = function(chunk, ...){ + if (index_is_missing) { + out = chunk %>% + dplyr::mutate(slide_output__ = + slider::slide( + chunk, + .f = ~ as.list(dplyr::summarise(.x, !!!ddd)), + .before = .frame[1], + .after = .frame[2], + .complete = .complete + ) + ) + } else { + out = chunk %>% + dplyr::mutate(slide_output__ = + slider::slide_index( + chunk, + .f = ~ as.list(dplyr::summarise(.x, !!!ddd)), + .i = chunk[[.index]], + .before = .frame[1], + .after = .frame[2], + .complete = .complete + ) ) - ) %>% + } + + # remove groupby columns (if they exist) + for (acol in .by){ + out[[acol]] = NULL + } + + return(out) + } + + x_copy = x_copy %>% + tidyr::nest(data__ = dplyr::everything(), + .by = dplyr::all_of(.by) + ) %>% + dplyr::ungroup() %>% + dplyr::mutate(data__ = furrr::future_map(data__, fun_per_chunk)) %>% + tidyr::unnest(data__) %>% remove_common_nested_columns(slide_output__) %>% tidyr::unnest_wider(slide_output__) + } } - + # reorder the output before return ------------------------------------------- + if (!order_by_is_missing){ x_copy = x_copy[order(row_order), ] } + res = x_copy } else { - # with groups ---- - if (frame_is_missing){ - # groupby mutate - x_copy = x_copy %>% - dplyr::group_by(dplyr::across(dplyr::all_of(.by))) %>% - dplyr::mutate(!!!ddd) %>% - dplyr::ungroup() + # capture expressions ----------------------------------------------------- + ddd = rlang::enquos(...) + + # assertions -------------------------------------------------------------- + if (!missing(.index)) { + stop(paste0("When input is `tbl_lazy`,", + " `.index` argument is not supported.", + " `.index` should missing" + ) + ) + } - } else { - # with groups and with slide - fun_per_chunk = function(chunk, ...){ - if (index_is_missing) { - out = chunk %>% - dplyr::mutate(slide_output__ = - slider::slide( - chunk, - .f = ~ as.list(dplyr::summarise(.x, !!!ddd)), - .before = .frame[1], - .after = .frame[2], - .complete = .complete - ) - ) - } else { - out = chunk %>% - dplyr::mutate(slide_output__ = - slider::slide_index( - chunk, - .f = ~ as.list(dplyr::summarise(.x, !!!ddd)), - .i = chunk[[.index]], - .before = .frame[1], - .after = .frame[2], - .complete = .complete - ) - ) - } + if (!missing(.complete)) { + stop(paste0("When input is `tbl_lazy`,", + " `.complete` argument is not supported.", + " `.complete` should be missing" + ) + ) + } - # remove groupby columns (if they exist) - for (acol in .by){ - out[[acol]] = NULL - } + order_by_is_missing = missing(.order_by) + by_is_missing = missing(.by) + frame_is_missing = missing(.frame) - return(out) - } + # declare res ------------------------------------------------------------- + res = x + + # group by before mutate ------------------------------------------------- + if (!by_is_missing) { + res = dplyr::group_by(res, dplyr::pick(dplyr::all_of(.by))) + } - x_copy = x_copy %>% - tidyr::nest(data__ = dplyr::everything(), - .by = dplyr::all_of(.by) - ) %>% - dplyr::ungroup() %>% - dplyr::mutate(data__ = furrr::future_map(data__, fun_per_chunk)) %>% - tidyr::unnest(data__) %>% - remove_common_nested_columns(slide_output__) %>% - tidyr::unnest_wider(slide_output__) + # apply frame before mutate ----------------------------------------------- + if (!frame_is_missing) { + checkmate::assert(length(.frame) == 2) + checkmate::assert_numeric(.frame) + res = dbplyr::window_frame(res, from = -.frame[1], to = .frame[2]) } - } - # reorder the output before return ------------------------------------------- - if (!order_by_is_missing){ x_copy = x_copy[order(row_order), ] } + # order before mutate ----------------------------------------------------- + if (!order_by_is_missing){ + checkmate::assert_string(.order_by) + checkmate::assert_subset(.order_by, colnames(x)) + checkmate::assert_flag(.desc) + + if (.desc){ + res = dbplyr::window_order(res, dplyr::desc(!!rlang::sym(.order_by))) + } else { + res = dbplyr::window_order(res, !!rlang::sym(.order_by)) + } + } + + # core mutate operation --------------------------------------------------- + res = dplyr::mutate(res, !!!ddd) + res = dplyr::ungroup(res) + + } # return --------------------------------------------------------------------- - return(x_copy) + return(res) } -# mutate ---- +# mutate ----------------------------------------------------------------------- #' @name mutate #' @title Drop-in replacement for \code{\link[dplyr]{mutate}} #' @description Provides supercharged version of \code{\link[dplyr]{mutate}} #' with `group_by`, `order_by` and aggregation over arbitrary window frame -#' around a row. +#' around a row for dataframes and lazy (remote) `tbl`s of class `tbl_lazy`. #' @seealso mutate_ #' @details A window function returns a value for every input row of a dataframe -#' based on a group of rows (frame) in the neighborhood of the input row. This -#' function implements computation over groups (`partition_by` in SQL) in a -#' predefined order (`order_by` in SQL) across a neighborhood of rows (frame) -#' defined by a (up, down) where +#' or `lazy_tbl` based on a group of rows (frame) in the neighborhood of the +#' input row. This function implements computation over groups (`partition_by` +#' in SQL) in a predefined order (`order_by` in SQL) across a neighborhood of +#' rows (frame) defined by a (up, down) where #' #' - up/down are number of rows before and after the corresponding row #' -#' - up/down are interval objects (ex: `c(days(2), days(1))`) +#' - up/down are interval objects (ex: `c(days(2), days(1))`). +#' Interval objects are currently supported for dataframe only. (not +#' `tbl_lazy`) #' #' This implementation is inspired by spark's [window #' API](https://www.databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html). #' +#' **Implementation Details**: #' -#' -#' Implementation Details: +#' For dataframe input: #' #' - Iteration per row over the window is implemented using the versatile #' [`slider`](https://cran.r-project.org/package=slider). @@ -300,30 +391,36 @@ mutate_ = function(x, ..., .by, .order_by, .frame, .index, #' #' - function subsumes regular usecases of \code{\link[dplyr]{mutate}} #' -#' @param x (data.frame) +#' For `tbl_lazy` input: +#' +#' - Uses `dbplyr::window_order` and `dbplyr::window_frame` to translate to +#' `partition_by` and window frame specification. +#' +#' @param x (`data.frame` or `tbl_lazy`) #' @param ... expressions to be passed to \code{\link[dplyr]{mutate}} -#' @param .by (expression, optional: Yes) columns to group by -#' @param .order_by (expression, optional: Yes) columns to order by +#' @param .by (expression, optional: Yes) Columns to group by +#' @param .order_by (expression, optional: Yes) Columns to order by #' @param .frame (vector, optional: Yes) Vector of length 2 indicating the #' number of rows to consider before and after the current row. When argument #' `.index` is provided (typically a column of type date or datetime), before #' and after can be #' [interval](https://lubridate.tidyverse.org/reference/interval.html) -#' objects. See examples. -#' @param .index (expression, optional: Yes) index column +#' objects. See examples. When input is `tbl_lazy`, only number of rows as +#' vector of length 2 is supported. +#' @param .index (expression, optional: Yes, default: NULL) index column. This +#' is supported when input is a dataframe only. #' @param .complete (flag, default: FALSE) This will be passed to #' `slider::slide` / `slider::slide_vec`. Should the function be evaluated on -#' complete windows only? If FALSE, the default, then partial computations -#' will be allowed. -#' @return data.frame +#' complete windows only? If FALSE or NULL, the default, then partial +#' computations will be allowed. This is supported when input is a dataframe +#' only. +#' @return `data.frame` or `tbl_lazy` #' @importFrom magrittr %>% #' @importFrom utils tail #' -#' @export -#' #' @examples #' library("magrittr") -#' # example 1 +#' # example 1 (simple case with dataframe) #' # Using iris dataset, #' # compute cumulative mean of column `Sepal.Length` #' # ordered by `Petal.Width` and `Sepal.Width` columns @@ -337,7 +434,7 @@ mutate_ = function(x, ..., .by, .order_by, .frame, .index, #' ) %>% #' dplyr::slice_min(n = 3, Petal.Width, by = Species) #' -#' # example 2 +#' # example 2 (detailed case with dataframe) #' # Using a sample airquality dataset, #' # compute mean temp over last seven days in the same month for every row #' @@ -357,190 +454,288 @@ mutate_ = function(x, ..., .by, .order_by, .frame, .index, #' ), #' .index = date_col #' ) +#' # example 3 +#' airquality %>% +#' # create date column as character +#' dplyr::mutate(date_col = +#' as.character(lubridate::make_date(1973, Month, Day)) +#' ) %>% +#' tibble::as_tibble() %>% +#' # as `tbl_lazy` +#' dbplyr::memdb_frame() %>% +#' mutate(avg_temp = mean(Temp), +#' .by = Month, +#' .order_by = date_col, +#' .frame = c(3, 3) +#' ) %>% +#' dplyr::collect() %>% +#' dplyr::select(Ozone, Solar.R, Wind, Temp, Month, Day, date_col, avg_temp) #' @export -mutate = function(x, ..., .by, .order_by, .frame, .index, .complete = FALSE){ - - # capture expressions -------------------------------------------------------- - ddd = rlang::enquos(...) - - # assertions ----------------------------------------------------------------- - order_by_is_missing = missing(.order_by) - by_is_missing = missing(.by) - frame_is_missing = missing(.frame) - index_is_missing = missing(.index) +mutate = function(x, + ..., + .by, + .order_by, + .frame, + .index, + .complete = FALSE + ){ + + checkmate::assert_multi_class(x, c("data.frame", "tbl_lazy")) + + if (inherits(x, "data.frame")){ + # capture expressions ---------------------------------------------------- + ddd = rlang::enquos(...) + + # assertions -------------------------------------------------------------- + order_by_is_missing = missing(.order_by) + by_is_missing = missing(.by) + frame_is_missing = missing(.frame) + index_is_missing = missing(.index) + + if (!by_is_missing) { + by = rlang::enexpr(.by) + + if (rlang::is_call(by)){ + # case: starts with 'c' + first_thing = by[[1]] + if (! (rlang::as_string(first_thing) == "c")) { + stop("expression to .by is not parsable") + } - if (!by_is_missing) { - by = rlang::enexpr(.by) + by_str = lapply(by, identity) %>% + tail(-1) %>% + vapply(rlang::as_string, character(1)) - if (rlang::is_call(by)){ - # case: starts with 'c' - first_thing = by[[1]] - if (! (rlang::as_string(first_thing) == "c")) { - stop("expression to .by is not parsable") + } else { + # case: direct columns + by_str = rlang::as_string(by) } + } - by_str = lapply(by, identity) %>% - tail(-1) %>% - vapply(rlang::as_string, character(1)) - - } else { - # case: direct columns - by_str = rlang::as_string(by) + if (!frame_is_missing) { + checkmate::assert(length(.frame) == 2) + checkmate::assert(inherits(.frame, c("numeric", "Period"))) + checkmate::assert_true(all(class(.frame[[1]]) == class(.frame[[2]]))) + if (!index_is_missing) { + .index = rlang::as_string(rlang::enexpr(.index)) + } else { + .index = NULL + } } - } - if (!frame_is_missing) { - checkmate::assert(length(.frame) == 2) - checkmate::assert(inherits(.frame, c("numeric", "Period"))) - checkmate::assert_true(all(class(.frame[[1]]) == class(.frame[[2]]))) - if (!index_is_missing) { - .index = rlang::as_string(rlang::enexpr(.index)) - } else { - .index = NULL + # `.complete` is TRUE or FALSE (same as NULL) + checkmate::check_flag(.complete, null.ok = TRUE) + if (is.null(.complete)){ + .complete = FALSE } - } - # order before mutate -------------------------------------------------------- - if (!order_by_is_missing){ - order_by = rlang::enexpr(.order_by) + # order before mutate ---------------------------------------------------- + if (!order_by_is_missing){ + order_by = rlang::enexpr(.order_by) - if (rlang::is_call(order_by)){ - # case: starts with 'c' or 'desc' - first_thing = order_by[[1]] - if(! (rlang::as_string(first_thing) %in% c("c", "desc"))) { - stop("expression to arrange is not parsable") - } + if (rlang::is_call(order_by)){ + # case: starts with 'c' or 'desc' + first_thing = order_by[[1]] + if(! (rlang::as_string(first_thing) %in% c("c", "desc"))) { + stop("expression to arrange is not parsable") + } - if (first_thing == "c"){ - x_copy = x %>% - dplyr::mutate(rn_ = dplyr::row_number()) %>% - dplyr::arrange(!!!tail(lapply(order_by, identity), -1)) + if (first_thing == "c"){ + res = x %>% + dplyr::mutate(rn_ = dplyr::row_number()) %>% + dplyr::arrange(!!!tail(lapply(order_by, identity), -1)) + } else { + # proto: desc(Sepal.Length) + res = x %>% + dplyr::mutate(rn_ = dplyr::row_number()) %>% + dplyr::arrange(!!order_by) + } } else { - # proto: desc(Sepal.Length) - x_copy = x %>% + # case: direct columns + res = x %>% dplyr::mutate(rn_ = dplyr::row_number()) %>% dplyr::arrange(!!order_by) } - } else { - # case: direct columns - x_copy = x %>% - dplyr::mutate(rn_ = dplyr::row_number()) %>% - dplyr::arrange(!!order_by) - } - row_order = x_copy[["rn_"]] - x_copy[["rn_"]] = NULL + row_order = res[["rn_"]] + res[["rn_"]] = NULL - } else { - # order is missing - x_copy = x - } + } else { + # order is null + res = x + } - # mutate core operation ------------------------------------------------------ - # for cran checks - data__ = NULL - slide_output__ = NULL + # mutate core operation -------------------------------------------------- + # for cran checks + data__ = NULL + slide_output__ = NULL - if (by_is_missing){ - # without groups ---- - if (frame_is_missing){ - # simple mutate without slide - x_copy = dplyr::mutate(x_copy, !!!ddd) + if (by_is_missing){ + # without groups ---- + if (frame_is_missing){ + # simple mutate without slide + res = dplyr::mutate(res, !!!ddd) + } else { + # without groups and with slide + if (index_is_missing){ + res = res %>% + dplyr::mutate(slide_output__ = + slider::slide( + res, + .f = ~ as.list(dplyr::summarise(.x, !!!ddd)), + .before = .frame[1], + .after = .frame[2], + .complete = .complete + ) + ) %>% + remove_common_nested_columns(slide_output__) %>% + tidyr::unnest_wider(slide_output__) + } else { + res = res %>% + dplyr::mutate(slide_output__ = + slider::slide_index( + res, + .f = ~ as.list(dplyr::summarise(.x, !!!ddd)), + .i = res[[.index]], + .before = .frame[1], + .after = .frame[2], + .complete = .complete + ) + ) %>% + remove_common_nested_columns(slide_output__) %>% + tidyr::unnest_wider(slide_output__) + } + } } else { - # without groups and with slide - if (index_is_missing){ - x_copy = x_copy %>% - dplyr::mutate(slide_output__ = - slider::slide( - x_copy, - .f = ~ as.list(dplyr::summarise(.x, !!!ddd)), - .before = .frame[1], - .after = .frame[2], - .complete = .complete - ) - ) %>% - remove_common_nested_columns(slide_output__) %>% - tidyr::unnest_wider(slide_output__) + # with groups ---- + if (frame_is_missing){ + # groupby mutate + res = res %>% + dplyr::group_by(dplyr::pick({{.by}})) %>% + dplyr::mutate(!!!ddd) %>% + dplyr::ungroup() + } else { - x_copy = x_copy %>% - dplyr::mutate(slide_output__ = - slider::slide_index( - x_copy, - .f = ~ as.list(dplyr::summarise(.x, !!!ddd)), - .i = x_copy[[.index]], - .before = .frame[1], - .after = .frame[2], - .complete = .complete + # with groups and with slide + fun_per_chunk = function(chunk, ...){ + if (index_is_missing) { + out = chunk %>% + dplyr::mutate(slide_output__ = + slider::slide( + chunk, + .f = ~ as.list(dplyr::summarise(.x, !!!ddd)), + .before = .frame[1], + .after = .frame[2], + .complete = .complete + ) ) - ) %>% + } else { + out = chunk %>% + dplyr::mutate(slide_output__ = + slider::slide_index( + chunk, + .f = ~ as.list(dplyr::summarise(.x, !!!ddd)), + .i = chunk[[.index]], + .before = .frame[1], + .after = .frame[2], + .complete = .complete + ) + ) + } + + # remove groupby columns (if they exist) + for (acol in by_str){ + out[[acol]] = NULL + } + return(out) + } + + res = res %>% + tidyr::nest(data__ = dplyr::everything(), + .by = {{.by}} + ) %>% + dplyr::ungroup() %>% + dplyr::mutate(data__ = furrr::future_map(data__, fun_per_chunk)) %>% + tidyr::unnest(data__) %>% remove_common_nested_columns(slide_output__) %>% tidyr::unnest_wider(slide_output__) + } } } else { - # with groups ---- - if (frame_is_missing){ - # groupby mutate - x_copy = x_copy %>% - dplyr::group_by(dplyr::pick({{.by}})) %>% - dplyr::mutate(!!!ddd) %>% - dplyr::ungroup() + # capture expressions ----------------------------------------------------- + ddd = rlang::enquos(...) + + # assertions -------------------------------------------------------------- + if (!missing(.index)) { + stop(paste0("When input is `tbl_lazy`,", + " `.index` argument is not supported.", + " `.index` should missing" + ) + ) + } - } else { - # with groups and with slide - fun_per_chunk = function(chunk, ...){ - if (index_is_missing) { - out = chunk %>% - dplyr::mutate(slide_output__ = - slider::slide( - chunk, - .f = ~ as.list(dplyr::summarise(.x, !!!ddd)), - .before = .frame[1], - .after = .frame[2], - .complete = .complete - ) - ) - } else { - out = chunk %>% - dplyr::mutate(slide_output__ = - slider::slide_index( - chunk, - .f = ~ as.list(dplyr::summarise(.x, !!!ddd)), - .i = chunk[[.index]], - .before = .frame[1], - .after = .frame[2], - .complete = .complete - ) - ) - } + if (!missing(.complete)) { + stop(paste0("When input is `tbl_lazy`,", + " `.complete` argument is not supported.", + " `.complete` should be missing" + ) + ) + } - # remove groupby columns (if they exist) - for (acol in by_str){ - out[[acol]] = NULL + order_by_is_missing = missing(.order_by) + by_is_missing = missing(.by) + frame_is_missing = missing(.frame) + + # declare res ------------------------------------------------------------- + res = x + + # group by before mutate ------------------------------------------------- + if (!by_is_missing) { + res = dplyr::group_by(res, dplyr::pick({{.by}})) + } + + # apply frame before mutate ----------------------------------------------- + if (!frame_is_missing) { + checkmate::assert(length(.frame) == 2) + checkmate::assert_numeric(.frame) + + res = dbplyr::window_frame(res, from = -.frame[1], to = .frame[2]) + } + + # order before mutate ----------------------------------------------------- + if (!order_by_is_missing){ + order_by = rlang::enexpr(.order_by) + + if (rlang::is_call(order_by)){ + # case: starts with 'c' or 'desc' + first_thing = order_by[[1]] + if(! (rlang::as_string(first_thing) %in% c("c", "desc"))) { + stop("expression to arrange is not parsable") } - return(out) - } - x_copy = x_copy %>% - tidyr::nest(data__ = dplyr::everything(), - .by = {{.by}} - ) %>% - dplyr::ungroup() %>% - dplyr::mutate(data__ = furrr::future_map(data__, fun_per_chunk)) %>% - tidyr::unnest(data__) %>% - remove_common_nested_columns(slide_output__) %>% - tidyr::unnest_wider(slide_output__) + if (first_thing == "c"){ + res = dbplyr::window_order(res, + !!!tail(lapply(order_by, identity), -1) + ) + } else { + # proto: desc(Sepal.Length) + res = dbplyr::window_order(res, !!order_by) + } + } else { # not call + # case: direct columns + res = dbplyr::window_order(res, !!order_by) + } } - } - # reorder the output before return ------------------------------------------- - if (!order_by_is_missing){ - x_copy = x_copy[row_order, ] + # core mutate operation --------------------------------------------------- + res = dplyr::mutate(res, !!!ddd) + res = dplyr::ungroup(res) } - # return --------------------------------------------------------------------- - return(x_copy) + + return(res) } # remove_common_nested_columns ---- @@ -569,3 +764,5 @@ remove_common_nested_columns = function(df, list_column){ return(df) } + + diff --git a/README.Rmd b/README.Rmd index 43b05bf..528ab5c 100644 --- a/README.Rmd +++ b/README.Rmd @@ -24,7 +24,7 @@ devtools::load_all() -`tidier` package provides '[Apache Spark](https://spark.apache.org/)' style window aggregation for R dataframes via '[mutate](https://dplyr.tidyverse.org/reference/mutate.html)' in '[dplyr](https://dplyr.tidyverse.org/index.html)' flavour. +`tidier` package provides '[Apache Spark](https://spark.apache.org/)' style window aggregation for R dataframes and remote `dbplyr` tbls via '[mutate](https://dplyr.tidyverse.org/reference/mutate.html)' in '[dplyr](https://dplyr.tidyverse.org/index.html)' flavour. ## Example @@ -54,9 +54,9 @@ airquality |> - `.by` (group by), - `.order_by` (order by), - `.frame` (endpoints of window frame), - - `.index` (identify index column like date column), - - `.complete` (whether to compute over incomplete window). -- `mutate` automatically uses a future backend (via [`furrr`](https://furrr.futureverse.org/)). + - `.index` (identify index column like date column, in df version only), + - `.complete` (whether to compute over incomplete window, in df version only). +- `mutate` automatically uses a future backend (via [`furrr`](https://furrr.futureverse.org/), in df version only). ## Motivation @@ -64,7 +64,7 @@ This implementation is inspired by Apache Spark's [`windowSpec`](https://spark.a ## Ecosystem -1. [`dbplyr`](https://dbplyr.tidyverse.org/) implements this via [`dbplyr::win_over`](https://dbplyr.tidyverse.org/reference/win_over.html?q=win_over#null) enabling [`sparklyr`](https://spark.rstudio.com/) users to write window computations. Also see, [`dbplyr::window_order`/`dbplyr::window_frame`](https://dbplyr.tidyverse.org/reference/window_order.html?q=window_fr#ref-usage). +1. [`dbplyr`](https://dbplyr.tidyverse.org/) implements this via [`dbplyr::win_over`](https://dbplyr.tidyverse.org/reference/win_over.html?q=win_over#null) enabling [`sparklyr`](https://spark.rstudio.com/) users to write window computations. Also see, [`dbplyr::window_order`/`dbplyr::window_frame`](https://dbplyr.tidyverse.org/reference/window_order.html?q=window_fr#ref-usage). `tidier`'s `mutate` wraps this functionality via uniform syntax across dataframes and remote tbls. 2. [`tidypyspark`](https://talegari.github.io/tidypyspark/_build/html/index.html) python package implements `mutate` style window computation API for pyspark. @@ -75,7 +75,7 @@ This implementation is inspired by Apache Spark's [`windowSpec`](https://spark.a ## Acknowledgements -`tidier` package is deeply indebted to two amazing packages and people behind it. +`tidier` package is deeply indebted to three amazing packages and people behind it. 1. [`dplyr`](https://cran.r-project.org/package=dplyr): @@ -95,3 +95,13 @@ Grammar of Data Manipulation_. R package version 1.1.0, Vaughan D (2021). _slider: Sliding Window Functions_. R package version 0.2.2, . ``` + +3. [`dbplyr`](https://cran.r-project.org/package=dbplyr): + + + +``` +Wickham H, Girlich M, Ruiz E (2023). _dbplyr: A 'dplyr' Back End + for Databases_. R package version 2.3.2, + . +``` diff --git a/README.md b/README.md index f860a3e..b1513b2 100644 --- a/README.md +++ b/README.md @@ -8,12 +8,12 @@ [![CRAN status](https://www.r-pkg.org/badges/version/tidier)](https://CRAN.R-project.org/package=tidier) [![R-CMD-check](https://github.com/talegari/tidier/actions/workflows/R-CMD-check.yaml/badge.svg)](https://github.com/talegari/tidier/actions/workflows/R-CMD-check.yaml) -[![](https://img.shields.io/badge/devel%20version-0.1.0-blue.svg)](https://github.com/talegari/tidier) +[![](https://img.shields.io/badge/devel%20version-0.2.0-blue.svg)](https://github.com/talegari/tidier) `tidier` package provides ‘[Apache Spark](https://spark.apache.org/)’ -style window aggregation for R dataframes via +style window aggregation for R dataframes and remote `dbplyr` tbls via ‘[mutate](https://dplyr.tidyverse.org/reference/mutate.html)’ in ‘[dplyr](https://dplyr.tidyverse.org/index.html)’ flavour. @@ -41,16 +41,16 @@ airquality |> #> # A tibble: 122 × 8 #> Month Ozone Solar.R Wind Temp Day date_col avg_temp_over_last_week #> -#> 1 6 NA 332 13.8 80 14 1973-06-14 87.2 -#> 2 5 28 NA 14.9 66 6 1973-05-06 66 -#> 3 5 6 78 18.4 57 18 1973-05-18 65.2 -#> 4 8 45 212 9.7 79 24 1973-08-24 76.5 -#> 5 5 36 118 8 72 2 1973-05-02 NaN -#> 6 9 24 238 10.3 68 19 1973-09-19 73 -#> 7 9 16 201 8 82 20 1973-09-20 71.7 -#> 8 6 NA 186 9.2 84 4 1973-06-04 72.5 -#> 9 8 78 NA 6.9 86 4 1973-08-04 81.3 -#> 10 8 168 238 3.4 81 25 1973-08-25 76.5 +#> 1 6 NA 286 8.6 78 1 1973-06-01 NaN +#> 2 6 NA 242 16.1 67 3 1973-06-03 78 +#> 3 6 NA 186 9.2 84 4 1973-06-04 72.5 +#> 4 6 NA 264 14.3 79 6 1973-06-06 76.3 +#> 5 6 29 127 9.7 82 7 1973-06-07 77 +#> 6 6 NA 273 6.9 87 8 1973-06-08 78 +#> 7 6 NA 259 10.9 93 11 1973-06-11 83 +#> 8 6 NA 250 9.2 92 12 1973-06-12 85.2 +#> 9 6 23 148 8 82 13 1973-06-13 86.6 +#> 10 6 NA 332 13.8 80 14 1973-06-14 87.2 #> # ℹ 112 more rows ``` @@ -60,10 +60,12 @@ airquality |> - `.by` (group by), - `.order_by` (order by), - `.frame` (endpoints of window frame), - - `.index` (identify index column like date column), - - `.complete` (whether to compute over incomplete window). + - `.index` (identify index column like date column, in df version + only), + - `.complete` (whether to compute over incomplete window, in df + version only). - `mutate` automatically uses a future backend (via - [`furrr`](https://furrr.futureverse.org/)). + [`furrr`](https://furrr.futureverse.org/), in df version only). ## Motivation @@ -81,6 +83,8 @@ and enabling [`sparklyr`](https://spark.rstudio.com/) users to write window computations. Also see, [`dbplyr::window_order`/`dbplyr::window_frame`](https://dbplyr.tidyverse.org/reference/window_order.html?q=window_fr#ref-usage). + `tidier`’s `mutate` wraps this functionality via uniform syntax + across dataframes and remote tbls. 2. [`tidypyspark`](https://talegari.github.io/tidypyspark/_build/html/index.html) python package implements `mutate` style window computation API for @@ -93,7 +97,7 @@ and ## Acknowledgements -`tidier` package is deeply indebted to two amazing packages and people +`tidier` package is deeply indebted to three amazing packages and people behind it. 1. [`dplyr`](https://cran.r-project.org/package=dplyr): @@ -110,3 +114,11 @@ behind it. Vaughan D (2021). _slider: Sliding Window Functions_. R package version 0.2.2, . + +3. [`dbplyr`](https://cran.r-project.org/package=dbplyr): + + + + Wickham H, Girlich M, Ruiz E (2023). _dbplyr: A 'dplyr' Back End + for Databases_. R package version 2.3.2, + . diff --git a/docs/404.html b/docs/404.html index c52ada2..c60144a 100644 --- a/docs/404.html +++ b/docs/404.html @@ -24,7 +24,7 @@ tidier - 0.1.0 + 0.2.0