Skip to content

Commit

Permalink
Add json parsing methods to LuaBody and LuaResponse classes
Browse files Browse the repository at this point in the history
  • Loading branch information
khvzak committed Mar 19, 2024
1 parent f486094 commit 0981869
Show file tree
Hide file tree
Showing 6 changed files with 281 additions and 90 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions casper-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ http = "0.2" # used by opentelemetry-http
itertools = "0.12"
linked-hash-map = "0.5.4"
log = "0.4"
mime = "0.3.17"
mini-moka = "0.10"
moka = { version = "0.12", features = ["future"] }
ntex = { version = "1.1.0", features = ["tokio", "openssl"] }
Expand Down
155 changes: 115 additions & 40 deletions casper-server/src/lua/http/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ use std::fmt;
use std::mem;
use std::rc::Rc;
use std::task::{Context, Poll};
use std::time::Duration;
use std::time::{Duration, Instant};

use futures::{Stream, TryStreamExt};
use mlua::{
AnyUserData, ExternalError, FromLua, Lua, OwnedAnyUserData, Result as LuaResult,
String as LuaString, UserData, Value,
AnyUserData, Error as LuaError, ErrorContext as _, ExternalError, FromLua, IntoLua, Lua,
OwnedAnyUserData, Result as LuaResult, String as LuaString, UserData, Value,
};
use ntex::http::body::{self, BodySize, BoxedBodyStream, MessageBody, ResponseBody, SizedStream};
use ntex::http::Payload;
Expand All @@ -17,10 +17,13 @@ use tokio::time;
use tracing::error;

use crate::http::buffer_body;
use crate::lua::json::JsonObject;

// TODO: Limit number of fetched bytes

