Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Manage our own pushback state #535

Merged
merged 22 commits into from
Sep 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 95 additions & 0 deletions R/req-perform-connection.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@

#' Perform a request and return a streaming connection
#'
#' @description
#' `r lifecycle::badge("experimental")`
#'
#' Use `req_perform_connection()` to perform a request if you want to stream the
#' response body. A response returned by `req_perform_connection()` includes a
#' connection as the body. You can then use [resp_stream_raw()],
#' [resp_stream_lines()], or [resp_stream_sse()] to retrieve data a chunk at a
#' time. Always finish up by closing the connection by calling
#' `close(response)`.
#'
#' This is an alternative interface to [req_perform_stream()] that returns a
#' [connection][base::connections] that you can use to pull the data, rather
#' than providing callbacks that the data is pushed to. This is useful if you
#' want to do other work in between handling inputs from the stream.
#'
#' @inheritParams req_perform_stream
#' @param blocking When retrieving data, should the connection block and wait
#' for the desired information or immediately return what it has (possibly
#' nothing)?
#' @export
#' @examples
#' req <- request(example_url()) |>
#' req_url_path("/stream-bytes/32768")
#' resp <- req_perform_connection(req)
#'
#' length(resp_stream_raw(resp, kb = 16))
#' length(resp_stream_raw(resp, kb = 16))
#' # When the stream has no more data, you'll get an empty result:
#' length(resp_stream_raw(resp, kb = 16))
#'
#' # Always close the response when you're done
#' close(resp)
req_perform_connection <- function(req, blocking = TRUE) {
check_request(req)
check_bool(blocking)

Check warning on line 38 in R/req-perform-connection.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-connection.R#L38

Added line #L38 was not covered by tests

req_prep <- req_prepare(req)
handle <- req_handle(req_prep)
the$last_request <- req
the$last_response <- NULL

Check warning on line 43 in R/req-perform-connection.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-connection.R#L40-L43

Added lines #L40 - L43 were not covered by tests

tries <- 0
delay <- 0
max_tries <- retry_max_tries(req)
deadline <- Sys.time() + retry_max_seconds(req)
resp <- NULL
while (tries < max_tries && Sys.time() < deadline) {
sys_sleep(delay, "for retry backoff")

Check warning on line 51 in R/req-perform-connection.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-connection.R#L45-L51

Added lines #L45 - L51 were not covered by tests

if (!is.null(resp)) {
close(resp)
}
resp <- req_perform_connection1(req, handle, blocking = blocking)

Check warning on line 56 in R/req-perform-connection.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-connection.R#L53-L56

Added lines #L53 - L56 were not covered by tests

if (retry_is_transient(req, resp)) {
tries <- tries + 1
delay <- retry_after(req, resp, tries)
signal(class = "httr2_retry", tries = tries, delay = delay)
} else {
break
}
}
req_completed(req)

Check warning on line 66 in R/req-perform-connection.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-connection.R#L58-L66

Added lines #L58 - L66 were not covered by tests

if (error_is_error(req, resp)) {

Check warning on line 68 in R/req-perform-connection.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-connection.R#L68

Added line #L68 was not covered by tests
# Read full body if there's an error
conn <- resp$body
resp$body <- read_con(conn)
close(conn)

Check warning on line 72 in R/req-perform-connection.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-connection.R#L70-L72

Added lines #L70 - L72 were not covered by tests
}
the$last_response <- resp
handle_resp(req, resp)

Check warning on line 75 in R/req-perform-connection.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-connection.R#L74-L75

Added lines #L74 - L75 were not covered by tests

resp

Check warning on line 77 in R/req-perform-connection.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-connection.R#L77

Added line #L77 was not covered by tests
}

