From 13237c3f5a4740b81ff98094fbdf663253122c3a Mon Sep 17 00:00:00 2001 From: Lars Eggert Date: Wed, 21 Aug 2024 14:22:07 +0300 Subject: [PATCH] fix: Resend stream data on `ZeroRttRejected` (#2062) * test: Check for RTX of lost 0-RTT data on reject This is currently failing. * Fix merge * Attempt to RTX requests when 0-RTT was rejected Look OK, @mxinden? Let's see what the CI says. * Reinit on `ZeroRttRejected` * Remove test * Fix * Reinit files * Suggestions from @mxinden --------- Signed-off-by: Lars Eggert --- neqo-bin/src/client/http09.rs | 19 +++++++++++++++++++ neqo-bin/src/client/http3.rs | 20 ++++++++++++++++++++ 2 files changed, 39 insertions(+) diff --git a/neqo-bin/src/client/http09.rs b/neqo-bin/src/client/http09.rs index a997bacabd..919d28f658 100644 --- a/neqo-bin/src/client/http09.rs +++ b/neqo-bin/src/client/http09.rs @@ -31,6 +31,7 @@ use crate::STREAM_IO_BUFFER_SIZE; pub struct Handler<'a> { streams: HashMap>>, url_queue: VecDeque, + handled_urls: Vec, all_paths: Vec, args: &'a Args, token: Option, @@ -38,6 +39,16 @@ pub struct Handler<'a> { read_buffer: Vec, } +impl<'a> Handler<'a> { + fn reinit(&mut self) { + for url in self.handled_urls.drain(..) { + self.url_queue.push_front(url); + } + self.streams.clear(); + self.all_paths.clear(); + } +} + impl<'a> super::Handler for Handler<'a> { type Client = Connection; @@ -80,6 +91,12 @@ impl<'a> super::Handler for Handler<'a> { qdebug!("{event:?}"); self.download_urls(client); } + ConnectionEvent::ZeroRttRejected => { + qdebug!("{event:?}"); + // All 0-RTT data was rejected. We need to retransmit it. + self.reinit(); + self.download_urls(client); + } ConnectionEvent::ResumptionToken(token) => { self.token = Some(token); } @@ -201,6 +218,7 @@ impl<'b> Handler<'b> { Self { streams: HashMap::new(), url_queue, + handled_urls: Vec::new(), all_paths: Vec::new(), args, token: None, @@ -242,6 +260,7 @@ impl<'b> Handler<'b> { client.stream_close_send(client_stream_id).unwrap(); let out_file = get_output_file(&url, &self.args.output_dir, &mut self.all_paths); self.streams.insert(client_stream_id, out_file); + self.handled_urls.push(url); true } Err(e @ (Error::StreamLimitError | Error::ConnectionState)) => { diff --git a/neqo-bin/src/client/http3.rs b/neqo-bin/src/client/http3.rs index 6e7255f51b..6031742c02 100644 --- a/neqo-bin/src/client/http3.rs +++ b/neqo-bin/src/client/http3.rs @@ -42,6 +42,7 @@ impl<'a> Handler<'a> { pub(crate) fn new(url_queue: VecDeque, args: &'a Args) -> Self { let url_handler = UrlHandler { url_queue, + handled_urls: Vec::new(), stream_handlers: HashMap::new(), all_paths: Vec::new(), handler_type: if args.test.is_some() { @@ -154,6 +155,16 @@ impl super::Client for Http3Client { } } +impl<'a> Handler<'a> { + fn reinit(&mut self) { + for url in self.url_handler.handled_urls.drain(..) { + self.url_handler.url_queue.push_front(url); + } + self.url_handler.stream_handlers.clear(); + self.url_handler.all_paths.clear(); + } +} + impl<'a> super::Handler for Handler<'a> { type Client = Http3Client; @@ -223,6 +234,13 @@ impl<'a> super::Handler for Handler<'a> { } Http3ClientEvent::StateChange(Http3State::Connected) | Http3ClientEvent::RequestsCreatable => { + qinfo!("{event:?}"); + self.url_handler.process_urls(client); + } + Http3ClientEvent::ZeroRttRejected => { + qinfo!("{event:?}"); + // All 0-RTT data was rejected. We need to retransmit it. + self.reinit(); self.url_handler.process_urls(client); } Http3ClientEvent::ResumptionToken(t) => self.token = Some(t), @@ -381,6 +399,7 @@ impl StreamHandler for UploadStreamHandler { struct UrlHandler<'a> { url_queue: VecDeque, + handled_urls: Vec, stream_handlers: HashMap>, all_paths: Vec, handler_type: StreamHandlerType, @@ -430,6 +449,7 @@ impl<'a> UrlHandler<'a> { client_stream_id, ); self.stream_handlers.insert(client_stream_id, handler); + self.handled_urls.push(url); true } Err(