diff --git a/DESCRIPTION b/DESCRIPTION index acfcb3ae..ae25e47e 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -37,6 +37,8 @@ Suggests: jose, jsonlite, knitr, + later, + promises, rmarkdown, testthat (>= 3.1.8), tibble, diff --git a/NAMESPACE b/NAMESPACE index 5bec5c1f..4531d624 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -74,6 +74,7 @@ export(req_options) export(req_perform) export(req_perform_iterative) export(req_perform_parallel) +export(req_perform_promise) export(req_perform_sequential) export(req_perform_stream) export(req_progress) diff --git a/NEWS.md b/NEWS.md index 1ba88a2a..728abcc3 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,5 +1,7 @@ # httr2 (development version) +* New function `req_perform_promise()` allows creating a `promises::promise` for a request that runs in the background (#501, @gergness). + # httr2 1.0.2 * `req_body_file()` now only opens a connection when the request actually needs data. In particular, this makes it work better with `req_perform_parallel()` (#487). diff --git a/R/httr2-package.R b/R/httr2-package.R index 1a455726..a88b7dda 100644 --- a/R/httr2-package.R +++ b/R/httr2-package.R @@ -15,3 +15,4 @@ the$cache_throttle <- list() the$token_cache <- new_environment() the$last_response <- NULL the$last_request <- NULL +the$pool_pollers <- new_environment() diff --git a/R/req-promise.R b/R/req-promise.R new file mode 100644 index 00000000..096c2474 --- /dev/null +++ b/R/req-promise.R @@ -0,0 +1,157 @@ +#' Perform request asynchronously using the promises package +#' +#' @description +#' This variation on [req_perform()] returns a [promises::promise()] object immediately +#' and then performs the request in the background, returning program control before the request +#' is finished. See the +#' [promises package documentation](https://rstudio.github.io/promises/articles/promises_01_motivation.html) +#' for more details on how to work with the resulting promise object. +#' +#' Like with [req_perform_parallel()], exercise caution when using this function; +#' it's easy to pummel a server with many simultaneous requests. Also, not all servers +#' can handle more than 1 request at a time, so the responses may still return +#' sequentially. +#' +#' `req_perform_promise()` also has similar limitations to the +#' [req_perform_parallel()] function, it: +#' +#' * Will not retrieve a new OAuth token if it expires after the promised request +#' is created but before it is actually requested. +#' * Does not perform throttling with [req_throttle()]. +#' * Does not attempt retries as described by [req_retry()]. +#' * Only consults the cache set by [req_cache()] when the request is promised. +#' +#' @inheritParams req_perform +#' @inheritParams req_perform_parallel +#' +#' @return a [promises::promise()] object which resolves to a [response] if +#' successful or rejects on the same errors thrown by [req_perform()]. +#' @export +#' +#' @examplesIf requireNamespace("promises", quietly = TRUE) +#' library(promises) +#' request_base <- request(example_url()) |> req_url_path_append("delay") +#' +#' p <- request_base |> req_url_path_append(2) |> req_perform_promise() +#' +#' # A promise object, not particularly useful on its own +#' p +#' +#' # Use promise chaining functions to access results +#' p %...>% +#' resp_body_json() %...>% +#' print() +#' +#' +#' # Can run two requests at the same time +#' p1 <- request_base |> req_url_path_append(4) |> req_perform_promise() +#' p2 <- request_base |> req_url_path_append(1) |> req_perform_promise() +#' +#' p1 %...>% +#' resp_url_path %...>% +#' paste0(., " finished") %...>% +#' print() +#' +#' p2 %...>% +#' resp_url_path %...>% +#' paste0(., " finished") %...>% +#' print() +#' +#' # See the [promises package documentation](https://rstudio.github.io/promises/) +#' # for more information on working with promises +req_perform_promise <- function(req, + path = NULL, + pool = NULL) { + check_installed(c("promises", "later")) + check_string(path, allow_null = TRUE) + + promises::promise( + function(resolve, reject) { + perf <- PerformancePromise$new( + req = req, + resolve = resolve, + reject = reject, + path = path, + error_call = environment() + ) + + perf$submit(pool) + } + ) +} + +PerformancePromise <- R6Class("PerformancePromise", inherit = Performance, + public = list( + resolve = NULL, + reject = NULL, + + initialize = function(req, resolve, reject, path = NULL, error_call = NULL) { + progress <- create_progress_bar(config = FALSE) + + super$initialize(req = req, path = path, progress = progress, error_call = error_call) + self$resolve <- resolve + self$reject <- reject + }, + + submit = function(pool = NULL) { + if (!is.null(self$resp)) { + # cached + self$resolve(self$resp) + return() + } + super$submit(pool) + ensure_pool_poller(pool, self$reject) + }, + + succeed = function(res) { + tryCatch( + { + super$succeed(res) + self$resolve(self$resp) + }, + httr2_fail = function(cnd) self$reject(cnd$error), + error = function(cnd) self$reject(cnd) + ) + }, + + fail = function(msg) { + tryCatch( + super$fail(msg), + httr2_fail = function(cnd) self$reject(cnd$error), + error = function(cnd) self$reject(cnd) + ) + } + )) + +ensure_pool_poller <- function(pool, reject) { + monitor <- pool_poller_monitor(pool) + if (monitor$already_going()) return() + + poll_pool <- function() { + tryCatch( + { + status <- curl::multi_run(0, pool = pool) + if (status$pending > 0) { + later::later(poll_pool, delay = 0.1, loop = later::global_loop()) + } else { + monitor$ending() + } + }, error = function(cnd) { + monitor$ending() + reject(cnd) + } + ) + } + + monitor$starting() + poll_pool() +} + +pool_poller_monitor <- function(pool) { + pool_address <- obj_address(pool) + list( + already_going = function() env_get(the$pool_pollers, pool_address, default = FALSE), + starting = function() env_poke(the$pool_pollers, pool_address, TRUE), + ending = function() env_unbind(the$pool_pollers, pool_address) + ) +} diff --git a/_pkgdown.yml b/_pkgdown.yml index cccf44f6..f2fe2c64 100644 --- a/_pkgdown.yml +++ b/_pkgdown.yml @@ -60,6 +60,7 @@ reference: contents: - req_perform_iterative - req_perform_parallel + - req_perform_promise - req_perform_sequential - starts_with("iterate_") - starts_with("resps_") diff --git a/man/req_perform_promise.Rd b/man/req_perform_promise.Rd new file mode 100644 index 00000000..18b8243b --- /dev/null +++ b/man/req_perform_promise.Rd @@ -0,0 +1,78 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/req-promise.R +\name{req_perform_promise} +\alias{req_perform_promise} +\title{Perform request asynchronously using the promises package} +\usage{ +req_perform_promise(req, path = NULL, pool = NULL) +} +\arguments{ +\item{req}{A \link{request}.} + +\item{path}{Optionally, path to save body of the response. This is useful +for large responses since it avoids storing the response in memory.} + +\item{pool}{Optionally, a curl pool made by \code{\link[curl:multi]{curl::new_pool()}}. Supply +this if you want to override the defaults for total concurrent connections +(100) or concurrent connections per host (6).} +} +\value{ +a \code{\link[promises:promise]{promises::promise()}} object which resolves to a \link{response} if +successful or rejects on the same errors thrown by \code{\link[=req_perform]{req_perform()}}. +} +\description{ +This variation on \code{\link[=req_perform]{req_perform()}} returns a \code{\link[promises:promise]{promises::promise()}} object immediately +and then performs the request in the background, returning program control before the request +is finished. See the +\href{https://rstudio.github.io/promises/articles/promises_01_motivation.html}{promises package documentation} +for more details on how to work with the resulting promise object. + +Like with \code{\link[=req_perform_parallel]{req_perform_parallel()}}, exercise caution when using this function; +it's easy to pummel a server with many simultaneous requests. Also, not all servers +can handle more than 1 request at a time, so the responses may still return +sequentially. + +\code{req_perform_promise()} also has similar limitations to the +\code{\link[=req_perform_parallel]{req_perform_parallel()}} function, it: +\itemize{ +\item Will not retrieve a new OAuth token if it expires after the promised request +is created but before it is actually requested. +\item Does not perform throttling with \code{\link[=req_throttle]{req_throttle()}}. +\item Does not attempt retries as described by \code{\link[=req_retry]{req_retry()}}. +\item Only consults the cache set by \code{\link[=req_cache]{req_cache()}} when the request is promised. +} +} +\examples{ +\dontshow{if (requireNamespace("promises", quietly = TRUE)) (if (getRversion() >= "3.4") withAutoprint else force)(\{ # examplesIf} +library(promises) +request_base <- request(example_url()) |> req_url_path_append("delay") + +p <- request_base |> req_url_path_append(2) |> req_perform_promise() + +# A promise object, not particularly useful on its own +p + +# Use promise chaining functions to access results +p \%...>\% + resp_body_json() \%...>\% + print() + + +# Can run two requests at the same time +p1 <- request_base |> req_url_path_append(4) |> req_perform_promise() +p2 <- request_base |> req_url_path_append(1) |> req_perform_promise() + +p1 \%...>\% + resp_url_path \%...>\% + paste0(., " finished") \%...>\% + print() + +p2 \%...>\% + resp_url_path \%...>\% + paste0(., " finished") \%...>\% + print() + +# See the [promises package documentation](https://rstudio.github.io/promises/) +# for more information on working with promises +\dontshow{\}) # examplesIf} +} diff --git a/tests/testthat/test-req-promise.R b/tests/testthat/test-req-promise.R new file mode 100644 index 00000000..9841cf96 --- /dev/null +++ b/tests/testthat/test-req-promise.R @@ -0,0 +1,106 @@ +# promises package test helper +extract_promise <- function(promise, timeout = 30) { + promise_value <- NULL + error <- NULL + promises::then( + promise, + onFulfilled = function(value) promise_value <<- value, + onRejected = function(reason) { + error <<- reason + } + ) + + start <- Sys.time() + while (!later::loop_empty()) { + if (difftime(Sys.time(), start, units = "secs") > timeout) { + stop("Waited too long") + } + later::run_now() + Sys.sleep(0.01) + } + + if (!is.null(error)) { + cnd_signal(error) + } else + promise_value +} + +test_that("returns a promise that resolves", { + p1 <- req_perform_promise(request_test("/delay/:secs", secs = 0.25)) + p2 <- req_perform_promise(request_test("/delay/:secs", secs = 0.25)) + expect_s3_class(p1, "promise") + expect_s3_class(p2, "promise") + p1_value <- extract_promise(p1) + expect_equal(resp_status(p1_value), 200) + p2_value <- extract_promise(p2) + expect_equal(resp_status(p2_value), 200) +}) + +test_that("can promise to download files", { + req <- request_test("/json") + path <- withr::local_tempfile() + p <- req_perform_promise(req, path) + expect_s3_class(p, "promise") + p_value <- extract_promise(p) + expect_equal(p_value$body, new_path(path)) + + # And check that something was downloaded + expect_gt(file.size(path), 0) +}) + +test_that("promises can retrieve from cache", { + req <- request("http://example.com") %>% req_cache(tempfile()) + resp <- response(200, + headers = "Expires: Wed, 01 Jan 3000 00:00:00 GMT", + body = charToRaw("abc") + ) + cache_set(req, resp) + + p <- req_perform_promise(req) + expect_s3_class(p, "promise") + p_value <- extract_promise(p) + expect_equal(p_value, resp) +}) + +test_that("both curl and HTTP errors in promises are rejected", { + expect_error( + extract_promise( + req_perform_promise(request_test("/status/:status", status = 404)) + ), + class = "httr2_http_404" + ) + expect_error( + extract_promise( + req_perform_promise(request("INVALID")) + ), + class = "httr2_failure" + ) + expect_error( + extract_promise( + req_perform_promise(request_test("/status/:status", status = 200), pool = "INVALID") + ), + 'inherits\\(pool, "curl_multi"\\) is not TRUE' + ) +}) + +test_that("req_perform_promise doesn't leave behind poller", { + skip_if_not(later::loop_empty(), "later::global_loop not empty when test started") + p <- req_perform_promise(request_test("/delay/:secs", secs = 0.25)) + # Before promise is resolved, there should be an operation in our later loop + expect_false(later::loop_empty()) + p_value <- extract_promise(p) + # But now that that our promise is resolved, we shouldn't still be polling the pool + expect_true(later::loop_empty()) +}) + + +test_that("req_perform_promise can use non-default pool", { + custom_pool <- curl::new_pool() + p1 <- req_perform_promise(request_test("/delay/:secs", secs = 0.25)) + p2 <- req_perform_promise(request_test("/delay/:secs", secs = 0.25), pool = custom_pool) + expect_equal(length(curl::multi_list(custom_pool)), 1) + p1_value <- extract_promise(p1) + expect_equal(resp_status(p1_value), 200) + p2_value <- extract_promise(p2) + expect_equal(resp_status(p2_value), 200) +})