Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add function req_perform_promise (fixes #501) #505

Merged
merged 12 commits into from
Jul 16, 2024
2 changes: 2 additions & 0 deletions DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ Suggests:
jose,
jsonlite,
knitr,
later,
promises,
rmarkdown,
testthat (>= 3.1.8),
tibble,
Expand Down
1 change: 1 addition & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ export(req_options)
export(req_perform)
export(req_perform_iterative)
export(req_perform_parallel)
export(req_perform_promise)
export(req_perform_sequential)
export(req_perform_stream)
export(req_progress)
Expand Down
1 change: 1 addition & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# httr2 (development version)

* New function `req_perform_promise()` allows creating a `promises::promise` for a request that runs in the background (#501, @gergness).
* `req_body_file()` now only opens a connection when the request actually needs data. In particular, this makes it work better with `req_perform_parallel()` (#487).
* `req_cache()` no longer fails if the `rds` files are somehow corrupted and now defaults the `debug` argument to the `httr2_cache_debug` option to make it easier to debug caching buried in other people's code (#486).
* `req_oauth_password()` now only asks for your password once (#498).
Expand Down
1 change: 1 addition & 0 deletions R/httr2-package.R
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ the$cache_throttle <- list()
the$token_cache <- new_environment()
the$last_response <- NULL
the$last_request <- NULL
the$pool_pollers <- new_environment()
157 changes: 157 additions & 0 deletions R/req-promise.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
#' Perform request asynchronously using the promises package
#'
#' @description
#' This variation on [req_perform()] returns a [promises::promise()] object immediately
#' and then performs the request in the background, returning program control before the request
#' is finished. See the
#' [promises package documentation](https://rstudio.github.io/promises/articles/promises_01_motivation.html)
#' for more details on how to work with the resulting promise object.
#'
#' Like with [req_perform_parallel()], exercise caution when using this function;
#' it's easy to pummel a server with many simultaneous requests. Also, not all servers
#' can handle more than 1 request at a time, so the responses may still return
#' sequentially.
#'
#' `req_perform_promise()` also has similar limitations to the
#' [req_perform_parallel()] function, it:
#'
#' * Will not retrieve a new OAuth token if it expires after the promised request
#' is created but before it is actually requested.
#' * Does not perform throttling with [req_throttle()].
#' * Does not attempt retries as described by [req_retry()].
#' * Only consults the cache set by [req_cache()] when the request is promised.
#'
#' @inheritParams req_perform
#' @inheritParams req_perform_parallel
#'
#' @return a [promises::promise()] object which resolves to a [response] if
#' successful or rejects on the same errors thrown by [req_perform()].
#' @export
#'
#' @examplesIf requireNamespace("promises", quietly = TRUE)
#' library(promises)
#' request_base <- request(example_url()) |> req_url_path_append("delay")
#'
#' p <- request_base |> req_url_path_append(2) |> req_perform_promise()
#'
#' # A promise object, not particularly useful on its own
#' p
#'
#' # Use promise chaining functions to access results
#' p %...>%
#' resp_body_json() %...>%
#' print()
#'
#'
#' # Can run two requests at the same time
#' p1 <- request_base |> req_url_path_append(4) |> req_perform_promise()
#' p2 <- request_base |> req_url_path_append(1) |> req_perform_promise()
#'
#' p1 %...>%
#' resp_url_path %...>%
#' paste0(., " finished") %...>%
#' print()
#'
#' p2 %...>%
#' resp_url_path %...>%
#' paste0(., " finished") %...>%
#' print()
#'
#' # See the [promises package documentation](https://rstudio.github.io/promises/)
#' # for more information on working with promises
req_perform_promise <- function(req,
path = NULL,
pool = NULL) {
check_installed(c("promises", "later"))
check_string(path, allow_null = TRUE)

promises::promise(
function(resolve, reject) {
perf <- PerformancePromise$new(
req = req,
resolve = resolve,
reject = reject,
path = path,
error_call = environment()
)

perf$submit(pool)
}
)
}

PerformancePromise <- R6Class("PerformancePromise", inherit = Performance,
public = list(
resolve = NULL,
reject = NULL,

initialize = function(req, resolve, reject, path = NULL, error_call = NULL) {
progress <- create_progress_bar(config = FALSE)

super$initialize(req = req, path = path, progress = progress, error_call = error_call)
self$resolve <- resolve
self$reject <- reject
},

submit = function(pool = NULL) {
if (!is.null(self$resp)) {
# cached
self$resolve(self$resp)
return()
}
super$submit(pool)
ensure_pool_poller(pool, self$reject)

Check warning on line 103 in R/req-promise.R

View check run for this annotation

Codecov / codecov/patch

R/req-promise.R#L102-L103

Added lines #L102 - L103 were not covered by tests
},

succeed = function(res) {
tryCatch(
{
super$succeed(res)
self$resolve(self$resp)
},
httr2_fail = function(cnd) self$reject(cnd$error),
error = function(cnd) self$reject(cnd)
)

Check warning on line 114 in R/req-promise.R

View check run for this annotation

Codecov / codecov/patch

R/req-promise.R#L107-L114

Added lines #L107 - L114 were not covered by tests
},

fail = function(msg) {
tryCatch(
super$fail(msg),
httr2_fail = function(cnd) self$reject(cnd$error),
error = function(cnd) self$reject(cnd)
)

Check warning on line 122 in R/req-promise.R

View check run for this annotation

Codecov / codecov/patch

R/req-promise.R#L118-L122

Added lines #L118 - L122 were not covered by tests
}
))

ensure_pool_poller <- function(pool, reject) {
monitor <- pool_poller_monitor(pool)
if (monitor$already_going()) return()

Check warning on line 128 in R/req-promise.R

View check run for this annotation

Codecov / codecov/patch

R/req-promise.R#L127-L128

Added lines #L127 - L128 were not covered by tests

poll_pool <- function() {
tryCatch(
{
status <- curl::multi_run(0, pool = pool)
if (status$pending > 0) {
later::later(poll_pool, delay = 0.1, loop = later::global_loop())
} else {
monitor$ending()
}
}, error = function(cnd) {
monitor$ending()
reject(cnd)
}
)
}

Check warning on line 144 in R/req-promise.R

View check run for this annotation

Codecov / codecov/patch

R/req-promise.R#L130-L144

Added lines #L130 - L144 were not covered by tests

monitor$starting()
poll_pool()

Check warning on line 147 in R/req-promise.R

View check run for this annotation

Codecov / codecov/patch

R/req-promise.R#L146-L147

Added lines #L146 - L147 were not covered by tests
}

pool_poller_monitor <- function(pool) {
pool_address <- obj_address(pool)
list(
already_going = function() env_get(the$pool_pollers, pool_address, default = FALSE),
starting = function() env_poke(the$pool_pollers, pool_address, TRUE),
ending = function() env_unbind(the$pool_pollers, pool_address)
)

Check warning on line 156 in R/req-promise.R

View check run for this annotation

Codecov / codecov/patch

R/req-promise.R#L151-L156

Added lines #L151 - L156 were not covered by tests
}
1 change: 1 addition & 0 deletions _pkgdown.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ reference:
contents:
- req_perform_iterative
- req_perform_parallel
- req_perform_promise
- req_perform_sequential
- starts_with("iterate_")
- starts_with("resps_")
Expand Down
78 changes: 78 additions & 0 deletions man/req_perform_promise.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

106 changes: 106 additions & 0 deletions tests/testthat/test-req-promise.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
# promises package test helper
extract_promise <- function(promise, timeout = 30) {
promise_value <- NULL
error <- NULL
promises::then(
promise,
onFulfilled = function(value) promise_value <<- value,
onRejected = function(reason) {
error <<- reason
}
)

start <- Sys.time()
while (!later::loop_empty()) {
if (difftime(Sys.time(), start, units = "secs") > timeout) {
stop("Waited too long")
}
later::run_now()
Sys.sleep(0.01)
}

if (!is.null(error)) {
cnd_signal(error)
} else
promise_value
}

test_that("returns a promise that resolves", {
p1 <- req_perform_promise(request_test("/delay/:secs", secs = 0.25))
p2 <- req_perform_promise(request_test("/delay/:secs", secs = 0.25))
expect_s3_class(p1, "promise")
expect_s3_class(p2, "promise")
p1_value <- extract_promise(p1)
expect_equal(resp_status(p1_value), 200)
p2_value <- extract_promise(p2)
expect_equal(resp_status(p2_value), 200)
})

test_that("can promise to download files", {
req <- request_test("/json")
path <- withr::local_tempfile()
p <- req_perform_promise(req, path)
expect_s3_class(p, "promise")
p_value <- extract_promise(p)
expect_equal(p_value$body, new_path(path))

# And check that something was downloaded
expect_gt(file.size(path), 0)
})

test_that("promises can retrieve from cache", {
req <- request("http://example.com") %>% req_cache(tempfile())
resp <- response(200,
headers = "Expires: Wed, 01 Jan 3000 00:00:00 GMT",
body = charToRaw("abc")
)
cache_set(req, resp)

p <- req_perform_promise(req)
expect_s3_class(p, "promise")
p_value <- extract_promise(p)
expect_equal(p_value, resp)
})

test_that("both curl and HTTP errors in promises are rejected", {
expect_error(
extract_promise(
req_perform_promise(request_test("/status/:status", status = 404))
),
class = "httr2_http_404"
)
expect_error(
extract_promise(
req_perform_promise(request("INVALID"))
),
class = "httr2_failure"
)
expect_error(
extract_promise(
req_perform_promise(request_test("/status/:status", status = 200), pool = "INVALID")
),
'inherits\\(pool, "curl_multi"\\) is not TRUE'
)
})

test_that("req_perform_promise doesn't leave behind poller", {
skip_if_not(later::loop_empty(), "later::global_loop not empty when test started")
p <- req_perform_promise(request_test("/delay/:secs", secs = 0.25))
# Before promise is resolved, there should be an operation in our later loop
expect_false(later::loop_empty())
p_value <- extract_promise(p)
# But now that that our promise is resolved, we shouldn't still be polling the pool
expect_true(later::loop_empty())
})


test_that("req_perform_promise can use non-default pool", {
custom_pool <- curl::new_pool()
p1 <- req_perform_promise(request_test("/delay/:secs", secs = 0.25))
p2 <- req_perform_promise(request_test("/delay/:secs", secs = 0.25), pool = custom_pool)
expect_equal(length(curl::multi_list(custom_pool)), 1)
p1_value <- extract_promise(p1)
expect_equal(resp_status(p1_value), 200)
p2_value <- extract_promise(p2)
expect_equal(resp_status(p2_value), 200)
})
Loading