Skip to content

Commit

Permalink
Update pkgcache, so it does not import
Browse files Browse the repository at this point in the history
  • Loading branch information
gaborcsardi committed Nov 21, 2023
1 parent 9db3122 commit 7d786ab
Show file tree
Hide file tree
Showing 9 changed files with 77 additions and 133 deletions.
2 changes: 1 addition & 1 deletion src/library/pkgcache/DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ Language: en-US
Roxygen: list(markdown = TRUE, r6 = FALSE)
RoxygenNote: 7.2.3
NeedsCompilation: yes
Packaged: 2023-11-21 10:49:19 UTC; gaborcsardi
Packaged: 2023-11-21 14:59:49 UTC; gaborcsardi
Author: Gábor Csárdi [aut, cre],
Posit Software, PBC [cph, fnd]
Maintainer: Gábor Csárdi <[email protected]>
28 changes: 0 additions & 28 deletions src/library/pkgcache/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -48,34 +48,6 @@ export(repo_status)
export(with_repo)
if (getRversion() >= "4.0.0") importFrom(tools, R_user_dir)
importFrom(R6,R6Class)
importFrom(callr,r_process)
importFrom(callr,r_process_options)
importFrom(callr,r_session)
importFrom(callr,rcmd_safe_env)
importFrom(cli,cli_alert_info)
importFrom(cli,cli_process_done)
importFrom(cli,cli_process_start)
importFrom(cli,cli_status)
importFrom(cli,cli_status_clear)
importFrom(cli,cli_status_update)
importFrom(cli,get_spinner)
importFrom(cli,hash_obj_md5)
importFrom(curl,handle_data)
importFrom(curl,handle_setheaders)
importFrom(curl,handle_setopt)
importFrom(curl,multi_add)
importFrom(curl,multi_cancel)
importFrom(curl,multi_fdset)
importFrom(curl,multi_list)
importFrom(curl,multi_run)
importFrom(curl,multi_set)
importFrom(curl,new_handle)
importFrom(curl,new_pool)
importFrom(curl,parse_headers_list)
importFrom(filelock,lock)
importFrom(filelock,unlock)
importFrom(processx,conn_get_fileno)
importFrom(processx,process)
importFrom(tools,file_ext)
importFrom(utils,URLencode)
importFrom(utils,getSrcDirectory)
Expand Down
80 changes: 31 additions & 49 deletions src/library/pkgcache/R/aaa-async.R
Original file line number Diff line number Diff line change
Expand Up @@ -1757,8 +1757,6 @@ el_init <- function(self, private) {
invisible(self)
}

#' @importFrom curl multi_add parse_headers_list handle_data

