From 95bcf207b995d251b92e76b68fa4e30bbf70f239 Mon Sep 17 00:00:00 2001 From: Matthew Kolopanis Date: Fri, 7 Jul 2023 14:16:54 -0700 Subject: [PATCH] makes pdu_handler a default method on transports. requires request and receive --- cfdp-core/src/pdu/error.rs | 2 +- cfdp-core/src/transaction/send.rs | 2 +- cfdp-core/src/transport.rs | 121 +++++++++++++++++------------- cfdp-core/tests/common/mod.rs | 98 +++++++++--------------- 4 files changed, 105 insertions(+), 118 deletions(-) diff --git a/cfdp-core/src/pdu/error.rs b/cfdp-core/src/pdu/error.rs index 678fc80..91616a1 100644 --- a/cfdp-core/src/pdu/error.rs +++ b/cfdp-core/src/pdu/error.rs @@ -75,7 +75,7 @@ pub enum PDUError { #[error("Invalid Segment Metadata Flag {0:}.")] InvalidSegmentMetadataFlag(u8), - #[error("CRC Failure on PDU. Expected 0x{0:X} Receieved 0x{1:X}")] + #[error("CRC Failure on PDU. Expected 0x{0:X} Received 0x{1:X}")] CRCFailure(u16, u16), #[error("Error Reading PDU Buffer. {0:}")] diff --git a/cfdp-core/src/transaction/send.rs b/cfdp-core/src/transaction/send.rs index 3e71a17..2022e33 100644 --- a/cfdp-core/src/transaction/send.rs +++ b/cfdp-core/src/transaction/send.rs @@ -387,7 +387,7 @@ impl SendTransaction { offset: Option, length: Option, ) -> TransactionResult<(u64, Vec)> { - // use the maximum size for the receiever if no length is given + // use the maximum size for the receiver if no length is given let length = length.unwrap_or(self.config.file_size_segment); let handle = self.get_handle()?; // if no offset given read from current cursor position diff --git a/cfdp-core/src/transport.rs b/cfdp-core/src/transport.rs index 17b2b8d..65bac73 100644 --- a/cfdp-core/src/transport.rs +++ b/cfdp-core/src/transport.rs @@ -23,7 +23,17 @@ use crate::pdu::{PDUEncode, VariableID, PDU}; /// inside a [Daemon](crate::daemon::Daemon) process #[async_trait] pub trait PDUTransport { + /// Look up the address of of the destination entity ID and send the PDU. + /// Errors if the destination Entity does not have an associated address. + async fn request(&mut self, destination: VariableID, pdu: PDU) -> Result<(), IoError>; + + /// Recieves a PDU from the associated communication protocol. + async fn receive(&mut self) -> Result; + /// Provides logic for listening for incoming PDUs and sending any outbound PDUs + /// A default implementeation is provided for convenience. + /// + /// This method relies on [tokio::select], as a result [request](PDUTransport::request) and [receive](PDUTransport::receive) must be cancel safe. /// /// A transport implementation will send any received messages through the /// [Sender] channel to the [Daemon](crate::daemon::Daemon). @@ -36,13 +46,45 @@ pub trait PDUTransport { signal: Arc, sender: Sender, mut recv: Receiver<(VariableID, PDU)>, - ) -> Result<(), IoError>; + ) -> Result<(), IoError> { + while !signal.load(Ordering::Relaxed) { + tokio::select! { + pdu = self.receive() => { + match pdu{ + Ok(pdu) => match sender.send(pdu).await { + Ok(()) => {} + Err(error) => { + error!("Channel to daemon severed: {}", error); + return Err(IoError::from(ErrorKind::ConnectionAborted)); + } + }, + Err(err) => { + error!("Error decoding PDU: {}", err); + + } + }; + }, + Some((entity, pdu)) = recv.recv() => { + self.request(entity, pdu).await?; + }, + else => { + log::info!("UdpSocket or Channel disconnected"); + break + } + } + // this should be at minimum made configurable + tokio::time::sleep(Duration::from_micros(100)).await; + } + Ok(()) + } } /// A wrapper struct around a [UdpSocket] and a Mapping from /// EntityIDs to [SocketAddr] instances. pub struct UdpTransport { socket: UdpSocket, + + buffer: Vec, entity_map: HashMap, } impl UdpTransport { @@ -51,20 +93,11 @@ impl UdpTransport { entity_map: HashMap, ) -> Result { let socket = UdpSocket::bind(addr).await?; - Ok(Self { socket, entity_map }) - } - - /// Look up the address of of the destination entity ID and send the PDU. - /// Errors if the destination Entity does not have an associated address. - fn request(&mut self, destination: VariableID, pdu: PDU) -> Result<(), IoError> { - self.entity_map - .get(&destination) - .ok_or_else(|| IoError::from(ErrorKind::AddrNotAvailable)) - .and_then(|addr| { - self.socket - .send_to(pdu.encode().as_slice(), addr) - .map(|_n| ()) - }) + Ok(Self { + socket, + buffer: vec![0_u8; u16::MAX as usize], + entity_map, + }) } } impl TryFrom<(UdpSocket, HashMap)> for UdpTransport { @@ -72,6 +105,8 @@ impl TryFrom<(UdpSocket, HashMap)> for UdpTransport { fn try_from(inputs: (UdpSocket, HashMap)) -> Result { let me = Self { socket: inputs.0, + // this buffer will be 511 KiB, should be sufficiently small; + buffer: vec![0_u8; u16::MAX as usize], entity_map: inputs.1, }; Ok(me) @@ -80,45 +115,25 @@ impl TryFrom<(UdpSocket, HashMap)> for UdpTransport { #[async_trait] impl PDUTransport for UdpTransport { - fn pdu_handler( - &mut self, - signal: Arc, - sender: Sender, - mut recv: Receiver<(VariableID, PDU)>, - ) -> Result<(), IoError> { - // this buffer will be 511 KiB, should be sufficiently small; - let mut buffer = vec![0_u8; u16::MAX as usize]; - while !signal.load(Ordering::Relaxed) { - tokio::select! { - Ok((_n, _addr)) = self.socket.recv_from(&mut buffer) => { - match PDU::decode(&mut buffer.as_slice()) { - Ok(pdu) => { - match sender.send(pdu).await { - Ok(()) => {} - Err(error) => { - error!("Channel to daemon severed: {}", error); - return Err(IoError::from(ErrorKind::ConnectionAborted)); - } - }; - } - Err(error) => { - error!("Error decoding PDU: {}", error); - // might need to stop depending on the error. - // some are recoverable though - } - } - }, - Some((entity, pdu)) = recv.recv() => { - self.request(entity, pdu).await?; - }, - else => { - log::info!("UdpSocket or Channel disconnected"); - break - } + async fn request(&mut self, destination: VariableID, pdu: PDU) -> Result<(), IoError> { + let addr = self + .entity_map + .get(&destination) + .ok_or_else(|| IoError::from(ErrorKind::AddrNotAvailable))?; + self.socket.send_to(pdu.encode().as_slice(), addr).await?; + Ok(()) + } + + async fn receive(&mut self) -> Result { + let (_n, _addr) = self.socket.recv_from(&mut self.buffer).await?; + + match PDU::decode(&mut self.buffer.as_slice()) { + Ok(pdu) => Ok(pdu), + Err(err) => { + // might need to stop depending on the error. + // some are recoverable though + Err(IoError::new(ErrorKind::InvalidData, err.to_string())) } - // this should be at minimum made configurable - tokio::time::sleep(Duration::from_micros(100)).await; } - Ok(()) } } diff --git a/cfdp-core/tests/common/mod.rs b/cfdp-core/tests/common/mod.rs index e888489..4afbd64 100644 --- a/cfdp-core/tests/common/mod.rs +++ b/cfdp-core/tests/common/mod.rs @@ -5,10 +5,7 @@ use std::{ marker::PhantomData, net::SocketAddr, path::Path, - sync::{ - atomic::{AtomicBool, Ordering}, - Arc, RwLock, - }, + sync::{Arc, RwLock}, }; use async_trait::async_trait; @@ -31,7 +28,7 @@ use cfdp_core::{ }; use itertools::{Either, Itertools}; -use log::{error, info}; +use log::info; use tempfile::TempDir; use rstest::fixture; @@ -988,6 +985,7 @@ pub(crate) struct LossyTransport { counter: usize, issue: TransportIssue, buffer: Vec, + bytes: Vec, } impl LossyTransport { #[allow(dead_code)] @@ -1003,11 +1001,32 @@ impl LossyTransport { counter: 1, issue, buffer: vec![], + bytes: vec![0_u8; u16::MAX as usize], }) } +} +impl TryFrom<(UdpSocket, HashMap, TransportIssue)> for LossyTransport { + type Error = IoError; - fn request(&mut self, destination: VariableID, pdu: PDU) -> Result<(), IoError> { - self.entity_map + fn try_from( + inputs: (UdpSocket, HashMap, TransportIssue), + ) -> Result { + let me = Self { + socket: inputs.0, + entity_map: inputs.1, + counter: 1, + issue: inputs.2, + buffer: vec![], + bytes: vec![0_u8; u16::MAX as usize], + }; + Ok(me) + } +} +#[async_trait] +impl PDUTransport for LossyTransport { + async fn request(&mut self, destination: VariableID, pdu: PDU) -> Result<(), IoError> { + let addr = self + .entity_map .get(&destination) .ok_or_else(|| IoError::from(ErrorKind::AddrNotAvailable))?; @@ -1151,64 +1170,17 @@ impl LossyTransport { } } } -} -impl TryFrom<(UdpSocket, HashMap, TransportIssue)> for LossyTransport { - type Error = IoError; - fn try_from( - inputs: (UdpSocket, HashMap, TransportIssue), - ) -> Result { - let me = Self { - socket: inputs.0, - entity_map: inputs.1, - counter: 1, - issue: inputs.2, - buffer: vec![], - }; - me.socket.set_read_timeout(Some(Duration::from_secs(1)))?; - me.socket.set_write_timeout(Some(Duration::from_secs(1)))?; - me.socket.set_nonblocking(true)?; - Ok(me) - } -} -impl PDUTransport for LossyTransport { - fn pdu_handler( - &mut self, - signal: Arc, - sender: Sender, - mut recv: Receiver<(VariableID, PDU)>, - ) -> Result<(), IoError> { - // this buffer will be 511 KiB, should be sufficiently small; - let mut buffer = vec![0_u8; u16::MAX as usize]; - while !signal.load(Ordering::Relaxed) { - tokio::select! { - Ok((_n, _addr)) = self.socket.recv_from(&mut buffer) => { - match PDU::decode(&mut buffer.as_slice()) { - Ok(pdu) => { - match sender.send(pdu).await { - Ok(()) => {} - Err(error) => { - error!("Channel to daemon severed: {}", error); - return Err(IoError::from(ErrorKind::ConnectionAborted)); - } - }; - } - Err(error) => { - error!("Error decoding PDU: {}", error); - // might need to stop depending on the error. - // some are recoverable though - } - } - }, - Some((entity, pdu)) = recv.recv() => { - self.request(entity, pdu).await?; - }, - else => { - log::info!("UdpSocket or Channel disconnected"); - break - } + async fn receive(&mut self) -> Result { + let (_n, _addr) = self.socket.recv_from(&mut self.bytes).await?; + + match PDU::decode(&mut self.bytes.as_slice()) { + Ok(pdu) => Ok(pdu), + Err(err) => { + // might need to stop depending on the error. + // some are recoverable though + Err(IoError::new(ErrorKind::InvalidData, err.to_string())) } } - Ok(()) } }