Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add function req_perform_promise (fixes #501) #505

Merged
merged 12 commits into from
Jul 16, 2024
2 changes: 2 additions & 0 deletions DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ Suggests:
jose,
jsonlite,
knitr,
later,
promises,
rmarkdown,
testthat (>= 3.1.8),
tibble,
Expand Down
1 change: 1 addition & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ export(req_options)
export(req_perform)
export(req_perform_iterative)
export(req_perform_parallel)
export(req_perform_promise)
export(req_perform_sequential)
export(req_perform_stream)
export(req_progress)
Expand Down
1 change: 1 addition & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# httr2 (development version)

* New function `req_perform_promise()` allows creating a `promises::promise` for a request that runs in the background (#501, @gergness).
* `req_perform_stream()` no longer applies the `callback` to unsuccessful
responses, instead creating a regular response. It also now sets `last_request()` and `last_response()` (#479).
* `req_cache()` now defaults the `debug` argument to the `httr2_cache_debug` option to make it easier to debug caching buried in other people's code (#486).
Expand Down
1 change: 1 addition & 0 deletions R/httr2-package.R
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ the$cache_throttle <- list()
the$token_cache <- new_environment()
the$last_response <- NULL
the$last_request <- NULL
the$pool_pollers <- new_environment()
154 changes: 154 additions & 0 deletions R/req-promise.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
#' Perform request asynchronously using the promises package
#'
#' @description
#' This variation on [req_perform()] returns a [promises::promise()] object immediately
#' and then performs the request in the background, returning program control before the request
#' is finished. See the
#' [promises package documentation](https://rstudio.github.io/promises/articles/promises_01_motivation.html)
#' for more details on how to work with the resulting promise object.
#'
#' Like with [req_perform_parallel()], exercise caution when using this function;
#' it's easy to pummel a server with many simultaneous requests. Also, not all servers
#' can handle more than 1 request at a time, so the responses may still return
#' sequentially.
#'
#' `req_perform_promise()` also has similar limitations to the
#' [req_perform_parallel()] function, it:
#'
#' * Will not retrieve a new OAuth token if it expires after the promised request
#' is created but before it is actually requested.
#' * Does not perform throttling with [req_throttle()].
#' * Does not attempt retries as described by [req_retry()].
#' * Only consults the cache set by [req_cache()] when the request is promised.
#'
#' @inheritParams req_perform
#' @inheritParams req_perform_parallel
#'
#' @return a [promises::promise()] object which resolves to a [response] if
#' successful or rejects on the same errors thrown by [req_perform()].
#' @export
#'
#' @examplesIf requireNamespace("promises", quietly = TRUE)
#' library(promises)
#' request_base <- request(example_url()) |> req_url_path_append("delay")
#'
#' p <- request_base |> req_url_path_append(2) |> req_perform_promise()
#'
#' # A promise object, not particularly useful on its own
#' p
#'
#' # Use promise chaining functions to access results
#' p %...>%
#' resp_body_json() %...>%
#' print()
#'
#'
#' # Can run two requests at the same time
#' p1 <- request_base |> req_url_path_append(4) |> req_perform_promise()
#' p2 <- request_base |> req_url_path_append(1) |> req_perform_promise()
#'
#' p1 %...>%
#' resp_url_path %...>%
#' paste0(., " finished") %...>%
#' print()
#'
#' p2 %...>%
#' resp_url_path %...>%
#' paste0(., " finished") %...>%
#' print()
#'
#' # See the [promises package documentation](https://rstudio.github.io/promises/)
#' # for more information on working with promises
req_perform_promise <- function(req,
path = NULL,
pool = NULL) {
check_installed(c("promises", "later"))
check_string(path, allow_null = TRUE)

promises::promise(
function(resolve, reject) {
perf <- PerformancePromise$new(
req = req,
resolve = resolve,
reject = reject,
path = path,
error_call = environment()
)

perf$submit(pool)
}
)
}

PerformancePromise <- R6Class("PerformancePromise", inherit = Performance,
public = list(
resolve = NULL,
reject = NULL,

initialize = function(req, resolve, reject, path = NULL, error_call = NULL) {
progress <- create_progress_bar(config = FALSE)

super$initialize(req = req, path = path, progress = progress, error_call = error_call)
self$resolve <- resolve
self$reject <- reject
},

submit = function(pool = NULL) {
if (!is.null(self$resp)) {
# cached
self$resolve(self$resp)
return()
}
super$submit()
gergness marked this conversation as resolved.
Show resolved Hide resolved
ensure_pool_poller(pool, self$reject)

Check warning on line 103 in R/req-promise.R

View check run for this annotation

Codecov / codecov/patch

R/req-promise.R#L102-L103

Added lines #L102 - L103 were not covered by tests
},

succeed = function(res) {
tryCatch({
super$succeed(res)
self$resolve(self$resp)
},
httr2_fail = function(cnd) self$reject(cnd$error),
error = function(cnd) self$reject(cnd)
)

Check warning on line 113 in R/req-promise.R

View check run for this annotation

Codecov / codecov/patch

R/req-promise.R#L107-L113

Added lines #L107 - L113 were not covered by tests
gergness marked this conversation as resolved.
Show resolved Hide resolved
},

fail = function(msg) {
tryCatch(
super$fail(msg),
httr2_fail = function(cnd) self$reject(cnd$error),
error = function(cnd) self$reject(cnd)
)

Check warning on line 121 in R/req-promise.R

View check run for this annotation

Codecov / codecov/patch

R/req-promise.R#L117-L121

Added lines #L117 - L121 were not covered by tests
}
))

ensure_pool_poller <- function(pool, reject) {
monitor <- pool_poller_monitor(pool)
if (monitor$already_going()) return()

Check warning on line 127 in R/req-promise.R

View check run for this annotation

Codecov / codecov/patch

R/req-promise.R#L126-L127

Added lines #L126 - L127 were not covered by tests

poll_pool <- function() {
tryCatch({
status <- curl::multi_run(0, pool = pool)
if (status$pending > 0) {
later::later(poll_pool, delay = 0.1, loop = later::global_loop())
} else {
monitor$ending()
}
}, error = function(cnd) {
monitor$ending()
reject(cnd)
})
}

Check warning on line 141 in R/req-promise.R

View check run for this annotation

Codecov / codecov/patch

R/req-promise.R#L129-L141

Added lines #L129 - L141 were not covered by tests

monitor$starting()
poll_pool()

Check warning on line 144 in R/req-promise.R

View check run for this annotation

Codecov / codecov/patch

R/req-promise.R#L143-L144

Added lines #L143 - L144 were not covered by tests
}

pool_poller_monitor <- function(pool) {
pool_address <- obj_address(pool)
list(
already_going = function() env_get(the$pool_pollers, pool_address, default = FALSE),
starting = function() env_poke(the$pool_pollers, pool_address, TRUE),
ending = function() env_unbind(the$pool_pollers, pool_address)
)

Check warning on line 153 in R/req-promise.R

View check run for this annotation

Codecov / codecov/patch

R/req-promise.R#L148-L153

Added lines #L148 - L153 were not covered by tests
}
1 change: 1 addition & 0 deletions _pkgdown.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ reference:
contents:
- req_perform_iterative
- req_perform_parallel
- req_perform_promise
- req_perform_sequential
- starts_with("iterate_")
- starts_with("resps_")
Expand Down
78 changes: 78 additions & 0 deletions man/req_perform_promise.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

94 changes: 94 additions & 0 deletions tests/testthat/test-req-promise.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
# promises package test helper
extract_promise <- function(promise, timeout = 30) {
promise_value <- NULL
error <- NULL
promises::then(
promise,
onFulfilled = function(value) promise_value <<- value,
onRejected = function(reason) {
error <<- reason
}
)

start <- Sys.time()
while (!later::loop_empty()) {
if (difftime(Sys.time(), start, units = "secs") > timeout) {
stop("Waited too long")
}
later::run_now()
Sys.sleep(0.01)
}

if (!is.null(error)) {
cnd_signal(error)
} else
promise_value
}

test_that("returns a promise that resolves", {
p1 <- req_perform_promise(request_test("/delay/:secs", secs = 0.25))
p2 <- req_perform_promise(request_test("/delay/:secs", secs = 0.25))
expect_s3_class(p1, "promise")
expect_s3_class(p2, "promise")
p1_value <- extract_promise(p1)
expect_equal(resp_status(p1_value), 200)
p2_value <- extract_promise(p2)
expect_equal(resp_status(p2_value), 200)
})

test_that("can promise to download files", {
req <- request_test("/json")
path <- withr::local_tempfile()
p <- req_perform_promise(req, path)
expect_s3_class(p, "promise")
p_value <- extract_promise(p)
expect_equal(p_value$body, new_path(path))

# And check that something was downloaded
expect_gt(file.size(path), 0)
})

test_that("promises can retrieve from cache", {
req <- request("http://example.com") %>% req_cache(tempfile())
resp <- response(200,
headers = "Expires: Wed, 01 Jan 3000 00:00:00 GMT",
body = charToRaw("abc")
)
cache_set(req, resp)

p <- req_perform_promise(req)
expect_s3_class(p, "promise")
p_value <- extract_promise(p)
expect_equal(p_value, resp)
})

test_that("both curl and HTTP errors in promises are rejected", {
expect_error(
extract_promise(
req_perform_promise(request_test("/status/:status", status = 404))
),
class = "httr2_http_404"
)
expect_error(
extract_promise(
req_perform_promise(request("INVALID"))
),
class = "httr2_failure"
)
expect_error(
extract_promise(
req_perform_promise(request_test("/status/:status", status = 200), pool = "INVALID")
),
'inherits\\(pool, "curl_multi"\\) is not TRUE'
)
})

test_that("req_perform_promise doesn't leave behind poller", {
skip_if_not(later::loop_empty(), "later::global_loop not empty when test started")
p <- req_perform_promise(request_test("/delay/:secs", secs = 0.25))
# Before promise is resolved, there should be an operation in our later loop
expect_false(later::loop_empty())
p_value <- extract_promise(p)
# But now that that our promise is resolved, we shouldn't still be polling the pool
expect_true(later::loop_empty())
})
Loading