diff --git a/src/rpc/base_stream.rs b/src/rpc/base_stream.rs index ddbaaf5..a2dfadb 100644 --- a/src/rpc/base_stream.rs +++ b/src/rpc/base_stream.rs @@ -24,9 +24,9 @@ impl WebRTCBaseStream { self.closed_reason.store(&mut err, Ordering::Release); } - pub fn process_message(&mut self, message: PacketMessage) -> Result> { + pub fn process_message(&mut self, message: PacketMessage) -> Result>> { if message.data.is_empty() && message.eom { - return Ok(Vec::new()); + return Ok(Some(Vec::new())); } if message.data.len() + self.packet_buffer.len() > MAX_MESSAGE_SIZE { let e = Err(anyhow::anyhow!( @@ -41,9 +41,9 @@ impl WebRTCBaseStream { if message.eom { let ret = self.packet_buffer.clone(); self.reset_packet_buffer(); - return Ok(ret); + return Ok(Some(ret)); } - Ok(Vec::new()) + Ok(None) } fn reset_packet_buffer(&mut self) { diff --git a/src/rpc/client_stream.rs b/src/rpc/client_stream.rs index 68a4676..59cf81d 100644 --- a/src/rpc/client_stream.rs +++ b/src/rpc/client_stream.rs @@ -25,8 +25,9 @@ impl WebRTCClientStream { let mut message_processed = false; if let Some(message) = response.packet_message { match self.base_stream.process_message(message) { - Ok(mut data) => { - if !data.is_empty() { + Ok(data) => { + if data.is_some() { + let mut data = data.unwrap(); message_processed = true; let mut message_buf = vec![0u8]; let len: u32 = data.len().try_into()?; @@ -39,7 +40,6 @@ impl WebRTCClientStream { .await?; } } - Err(e) => { log::error!("Error processing message: {e}"); }