Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Manage our own pushback state #535

Merged
merged 22 commits into from
Sep 6, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 111 additions & 32 deletions R/req-perform-stream.R
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@
#' want to do other work in between handling inputs from the stream.
#'
#' @inheritParams req_perform_stream
#' @param mode The mode that should be used for opening the connection.
#' @param blocking When retrieving data, should the connection block and wait
#' for the desired information or immediately return what it has (possibly
#' nothing)?
Expand All @@ -115,13 +114,9 @@
#'
#' # Always close the response when you're done
#' close(resp)
req_perform_connection <- function(req,
mode = c("binary", "text"),
blocking = TRUE) {
req_perform_connection <- function(req, blocking = TRUE) {
check_request(req)
check_bool(blocking)
mode <- arg_match(mode)
con_mode <- if (mode == "text") "rf" else "rbf"

handle <- req_handle(req)
the$last_request <- req
Expand All @@ -137,7 +132,7 @@
if (!is.null(resp)) {
close(resp$body)
}
resp <- req_perform_connection1(req, handle, con_mode, blocking = blocking)
resp <- req_perform_connection1(req, handle, blocking = blocking)

Check warning on line 135 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L135

Added line #L135 was not covered by tests

if (retry_is_transient(req, resp)) {
tries <- tries + 1
Expand All @@ -159,11 +154,11 @@
resp
}

req_perform_connection1 <- function(req, handle, con_mode = "rbf", blocking = TRUE) {
req_perform_connection1 <- function(req, handle, blocking = TRUE) {
stream <- curl::curl(req$url, handle = handle)

# Must open the stream in order to initiate the connection
open(stream, con_mode, blocking = blocking)
open(stream, "rbf", blocking = blocking)

Check warning on line 161 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L161

Added line #L161 was not covered by tests
curl_data <- curl::handle_data(handle)

new_response(
Expand Down Expand Up @@ -214,39 +209,116 @@
readLines(conn, n = lines)
}

# Slices the vector using the only sane semantics: start inclusive, end
# exclusive.
#
# * Allows start == end, which means return no elements.
# * Allows start == length(vector) + 1, which means return no elements.
# * Allows zero-length vectors.
#
# Otherwise, slice() is quite strict about what it allows start/end to be: no
# negatives, no reversed order.
slice <- function(vector, start = 1, end = length(vector) + 1) {
stopifnot(start > 0)
stopifnot(start <= length(vector) + 1)
stopifnot(end > 0)
stopifnot(end <= length(vector) + 1)
stopifnot(end >= start)

if (start == end) {
vector[FALSE] # Return an empty vector of the same type
} else {
vector[start:(end - 1)]
}
}

# Function to find the first double line ending in a buffer, or NULL if no
# double line ending is found
#
# Example:
# find_event_boundary(charToRaw("data: 1\n\nid: 12345"))
# Returns:
# list(
# matched = charToRaw("data: 1\n\n"),
# remaining = charToRaw("id: 12345")
# )
find_event_boundary <- function(buffer) {
if (length(buffer) < 2) {
return(NULL)
}

# leftX means look behind by X bytes. For example, left1[2] equals buffer[1].
# Any attempt to read past the beginning of the buffer results in 0x00.
left1 <- c(0x00, head(buffer, -1))
jcheng5 marked this conversation as resolved.
Show resolved Hide resolved
left2 <- c(0x00, head(left1, -1))
left3 <- c(0x00, head(left2, -1))

boundary_end <- which(
(left1 == 0x0A & buffer == 0x0A) | # \n\n
(left1 == 0x0D & buffer == 0x0D) | # \r\r
(left3 == 0x0D & left2 == 0x0A & left1 == 0x0D & buffer == 0x0A) # \r\n\r\n
)

if (length(boundary_end) == 0) {
return(NULL) # No event boundary found
}

boundary_end <- boundary_end[1] # Take the first occurrence
split_at <- boundary_end + 1 # Split at one after the boundary

# Return a list with the event data and the remaining buffer
list(
matched = slice(buffer, end = split_at),
remaining = slice(buffer, start = split_at)
)
}

#' @export
#' @rdname resp_stream_raw
# TODO: max_size
jcheng5 marked this conversation as resolved.
Show resolved Hide resolved
resp_stream_sse <- function(resp) {
check_streaming_response(resp)
conn <- resp$body
if (!identical(summary(conn)$text, "text")) {
cli::cli_abort(c(
"{.arg resp} must have a text mode connection.",
i = 'Use {.code mode = "text"} when calling {.fn req_perform_connection}.'
))

# Grab data left over from last resp_stream_sse() call (if any)
buffer <- resp$cache$push_back %||% raw()
resp$cache$push_back <- raw()

Check warning on line 284 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L283-L284

Added lines #L283 - L284 were not covered by tests

print_buffer <- function(buf, label) {

Check warning on line 286 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L286

Added line #L286 was not covered by tests
# cat(label, ":", paste(sprintf("%02X", as.integer(buf)), collapse = " "), "\n", file = stderr())
}

lines <- character(0)
# Read chunks until we find an event or reach the end of input
while (TRUE) {
jcheng5 marked this conversation as resolved.
Show resolved Hide resolved
line <- readLines(conn, n = 1)
if (length(line) == 0) {
break
# Try to find an event boundary using the data we have
print_buffer(buffer, "Buffer to parse")
result <- find_event_boundary(buffer)

Check warning on line 294 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L293-L294

Added lines #L293 - L294 were not covered by tests

if (!is.null(result)) {

Check warning on line 296 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L296

Added line #L296 was not covered by tests
# We found a complete event
print_buffer(result$matched, "Event data")
print_buffer(result$remaining, "Remaining buffer")
resp$cache$push_back <- result$remaining
return(parse_event(result$matched))

Check warning on line 301 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L298-L301

Added lines #L298 - L301 were not covered by tests
}
if (line == "") {
# \n\n detected, end of event
return(parse_event(lines))

# We didn't have enough data. Attempt to read more
chunk_size <- 1024
chunk <- readBin(resp$body, raw(), n = chunk_size)

Check warning on line 306 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L305-L306

Added lines #L305 - L306 were not covered by tests

print_buffer(chunk, "Received chunk")

Check warning on line 308 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L308

Added line #L308 was not covered by tests

# If we've reached the end of input, store the buffer and return NULL
if (length(chunk) == 0) {
jcheng5 marked this conversation as resolved.
Show resolved Hide resolved
print_buffer(buffer, "Storing incomplete buffer")
resp$cache$push_back <- buffer
return(NULL)

Check warning on line 314 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L311-L314

Added lines #L311 - L314 were not covered by tests
}
lines <- c(lines, line)
}

if (length(lines) > 0) {
# We have a partial event, put it back while we wait
# for more
pushBack(lines, conn)

# More data was received; combine it with existing buffer and continue the
# loop to try parsing again
buffer <- c(buffer, chunk)
print_buffer(buffer, "Combined buffer")

Check warning on line 320 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L319-L320

Added lines #L319 - L320 were not covered by tests
}

return(NULL)
}

#' @export
Expand Down Expand Up @@ -300,7 +372,14 @@
}


parse_event <- function(lines) {
parse_event <- function(event_data) {
# always treat event_data as UTF-8, it's in the spec
str_data <- rawToChar(event_data)
Encoding(str_data) <- "UTF-8"

Check warning on line 378 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L377-L378

Added lines #L377 - L378 were not covered by tests

# The spec says \r\n, \r, and \n are all valid separators
lines <- strsplit(str_data, "\r\n|\r|\n")[[1]]

Check warning on line 381 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L381

Added line #L381 was not covered by tests

m <- regexec("([^:]*)(: ?)?(.*)", lines)
matches <- regmatches(lines, m)
keys <- c("event", vapply(matches, function(x) x[2], character(1)))
Expand Down
4 changes: 1 addition & 3 deletions man/req_perform_connection.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 0 additions & 9 deletions tests/testthat/_snaps/req-perform-stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,6 @@
Error in `resp_stream_raw()`:
! `resp` has already been closed.

# resp_stream_sse() requires a text connection

Code
resp_stream_sse(resp)
Condition
Error in `resp_stream_sse()`:
! `resp` must have a text mode connection.
i Use `mode = "text"` when calling `req_perform_connection()`.

# req_perform_stream checks its inputs

Code
Expand Down
134 changes: 127 additions & 7 deletions tests/testthat/test-req-perform-stream.R
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ test_that("req_stream() is deprecated", {

test_that("can stream bytes from a connection", {
resp <- request_test("/stream-bytes/2048") %>% req_perform_connection()
on.exit(close(resp))
withr::defer(close(resp))

expect_s3_class(resp, "httr2_response")
expect_true(resp_has_body(resp))
Expand All @@ -26,6 +26,7 @@ test_that("can stream bytes from a connection", {

test_that("can read all data from a connection", {
resp <- request_test("/stream-bytes/2048") %>% req_perform_connection()
withr::defer(close(resp))

out <- resp_body_raw(resp)
expect_length(out, 2048)
Expand Down Expand Up @@ -55,8 +56,8 @@ test_that("can feed sse events one at a time", {

server <- webfakes::local_app_process(app)
req <- request(server$url("/events"))
resp <- req_perform_connection(req, mode = "text")
on.exit(close(resp))
resp <- req_perform_connection(req)
withr::defer(close(resp))

expect_equal(
resp_stream_sse(resp),
Expand All @@ -71,11 +72,130 @@ test_that("can feed sse events one at a time", {
expect_equal(resp_stream_sse(resp), NULL)
})

test_that("resp_stream_sse() requires a text connection", {
resp <- request_test("/stream-bytes/1024") %>% req_perform_connection()
on.exit(close(resp))
test_that("can join sse events across multiple reads", {
skip_on_covr()
app <- webfakes::new_app()

app$get("/events", function(req, res) {
res$send_chunk("data: 1\n")
Sys.sleep(0.2)
res$send_chunk("data")
Sys.sleep(0.2)
res$send_chunk(": 2\n")
res$send_chunk("\ndata: 3\n\n")
})
server <- webfakes::local_app_process(app)
req <- request(server$url("/events"))

# Non-blocking returns NULL until data is ready
resp1 <- req_perform_connection(req, blocking = FALSE)
withr::defer(close(resp1))

out <- resp_stream_sse(resp1)
expect_equal(out, NULL)
expect_equal(resp1$cache$push_back, charToRaw("data: 1\n"))

while(is.null(out)) {
Sys.sleep(0.1)
out <- resp_stream_sse(resp1)
}
expect_equal(out, list(type = "message", data = c("1", "2"), id = character()))
expect_equal(resp1$cache$push_back, charToRaw("data: 3\n\n"))
out <- resp_stream_sse(resp1)
expect_equal(out, list(type = "message", data = "3", id = character()))

# Blocking waits for a complete event
resp2 <- req_perform_connection(req)
withr::defer(close(resp2))

out <- resp_stream_sse(resp2)
expect_equal(out, list(type = "message", data = c("1", "2"), id = character()))
})

test_that("always interprets data as UTF-8", {
skip_on_covr()
app <- webfakes::new_app()

app$get("/events", function(req, res) {
res$send_chunk("data: \xE3\x81\x82\r\n\r\n")
})
server <- webfakes::local_app_process(app)
req <- request(server$url("/events"))

withr::with_locale(c(LC_CTYPE = "C"), {
# Non-blocking returns NULL until data is ready
resp1 <- req_perform_connection(req, blocking = FALSE)
withr::defer(close(resp1))

out <- NULL
while(is.null(out)) {
Sys.sleep(0.1)
out <- resp_stream_sse(resp1)
}

s <- "\xE3\x81\x82"
Encoding(s) <- "UTF-8"
expect_equal(out, list(type = "message", data = s, id = character()))
expect_equal(Encoding(out$data), "UTF-8")
expect_equal(resp1$cache$push_back, raw())
})
})

test_that("has a working find_event_boundary", {
boundary_test <- function(x, matched, remaining) {
expect_identical(
find_event_boundary(charToRaw(x)),
list(matched=charToRaw(matched), remaining = charToRaw(remaining))
)
}

# Basic matches
boundary_test("\r\r", matched = "\r\r", remaining = "")
boundary_test("\n\n", matched = "\n\n", remaining = "")
boundary_test("\r\n\r\n", matched = "\r\n\r\n", remaining = "")
boundary_test("a\r\r", matched = "a\r\r", remaining = "")
boundary_test("a\n\n", matched = "a\n\n", remaining = "")
boundary_test("a\r\n\r\n", matched = "a\r\n\r\n", remaining = "")
boundary_test("\r\ra", matched = "\r\r", remaining = "a")
boundary_test("\n\na", matched = "\n\n", remaining = "a")
boundary_test("\r\n\r\na", matched = "\r\n\r\n", remaining = "a")

# Matches the first boundary found
boundary_test("\r\r\r", matched = "\r\r", remaining = "\r")
boundary_test("\r\r\r\r", matched = "\r\r", remaining = "\r\r")
boundary_test("\n\n\r\r", matched = "\n\n", remaining = "\r\r")
boundary_test("\r\r\n\n", matched = "\r\r", remaining = "\n\n")

# Non-matches
expect_null(find_event_boundary(charToRaw("\n\r\n\r")))
expect_null(find_event_boundary(charToRaw("hello\ngoodbye\n")))
expect_null(find_event_boundary(charToRaw("")))
expect_null(find_event_boundary(charToRaw("1")))
expect_null(find_event_boundary(charToRaw("12")))
expect_null(find_event_boundary(charToRaw("\r\n\r")))
})

expect_snapshot(resp_stream_sse(resp), error = TRUE)
test_that("has a working slice", {
x <- letters[1:5]
expect_identical(slice(x), x)
expect_identical(slice(x, 1, length(x) + 1), x)

# start is inclusive, end is exclusive
expect_identical(slice(x, 1, length(x)), head(x, -1))
# zero-length slices are fine
expect_identical(slice(x, 1, 1), character())
# starting off the end is fine
expect_identical(slice(x, length(x) + 1), character())
expect_identical(slice(x, length(x) + 1, length(x) + 1), character())
# slicing zero-length is fine
expect_identical(slice(character()), character())

# out of bounds
expect_error(slice(x, 0, 1))
expect_error(slice(x, length(x) + 2))
expect_error(slice(x, end = length(x) + 2))
# end too small relative to start
expect_error(slice(x, 2, 1))
})

# req_perform_stream() --------------------------------------------------------
Expand Down
Loading