Skip to content

Commit

Permalink
feat: custom SQL statement for todbDense conversion
Browse files Browse the repository at this point in the history
---
- AFAIK dbplyr does not have a convenient way to write temporary VIEWs with `compute`.
- Here we use a CTE to take the SQL generated from an upstream lazy dbMatrix object and construct a TEMPORARY VIEW from it. This allows for cleaner SQL code in the `todbDense` conversion as the lazy table will be named 'dbDenseMatrix_randomid'
  • Loading branch information
Ed2uiz committed Sep 18, 2024
1 parent beab704 commit e56e052
Show file tree
Hide file tree
Showing 5 changed files with 237 additions and 51 deletions.
8 changes: 7 additions & 1 deletion NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,19 @@ exportMethods("dimnames<-")
exportMethods("rownames<-")
exportMethods(Arith)
exportMethods(Ops)
exportMethods(as.matrix)
exportMethods(castNumeric)
exportMethods(colMeans)
exportMethods(colSds)
exportMethods(colSums)
exportMethods(colTypes)
exportMethods(colnames)
exportMethods(compute)
exportMethods(dbList)
exportMethods(dbReconnect)
exportMethods(dim)
exportMethods(dimnames)
exportMethods(head)
exportMethods(load)
exportMethods(log)
exportMethods(mean)
exportMethods(ncol)
Expand All @@ -31,6 +35,7 @@ exportMethods(rowMeans)
exportMethods(rowSds)
exportMethods(rowSums)
exportMethods(rownames)
exportMethods(sum)
exportMethods(t)
exportMethods(tail)
importFrom(MatrixGenerics,colMeans)
Expand All @@ -40,3 +45,4 @@ importFrom(MatrixGenerics,rowMeans)
importFrom(MatrixGenerics,rowSds)
importFrom(MatrixGenerics,rowSums)
importFrom(data.table,":=")
importFrom(dplyr,compute)
72 changes: 36 additions & 36 deletions R/dbMatrix.R
Original file line number Diff line number Diff line change
Expand Up @@ -552,20 +552,28 @@ toDbDense <- function(db_sparse){
)
}

key <- precomp |>
dplyr::filter(i <= n_rows, j <= n_cols) |> # filter out rows and cols that are not in db_sparse
dplyr::mutate(x = 0)

