Skip to content

Commit

Permalink
fix: Resend stream data on ZeroRttRejected (#2062)
Browse files Browse the repository at this point in the history
* test: Check for RTX of lost 0-RTT data on reject

This is currently failing.

* Fix merge

* Attempt to RTX requests when 0-RTT was rejected

Look OK, @mxinden?

Let's see what the CI says.

* Reinit on `ZeroRttRejected`

* Remove test

* Fix

* Reinit files

* Suggestions from @mxinden

---------

Signed-off-by: Lars Eggert <[email protected]>
  • Loading branch information
larseggert committed Aug 21, 2024
1 parent 90d1585 commit 13237c3
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 0 deletions.
19 changes: 19 additions & 0 deletions neqo-bin/src/client/http09.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,24 @@ use crate::STREAM_IO_BUFFER_SIZE;
pub struct Handler<'a> {
streams: HashMap<StreamId, Option<BufWriter<File>>>,
url_queue: VecDeque<Url>,
handled_urls: Vec<Url>,
all_paths: Vec<PathBuf>,
args: &'a Args,
token: Option<ResumptionToken>,
needs_key_update: bool,
read_buffer: Vec<u8>,
}

impl<'a> Handler<'a> {
fn reinit(&mut self) {
for url in self.handled_urls.drain(..) {
self.url_queue.push_front(url);
}
self.streams.clear();
self.all_paths.clear();
}
}

impl<'a> super::Handler for Handler<'a> {
type Client = Connection;

Expand Down Expand Up @@ -80,6 +91,12 @@ impl<'a> super::Handler for Handler<'a> {
qdebug!("{event:?}");
self.download_urls(client);
}
ConnectionEvent::ZeroRttRejected => {
qdebug!("{event:?}");
// All 0-RTT data was rejected. We need to retransmit it.
self.reinit();
self.download_urls(client);
}
ConnectionEvent::ResumptionToken(token) => {
self.token = Some(token);
}
Expand Down Expand Up @@ -201,6 +218,7 @@ impl<'b> Handler<'b> {
Self {
streams: HashMap::new(),
url_queue,
handled_urls: Vec::new(),
all_paths: Vec::new(),
args,
token: None,
Expand Down Expand Up @@ -242,6 +260,7 @@ impl<'b> Handler<'b> {
client.stream_close_send(client_stream_id).unwrap();
let out_file = get_output_file(&url, &self.args.output_dir, &mut self.all_paths);
self.streams.insert(client_stream_id, out_file);
self.handled_urls.push(url);
true
}
Err(e @ (Error::StreamLimitError | Error::ConnectionState)) => {
Expand Down
20 changes: 20 additions & 0 deletions neqo-bin/src/client/http3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ impl<'a> Handler<'a> {
pub(crate) fn new(url_queue: VecDeque<Url>, args: &'a Args) -> Self {
let url_handler = UrlHandler {
url_queue,
handled_urls: Vec::new(),
stream_handlers: HashMap::new(),
all_paths: Vec::new(),
handler_type: if args.test.is_some() {
Expand Down Expand Up @@ -154,6 +155,16 @@ impl super::Client for Http3Client {
}
}

impl<'a> Handler<'a> {
fn reinit(&mut self) {
for url in self.url_handler.handled_urls.drain(..) {
self.url_handler.url_queue.push_front(url);
}
self.url_handler.stream_handlers.clear();
self.url_handler.all_paths.clear();
}
}

impl<'a> super::Handler for Handler<'a> {
type Client = Http3Client;

Expand Down Expand Up @@ -223,6 +234,13 @@ impl<'a> super::Handler for Handler<'a> {
}
Http3ClientEvent::StateChange(Http3State::Connected)
| Http3ClientEvent::RequestsCreatable => {
qinfo!("{event:?}");
self.url_handler.process_urls(client);
}
Http3ClientEvent::ZeroRttRejected => {
qinfo!("{event:?}");
// All 0-RTT data was rejected. We need to retransmit it.
self.reinit();
self.url_handler.process_urls(client);
}
Http3ClientEvent::ResumptionToken(t) => self.token = Some(t),
Expand Down Expand Up @@ -381,6 +399,7 @@ impl StreamHandler for UploadStreamHandler {

struct UrlHandler<'a> {
url_queue: VecDeque<Url>,
handled_urls: Vec<Url>,
stream_handlers: HashMap<StreamId, Box<dyn StreamHandler>>,
all_paths: Vec<PathBuf>,
handler_type: StreamHandlerType,
Expand Down Expand Up @@ -430,6 +449,7 @@ impl<'a> UrlHandler<'a> {
client_stream_id,
);
self.stream_handlers.insert(client_stream_id, handler);
self.handled_urls.push(url);
true
}
Err(
Expand Down

0 comments on commit 13237c3

Please sign in to comment.