el_add_http <- function(self, private, handle, callback, progress, file,
data) {
self; private; handle; callback; progress; outfile <- file; data
Expand All @@ -1770,7 +1768,7 @@ el_add_http <- function(self, private, handle, callback, progress, file,

content <- NULL

multi_add(
curl::multi_add(
handle = handle,
pool = private$pool,
done = function(response) {
Expand Down Expand Up @@ -1852,13 +1850,11 @@ el_add_next_tick <- function(self, private, func, callback, data) {
private$next_ticks <- c(private$next_ticks, id)
}

#' @importFrom curl multi_cancel

el_cancel <- function(self, private, id) {
private$next_ticks <- setdiff(private$next_ticks, id)
private$timers <- private$timers[setdiff(names(private$timers), id)]
if (id %in% names(private$tasks) && private$tasks[[id]]$type == "http") {
multi_cancel(private$tasks[[id]]$data$handle)
curl::multi_cancel(private$tasks[[id]]$data$handle)
} else if (id %in% names(private$tasks) &&
private$tasks[[id]]$type %in% c("process", "r-process")) {
private$tasks[[id]]$data$process$kill()
Expand All @@ -1870,11 +1866,9 @@ el_cancel <- function(self, private, id) {
invisible(self)
}

#' @importFrom curl multi_cancel multi_list

el_cancel_all <- function(self, private) {
http <- multi_list(pool = private$pool)
lapply(http, multi_cancel)
http <- curl::multi_list(pool = private$pool)
lapply(http, curl::multi_cancel)
private$next_ticks <- character()
private$timers <- Sys.time()[numeric()]

Expand Down Expand Up @@ -1969,8 +1963,6 @@ el__run_pending <- function(self, private) {
length(next_ticks) > 0 || finished_pool
}

#' @importFrom curl multi_run multi_fdset

el__io_poll <- function(self, private, timeout) {

types <- vcapply(private$tasks, "[[", "type")
Expand Down Expand Up @@ -2026,7 +2018,7 @@ el__io_poll <- function(self, private, timeout) {
}

if (!is.null(private$curl_timer) && private$curl_timer <= private$time) {
multi_run(timeout = 0L, poll = TRUE, pool = private$pool)
curl::multi_run(timeout = 0L, poll = TRUE, pool = private$pool)
private$curl_timer <- NULL
}

Expand All @@ -2038,7 +2030,7 @@ el__io_poll <- function(self, private, timeout) {
## Any HTTP?
if (private$curl_poll &&
pollables$ready[match("curl", pollables$type)] == "event") {
multi_run(timeout = 0L, poll = TRUE, pool = private$pool)
curl::multi_run(timeout = 0L, poll = TRUE, pool = private$pool)
}

## Any processes
Expand Down Expand Up @@ -2111,8 +2103,6 @@ el__create_task <- function(self, private, callback, data, ..., id, type) {
id
}

#' @importFrom curl new_pool

el__ensure_pool <- function(self, private) {
getopt <- function(nm) {
anm <- paste0("async_http_", nm)
Expand All @@ -2126,22 +2116,20 @@ el__ensure_pool <- function(self, private) {
host_con = getopt("host_con") %||% 6,
multiplex = getopt("multiplex") %||% TRUE
)
private$pool <- new_pool(
private$pool <- curl::new_pool(
total_con = private$http_opts$total_con,
host_con = private$http_opts$host_con,
multiplex = private$http_opts$multiplex
)
}
}

#' @importFrom curl multi_set

el_http_setopt <- function(self, private, total_con, host_con, multiplex) {
private$ensure_pool()
if (!is.null(total_con)) private$http_opts$total_con <- total_con
if (!is.null(host_con)) private$http_opts$host_con <- host_con
if (!is.null(multiplex)) private$http_opts$multiplex <- multiplex
multi_set(
curl::multi_set(
pool = private$pool,
total_con = private$http_opts$total_con,
host_con = private$http_opts$host_con,
Expand Down Expand Up @@ -2195,10 +2183,8 @@ el__update_time <- function(self, private) {
private$time <- Sys.time()
}

#' @importFrom curl multi_fdset
#'
el__update_curl_data <- function(self, private) {
private$curl_fdset <- multi_fdset(private$pool)
private$curl_fdset <- curl::multi_fdset(private$pool)
num_fds <- length(unique(unlist(private$curl_fdset[1:3])))
private$curl_poll <- num_fds > 0
private$curl_timer <- if ((t <- private$curl_fdset$timeout) != -1) {
Expand Down Expand Up @@ -2536,7 +2522,6 @@ async_reject <- mark_as_async(async_reject)
#'
#' @family asyncronous HTTP calls
#' @noRd
#' @importFrom curl new_handle handle_setheaders
#' @examples
#' \donttest{
#' afun <- async(function() {
Expand All @@ -2555,8 +2540,8 @@ http_get <- function(url, headers = character(), file = NULL,
make_deferred_http(
function() {
assert_that(is_string(url))
handle <- new_handle(url = url)
handle_setheaders(handle, .list = headers)
handle <- curl::new_handle(url = url)
curl::handle_setheaders(handle, .list = headers)

if (!is.null(on_progress)) {
options$noprogress <- FALSE
Expand All @@ -2575,7 +2560,7 @@ http_get <- function(url, headers = character(), file = NULL,
reg.finalizer(handle, function(...) fun, onexit = TRUE)
}

handle_setopt(handle, .list = options)
curl::handle_setopt(handle, .list = options)
list(handle = handle, options = options)
},
file
Expand All @@ -2591,7 +2576,6 @@ http_get <- mark_as_async(http_get)
#'
#' @family asyncronous HTTP calls
#' @noRd
#' @importFrom curl handle_setopt
#' @examples
#' \donttest{
#' afun <- async(function() {
Expand All @@ -2618,9 +2602,9 @@ http_head <- function(url, headers = character(), file = NULL,
make_deferred_http(
function() {
assert_that(is_string(url))
handle <- new_handle(url = url)
handle_setheaders(handle, .list = headers)
handle_setopt(handle, customrequest = "HEAD", nobody = TRUE,
handle <- curl::new_handle(url = url)
curl::handle_setheaders(handle, .list = headers)
curl::handle_setopt(handle, customrequest = "HEAD", nobody = TRUE,
.list = options)
list(handle = handle, options = options)
},
Expand Down Expand Up @@ -2676,9 +2660,9 @@ http_post <- function(url, data = NULL, data_file = NULL,
make_deferred_http(
function() {
assert_that(is_string(url))
handle <- new_handle(url = url)
handle_setheaders(handle, .list = headers)
handle_setopt(handle, customrequest = "POST",
handle <- curl::new_handle(url = url)
curl::handle_setheaders(handle, .list = headers)
curl::handle_setopt(handle, customrequest = "POST",
postfieldsize = length(data), postfields = data,
.list = options)
list(handle = handle, options = options)
Expand All @@ -2696,9 +2680,9 @@ http_delete <- function(url, headers = character(), file = NULL,
make_deferred_http(
function() {
assert_that(is_string(url))
handle <- new_handle(url = url)
handle_setheaders(handle, .list = headers)
handle_setopt(handle, customrequest = "DELETE", .list = options)
handle <- curl::new_handle(url = url)
curl::handle_setheaders(handle, .list = headers)
curl::handle_setopt(handle, customrequest = "DELETE", .list = options)
list(handle = handle, options = options)
},
file
Expand Down Expand Up @@ -2985,7 +2969,10 @@ async_map_limit <- function(.x, .f, ..., .args = list(), .limit = Inf) {
## nocov start

.onLoad <- function(libname, pkgname) {
if (requireNamespace("debugme", quietly = TRUE)) debugme::debugme()
if (Sys.getenv("DEBUGME") != "" &&
requireNamespace("debugme", quietly = TRUE)) {
debugme::debugme()
}
}

## nocov end
Expand All @@ -3002,7 +2989,6 @@ async_map_limit <- function(.x, .f, ..., .args = list(), .limit = Inf) {
#'
#' @family asynchronous external processes
#' @noRd
#' @importFrom processx process
#' @examples
#' \dontrun{
#' afun <- function() {
Expand All @@ -3029,7 +3015,7 @@ run_process <- function(command = NULL, args = character(),
reject <- environment(resolve)$private$reject
stdout <- tempfile()
stderr <- tempfile()
px <- process$new(command, args = args,
px <- processx::process$new(command, args = args,
stdout = stdout, stderr = stderr, poll_connection = TRUE,
env = env, cleanup = TRUE, cleanup_tree = TRUE, wd = wd,
encoding = encoding, ...)
Expand All @@ -3055,7 +3041,6 @@ run_process <- mark_as_async(run_process)
#'
#' @inheritParams callr::r_bg
#' @noRd
#' @importFrom callr r_process_options r_process rcmd_safe_env
#'
#' @examples
#' \dontrun{
Expand All @@ -3068,7 +3053,7 @@ run_process <- mark_as_async(run_process)
run_r_process <- function(func, args = list(), libpath = .libPaths(),
repos = c(getOption("repos"), c(CRAN = "https://cloud.r-project.org")),
cmdargs = c("--no-site-file", "--slave", "--no-save", "--no-restore"),
system_profile = FALSE, user_profile = FALSE, env = rcmd_safe_env()) {
system_profile = FALSE, user_profile = FALSE, env = callr::rcmd_safe_env()) {

func; args; libpath; repos; cmdargs; system_profile; user_profile; env

Expand All @@ -3081,13 +3066,13 @@ run_r_process <- function(func, args = list(), libpath = .libPaths(),
reject <- environment(resolve)$private$reject
stdout <- tempfile()
stderr <- tempfile()
opts <- r_process_options(
opts <- callr::r_process_options(
func = func, args = args, libpath = libpath, repos = repos,
cmdargs = cmdargs, system_profile = system_profile,
user_profile = user_profile, env = env, stdout = stdout,
stderr = stderr, extra = list(cleanup_tree = TRUE))

rx <- r_process$new(opts)
rx <- callr::r_process$new(opts)
pipe <- rx$get_poll_connection()
id <<- get_default_event_loop()$add_r_process(
list(pipe),
Expand Down Expand Up @@ -4278,9 +4263,6 @@ wp_init <- function(self, private) {
invisible(self)
}

#' @importFrom callr r_session
#' @importFrom processx conn_get_fileno

wp_start_workers <- function(self, private) {
num <- worker_pool_size()

Expand All @@ -4289,8 +4271,8 @@ wp_start_workers <- function(self, private) {

## Yeah, start some more
to_start <- num - NROW(private$workers)
sess <- lapply(1:to_start, function(x) r_session$new(wait = FALSE))
fd <- viapply(sess, function(x) conn_get_fileno(x$get_poll_connection()))
sess <- lapply(1:to_start, function(x) callr::r_session$new(wait = FALSE))
fd <- viapply(sess, function(x) processx::conn_get_fileno(x$get_poll_connection()))
new_workers <- data.frame(
stringsAsFactors = FALSE,
session = I(sess),
Expand Down
12 changes: 6 additions & 6 deletions src/library/pkgcache/R/archive.R
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ cac_cleanup <- function(self, private, force) {
rep_etag <- paste0(rep_rds, "-etag")
unlink(c(rep_rds, rep_etag), recursive = TRUE, force = TRUE)
private$data <- NULL
cli_alert_info("Cleaning up archive cache in {.path {pri_rds}}.")
cli::cli_alert_info("Cleaning up archive cache in {.path {pri_rds}}.")
unlink(c(pri_rds, pri_etag, pri_lock), recursive = TRUE, force = TRUE)
invisible(self)
}
Expand Down Expand Up @@ -331,9 +331,9 @@ cac__load_primary <- function(self, private, max_age) {

pri_lock <- paste0(pri_file, "-lock")
mkdirp(dirname(pri_lock))
l <- lock(pri_lock, exclusive = FALSE, private$lock_timeout)
l <- filelock::lock(pri_lock, exclusive = FALSE, private$lock_timeout)
if (is.null(l)) stop("Cannot acquire lock to copy RDS")
on.exit(unlock(l), add = TRUE)
on.exit(filelock::unlock(l), add = TRUE)

if (!file.exists(pri_file)) stop("No primary RDS file in cache")
time <- file_get_time(pri_file)
Expand All @@ -345,7 +345,7 @@ cac__load_primary <- function(self, private, max_age) {
rep_etag <- paste0(rep_file, "-etag")
file_copy_with_time(pri_etag, rep_etag)

unlock(l)
filelock::unlock(l)

private$data <- readRDS(rep_file)
private$data_time <- time
Expand Down Expand Up @@ -422,9 +422,9 @@ cac__update_primary <- function(self, private, lock) {
if (lock) {
pri_lock <- paste0(pri_file, "-lock")
mkdirp(dirname(pri_lock))
l <- lock(pri_lock, exclusive = FALSE, private$lock_timeout)
l <- filelock::lock(pri_lock, exclusive = FALSE, private$lock_timeout)
if (is.null(l)) stop("Cannot acquire lock to copy RDS")
on.exit(unlock(l), add = TRUE)
on.exit(filelock::unlock(l), add = TRUE)
}

file_copy_with_time(rep_file, pri_file)
Expand Down
5 changes: 2 additions & 3 deletions src/library/pkgcache/R/async-http.R
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ update_async_timeouts <- function(options) {
#' * `etag_file`: The file the ETag was written to, or `NULL` otherwise
#'
#' @family async HTTP tools
#' @importFrom curl parse_headers_list
#' @noRd
#' @section Examples:
#' ```
Expand Down Expand Up @@ -122,7 +121,7 @@ download_file <- function(url, destfile, etag_file = NULL,
then(function(resp) {
"!DEBUG downloaded `url`"
file.rename(tmp_destfile, destfile)
etag <- parse_headers_list(resp$headers)[["etag"]] %||% NA_character_
etag <- curl::parse_headers_list(resp$headers)[["etag"]] %||% NA_character_
if (!is.null(etag_file) && !is.na(etag[1])) {
mkdirp(dirname(etag_file))
writeLines(etag, etag_file)
Expand Down Expand Up @@ -245,7 +244,7 @@ download_if_newer <- function(url, destfile, etag_file = NULL,
} else if (resp$status_code == 200 || resp$status_code == 0) {
"!DEBUG downloaded `url`"
file.rename(tmp_destfile, destfile)
etag <- parse_headers_list(resp$headers)[["etag"]] %||% NA_character_
etag <- curl::parse_headers_list(resp$headers)[["etag"]] %||% NA_character_
if (!is.null(etag_file) && !is.na(etag[1])) {
mkdirp(dirname(etag_file))
writeLines(etag, etag_file)
Expand Down
Loading

0 comments on commit 7d786ab

Please sign in to comment.