data <- key |>
dplyr::left_join(db_sparse[], by = c("i", "j"), suffix = c("", ".dgc")) |>
dplyr::mutate(x = ifelse(is.na(x.dgc), x, x.dgc)) |>
dplyr::select(-x.dgc)
db_sparse_sql <- dbplyr::sql_render(db_sparse[])
precomp_name <- dbplyr::remote_name(precomp)
db_dense_name <- unique_table_name(prefix = "dbDenseMatrix")
sql <- glue::glue("
CREATE TEMPORARY VIEW '{db_dense_name}' AS
WITH {remote_name} AS ({db_sparse_sql})
SELECT
p.i,
p.j,
COALESCE(main.{remote_name}.x, 0.0) AS x
FROM '{precomp_name}' p
LEFT JOIN main.{remote_name} ON p.i = main.{remote_name}.i AND p.j = main.{remote_name}.j
WHERE p.i <= {n_rows} AND p.j <= {n_cols}
")
DBI::dbExecute(con, sql)
data <- dplyr::tbl(con, db_dense_name)

# Create new dbMatrix object
db_dense <- new(
Class = "dbDenseMatrix",
value = data,
name = remote_name,
name = NA_character_,
dims = dims,
dim_names = dim_names,
init = TRUE
Expand Down Expand Up @@ -640,21 +648,24 @@ to_ijx_disk <- function(con, name){
# return(res)
}

#' as_matrix
## as.matrix ####
#' Convert dbMatrix to in-memory matrix
#'
#' @param x A dbMatrix object (dbSparseMatrix or dbDenseMatrix)
#' @param ... Additional arguments (not used)
#'
#' @param x dbSparseMatrix
#' @param output "matrix"
#' @description
#' Set output to matrix to cast dbSparseMatrix into matrix
#' Converts a dbMatrix object into an in-memory matrix or sparse matrix.
#'
#' @details
#' this is a helper function to convert dbMatrix to dgCMatrix or matrix
#' Warning: can cause memory issues if the input matrix is large
#' This method converts a dbMatrix object (dbSparseMatrix or dbDenseMatrix) to an in-memory
#' Matrix::dgCMatrix (for sparse) or base R matrix (for dense).
#' Warning: This can cause memory issues if the input matrix is large.
#'
#'
#' @return dgCMatrix or matrix
#' @noRd
as_matrix <- function(x, output){
#' @return A Matrix::dgCMatrix or base R matrix
#' @export
#' @concept transform
setMethod("as.matrix", "dbMatrix", function(x, ...) {
con <- dbplyr::remote_con(x[])
.check_con(con)
dims <- dim(x)
Expand All @@ -668,7 +679,7 @@ as_matrix <- function(x, output){
"Warning: Converting large dbMatrix to in-memory Matrix.")
}

if(class(x) == "dbSparseMatrix"){
if(is(x, "dbSparseMatrix")){
max_i <- x[] |> dplyr::summarise(max_i = max(i)) |> dplyr::pull(max_i)
max_j <- x[] |> dplyr::summarise(max_j = max(j)) |> dplyr::pull(max_j)

Expand Down Expand Up @@ -706,11 +717,8 @@ as_matrix <- function(x, output){
dimnames(mat) = dim_names
dim(mat) = dims
unlink(temp_file, recursive = TRUE, force = TRUE)
if(!missing(output) && output == "matrix"){
mat <- as.matrix(mat)
}
}
else if(class(x) == "dbDenseMatrix"){
else if(is(x, "dbDenseMatrix")){
# Create a temporary file to store the matrix
temp_file <- tempfile(tmpdir = getwd(), fileext = ".parquet")

Expand All @@ -721,25 +729,17 @@ as_matrix <- function(x, output){

dt <- arrow::read_parquet(temp_file)

# Create a sparse matrix
mat <- Matrix::sparseMatrix(
i = dt$i,
j = dt$j,
x = dt$x,
index1 = TRUE
)
# Create a dense matrix
mat <- matrix(0, nrow = n_rows, ncol = n_cols)
mat[cbind(dt$i, dt$j)] <- dt$x
dimnames(mat) = dim_names
dim(mat) = dims

# Convert sparse matrix to dense matrix in-memory
mat <- as.matrix(mat)

# Clean up temp files
unlink(temp_file, recursive = TRUE, force = TRUE)
}

return(mat)
}
})

#' @title as_ijx
#' @param x dgCMatrix or matrix
Expand Down
18 changes: 7 additions & 11 deletions R/generics.R
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# dbData object interactions ####
setGeneric('colTypes', function(x, ...) standardGeneric('colTypes'))
setGeneric('castNumeric', function(x, col, ...) standardGeneric('castNumeric'))

#' @importFrom MatrixGenerics colMeans colSums rowMeans rowSums colSds rowSds
#' @importFrom dplyr compute
NULL

.onLoad <- function(libname, pkgname) {
Expand All @@ -14,18 +14,14 @@ NULL
if (!isGeneric("ncol")) methods::setGeneric("ncol")
}

# dbMatrix specific ####
# setGeneric('colSds', function(x, ...) standardGeneric('colSds'))
# setGeneric('colMeans', function(x, ...) standardGeneric('colMeans'))
# setGeneric('colSums', function(x, ...) standardGeneric('colSums'))
# setGeneric('rowSds', function(x, ...) standardGeneric('rowSds'))
# setGeneric('rowMeans', function(x, ...) standardGeneric('rowMeans'))
# setGeneric('rowSums', function(x, ...) standardGeneric('rowSums'))
# dbMatrix ####
setGeneric('load', function(conn, name, class, ...) standardGeneric('load'))

# dbData ops ####
# setGeneric('t', function(x, ...) standardGeneric('t'))
# setGeneric('mean', function(x, ...) standardGeneric('mean'))
# dbData ####
setGeneric("dbReconnect", function(x, ...) standardGeneric("dbReconnect"))
setGeneric("dbList", function(conn, ...) standardGeneric("dbList"))

# DBI ####
# setGeneric('dbDisconnect', function(x, ...) standardGeneric('dbDisconnect'))
# setGeneric('dbListTables', function(x, ...) standardGeneric('dbListTables'))

3 changes: 0 additions & 3 deletions R/precompute.R
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,6 @@
#' con = DBI::dbConnect(duckdb::duckdb(), ":memory:")
#' precompute(con = con , m = 100, n = 100, name = "precomputed_table")
precompute <- function(conn, m, n, name, verbose = TRUE){
# create a random sufix with precomp_ as suffix if name is NULL using
# dbplyr internal random name generator

# input validation
.check_con(conn = conn)
.check_name(name = name)
Expand Down
Loading

0 comments on commit e56e052

Please sign in to comment.