diff --git a/R/req-perform-connection.R b/R/req-perform-connection.R new file mode 100644 index 00000000..28cc05d2 --- /dev/null +++ b/R/req-perform-connection.R @@ -0,0 +1,95 @@ + +#' Perform a request and return a streaming connection +#' +#' @description +#' `r lifecycle::badge("experimental")` +#' +#' Use `req_perform_connection()` to perform a request if you want to stream the +#' response body. A response returned by `req_perform_connection()` includes a +#' connection as the body. You can then use [resp_stream_raw()], +#' [resp_stream_lines()], or [resp_stream_sse()] to retrieve data a chunk at a +#' time. Always finish up by closing the connection by calling +#' `close(response)`. +#' +#' This is an alternative interface to [req_perform_stream()] that returns a +#' [connection][base::connections] that you can use to pull the data, rather +#' than providing callbacks that the data is pushed to. This is useful if you +#' want to do other work in between handling inputs from the stream. +#' +#' @inheritParams req_perform_stream +#' @param blocking When retrieving data, should the connection block and wait +#' for the desired information or immediately return what it has (possibly +#' nothing)? +#' @export +#' @examples +#' req <- request(example_url()) |> +#' req_url_path("/stream-bytes/32768") +#' resp <- req_perform_connection(req) +#' +#' length(resp_stream_raw(resp, kb = 16)) +#' length(resp_stream_raw(resp, kb = 16)) +#' # When the stream has no more data, you'll get an empty result: +#' length(resp_stream_raw(resp, kb = 16)) +#' +#' # Always close the response when you're done +#' close(resp) +req_perform_connection <- function(req, blocking = TRUE) { + check_request(req) + check_bool(blocking) + + req_prep <- req_prepare(req) + handle <- req_handle(req_prep) + the$last_request <- req + the$last_response <- NULL + + tries <- 0 + delay <- 0 + max_tries <- retry_max_tries(req) + deadline <- Sys.time() + retry_max_seconds(req) + resp <- NULL + while (tries < max_tries && Sys.time() < deadline) { + sys_sleep(delay, "for retry backoff") + + if (!is.null(resp)) { + close(resp) + } + resp <- req_perform_connection1(req, handle, blocking = blocking) + + if (retry_is_transient(req, resp)) { + tries <- tries + 1 + delay <- retry_after(req, resp, tries) + signal(class = "httr2_retry", tries = tries, delay = delay) + } else { + break + } + } + req_completed(req) + + if (error_is_error(req, resp)) { + # Read full body if there's an error + conn <- resp$body + resp$body <- read_con(conn) + close(conn) + } + the$last_response <- resp + handle_resp(req, resp) + + resp +} + +req_perform_connection1 <- function(req, handle, blocking = TRUE) { + stream <- curl::curl(req$url, handle = handle) + + # Must open the stream in order to initiate the connection + open(stream, "rbf", blocking = blocking) + curl_data <- curl::handle_data(handle) + + new_response( + method = req_method_get(req), + url = curl_data$url, + status_code = curl_data$status_code, + headers = as_headers(curl_data$headers), + body = stream, + request = req + ) +} diff --git a/R/req-perform-stream.R b/R/req-perform-stream.R index 3f84585a..0f9cd42e 100644 --- a/R/req-perform-stream.R +++ b/R/req-perform-stream.R @@ -79,250 +79,8 @@ req_perform_stream <- function(req, resp } -#' Perform a request and return a streaming connection -#' -#' @description -#' `r lifecycle::badge("experimental")` -#' -#' Use `req_perform_connection()` to perform a request if you want to stream the -#' response body. A response returned by `req_perform_connection()` includes a -#' connection as the body. You can then use [resp_stream_raw()], -#' [resp_stream_lines()], or [resp_stream_sse()] to retrieve data a chunk at a -#' time. Always finish up by closing the connection by calling -#' `close(response)`. -#' -#' This is an alternative interface to [req_perform_stream()] that returns a -#' [connection][base::connections] that you can use to pull the data, rather -#' than providing callbacks that the data is pushed to. This is useful if you -#' want to do other work in between handling inputs from the stream. -#' -#' @inheritParams req_perform_stream -#' @param mode The mode that should be used for opening the connection. -#' @param blocking When retrieving data, should the connection block and wait -#' for the desired information or immediately return what it has (possibly -#' nothing)? -#' @export -#' @examples -#' req <- request(example_url()) |> -#' req_url_path("/stream-bytes/32768") -#' resp <- req_perform_connection(req) -#' -#' length(resp_stream_raw(resp, kb = 16)) -#' length(resp_stream_raw(resp, kb = 16)) -#' # When the stream has no more data, you'll get an empty result: -#' length(resp_stream_raw(resp, kb = 16)) -#' -#' # Always close the response when you're done -#' close(resp) -req_perform_connection <- function(req, - mode = c("binary", "text"), - blocking = TRUE) { - check_request(req) - check_bool(blocking) - mode <- arg_match(mode) - con_mode <- if (mode == "text") "rf" else "rbf" - - req_prep <- req_prepare(req) - handle <- req_handle(req_prep) - the$last_request <- req - - tries <- 0 - delay <- 0 - max_tries <- retry_max_tries(req) - deadline <- Sys.time() + retry_max_seconds(req) - resp <- NULL - while (tries < max_tries && Sys.time() < deadline) { - sys_sleep(delay, "for retry backoff") - - if (!is.null(resp)) { - close(resp$body) - } - resp <- req_perform_connection1(req, handle, con_mode, blocking = blocking) - - if (retry_is_transient(req, resp)) { - tries <- tries + 1 - delay <- retry_after(req, resp, tries) - } else { - break - } - } - - req_completed(req_prep) - - if (error_is_error(req, resp)) { - # Read full body if there's an error - conn <- resp$body - resp$body <- read_con(conn) - close(conn) - } - the$last_response <- resp - handle_resp(req, resp) - - resp -} - -req_perform_connection1 <- function(req, handle, con_mode = "rbf", blocking = TRUE) { - stream <- curl::curl(req$url, handle = handle) - - # Must open the stream in order to initiate the connection - open(stream, con_mode, blocking = blocking) - curl_data <- curl::handle_data(handle) - - new_response( - method = req_method_get(req), - url = curl_data$url, - status_code = curl_data$status_code, - headers = as_headers(curl_data$headers), - body = stream, - request = req - ) -} - -#' Read a streaming body a chunk at a time -#' -#' @description -#' `r lifecycle::badge("experimental")` -#' -#' * `resp_stream_raw()` retrieves bytes (`raw` vectors). -#' * `resp_stream_lines()` retrieves lines of text (`character` vectors). -#' * `resp_stream_sse()` retrieves [server-sent -#' events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events) -#' from the stream. It currently only works with text mode connections so when calling -#' `req_perform_connection()` you must use `mode = "text"`. -#' -#' @returns -#' * `resp_stream_raw()`: a raw vector. -#' * `resp_stream_lines()`: a character vector. -#' * `resp_stream_sse()`: a list with components `type`, `data`, and `id`; or -#' `NULL`, signifying that the end of the stream has been reached or--if in -#' nonblocking mode--that no event is currently available. -#' @export -#' @param resp,con A streaming [response] created by [req_perform_connection()]. -#' @param kb How many kilobytes (1024 bytes) of data to read. -resp_stream_raw <- function(resp, kb = 32) { - check_streaming_response(resp) - conn <- resp$body - - readBin(conn, raw(), kb * 1024) -} - -#' @export -#' @rdname resp_stream_raw -#' @param lines How many lines to read -resp_stream_lines <- function(resp, lines = 1) { - check_streaming_response(resp) - conn <- resp$body - - readLines(conn, n = lines) -} - -#' @export -#' @rdname resp_stream_raw -# TODO: max_size -resp_stream_sse <- function(resp) { - check_streaming_response(resp) - conn <- resp$body - if (!identical(summary(conn)$text, "text")) { - cli::cli_abort(c( - "{.arg resp} must have a text mode connection.", - i = 'Use {.code mode = "text"} when calling {.fn req_perform_connection}.' - )) - } - - lines <- character(0) - while (TRUE) { - line <- readLines(conn, n = 1) - if (length(line) == 0) { - break - } - if (line == "") { - # \n\n detected, end of event - return(parse_event(lines)) - } - lines <- c(lines, line) - } - - if (length(lines) > 0) { - # We have a partial event, put it back while we wait - # for more - pushBack(lines, conn) - } - - return(NULL) -} - -#' @export -#' @param ... Not used; included for compatibility with generic. -#' @rdname resp_stream_raw -close.httr2_response <- function(con, ...) { - check_response(con) - - if (inherits(con$body, "connection") && isValid(con$body)) { - close(con$body) - } - - invisible() -} - # Helpers ---------------------------------------------------------------------- -check_streaming_response <- function(resp, - arg = caller_arg(resp), - call = caller_env()) { - - check_response(resp, arg = arg, call = call) - - if (resp_body_type(resp) != "stream") { - stop_input_type( - resp, - "a streaming HTTP response object", - allow_null = FALSE, - arg = arg, - call = call - ) - } - - if (!isValid(resp$body)) { - cli::cli_abort("{.arg {arg}} has already been closed.", call = call) - } -} - -# isOpen doesn't work for two reasons: -# 1. It errors if con has been closed, rather than returning FALSE -# 2. If returns TRUE if con has been closed and a new connection opened -# -# So instead we retrieve the connection from its number and compare to the -# original connection. This works because connections have an undocumented -# external pointer. -isValid <- function(con) { - tryCatch( - identical(getConnection(con), con), - error = function(cnd) FALSE - ) -} - - -parse_event <- function(lines) { - m <- regexec("([^:]*)(: ?)?(.*)", lines) - matches <- regmatches(lines, m) - keys <- c("event", vapply(matches, function(x) x[2], character(1))) - values <- c("message", vapply(matches, function(x) x[4], character(1))) - - remove_dupes <- duplicated(keys, fromLast = TRUE) & keys != "data" - keys <- keys[!remove_dupes] - values <- values[!remove_dupes] - - event_type <- values[keys == "event"] - data <- values[keys == "data"] - id <- values[keys == "id"] - - list( - type = event_type, - data = data, - id = id - ) -} - as_round_function <- function(round = c("byte", "line"), error_call = caller_env()) { if (is.function(round)) { @@ -342,20 +100,6 @@ as_round_function <- function(round = c("byte", "line"), } } -read_con <- function(con, buffer = 32 * 1024) { - bytes <- raw() - repeat { - new <- readBin(con, "raw", n = buffer) - if (length(new) == 0) break - bytes <- c(bytes, new) - } - if (length(bytes) == 0) { - NULL - } else { - bytes - } -} - #' @export #' @rdname req_perform_stream #' @usage NULL diff --git a/R/resp-stream.R b/R/resp-stream.R new file mode 100644 index 00000000..12aa650e --- /dev/null +++ b/R/resp-stream.R @@ -0,0 +1,337 @@ +#' Read a streaming body a chunk at a time +#' +#' @description +#' `r lifecycle::badge("experimental")` +#' +#' * `resp_stream_raw()` retrieves bytes (`raw` vectors). +#' * `resp_stream_lines()` retrieves lines of text (`character` vectors). +#' * `resp_stream_sse()` retrieves [server-sent +#' events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events) +#' from the stream. It currently only works with text mode connections so when calling +#' `req_perform_connection()` you must use `mode = "text"`. +#' +#' @returns +#' * `resp_stream_raw()`: a raw vector. +#' * `resp_stream_lines()`: a character vector. +#' * `resp_stream_sse()`: a list with components `type`, `data`, and `id`; or +#' `NULL`, signifying that the end of the stream has been reached or--if in +#' nonblocking mode--that no event is currently available. +#' @export +#' @param resp,con A streaming [response] created by [req_perform_connection()]. +#' @param kb How many kilobytes (1024 bytes) of data to read. +resp_stream_raw <- function(resp, kb = 32) { + check_streaming_response(resp) + conn <- resp$body + + readBin(conn, raw(), kb * 1024) +} + +#' @export +#' @rdname resp_stream_raw +#' @param lines The maximum number of lines to return at once. +#' @param warn Like [readLines()]: warn if the connection ends without a final +#' EOL. +resp_stream_lines <- function(resp, lines = 1, max_size = Inf, warn = TRUE) { + check_streaming_response(resp) + check_number_whole(lines, min = 0, allow_infinite = TRUE) + check_number_whole(max_size, min = 1, allow_infinite = TRUE) + check_logical(warn) + + if (lines == 0) { + # If you want to do that, who am I to judge? + return(character()) + } + + encoding <- resp_encoding(resp) + + lines_read <- character(0) + while (lines > 0) { + line <- resp_stream_oneline(resp, max_size, warn, encoding) + if (length(line) == 0) { + # No more data, either because EOF or req_perform_connection(blocking=FALSE). + # Either way, return what we have + return(lines_read) + } + lines_read <- c(lines_read, line) + lines <- lines - 1 + } + lines_read +} + + +#' @param max_size The maximum number of bytes to buffer; once this number of +#' bytes has been exceeded without a line/event boundary, an error is thrown. +#' @export +#' @rdname resp_stream_raw +resp_stream_sse <- function(resp, max_size = Inf) { + event_bytes <- resp_boundary_pushback(resp, max_size, find_event_boundary, include_trailer = FALSE) + if (!is.null(event_bytes)) { + parse_event(event_bytes) + } else { + return(NULL) + } +} + +#' @export +#' @param ... Not used; included for compatibility with generic. +#' @rdname resp_stream_raw +close.httr2_response <- function(con, ...) { + check_response(con) + + if (inherits(con$body, "connection") && isValid(con$body)) { + close(con$body) + } + + invisible() +} + +resp_stream_oneline <- function(resp, max_size, warn, encoding) { + repeat { + line_bytes <- resp_boundary_pushback(resp, max_size, find_line_boundary, include_trailer = TRUE) + if (is.null(line_bytes)) { + return(character()) + } + + eat_next_lf <- resp$cache$resp_stream_oneline_eat_next_lf + resp$cache$resp_stream_oneline_eat_next_lf <- FALSE + + if (identical(line_bytes, as.raw(0x0A)) && isTRUE(eat_next_lf)) { + # We hit that special edge case, see below + next + } + + # If ending on \r, there's a special edge case here where if the + # next line begins with \n, that byte should be eaten. + if (tail(line_bytes, 1) == 0x0D) { + resp$cache$resp_stream_oneline_eat_next_lf <- TRUE + } + + # Use `resp$body` as the variable name so that if warn=TRUE, you get + # "incomplete final line found on 'resp$body'" as the warning message + `resp$body` <- line_bytes + line_con <- rawConnection(`resp$body`) + on.exit(close(line_con)) + + # readLines chomps the trailing newline. I assume this is desirable. + raw_text <- readLines(line_con, n = 1, warn = warn) + + # Use iconv to convert from whatever encoding is specified in the + # response header, to UTF-8 + return(iconv(raw_text, encoding, "UTF-8")) + } +} + +find_line_boundary <- function(buffer) { + if (length(buffer) == 0) { + return(NULL) + } + + # Look left 1 byte + right1 <- c(tail(buffer, -1), 0x00) + + crlf <- buffer == 0x0D & right1 == 0x0A + cr <- buffer == 0x0D + lf <- buffer == 0x0A + + all <- which(crlf | cr | lf) + if (length(all) == 0) { + return(NULL) + } + + first <- all[[1]] + if (crlf[first]) { + return(first + 2) + } else { + return(first + 1) + } +} + +# Function to find the first double line ending in a buffer, or NULL if no +# double line ending is found +# +# Example: +# find_event_boundary(charToRaw("data: 1\n\nid: 12345")) +# Returns: +# list( +# matched = charToRaw("data: 1\n\n"), +# remaining = charToRaw("id: 12345") +# ) +find_event_boundary <- function(buffer) { + if (length(buffer) < 2) { + return(NULL) + } + + # leftX means look behind by X bytes. For example, left1[2] equals buffer[1]. + # Any attempt to read past the beginning of the buffer results in 0x00. + left1 <- c(0x00, head(buffer, -1)) + left2 <- c(0x00, head(left1, -1)) + left3 <- c(0x00, head(left2, -1)) + + boundary_end <- which( + (left1 == 0x0A & buffer == 0x0A) | # \n\n + (left1 == 0x0D & buffer == 0x0D) | # \r\r + (left3 == 0x0D & left2 == 0x0A & left1 == 0x0D & buffer == 0x0A) # \r\n\r\n + ) + + if (length(boundary_end) == 0) { + return(NULL) # No event boundary found + } + + boundary_end <- boundary_end[1] # Take the first occurrence + split_at <- boundary_end + 1 # Split at one after the boundary + split_at +} + +# Splits a buffer into the part before `split_at`, and the part starting at +# `split_at`. It's possible for either of the returned parts to be zero-length +# (i.e. if `split_at` is 1 or length(buffer)+1). +split_buffer <- function(buffer, split_at) { + # Return a list with the event data and the remaining buffer + list( + matched = slice(buffer, end = split_at), + remaining = slice(buffer, start = split_at) + ) +} + +# @param max_size Maximum number of bytes to look for a boundary before throwing an error +# @param boundary_func A function that takes a raw vector and returns NULL if no +# boundary was detected, or one position PAST the end of the first boundary in +# the vector +# @param include_trailer If TRUE, at the end of the response, if there are +# bytes after the last boundary, then return those bytes; if FALSE, then those +# bytes are silently discarded. +resp_boundary_pushback <- function(resp, max_size, boundary_func, include_trailer) { + check_streaming_response(resp) + check_number_whole(max_size, min = 1, allow_infinite = TRUE) + + chunk_size <- min(max_size + 1, 1024) + + # Grab data left over from last resp_stream_sse() call (if any) + buffer <- resp$cache$push_back %||% raw() + resp$cache$push_back <- raw() + + print_buffer <- function(buf, label) { + # cat(label, ":", paste(sprintf("%02X", as.integer(buf)), collapse = " "), "\n", file = stderr()) + } + + # Read chunks until we find an event or reach the end of input + repeat { + # Try to find an event boundary using the data we have + print_buffer(buffer, "Buffer to parse") + split_at <- boundary_func(buffer) + + if (!is.null(split_at)) { + result <- split_buffer(buffer, split_at) + # We found a complete event + print_buffer(result$matched, "Matched data") + print_buffer(result$remaining, "Remaining buffer") + resp$cache$push_back <- result$remaining + return(result$matched) + } + + if (length(buffer) > max_size) { + # Keep the buffer in place, so that if the user tries resp_stream_sse + # again, they'll get the same error rather than reading the stream + # having missed a bunch of bytes. + resp$cache$push_back <- buffer + cli::cli_abort("Streaming read exceeded size limit of {max_size}") + } + + # We didn't have enough data. Attempt to read more + chunk <- readBin(resp$body, raw(), + # Don't let us exceed the max size by more than one byte; we do allow the + # one extra byte so we know to error. + n = min(chunk_size, max_size - length(buffer) + 1) + ) + + print_buffer(chunk, "Received chunk") + + # If we've reached the end of input, store the buffer and return NULL + if (length(chunk) == 0) { + if (!isIncomplete(resp$body)) { + # We've truly reached the end of the connection; no more data is coming + if (include_trailer && length(buffer) > 0) { + return(buffer) + } else { + return(NULL) + } + } + + # More data might come later + print_buffer(buffer, "Storing incomplete buffer") + resp$cache$push_back <- buffer + return(NULL) + } + + # More data was received; combine it with existing buffer and continue the + # loop to try parsing again + buffer <- c(buffer, chunk) + print_buffer(buffer, "Combined buffer") + } +} + +parse_event <- function(event_data) { + # always treat event_data as UTF-8, it's in the spec + str_data <- rawToChar(event_data) + Encoding(str_data) <- "UTF-8" + + # The spec says \r\n, \r, and \n are all valid separators + lines <- strsplit(str_data, "\r\n|\r|\n")[[1]] + + m <- regexec("([^:]*)(: ?)?(.*)", lines) + matches <- regmatches(lines, m) + keys <- c("event", vapply(matches, function(x) x[2], character(1))) + values <- c("message", vapply(matches, function(x) x[4], character(1))) + + remove_dupes <- duplicated(keys, fromLast = TRUE) & keys != "data" + keys <- keys[!remove_dupes] + values <- values[!remove_dupes] + + event_type <- values[keys == "event"] + data <- values[keys == "data"] + id <- values[keys == "id"] + + list( + type = event_type, + data = data, + id = id + ) +} + +# Helpers ---------------------------------------------------- + + +check_streaming_response <- function(resp, + arg = caller_arg(resp), + call = caller_env()) { + + check_response(resp, arg = arg, call = call) + + if (resp_body_type(resp) != "stream") { + stop_input_type( + resp, + "a streaming HTTP response object", + allow_null = FALSE, + arg = arg, + call = call + ) + } + + if (!isValid(resp$body)) { + cli::cli_abort("{.arg {arg}} has already been closed.", call = call) + } +} + +# isOpen doesn't work for two reasons: +# 1. It errors if con has been closed, rather than returning FALSE +# 2. If returns TRUE if con has been closed and a new connection opened +# +# So instead we retrieve the connection from its number and compare to the +# original connection. This works because connections have an undocumented +# external pointer. +isValid <- function(con) { + tryCatch( + identical(getConnection(con), con), + error = function(cnd) FALSE + ) +} diff --git a/R/test.R b/R/test.R index 22c195b9..9f973db1 100644 --- a/R/test.R +++ b/R/test.R @@ -20,7 +20,7 @@ request_test <- function(template = "/get", ...) { #' @export example_url <- function() { check_installed("webfakes") - if (is_testing()) { + if (is_testing() && !is_interactive()) { testthat::skip_on_covr() } diff --git a/R/utils.R b/R/utils.R index 8626a97a..91e25166 100644 --- a/R/utils.R +++ b/R/utils.R @@ -286,3 +286,41 @@ create_progress_bar <- function(total, imap <- function(.x, .f, ...) { map2(.x, names(.x), .f, ...) } + +read_con <- function(con, buffer = 32 * 1024) { + bytes <- raw() + repeat { + new <- readBin(con, "raw", n = buffer) + if (length(new) == 0) break + bytes <- c(bytes, new) + } + if (length(bytes) == 0) { + NULL + } else { + bytes + } +} + + +# Slices the vector using the only sane semantics: start inclusive, end +# exclusive. +# +# * Allows start == end, which means return no elements. +# * Allows start == length(vector) + 1, which means return no elements. +# * Allows zero-length vectors. +# +# Otherwise, slice() is quite strict about what it allows start/end to be: no +# negatives, no reversed order. +slice <- function(vector, start = 1, end = length(vector) + 1) { + stopifnot(start > 0) + stopifnot(start <= length(vector) + 1) + stopifnot(end > 0) + stopifnot(end <= length(vector) + 1) + stopifnot(end >= start) + + if (start == end) { + vector[FALSE] # Return an empty vector of the same type + } else { + vector[start:(end - 1)] + } +} diff --git a/man/req_perform_connection.Rd b/man/req_perform_connection.Rd index 541ab038..a6126f37 100644 --- a/man/req_perform_connection.Rd +++ b/man/req_perform_connection.Rd @@ -1,16 +1,14 @@ % Generated by roxygen2: do not edit by hand -% Please edit documentation in R/req-perform-stream.R +% Please edit documentation in R/req-perform-connection.R \name{req_perform_connection} \alias{req_perform_connection} \title{Perform a request and return a streaming connection} \usage{ -req_perform_connection(req, mode = c("binary", "text"), blocking = TRUE) +req_perform_connection(req, blocking = TRUE) } \arguments{ \item{req}{A httr2 \link{request} object.} -\item{mode}{The mode that should be used for opening the connection.} - \item{blocking}{When retrieving data, should the connection block and wait for the desired information or immediately return what it has (possibly nothing)?} diff --git a/man/resp_stream_raw.Rd b/man/resp_stream_raw.Rd index 152ad7eb..c0b51a25 100644 --- a/man/resp_stream_raw.Rd +++ b/man/resp_stream_raw.Rd @@ -1,5 +1,5 @@ % Generated by roxygen2: do not edit by hand -% Please edit documentation in R/req-perform-stream.R +% Please edit documentation in R/resp-stream.R \name{resp_stream_raw} \alias{resp_stream_raw} \alias{resp_stream_lines} @@ -9,9 +9,9 @@ \usage{ resp_stream_raw(resp, kb = 32) -resp_stream_lines(resp, lines = 1) +resp_stream_lines(resp, lines = 1, max_size = Inf, warn = TRUE) -resp_stream_sse(resp) +resp_stream_sse(resp, max_size = Inf) \method{close}{httr2_response}(con, ...) } @@ -20,7 +20,13 @@ resp_stream_sse(resp) \item{kb}{How many kilobytes (1024 bytes) of data to read.} -\item{lines}{How many lines to read} +\item{lines}{The maximum number of lines to return at once.} + +\item{max_size}{The maximum number of bytes to buffer; once this number of +bytes has been exceeded without a line/event boundary, an error is thrown.} + +\item{warn}{Like \code{\link[=readLines]{readLines()}}: warn if the connection ends without a final +EOL.} \item{...}{Not used; included for compatibility with generic.} } diff --git a/tests/testthat/_snaps/req-perform-connection.md b/tests/testthat/_snaps/req-perform-connection.md new file mode 100644 index 00000000..21eac174 --- /dev/null +++ b/tests/testthat/_snaps/req-perform-connection.md @@ -0,0 +1,13 @@ +# validates inputs + + Code + req_perform_connection(1) + Condition + Error in `req_perform_connection()`: + ! `req` must be an HTTP request object, not the number 1. + Code + req_perform_connection(request_test(), 1) + Condition + Error in `req_perform_connection()`: + ! `blocking` must be `TRUE` or `FALSE`, not the number 1. + diff --git a/tests/testthat/_snaps/req-perform-stream.md b/tests/testthat/_snaps/req-perform-stream.md index 2a816771..4bd75193 100644 --- a/tests/testthat/_snaps/req-perform-stream.md +++ b/tests/testthat/_snaps/req-perform-stream.md @@ -7,23 +7,6 @@ `req_stream()` was deprecated in httr2 1.0.0. i Please use `req_perform_stream()` instead. -# can't read from a closed connection - - Code - resp_stream_raw(resp, 1) - Condition - Error in `resp_stream_raw()`: - ! `resp` has already been closed. - -# resp_stream_sse() requires a text connection - - Code - resp_stream_sse(resp) - Condition - Error in `resp_stream_sse()`: - ! `resp` must have a text mode connection. - i Use `mode = "text"` when calling `req_perform_connection()`. - # req_perform_stream checks its inputs Code diff --git a/tests/testthat/_snaps/resp-stream.md b/tests/testthat/_snaps/resp-stream.md new file mode 100644 index 00000000..9b3cdf27 --- /dev/null +++ b/tests/testthat/_snaps/resp-stream.md @@ -0,0 +1,8 @@ +# can't read from a closed connection + + Code + resp_stream_raw(resp, 1) + Condition + Error in `resp_stream_raw()`: + ! `resp` has already been closed. + diff --git a/tests/testthat/helper-webfakes.R b/tests/testthat/helper-webfakes.R new file mode 100644 index 00000000..849db50d --- /dev/null +++ b/tests/testthat/helper-webfakes.R @@ -0,0 +1,15 @@ +local_app_request <- function(get, frame = parent.frame()) { + # Works interactively (useful for manaul coverage checking) + # but not in separate process + if (!is_interactive()) { + skip_on_covr() + } + + app <- webfakes::new_app() + app$get("/test", get) + server <- webfakes::local_app_process(app, .local_envir = frame) + + req <- request(server$url("/test")) + req <- req_error(req, body = function(resp) resp_body_string(resp)) + req +} diff --git a/tests/testthat/test-oauth-flow-auth-code.R b/tests/testthat/test-oauth-flow-auth-code.R index 887e83f6..72cd8e6f 100644 --- a/tests/testthat/test-oauth-flow-auth-code.R +++ b/tests/testthat/test-oauth-flow-auth-code.R @@ -125,15 +125,12 @@ test_that("external auth code sources are detected correctly", { test_that("auth codes can be retrieved from an external source", { skip_on_cran() - skip_on_covr() - app <- webfakes::new_app() - authorized <- FALSE - - # Error on first, and then respond on second - app$get("/code", function(req, res) { + req <- local_app_request(function(req, res) { + # Error on first, and then respond on second + authorized <- res$app$locals$authorized %||% FALSE if (!authorized) { - authorized <<- TRUE + res$app$locals$authorized <- TRUE res$ set_status(404L)$ set_type("text/plain")$ @@ -144,8 +141,7 @@ test_that("auth codes can be retrieved from an external source", { send_json(text = '{"code":"abc123"}') } }) - server <- webfakes::local_app_process(app) - withr::local_envvar("HTTR2_OAUTH_CODE_SOURCE_URL" = server$url("/code")) + withr::local_envvar("HTTR2_OAUTH_CODE_SOURCE_URL" = req$url) expect_equal(oauth_flow_auth_code_fetch("ignored"), "abc123") }) diff --git a/tests/testthat/test-req-perform-connection.R b/tests/testthat/test-req-perform-connection.R new file mode 100644 index 00000000..7f76f928 --- /dev/null +++ b/tests/testthat/test-req-perform-connection.R @@ -0,0 +1,52 @@ +test_that("validates inputs", { + expect_snapshot(error = TRUE, { + req_perform_connection(1) + req_perform_connection(request_test(), 1) + }) +}) + +test_that("can read all data from a connection", { + resp <- request_test("/stream-bytes/2048") %>% req_perform_connection() + withr::defer(close(resp)) + + out <- resp_body_raw(resp) + expect_length(out, 2048) + expect_false(resp_has_body(resp)) +}) + +test_that("reads body on error", { + req <- local_app_request(function(req, res) { + res$set_status(404L)$send_json(list(status = 404), auto_unbox = TRUE) + }) + + expect_error(req_perform_connection(req), class = "httr2_http_404") + resp <- last_response() + expect_equal(resp_body_json(resp), list(status = 404)) +}) + +test_that("can retry a transient error", { + req <- local_app_request(function(req, res) { + i <- res$app$locals$i %||% 1 + if (i == 1) { + res$app$locals$i <- 2 + res$ + set_status(429)$ + set_header("retry-after", 0)$ + send_json(list(status = "waiting"), auto_unbox = TRUE) + } else { + res$send_json(list(status = "done"), auto_unbox = TRUE) + } + }) + req <- req_retry(req, max_tries = 2) + + cnd <- expect_condition( + resp <- req_perform_connection(req), + class = "httr2_retry" + ) + expect_s3_class(cnd, "httr2_retry") + expect_equal(cnd$tries, 1) + expect_equal(cnd$delay, 0) + + expect_equal(last_response(), resp) + expect_equal(resp_body_json(resp), list(status = "done")) +}) diff --git a/tests/testthat/test-req-perform-stream.R b/tests/testthat/test-req-perform-stream.R index bdf6a28c..788b52d8 100644 --- a/tests/testthat/test-req-perform-stream.R +++ b/tests/testthat/test-req-perform-stream.R @@ -5,79 +5,6 @@ test_that("req_stream() is deprecated", { ) }) -# req_perform_connection() ---------------------------------------------------- - -test_that("can stream bytes from a connection", { - resp <- request_test("/stream-bytes/2048") %>% req_perform_connection() - on.exit(close(resp)) - - expect_s3_class(resp, "httr2_response") - expect_true(resp_has_body(resp)) - - out <- resp_stream_raw(resp, 1) - expect_length(out, 1024) - - out <- resp_stream_raw(resp, 1) - expect_length(out, 1024) - - out <- resp_stream_raw(resp, 1) - expect_length(out, 0) -}) - -test_that("can read all data from a connection", { - resp <- request_test("/stream-bytes/2048") %>% req_perform_connection() - - out <- resp_body_raw(resp) - expect_length(out, 2048) - expect_false(resp_has_body(resp)) -}) - -test_that("can't read from a closed connection", { - resp <- request_test("/stream-bytes/1024") %>% req_perform_connection() - close(resp) - - expect_false(resp_has_body(resp)) - expect_snapshot(resp_stream_raw(resp, 1), error = TRUE) - - # and no error if we try to close it again - expect_no_error(close(resp)) -}) - -test_that("can feed sse events one at a time", { - skip_on_covr() - app <- webfakes::new_app() - - app$get("/events", function(req, res) { - for(i in 1:3) { - res$send_chunk(sprintf("data: %s\n\n", i)) - } - }) - - server <- webfakes::local_app_process(app) - req <- request(server$url("/events")) - resp <- req_perform_connection(req, mode = "text") - on.exit(close(resp)) - - expect_equal( - resp_stream_sse(resp), - list(type = "message", data = "1", id = character()) - ) - expect_equal( - resp_stream_sse(resp), - list(type = "message", data = "2", id = character()) - ) - resp_stream_sse(resp) - - expect_equal(resp_stream_sse(resp), NULL) -}) - -test_that("resp_stream_sse() requires a text connection", { - resp <- request_test("/stream-bytes/1024") %>% req_perform_connection() - on.exit(close(resp)) - - expect_snapshot(resp_stream_sse(resp), error = TRUE) -}) - # req_perform_stream() -------------------------------------------------------- test_that("returns stream body; sets last request & response", { diff --git a/tests/testthat/test-req-perform.R b/tests/testthat/test-req-perform.R index 97c3b538..75060208 100644 --- a/tests/testthat/test-req-perform.R +++ b/tests/testthat/test-req-perform.R @@ -55,10 +55,7 @@ test_that("persistent HTTP errors only get single attempt", { }) test_that("can retry a transient error", { - skip_on_covr() - app <- webfakes::new_app() - - app$get("/retry", function(req, res) { + req <- local_app_request(function(req, res) { i <- res$app$locals$i %||% 1 if (i == 1) { res$app$locals$i <- 2 @@ -70,10 +67,7 @@ test_that("can retry a transient error", { res$send_json(list(status = "done")) } }) - - server <- webfakes::local_app_process(app) - req <- request(server$url("/retry")) %>% - req_retry(max_tries = 2) + req <- req_retry(req, max_tries = 2) cnd <- catch_cnd(resp <- req_perform(req), "httr2_retry") expect_s3_class(cnd, "httr2_retry") diff --git a/tests/testthat/test-resp-stream.R b/tests/testthat/test-resp-stream.R new file mode 100644 index 00000000..718b824a --- /dev/null +++ b/tests/testthat/test-resp-stream.R @@ -0,0 +1,297 @@ + +test_that("can stream bytes from a connection", { + resp <- request_test("/stream-bytes/2048") %>% req_perform_connection() + withr::defer(close(resp)) + + expect_s3_class(resp, "httr2_response") + expect_true(resp_has_body(resp)) + + out <- resp_stream_raw(resp, 1) + expect_length(out, 1024) + + out <- resp_stream_raw(resp, 1) + expect_length(out, 1024) + + out <- resp_stream_raw(resp, 1) + expect_length(out, 0) +}) + +test_that("can't read from a closed connection", { + resp <- request_test("/stream-bytes/1024") %>% req_perform_connection() + close(resp) + + expect_false(resp_has_body(resp)) + expect_snapshot(resp_stream_raw(resp, 1), error = TRUE) + + # and no error if we try to close it again + expect_no_error(close(resp)) +}) + +test_that("can join lines across multiple reads", { + req <- local_app_request(function(req, res) { + res$send_chunk("This is a ") + Sys.sleep(0.2) + res$send_chunk("complete sentence.\n") + }) + + # Non-blocking returns NULL until data is ready + resp1 <- req_perform_connection(req, blocking = FALSE) + withr::defer(close(resp1)) + + out <- resp_stream_lines(resp1) + expect_equal(out, character()) + expect_equal(resp1$cache$push_back, charToRaw("This is a ")) + + while(length(out) == 0) { + Sys.sleep(0.1) + out <- resp_stream_lines(resp1) + } + expect_equal(out, "This is a complete sentence.") +}) + +test_that("handles line endings of multiple kinds", { + req <- local_app_request(function(req, res) { + res$set_header("Content-Type", "text/plain; charset=Shift_JIS") + res$send_chunk(as.raw(c(0x82, 0xA0, 0x0A))) + Sys.sleep(0.1) + res$send_chunk("crlf\r\n") + Sys.sleep(0.1) + res$send_chunk("lf\n") + Sys.sleep(0.1) + res$send_chunk("cr\r") + Sys.sleep(0.1) + res$send_chunk("half line/") + Sys.sleep(0.1) + res$send_chunk("other half\n") + Sys.sleep(0.1) + res$send_chunk("broken crlf\r") + Sys.sleep(0.1) + res$send_chunk("\nanother line\n") + Sys.sleep(0.1) + res$send_chunk("eof without line ending") + }) + + resp1 <- req_perform_connection(req, blocking = TRUE) + withr::defer(close(resp1)) + + for (expected in c("\u3042", "crlf", "lf", "cr", "half line/other half", "broken crlf", "another line")) { + rlang::inject(expect_equal(resp_stream_lines(resp1), !!expected)) + } + expect_warning( + expect_equal(resp_stream_lines(resp1), "eof without line ending"), + "incomplete final line" + ) + expect_identical(resp_stream_lines(resp1), character(0)) + + # Same test, but now, non-blocking + resp2 <- req_perform_connection(req, blocking = FALSE) + withr::defer(close(resp2)) + + for (expected in c("\u3042", "crlf", "lf", "cr", "half line/other half", "broken crlf", "another line")) { + repeat { + out <- resp_stream_lines(resp2) + if (length(out) > 0) { + rlang::inject(expect_equal(out, !!expected)) + break + } + } + } + expect_warning( + repeat { + out <- resp_stream_lines(resp2) + if (length(out) > 0) { + expect_equal(out, "eof without line ending") + break + } + }, + "incomplete final line" + ) +}) + +test_that("streams the specified number of lines", { + req <- local_app_request(function(req, res) { + res$send_chunk(paste0(letters[1:5], "\n", collapse = "")) + }) + + resp1 <- req_perform_connection(req, blocking = TRUE) + withr::defer(close(resp1)) + expect_equal( + resp_stream_lines(resp1, 3), + c("a", "b", "c") + ) + expect_equal( + resp_stream_lines(resp1, 3), + c("d", "e") + ) + expect_equal( + resp_stream_lines(resp1, 3), + character() + ) + + resp2 <- req_perform_connection(req, blocking = FALSE) + withr::defer(close(resp2)) + Sys.sleep(0.2) + expect_equal( + resp_stream_lines(resp2, 3), + c("a", "b", "c") + ) + expect_equal( + resp_stream_lines(resp2, 3), + c("d", "e") + ) + expect_equal( + resp_stream_lines(resp2, 3), + character() + ) +}) + +test_that("can feed sse events one at a time", { + req <- local_app_request(function(req, res) { + for(i in 1:3) { + res$send_chunk(sprintf("data: %s\n\n", i)) + } + }) + resp <- req_perform_connection(req) + withr::defer(close(resp)) + + expect_equal( + resp_stream_sse(resp), + list(type = "message", data = "1", id = character()) + ) + expect_equal( + resp_stream_sse(resp), + list(type = "message", data = "2", id = character()) + ) + resp_stream_sse(resp) + + expect_equal(resp_stream_sse(resp), NULL) +}) + +test_that("can join sse events across multiple reads", { + req <- local_app_request(function(req, res) { + res$send_chunk("data: 1\n") + Sys.sleep(0.2) + res$send_chunk("data") + Sys.sleep(0.2) + res$send_chunk(": 2\n") + res$send_chunk("\ndata: 3\n\n") + }) + + # Non-blocking returns NULL until data is ready + resp1 <- req_perform_connection(req, blocking = FALSE) + withr::defer(close(resp1)) + + out <- resp_stream_sse(resp1) + expect_equal(out, NULL) + expect_equal(resp1$cache$push_back, charToRaw("data: 1\n")) + + while(is.null(out)) { + Sys.sleep(0.1) + out <- resp_stream_sse(resp1) + } + expect_equal(out, list(type = "message", data = c("1", "2"), id = character())) + expect_equal(resp1$cache$push_back, charToRaw("data: 3\n\n")) + out <- resp_stream_sse(resp1) + expect_equal(out, list(type = "message", data = "3", id = character())) + + # Blocking waits for a complete event + resp2 <- req_perform_connection(req) + withr::defer(close(resp2)) + + out <- resp_stream_sse(resp2) + expect_equal(out, list(type = "message", data = c("1", "2"), id = character())) +}) + +test_that("sse always interprets data as UTF-8", { + req <- local_app_request(function(req, res) { + res$send_chunk("data: \xE3\x81\x82\r\n\r\n") + }) + + withr::with_locale(c(LC_CTYPE = "C"), { + # Non-blocking returns NULL until data is ready + resp1 <- req_perform_connection(req, blocking = FALSE) + withr::defer(close(resp1)) + + out <- NULL + while(is.null(out)) { + Sys.sleep(0.1) + out <- resp_stream_sse(resp1) + } + + s <- "\xE3\x81\x82" + Encoding(s) <- "UTF-8" + expect_equal(out, list(type = "message", data = s, id = character())) + expect_equal(Encoding(out$data), "UTF-8") + expect_equal(resp1$cache$push_back, raw()) + }) +}) + +test_that("streaming size limits enforced", { + req <- local_app_request(function(req, res) { + data_size <- 1000 + data <- paste(rep_len("0", data_size), collapse = "") + res$send_chunk(data) + }) + + resp1 <- req_perform_connection(req, blocking = FALSE) + withr::defer(close(resp1)) + expect_error( + while(is.null(out)) { + Sys.sleep(0.1) + out <- resp_stream_sse(resp1, max_size = 999) + } + ) + + resp2 <- req_perform_connection(req, blocking = TRUE) + withr::defer(close(resp2)) + expect_error( + out <- resp_stream_sse(resp2, max_size = 999) + ) + + resp3 <- req_perform_connection(req, blocking = TRUE) + withr::defer(close(resp3)) + expect_error( + out <- resp_stream_lines(resp3, max_size = 999) + ) +}) + +test_that("has a working find_event_boundary", { + boundary_test <- function(x, matched, remaining) { + buffer <- charToRaw(x) + split_at <- find_event_boundary(buffer) + result <- if (is.null(split_at)) { + NULL + } else { + split_buffer(buffer, split_at) + } + expect_identical( + result, + list(matched=charToRaw(matched), remaining = charToRaw(remaining)) + ) + } + + # Basic matches + boundary_test("\r\r", matched = "\r\r", remaining = "") + boundary_test("\n\n", matched = "\n\n", remaining = "") + boundary_test("\r\n\r\n", matched = "\r\n\r\n", remaining = "") + boundary_test("a\r\r", matched = "a\r\r", remaining = "") + boundary_test("a\n\n", matched = "a\n\n", remaining = "") + boundary_test("a\r\n\r\n", matched = "a\r\n\r\n", remaining = "") + boundary_test("\r\ra", matched = "\r\r", remaining = "a") + boundary_test("\n\na", matched = "\n\n", remaining = "a") + boundary_test("\r\n\r\na", matched = "\r\n\r\n", remaining = "a") + + # Matches the first boundary found + boundary_test("\r\r\r", matched = "\r\r", remaining = "\r") + boundary_test("\r\r\r\r", matched = "\r\r", remaining = "\r\r") + boundary_test("\n\n\r\r", matched = "\n\n", remaining = "\r\r") + boundary_test("\r\r\n\n", matched = "\r\r", remaining = "\n\n") + + # Non-matches + expect_null(find_event_boundary(charToRaw("\n\r\n\r"))) + expect_null(find_event_boundary(charToRaw("hello\ngoodbye\n"))) + expect_null(find_event_boundary(charToRaw(""))) + expect_null(find_event_boundary(charToRaw("1"))) + expect_null(find_event_boundary(charToRaw("12"))) + expect_null(find_event_boundary(charToRaw("\r\n\r"))) +}) diff --git a/tests/testthat/test-utils.R b/tests/testthat/test-utils.R index 20e3e19c..d13d8214 100644 --- a/tests/testthat/test-utils.R +++ b/tests/testthat/test-utils.R @@ -32,3 +32,27 @@ test_that("respects httr verbose config", { test_that("progress bar suppressed in tests", { expect_snapshot(sys_sleep(0.1, "in test")) }) + + +test_that("has a working slice", { + x <- letters[1:5] + expect_identical(slice(x), x) + expect_identical(slice(x, 1, length(x) + 1), x) + + # start is inclusive, end is exclusive + expect_identical(slice(x, 1, length(x)), head(x, -1)) + # zero-length slices are fine + expect_identical(slice(x, 1, 1), character()) + # starting off the end is fine + expect_identical(slice(x, length(x) + 1), character()) + expect_identical(slice(x, length(x) + 1, length(x) + 1), character()) + # slicing zero-length is fine + expect_identical(slice(character()), character()) + + # out of bounds + expect_error(slice(x, 0, 1)) + expect_error(slice(x, length(x) + 2)) + expect_error(slice(x, end = length(x) + 2)) + # end too small relative to start + expect_error(slice(x, 2, 1)) +})