Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

runtimes/js: don't destroy stream after done #1376

Merged
merged 1 commit into from
Sep 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions runtimes/js/src/raw_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,10 @@ impl ResponseWriterState {

let resp = match resp.body(Body::from_stream(read)) {
Ok(resp) => resp,
Err(err) => return Err((Self::Done, err.into())),
Err(err) => {
::log::error!(err = err.to_string(); "failed to set raw response body to flush header");
return Err((Self::Done, err.into()));
}
};

let _ = sender.send(resp);
Expand All @@ -117,7 +120,10 @@ impl ResponseWriterState {
};
let resp = match resp.body(body) {
Ok(resp) => resp,
Err(err) => return Err((Self::Done, err.into())),
Err(err) => {
::log::error!(err = err.to_string(); "failed to close raw response body");
return Err((Self::Done, err.into()));
}
};
let _ = sender.send(resp);
Ok(Self::Done)
Expand Down Expand Up @@ -367,8 +373,8 @@ impl api::BoxedHandler for JSRawHandler {
match resp {
Ok(resp) => resp,
Err(_) => {
let err = api::Error::internal(anyhow::anyhow!("handler did not respond"));
err.into_response()
let err_resp = api::Error::internal(anyhow::anyhow!("handler did not respond"));
err_resp.into_response()
}
}
}
Expand All @@ -380,8 +386,8 @@ impl api::BoxedHandler for JSRawHandler {
match body_rx.await {
Ok(resp) => resp,
Err(_) => {
let err = api::Error::internal(anyhow::anyhow!("handler did not respond"));
err.into_response()
let err_resp = api::Error::internal(anyhow::anyhow!("handler did not respond"));
err_resp.into_response()
}
}
}
Expand Down
20 changes: 14 additions & 6 deletions runtimes/js/src/stream/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,15 +108,23 @@ where
match self.stream.next().await.transpose() {
Ok(data) => {
let is_none = data.is_none();
let push_result = self.push(data).await;
if is_none {
self.notify_close();
return;
}
match self.push(data).await {
// The stream successfully ended.
Ok(_) if is_none => {
// Note: don't notify_close; the node:stream API will handle
// automatically closing the stream when the data has actually been read.
// If we close here we end up destroying the stream too early, preventing
// the last data from being read.
return;
}

match push_result {
// We haven't reached the high water mark yet; continue pushing data.
Ok(true) => continue 'PushLoop,

// We've reached the high water mark; wait for the next read request.
Ok(false) => continue 'ReadRequestLoop,

// Something went wrong communicating with node. Close the stream with an error.
Err(err) => {
self.notify_err(err);
return;
Expand Down
Loading