req_perform_connection1 <- function(req, handle, blocking = TRUE) {
stream <- curl::curl(req$url, handle = handle)

Check warning on line 81 in R/req-perform-connection.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-connection.R#L81

Added line #L81 was not covered by tests

# Must open the stream in order to initiate the connection
open(stream, "rbf", blocking = blocking)
curl_data <- curl::handle_data(handle)

Check warning on line 85 in R/req-perform-connection.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-connection.R#L84-L85

Added lines #L84 - L85 were not covered by tests

new_response(
method = req_method_get(req),
url = curl_data$url,
status_code = curl_data$status_code,
headers = as_headers(curl_data$headers),
body = stream,
request = req
)

Check warning on line 94 in R/req-perform-connection.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-connection.R#L87-L94

Added lines #L87 - L94 were not covered by tests
}
256 changes: 0 additions & 256 deletions R/req-perform-stream.R
Original file line number Diff line number Diff line change
Expand Up @@ -79,250 +79,8 @@ req_perform_stream <- function(req,
resp
}

#' Perform a request and return a streaming connection
#'
#' @description
#' `r lifecycle::badge("experimental")`
#'
#' Use `req_perform_connection()` to perform a request if you want to stream the
#' response body. A response returned by `req_perform_connection()` includes a
#' connection as the body. You can then use [resp_stream_raw()],
#' [resp_stream_lines()], or [resp_stream_sse()] to retrieve data a chunk at a
#' time. Always finish up by closing the connection by calling
#' `close(response)`.
#'
#' This is an alternative interface to [req_perform_stream()] that returns a
#' [connection][base::connections] that you can use to pull the data, rather
#' than providing callbacks that the data is pushed to. This is useful if you
#' want to do other work in between handling inputs from the stream.
#'
#' @inheritParams req_perform_stream
#' @param mode The mode that should be used for opening the connection.
#' @param blocking When retrieving data, should the connection block and wait
#' for the desired information or immediately return what it has (possibly
#' nothing)?
#' @export
#' @examples
#' req <- request(example_url()) |>
#' req_url_path("/stream-bytes/32768")
#' resp <- req_perform_connection(req)
#'
#' length(resp_stream_raw(resp, kb = 16))
#' length(resp_stream_raw(resp, kb = 16))
#' # When the stream has no more data, you'll get an empty result:
#' length(resp_stream_raw(resp, kb = 16))
#'
#' # Always close the response when you're done
#' close(resp)
req_perform_connection <- function(req,
mode = c("binary", "text"),
blocking = TRUE) {
check_request(req)
check_bool(blocking)
mode <- arg_match(mode)
con_mode <- if (mode == "text") "rf" else "rbf"

req_prep <- req_prepare(req)
handle <- req_handle(req_prep)
the$last_request <- req

tries <- 0
delay <- 0
max_tries <- retry_max_tries(req)
deadline <- Sys.time() + retry_max_seconds(req)
resp <- NULL
while (tries < max_tries && Sys.time() < deadline) {
sys_sleep(delay, "for retry backoff")

if (!is.null(resp)) {
close(resp$body)
}
resp <- req_perform_connection1(req, handle, con_mode, blocking = blocking)

if (retry_is_transient(req, resp)) {
tries <- tries + 1
delay <- retry_after(req, resp, tries)
} else {
break
}
}

req_completed(req_prep)

if (error_is_error(req, resp)) {
# Read full body if there's an error
conn <- resp$body
resp$body <- read_con(conn)
close(conn)
}
the$last_response <- resp
handle_resp(req, resp)

resp
}

req_perform_connection1 <- function(req, handle, con_mode = "rbf", blocking = TRUE) {
stream <- curl::curl(req$url, handle = handle)

# Must open the stream in order to initiate the connection
open(stream, con_mode, blocking = blocking)
curl_data <- curl::handle_data(handle)

new_response(
method = req_method_get(req),
url = curl_data$url,
status_code = curl_data$status_code,
headers = as_headers(curl_data$headers),
body = stream,
request = req
)
}

