Skip to content

Commit

Permalink
refactor(bin): use 32k stream IO buffer (#2008)
Browse files Browse the repository at this point in the history
* refactor(bin): use 32k stream IO buffer

Firefox by default uses a 32k IO buffer for streams.

https://searchfox.org/mozilla-central/rev/f6e3b81aac49e602f06c204f9278da30993cdc8a/modules/libpref/init/all.js#3212

This commit makes `neqo-bin` use the same buffer size across http09/3 and
client/server.

Along the way it consolidates various buffer logic and reuses buffers whereever
feasible.

* Trigger benchmarks
  • Loading branch information
mxinden committed Aug 12, 2024
1 parent fe2f0d0 commit 4a5a041
Show file tree
Hide file tree
Showing 6 changed files with 182 additions and 155 deletions.
15 changes: 10 additions & 5 deletions neqo-bin/src/client/http09.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use neqo_transport::{
use url::Url;

use super::{get_output_file, qlog_new, Args, CloseState, Res};
use crate::STREAM_IO_BUFFER_SIZE;

pub struct Handler<'a> {
streams: HashMap<StreamId, Option<BufWriter<File>>>,
Expand All @@ -34,6 +35,7 @@ pub struct Handler<'a> {
args: &'a Args,
token: Option<ResumptionToken>,
needs_key_update: bool,
read_buffer: Vec<u8>,
}

impl<'a> super::Handler for Handler<'a> {
Expand Down Expand Up @@ -203,6 +205,7 @@ impl<'b> Handler<'b> {
args,
token: None,
needs_key_update: args.key_update,
read_buffer: vec![0; STREAM_IO_BUFFER_SIZE],
}
}

Expand Down Expand Up @@ -257,25 +260,26 @@ impl<'b> Handler<'b> {
fn read_from_stream(
client: &mut Connection,
stream_id: StreamId,
read_buffer: &mut [u8],
output_read_data: bool,
maybe_out_file: &mut Option<BufWriter<File>>,
) -> Res<bool> {
let mut data = vec![0; 4096];
loop {
let (sz, fin) = client.stream_recv(stream_id, &mut data)?;
let (sz, fin) = client.stream_recv(stream_id, read_buffer)?;
if sz == 0 {
return Ok(fin);
}
let read_buffer = &read_buffer[0..sz];

if let Some(out_file) = maybe_out_file {
out_file.write_all(&data[..sz])?;
out_file.write_all(read_buffer)?;
} else if !output_read_data {
qdebug!("READ[{stream_id}]: {sz} bytes");
qdebug!("READ[{stream_id}]: {} bytes", read_buffer.len());
} else {
qdebug!(
"READ[{}]: {}",
stream_id,
String::from_utf8(data.clone()).unwrap()
std::str::from_utf8(read_buffer).unwrap()
);
}
if fin {
Expand All @@ -294,6 +298,7 @@ impl<'b> Handler<'b> {
let fin_recvd = Self::read_from_stream(
client,
stream_id,
&mut self.read_buffer,
self.args.output_read_data,
maybe_out_file,
)?;
Expand Down
34 changes: 16 additions & 18 deletions neqo-bin/src/client/http3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,14 @@ use neqo_transport::{
use url::Url;

use super::{get_output_file, qlog_new, Args, CloseState, Res};
use crate::STREAM_IO_BUFFER_SIZE;

pub struct Handler<'a> {
#[allow(clippy::struct_field_names)]
url_handler: UrlHandler<'a>,
token: Option<ResumptionToken>,
output_read_data: bool,
read_buffer: Vec<u8>,
}

impl<'a> Handler<'a> {
Expand All @@ -54,6 +56,7 @@ impl<'a> Handler<'a> {
url_handler,
token: None,
output_read_data: args.output_read_data,
read_buffer: vec![0; STREAM_IO_BUFFER_SIZE],
}
}
}
Expand Down Expand Up @@ -182,16 +185,14 @@ impl<'a> super::Handler for Handler<'a> {
qwarn!("Data on unexpected stream: {stream_id}");
}
Some(handler) => loop {
let mut data = vec![0; 4096];
let (sz, fin) = client
.read_data(Instant::now(), stream_id, &mut data)
.read_data(Instant::now(), stream_id, &mut self.read_buffer)
.expect("Read should succeed");

handler.process_data_readable(
stream_id,
fin,
data,
sz,
&self.read_buffer[..sz],
self.output_read_data,
)?;

Expand Down Expand Up @@ -245,8 +246,7 @@ trait StreamHandler {
&mut self,
stream_id: StreamId,
fin: bool,
data: Vec<u8>,
sz: usize,
data: &[u8],
output_read_data: bool,
) -> Res<bool>;
fn process_data_writable(&mut self, client: &mut Http3Client, stream_id: StreamId);
Expand Down Expand Up @@ -275,7 +275,7 @@ impl StreamHandlerType {
Self::Upload => Box::new(UploadStreamHandler {
data: vec![42; args.upload_size],
offset: 0,
chunk_size: 32768,
chunk_size: STREAM_IO_BUFFER_SIZE,
start: Instant::now(),
}),
}
Expand All @@ -297,21 +297,20 @@ impl StreamHandler for DownloadStreamHandler {
&mut self,
stream_id: StreamId,
fin: bool,
data: Vec<u8>,
sz: usize,
data: &[u8],
output_read_data: bool,
) -> Res<bool> {
if let Some(out_file) = &mut self.out_file {
if sz > 0 {
out_file.write_all(&data[..sz])?;
if !data.is_empty() {
out_file.write_all(data)?;
}
return Ok(true);
} else if !output_read_data {
qdebug!("READ[{stream_id}]: {sz} bytes");
} else if let Ok(txt) = String::from_utf8(data.clone()) {
qdebug!("READ[{stream_id}]: {} bytes", data.len());
} else if let Ok(txt) = std::str::from_utf8(data) {
qdebug!("READ[{stream_id}]: {txt}");
} else {
qdebug!("READ[{}]: 0x{}", stream_id, hex(&data));
qdebug!("READ[{}]: 0x{}", stream_id, hex(data));
}

if fin {
Expand Down Expand Up @@ -344,19 +343,18 @@ impl StreamHandler for UploadStreamHandler {
&mut self,
stream_id: StreamId,
_fin: bool,
data: Vec<u8>,
_sz: usize,
data: &[u8],
_output_read_data: bool,
) -> Res<bool> {
if let Ok(txt) = String::from_utf8(data.clone()) {
if let Ok(txt) = std::str::from_utf8(data) {
let trimmed_txt = txt.trim_end_matches(char::from(0));
let parsed: usize = trimmed_txt.parse().unwrap();
if parsed == self.data.len() {
let upload_time = Instant::now().duration_since(self.start);
qinfo!("Stream ID: {stream_id:?}, Upload time: {upload_time:?}");
}
} else {
panic!("Unexpected data [{}]: 0x{}", stream_id, hex(&data));
panic!("Unexpected data [{}]: 0x{}", stream_id, hex(data));
}
Ok(true)
}
Expand Down
5 changes: 5 additions & 0 deletions neqo-bin/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ pub mod client;
pub mod server;
pub mod udp;

/// Firefox default value
///
/// See `network.buffer.cache.size` pref <https://searchfox.org/mozilla-central/rev/f6e3b81aac49e602f06c204f9278da30993cdc8a/modules/libpref/init/all.js#3212>
const STREAM_IO_BUFFER_SIZE: usize = 32 * 1024;

#[derive(Debug, Parser)]
pub struct SharedArgs {
#[command(flatten)]
Expand Down
117 changes: 54 additions & 63 deletions neqo-bin/src/server/http09.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.

use std::{cell::RefCell, collections::HashMap, fmt::Display, rc::Rc, time::Instant};
use std::{borrow::Cow, cell::RefCell, collections::HashMap, fmt::Display, rc::Rc, time::Instant};

use neqo_common::{event::Provider, hex, qdebug, qerror, qinfo, qwarn, Datagram};
use neqo_crypto::{generate_ech_keys, random, AllowZeroRtt, AntiReplay};
Expand All @@ -15,12 +15,13 @@ use neqo_transport::{
};
use regex::Regex;

use super::{qns_read_response, Args};
use super::{qns_read_response, Args, ResponseData};
use crate::STREAM_IO_BUFFER_SIZE;

#[derive(Default)]
struct HttpStreamState {
writable: bool,
data_to_send: Option<(Vec<u8>, usize)>,
data_to_send: Option<ResponseData>,
}

pub struct HttpServer {
Expand All @@ -29,6 +30,7 @@ pub struct HttpServer {
read_state: HashMap<StreamId, Vec<u8>>,
is_qns_test: bool,
regex: Regex,
read_buffer: Vec<u8>,
}

impl HttpServer {
Expand Down Expand Up @@ -72,6 +74,7 @@ impl HttpServer {
} else {
Regex::new(r"GET +/(\d+)(?:\r)?\n").unwrap()
},
read_buffer: vec![0; STREAM_IO_BUFFER_SIZE],
})
}

Expand All @@ -87,38 +90,14 @@ impl HttpServer {
}
}

fn write(&mut self, stream_id: StreamId, data: Option<Vec<u8>>, conn: &ConnectionRef) {
let resp = data.unwrap_or_else(|| Vec::from(&b"404 That request was nonsense\r\n"[..]));
if let Some(stream_state) = self.write_state.get_mut(&stream_id) {
match stream_state.data_to_send {
None => stream_state.data_to_send = Some((resp, 0)),
Some(_) => {
qdebug!("Data already set, doing nothing");
}
}
if stream_state.writable {
self.stream_writable(stream_id, conn);
}
} else {
self.write_state.insert(
stream_id,
HttpStreamState {
writable: false,
data_to_send: Some((resp, 0)),
},
);
}
}

fn stream_readable(&mut self, stream_id: StreamId, conn: &ConnectionRef) {
if !stream_id.is_client_initiated() || !stream_id.is_bidi() {
qdebug!("Stream {} not client-initiated bidi, ignoring", stream_id);
return;
}
let mut data = vec![0; 4000];
let (sz, fin) = conn
.borrow_mut()
.stream_recv(stream_id, &mut data)
.stream_recv(stream_id, &mut self.read_buffer)
.expect("Read should succeed");

if sz == 0 {
Expand All @@ -127,67 +106,79 @@ impl HttpServer {
}
return;
}
let read_buffer = &self.read_buffer[..sz];

data.truncate(sz);
let buf = if let Some(mut existing) = self.read_state.remove(&stream_id) {
existing.append(&mut data);
existing
} else {
data
};
let buf = self.read_state.remove(&stream_id).map_or(
Cow::Borrowed(read_buffer),
|mut existing| {
existing.extend_from_slice(read_buffer);
Cow::Owned(existing)
},
);

let Ok(msg) = std::str::from_utf8(&buf[..]) else {
self.save_partial(stream_id, buf, conn);
self.save_partial(stream_id, buf.to_vec(), conn);
return;
};

let m = self.regex.captures(msg);
let Some(path) = m.and_then(|m| m.get(1)) else {
self.save_partial(stream_id, buf, conn);
self.save_partial(stream_id, buf.to_vec(), conn);
return;
};

let resp = {
let resp: ResponseData = {
let path = path.as_str();
qdebug!("Path = '{path}'");
if self.is_qns_test {
match qns_read_response(path) {
Ok(data) => Some(data),
Ok(data) => data.into(),
Err(e) => {
qerror!("Failed to read {path}: {e}");
Some(b"404".to_vec())
b"404".to_vec().into()
}
}
} else {
let count = path.parse().unwrap();
Some(vec![b'a'; count])
ResponseData::zeroes(count)
}
};
self.write(stream_id, resp, conn);

if let Some(stream_state) = self.write_state.get_mut(&stream_id) {
match stream_state.data_to_send {
None => stream_state.data_to_send = Some(resp),
Some(_) => {
qdebug!("Data already set, doing nothing");
}
}
if stream_state.writable {
self.stream_writable(stream_id, conn);
}
} else {
self.write_state.insert(
stream_id,
HttpStreamState {
writable: false,
data_to_send: Some(resp),
},
);
}
}

fn stream_writable(&mut self, stream_id: StreamId, conn: &ConnectionRef) {
match self.write_state.get_mut(&stream_id) {
None => {
qwarn!("Unknown stream {stream_id}, ignoring event");
}
Some(stream_state) => {
stream_state.writable = true;
if let Some((data, ref mut offset)) = &mut stream_state.data_to_send {
let sent = conn
.borrow_mut()
.stream_send(stream_id, &data[*offset..])
.unwrap();
qdebug!("Wrote {}", sent);
*offset += sent;
if *offset == data.len() {
qinfo!("Sent {sent} on {stream_id}, closing");
conn.borrow_mut().stream_close_send(stream_id).unwrap();
self.write_state.remove(&stream_id);
} else {
stream_state.writable = false;
}
}
let Some(stream_state) = self.write_state.get_mut(&stream_id) else {
qwarn!("Unknown stream {stream_id}, ignoring event");
return;
};

stream_state.writable = true;
if let Some(resp) = &mut stream_state.data_to_send {
resp.send_h09(stream_id, conn);
if resp.done() {
conn.borrow_mut().stream_close_send(stream_id).unwrap();
self.write_state.remove(&stream_id);
} else {
stream_state.writable = false;
}
}
}
Expand Down
Loading

0 comments on commit 4a5a041

Please sign in to comment.