Skip to content

Commit

Permalink
Manage our own pushback state (#535)
Browse files Browse the repository at this point in the history
Fixes #532. Includes webfakes helper to make testing/coverage easier. Fixes #544

---------

Co-authored-by: Joe Cheng <[email protected]>
  • Loading branch information
hadley and jcheng5 committed Sep 6, 2024
1 parent a241b04 commit acaaa9a
Show file tree
Hide file tree
Showing 17 changed files with 899 additions and 372 deletions.
95 changes: 95 additions & 0 deletions R/req-perform-connection.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@

#' Perform a request and return a streaming connection
#'
#' @description
#' `r lifecycle::badge("experimental")`
#'
#' Use `req_perform_connection()` to perform a request if you want to stream the
#' response body. A response returned by `req_perform_connection()` includes a
#' connection as the body. You can then use [resp_stream_raw()],
#' [resp_stream_lines()], or [resp_stream_sse()] to retrieve data a chunk at a
#' time. Always finish up by closing the connection by calling
#' `close(response)`.
#'
#' This is an alternative interface to [req_perform_stream()] that returns a
#' [connection][base::connections] that you can use to pull the data, rather
#' than providing callbacks that the data is pushed to. This is useful if you
#' want to do other work in between handling inputs from the stream.
#'
#' @inheritParams req_perform_stream
#' @param blocking When retrieving data, should the connection block and wait
#' for the desired information or immediately return what it has (possibly
#' nothing)?
#' @export
#' @examples
#' req <- request(example_url()) |>
#' req_url_path("/stream-bytes/32768")
#' resp <- req_perform_connection(req)
#'
#' length(resp_stream_raw(resp, kb = 16))
#' length(resp_stream_raw(resp, kb = 16))
#' # When the stream has no more data, you'll get an empty result:
#' length(resp_stream_raw(resp, kb = 16))
#'
#' # Always close the response when you're done
#' close(resp)
req_perform_connection <- function(req, blocking = TRUE) {
check_request(req)
check_bool(blocking)

req_prep <- req_prepare(req)
handle <- req_handle(req_prep)
the$last_request <- req
the$last_response <- NULL

tries <- 0
delay <- 0
max_tries <- retry_max_tries(req)
deadline <- Sys.time() + retry_max_seconds(req)
resp <- NULL
while (tries < max_tries && Sys.time() < deadline) {
sys_sleep(delay, "for retry backoff")

if (!is.null(resp)) {
close(resp)
}
resp <- req_perform_connection1(req, handle, blocking = blocking)

if (retry_is_transient(req, resp)) {
tries <- tries + 1
delay <- retry_after(req, resp, tries)
signal(class = "httr2_retry", tries = tries, delay = delay)
} else {
break
}
}
req_completed(req)

if (error_is_error(req, resp)) {
# Read full body if there's an error
conn <- resp$body
resp$body <- read_con(conn)
close(conn)
}
the$last_response <- resp
handle_resp(req, resp)

resp
}

req_perform_connection1 <- function(req, handle, blocking = TRUE) {
stream <- curl::curl(req$url, handle = handle)

# Must open the stream in order to initiate the connection
open(stream, "rbf", blocking = blocking)
curl_data <- curl::handle_data(handle)

new_response(
method = req_method_get(req),
url = curl_data$url,
status_code = curl_data$status_code,
headers = as_headers(curl_data$headers),
body = stream,
request = req
)
}
256 changes: 0 additions & 256 deletions R/req-perform-stream.R
Original file line number Diff line number Diff line change
Expand Up @@ -79,250 +79,8 @@ req_perform_stream <- function(req,
resp
}

#' Perform a request and return a streaming connection
#'
#' @description
#' `r lifecycle::badge("experimental")`
#'
#' Use `req_perform_connection()` to perform a request if you want to stream the
#' response body. A response returned by `req_perform_connection()` includes a
#' connection as the body. You can then use [resp_stream_raw()],
#' [resp_stream_lines()], or [resp_stream_sse()] to retrieve data a chunk at a
#' time. Always finish up by closing the connection by calling
#' `close(response)`.
#'
#' This is an alternative interface to [req_perform_stream()] that returns a
#' [connection][base::connections] that you can use to pull the data, rather
#' than providing callbacks that the data is pushed to. This is useful if you
#' want to do other work in between handling inputs from the stream.
#'
#' @inheritParams req_perform_stream
#' @param mode The mode that should be used for opening the connection.
#' @param blocking When retrieving data, should the connection block and wait
#' for the desired information or immediately return what it has (possibly
#' nothing)?
#' @export
#' @examples
#' req <- request(example_url()) |>
#' req_url_path("/stream-bytes/32768")
#' resp <- req_perform_connection(req)
#'
#' length(resp_stream_raw(resp, kb = 16))
#' length(resp_stream_raw(resp, kb = 16))
#' # When the stream has no more data, you'll get an empty result:
#' length(resp_stream_raw(resp, kb = 16))
#'
#' # Always close the response when you're done
#' close(resp)
req_perform_connection <- function(req,
mode = c("binary", "text"),
blocking = TRUE) {
check_request(req)
check_bool(blocking)
mode <- arg_match(mode)
con_mode <- if (mode == "text") "rf" else "rbf"

req_prep <- req_prepare(req)
handle <- req_handle(req_prep)
the$last_request <- req

tries <- 0
delay <- 0
max_tries <- retry_max_tries(req)
deadline <- Sys.time() + retry_max_seconds(req)
resp <- NULL
while (tries < max_tries && Sys.time() < deadline) {
sys_sleep(delay, "for retry backoff")

if (!is.null(resp)) {
close(resp$body)
}
resp <- req_perform_connection1(req, handle, con_mode, blocking = blocking)

if (retry_is_transient(req, resp)) {
tries <- tries + 1
delay <- retry_after(req, resp, tries)
} else {
break
}
}

req_completed(req_prep)

if (error_is_error(req, resp)) {
# Read full body if there's an error
conn <- resp$body
resp$body <- read_con(conn)
close(conn)
}
the$last_response <- resp
handle_resp(req, resp)

resp
}

req_perform_connection1 <- function(req, handle, con_mode = "rbf", blocking = TRUE) {
stream <- curl::curl(req$url, handle = handle)

# Must open the stream in order to initiate the connection
open(stream, con_mode, blocking = blocking)
curl_data <- curl::handle_data(handle)

new_response(
method = req_method_get(req),
url = curl_data$url,
status_code = curl_data$status_code,
headers = as_headers(curl_data$headers),
body = stream,
request = req
)
}

#' Read a streaming body a chunk at a time
#'
#' @description
#' `r lifecycle::badge("experimental")`
#'
#' * `resp_stream_raw()` retrieves bytes (`raw` vectors).
#' * `resp_stream_lines()` retrieves lines of text (`character` vectors).
#' * `resp_stream_sse()` retrieves [server-sent
#' events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events)
#' from the stream. It currently only works with text mode connections so when calling
#' `req_perform_connection()` you must use `mode = "text"`.
#'
#' @returns
#' * `resp_stream_raw()`: a raw vector.
#' * `resp_stream_lines()`: a character vector.
#' * `resp_stream_sse()`: a list with components `type`, `data`, and `id`; or
#' `NULL`, signifying that the end of the stream has been reached or--if in
#' nonblocking mode--that no event is currently available.
#' @export
#' @param resp,con A streaming [response] created by [req_perform_connection()].
#' @param kb How many kilobytes (1024 bytes) of data to read.
resp_stream_raw <- function(resp, kb = 32) {
check_streaming_response(resp)
conn <- resp$body

readBin(conn, raw(), kb * 1024)
}

#' @export
#' @rdname resp_stream_raw
#' @param lines How many lines to read
resp_stream_lines <- function(resp, lines = 1) {
check_streaming_response(resp)
conn <- resp$body

readLines(conn, n = lines)
}

#' @export
#' @rdname resp_stream_raw
# TODO: max_size
resp_stream_sse <- function(resp) {
check_streaming_response(resp)
conn <- resp$body
if (!identical(summary(conn)$text, "text")) {
cli::cli_abort(c(
"{.arg resp} must have a text mode connection.",
i = 'Use {.code mode = "text"} when calling {.fn req_perform_connection}.'
))
}

lines <- character(0)
while (TRUE) {
line <- readLines(conn, n = 1)
if (length(line) == 0) {
break
}
if (line == "") {
# \n\n detected, end of event
return(parse_event(lines))
}
lines <- c(lines, line)
}

if (length(lines) > 0) {
# We have a partial event, put it back while we wait
# for more
pushBack(lines, conn)
}

return(NULL)
}

#' @export
#' @param ... Not used; included for compatibility with generic.
#' @rdname resp_stream_raw
close.httr2_response <- function(con, ...) {
check_response(con)

if (inherits(con$body, "connection") && isValid(con$body)) {
close(con$body)
}

invisible()
}

# Helpers ----------------------------------------------------------------------

check_streaming_response <- function(resp,
arg = caller_arg(resp),
call = caller_env()) {

check_response(resp, arg = arg, call = call)

if (resp_body_type(resp) != "stream") {
stop_input_type(
resp,
"a streaming HTTP response object",
allow_null = FALSE,
arg = arg,
call = call
)
}

if (!isValid(resp$body)) {
cli::cli_abort("{.arg {arg}} has already been closed.", call = call)
}
}

# isOpen doesn't work for two reasons:
# 1. It errors if con has been closed, rather than returning FALSE
# 2. If returns TRUE if con has been closed and a new connection opened
#
# So instead we retrieve the connection from its number and compare to the
# original connection. This works because connections have an undocumented
# external pointer.
isValid <- function(con) {
tryCatch(
identical(getConnection(con), con),
error = function(cnd) FALSE
)
}


parse_event <- function(lines) {
m <- regexec("([^:]*)(: ?)?(.*)", lines)
matches <- regmatches(lines, m)
keys <- c("event", vapply(matches, function(x) x[2], character(1)))
values <- c("message", vapply(matches, function(x) x[4], character(1)))

remove_dupes <- duplicated(keys, fromLast = TRUE) & keys != "data"
keys <- keys[!remove_dupes]
values <- values[!remove_dupes]

event_type <- values[keys == "event"]
data <- values[keys == "data"]
id <- values[keys == "id"]

list(
type = event_type,
data = data,
id = id
)
}

as_round_function <- function(round = c("byte", "line"),
error_call = caller_env()) {
if (is.function(round)) {
Expand All @@ -342,20 +100,6 @@ as_round_function <- function(round = c("byte", "line"),
}
}

read_con <- function(con, buffer = 32 * 1024) {
bytes <- raw()
repeat {
new <- readBin(con, "raw", n = buffer)
if (length(new) == 0) break
bytes <- c(bytes, new)
}
if (length(bytes) == 0) {
NULL
} else {
bytes
}
}

#' @export
#' @rdname req_perform_stream
#' @usage NULL
Expand Down
Loading

0 comments on commit acaaa9a

Please sign in to comment.