#' Read a streaming body a chunk at a time
#'
#' @description
#' `r lifecycle::badge("experimental")`
#'
#' * `resp_stream_raw()` retrieves bytes (`raw` vectors).
#' * `resp_stream_lines()` retrieves lines of text (`character` vectors).
#' * `resp_stream_sse()` retrieves [server-sent
#' events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events)
#' from the stream. It currently only works with text mode connections so when calling
#' `req_perform_connection()` you must use `mode = "text"`.
#'
#' @returns
#' * `resp_stream_raw()`: a raw vector.
#' * `resp_stream_lines()`: a character vector.
#' * `resp_stream_sse()`: a list with components `type`, `data`, and `id`; or
#' `NULL`, signifying that the end of the stream has been reached or--if in
#' nonblocking mode--that no event is currently available.
#' @export
#' @param resp,con A streaming [response] created by [req_perform_connection()].
#' @param kb How many kilobytes (1024 bytes) of data to read.
resp_stream_raw <- function(resp, kb = 32) {
check_streaming_response(resp)
conn <- resp$body

readBin(conn, raw(), kb * 1024)
}

#' @export
#' @rdname resp_stream_raw
#' @param lines How many lines to read
resp_stream_lines <- function(resp, lines = 1) {
check_streaming_response(resp)
conn <- resp$body

readLines(conn, n = lines)
}

#' @export
#' @rdname resp_stream_raw
# TODO: max_size
resp_stream_sse <- function(resp) {
check_streaming_response(resp)
conn <- resp$body
if (!identical(summary(conn)$text, "text")) {
cli::cli_abort(c(
"{.arg resp} must have a text mode connection.",
i = 'Use {.code mode = "text"} when calling {.fn req_perform_connection}.'
))
}

lines <- character(0)
while (TRUE) {
line <- readLines(conn, n = 1)
if (length(line) == 0) {
break
}
if (line == "") {
# \n\n detected, end of event
return(parse_event(lines))
}
lines <- c(lines, line)
}

if (length(lines) > 0) {
# We have a partial event, put it back while we wait
# for more
pushBack(lines, conn)
}

return(NULL)
}

#' @export
#' @param ... Not used; included for compatibility with generic.
#' @rdname resp_stream_raw
close.httr2_response <- function(con, ...) {
check_response(con)

if (inherits(con$body, "connection") && isValid(con$body)) {
close(con$body)
}

invisible()
}

# Helpers ----------------------------------------------------------------------

check_streaming_response <- function(resp,
arg = caller_arg(resp),
call = caller_env()) {

check_response(resp, arg = arg, call = call)

if (resp_body_type(resp) != "stream") {
stop_input_type(
resp,
"a streaming HTTP response object",
allow_null = FALSE,
arg = arg,
call = call
)
}

if (!isValid(resp$body)) {
cli::cli_abort("{.arg {arg}} has already been closed.", call = call)
}
}

# isOpen doesn't work for two reasons:
# 1. It errors if con has been closed, rather than returning FALSE
# 2. If returns TRUE if con has been closed and a new connection opened
#
# So instead we retrieve the connection from its number and compare to the
# original connection. This works because connections have an undocumented
# external pointer.
isValid <- function(con) {
tryCatch(
identical(getConnection(con), con),
error = function(cnd) FALSE
)
}


parse_event <- function(lines) {
m <- regexec("([^:]*)(: ?)?(.*)", lines)
matches <- regmatches(lines, m)
keys <- c("event", vapply(matches, function(x) x[2], character(1)))
values <- c("message", vapply(matches, function(x) x[4], character(1)))

remove_dupes <- duplicated(keys, fromLast = TRUE) & keys != "data"
keys <- keys[!remove_dupes]
values <- values[!remove_dupes]

event_type <- values[keys == "event"]
data <- values[keys == "data"]
id <- values[keys == "id"]

list(
type = event_type,
data = data,
id = id
)
}

as_round_function <- function(round = c("byte", "line"),
error_call = caller_env()) {
if (is.function(round)) {
Expand All @@ -342,20 +100,6 @@ as_round_function <- function(round = c("byte", "line"),
}
}

read_con <- function(con, buffer = 32 * 1024) {
bytes <- raw()
repeat {
new <- readBin(con, "raw", n = buffer)
if (length(new) == 0) break
bytes <- c(bytes, new)
}
if (length(bytes) == 0) {
NULL
} else {
bytes
}
}

#' @export
#' @rdname req_perform_stream
#' @usage NULL
Expand Down
Loading
Loading