#[derive(Default)]
pub enum LuaBody {
#[default]
None,
Bytes(Bytes),
Body {
Expand Down Expand Up @@ -53,35 +56,49 @@ impl LuaBody {
}
}

/// Buffers the whole body into memory and returns the buffered data.
pub async fn buffer(&mut self) -> LuaResult<Option<Bytes>> {
match self {
/// Reads the whole body into memory and returns the buffered data.
/// The body is consumed and cannot be read again.
pub async fn read(&mut self) -> LuaResult<Option<Bytes>> {
let timeout = self.timeout();
match mem::take(self) {
LuaBody::None => Ok(None),
LuaBody::Bytes(bytes) => Ok(Some(bytes.clone())),
LuaBody::Bytes(bytes) => Ok(Some(bytes)),
body => {
let tmp_body = mem::replace(body, LuaBody::None);
let buffer_fut = buffer_body(tmp_body);
let res = match body.timeout() {
let buffer_fut = buffer_body(body);
let res = match timeout {
Some(timeout) => time::timeout(timeout, buffer_fut).await,
None => Ok(buffer_fut.await),
};
match res {
Ok(Ok(bytes)) => {
*body = LuaBody::Bytes(bytes.clone());
Ok(Some(bytes))
}
Ok(Err(err)) => {
*body = LuaBody::None;
Err(err.to_string().into_lua_err())
}
Err(err) => {
*body = LuaBody::None;
Err(err.into_lua_err())
}
}
res.map_err(|_| LuaError::external("timeout reading body"))?
.map(Some)
.map_err(|err| LuaError::external(err.to_string()))
}
}
}

/// Buffers the whole body into memory and returns the buffered data.
/// The data is not consumed and can be read again.
pub async fn buffer(&mut self) -> LuaResult<Option<Bytes>> {
match self {
LuaBody::None => Ok(None),
LuaBody::Bytes(bytes) => Ok(Some(bytes.clone())),
_ => Ok(self.read().await?.map(|b| {
*self = LuaBody::Bytes(b.clone());
b
})),
}
}

/// Buffers the whole body and parses it as JSON.
pub async fn json(&mut self) -> LuaResult<serde_json::Value> {
let bytes = self
.buffer()
.await?
.ok_or_else(|| LuaError::external("body is empty"))?;
serde_json::from_slice(&bytes)
.map_err(LuaError::external)
.context("failed to parse JSON body")
}
}

pub enum EitherBody {
Expand All @@ -107,13 +124,20 @@ impl Default for EitherBody {
}
}

macro_rules! borrow_body {
($ud:expr) => {
$ud.borrow_mut::<LuaBody>()
.expect("Failed to borrow body from Lua UserData")
};
}

impl EitherBody {
pub(crate) fn as_userdata<'lua>(&mut self, lua: &'lua Lua) -> LuaResult<AnyUserData<'lua>> {
#[allow(clippy::wrong_self_convention)]
pub(crate) fn to_userdata<'lua>(&mut self, lua: &'lua Lua) -> LuaResult<AnyUserData<'lua>> {
match self {
EitherBody::Body(tmp_body) => {
let body = mem::replace(tmp_body, LuaBody::None);
// Move body to Lua registry
let lua_body = lua.create_userdata(body)?;
let lua_body = lua.create_userdata(mem::take(tmp_body))?;
*self = EitherBody::UserData(lua_body.clone().into_owned());
Ok(lua_body)
}
Expand All @@ -124,16 +148,26 @@ impl EitherBody {
}
}

pub(crate) fn set_timeout(&mut self, dur: Option<Duration>) {
match self {
EitherBody::Body(body) => body.set_timeout(dur),
EitherBody::UserData(ud) => borrow_body!(ud).set_timeout(dur),
}
}

#[allow(clippy::await_holding_refcell_ref)]
pub(crate) async fn buffer(&mut self) -> LuaResult<Option<Bytes>> {
match self {
EitherBody::Body(body) => body.buffer().await,
EitherBody::UserData(ud) => {
let mut body = ud
.borrow_mut::<LuaBody>()
.expect("Failed to borrow body from Lua UserData");
body.buffer().await
}
EitherBody::UserData(ud) => borrow_body!(ud).buffer().await,
}
}

#[allow(clippy::await_holding_refcell_ref)]
pub(crate) async fn json(&mut self) -> LuaResult<serde_json::Value> {
match self {
EitherBody::Body(body) => body.json().await,
EitherBody::UserData(ud) => borrow_body!(ud).json().await,
}
}
}
Expand Down Expand Up @@ -354,9 +388,8 @@ impl UserData for LuaBody {
// Reads the body
// Returns `bytes` (userdata) or `nil, error`
methods.add_async_method_mut("read", |lua, this, ()| async move {
let bytes = lua_try!(this.buffer().await);
let bytes = lua_try!(this.read().await);
let data = bytes.map(|b| lua.create_any_userdata(b)).transpose()?;
*this = LuaBody::None; // Drop saved data
Ok(Ok(data))
});

Expand All @@ -372,12 +405,12 @@ impl UserData for LuaBody {
let next_chunk = futures::future::poll_fn(|cx| this.poll_next_chunk(cx));
let bytes = match timeout {
Some(timeout) => {
let start = time::Instant::now();
let start = Instant::now();
let bytes = match time::timeout(timeout, next_chunk).await {
Ok(res) => res,
Err(err) => {
Err(_) => {
this.set_timeout(Some(Duration::new(0, 0)));
return Ok(Err(err.to_string()));
return Ok(Err("timeout reading body".to_string()));
}
};
this.set_timeout(Some(timeout.saturating_sub(start.elapsed())));
Expand All @@ -400,6 +433,12 @@ impl UserData for LuaBody {
Ok(Ok(data))
});

// Buffers the body into memory (if not already) and parses it as JSON
methods.add_async_method_mut("json", |lua, this, ()| async move {
let json = lua_try!(this.json().await);
Ok(Ok(JsonObject::from(json).into_lua(lua)?))
});

methods.add_async_method_mut("to_string", |lua, this, ()| async move {
let bytes = lua_try!(this.buffer().await);
let data = bytes.map(|b| lua.create_string(&b)).transpose()?;
Expand Down Expand Up @@ -449,7 +488,7 @@ mod tests {
assert($body:to_string() == "hello, world")
// Read must consume body
assert($body:read():to_string() == "hello, world")
assert($body:read() == nil)
assert($body:read() == nil, "read must consume body")
assert($body:data() == nil)
})
.exec_async()
Expand Down Expand Up @@ -644,7 +683,7 @@ mod tests {
local reader = $body:reader()
assert(reader():to_string() == "hello")
local _, err = reader()
assert(err:find("deadline") ~= nil)
assert(err:find("timeout") ~= nil)
// Reset timeout and try again
$body:set_timeout(0.010)
assert(reader():to_string() == ", ")
Expand All @@ -655,4 +694,40 @@ mod tests {

Ok(())
}

#[ntex::test]
async fn test_body_json() -> LuaResult<()> {
let lua = Lua::new();
super::super::super::bytes::register_types(&lua)?;

let body = LuaBody::from(r#"{"hello": "world"}"#);
lua.load(chunk! {
local json = $body:json()
assert(typeof(json) == "JsonObject", "variable is not JsonObject")
assert(json.hello == "world", "`json.hello` is not 'world'")
assert($body:json() ~= nil, "`json()` method must not consume body")
})
.exec_async()
.await
.unwrap();

// Test timeout while reading json
let chunks: Vec<Result<_, Box<dyn StdError>>> = vec![
Ok("{\"hello\"".into()),
Ok(":".into()),
Ok("\"world\"}".into()),
];
let stream = stream::iter(chunks).throttle(Duration::from_millis(15));
let mut body = LuaBody::from(BoxedBodyStream::new(Box::pin(stream)));
body.set_timeout(Some(Duration::from_millis(20)));
lua.load(chunk! {
local json, err = $body:json()
assert(json == nil and err == "timeout reading body", "json must fail with timeout")
})
.exec_async()
.await
.unwrap();

Ok(())
}
}
2 changes: 1 addition & 1 deletion casper-server/src/lua/http/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ impl UserData for LuaRequest {

fields.add_field_function_get("body", |lua, this| {
let mut this = this.borrow_mut::<Self>()?;
this.body_mut().as_userdata(lua)
this.body_mut().to_userdata(lua)
});
}

Expand Down
Loading

0 comments on commit 0981869

Please sign in to comment.