Skip to content

Commit

Permalink
makes pdu_handler a default method on transports. requires request an…
Browse files Browse the repository at this point in the history
…d receive
  • Loading branch information
mkolopanis committed Jul 7, 2023
1 parent 913c645 commit 95bcf20
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 118 deletions.
2 changes: 1 addition & 1 deletion cfdp-core/src/pdu/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ pub enum PDUError {
#[error("Invalid Segment Metadata Flag {0:}.")]
InvalidSegmentMetadataFlag(u8),

#[error("CRC Failure on PDU. Expected 0x{0:X} Receieved 0x{1:X}")]
#[error("CRC Failure on PDU. Expected 0x{0:X} Received 0x{1:X}")]
CRCFailure(u16, u16),

#[error("Error Reading PDU Buffer. {0:}")]
Expand Down
2 changes: 1 addition & 1 deletion cfdp-core/src/transaction/send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ impl<T: FileStore> SendTransaction<T> {
offset: Option<u64>,
length: Option<u16>,
) -> TransactionResult<(u64, Vec<u8>)> {
// use the maximum size for the receiever if no length is given
// use the maximum size for the receiver if no length is given
let length = length.unwrap_or(self.config.file_size_segment);
let handle = self.get_handle()?;
// if no offset given read from current cursor position
Expand Down
121 changes: 68 additions & 53 deletions cfdp-core/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,17 @@ use crate::pdu::{PDUEncode, VariableID, PDU};
/// inside a [Daemon](crate::daemon::Daemon) process
#[async_trait]
pub trait PDUTransport {
/// Look up the address of of the destination entity ID and send the PDU.
/// Errors if the destination Entity does not have an associated address.
async fn request(&mut self, destination: VariableID, pdu: PDU) -> Result<(), IoError>;

/// Recieves a PDU from the associated communication protocol.
async fn receive(&mut self) -> Result<PDU, IoError>;

/// Provides logic for listening for incoming PDUs and sending any outbound PDUs
/// A default implementeation is provided for convenience.
///
/// This method relies on [tokio::select], as a result [request](PDUTransport::request) and [receive](PDUTransport::receive) must be cancel safe.
///
/// A transport implementation will send any received messages through the
/// [Sender] channel to the [Daemon](crate::daemon::Daemon).
Expand All @@ -36,13 +46,45 @@ pub trait PDUTransport {
signal: Arc<AtomicBool>,
sender: Sender<PDU>,
mut recv: Receiver<(VariableID, PDU)>,
) -> Result<(), IoError>;
) -> Result<(), IoError> {
while !signal.load(Ordering::Relaxed) {
tokio::select! {
pdu = self.receive() => {
match pdu{
Ok(pdu) => match sender.send(pdu).await {
Ok(()) => {}
Err(error) => {
error!("Channel to daemon severed: {}", error);
return Err(IoError::from(ErrorKind::ConnectionAborted));
}
},
Err(err) => {
error!("Error decoding PDU: {}", err);

}
};
},
Some((entity, pdu)) = recv.recv() => {
self.request(entity, pdu).await?;
},
else => {
log::info!("UdpSocket or Channel disconnected");
break
}
}
// this should be at minimum made configurable
tokio::time::sleep(Duration::from_micros(100)).await;
}
Ok(())
}
}

/// A wrapper struct around a [UdpSocket] and a Mapping from
/// EntityIDs to [SocketAddr] instances.
pub struct UdpTransport {
socket: UdpSocket,

buffer: Vec<u8>,
entity_map: HashMap<VariableID, SocketAddr>,
}
impl UdpTransport {
Expand All @@ -51,27 +93,20 @@ impl UdpTransport {
entity_map: HashMap<VariableID, SocketAddr>,
) -> Result<Self, IoError> {
let socket = UdpSocket::bind(addr).await?;
Ok(Self { socket, entity_map })
}

/// Look up the address of of the destination entity ID and send the PDU.
/// Errors if the destination Entity does not have an associated address.
fn request(&mut self, destination: VariableID, pdu: PDU) -> Result<(), IoError> {
self.entity_map
.get(&destination)
.ok_or_else(|| IoError::from(ErrorKind::AddrNotAvailable))
.and_then(|addr| {
self.socket
.send_to(pdu.encode().as_slice(), addr)
.map(|_n| ())
})
Ok(Self {
socket,
buffer: vec![0_u8; u16::MAX as usize],
entity_map,
})
}
}
impl TryFrom<(UdpSocket, HashMap<VariableID, SocketAddr>)> for UdpTransport {
type Error = IoError;
fn try_from(inputs: (UdpSocket, HashMap<VariableID, SocketAddr>)) -> Result<Self, Self::Error> {
let me = Self {
socket: inputs.0,
// this buffer will be 511 KiB, should be sufficiently small;
buffer: vec![0_u8; u16::MAX as usize],
entity_map: inputs.1,
};
Ok(me)
Expand All @@ -80,45 +115,25 @@ impl TryFrom<(UdpSocket, HashMap<VariableID, SocketAddr>)> for UdpTransport {

#[async_trait]
impl PDUTransport for UdpTransport {
fn pdu_handler(
&mut self,
signal: Arc<AtomicBool>,
sender: Sender<PDU>,
mut recv: Receiver<(VariableID, PDU)>,
) -> Result<(), IoError> {
// this buffer will be 511 KiB, should be sufficiently small;
let mut buffer = vec![0_u8; u16::MAX as usize];
while !signal.load(Ordering::Relaxed) {
tokio::select! {
Ok((_n, _addr)) = self.socket.recv_from(&mut buffer) => {
match PDU::decode(&mut buffer.as_slice()) {
Ok(pdu) => {
match sender.send(pdu).await {
Ok(()) => {}
Err(error) => {
error!("Channel to daemon severed: {}", error);
return Err(IoError::from(ErrorKind::ConnectionAborted));
}
};
}
Err(error) => {
error!("Error decoding PDU: {}", error);
// might need to stop depending on the error.
// some are recoverable though
}
}
},
Some((entity, pdu)) = recv.recv() => {
self.request(entity, pdu).await?;
},
else => {
log::info!("UdpSocket or Channel disconnected");
break
}
async fn request(&mut self, destination: VariableID, pdu: PDU) -> Result<(), IoError> {
let addr = self
.entity_map
.get(&destination)
.ok_or_else(|| IoError::from(ErrorKind::AddrNotAvailable))?;
self.socket.send_to(pdu.encode().as_slice(), addr).await?;
Ok(())
}

async fn receive(&mut self) -> Result<PDU, IoError> {
let (_n, _addr) = self.socket.recv_from(&mut self.buffer).await?;

match PDU::decode(&mut self.buffer.as_slice()) {
Ok(pdu) => Ok(pdu),
Err(err) => {
// might need to stop depending on the error.
// some are recoverable though
Err(IoError::new(ErrorKind::InvalidData, err.to_string()))
}
// this should be at minimum made configurable
tokio::time::sleep(Duration::from_micros(100)).await;
}
Ok(())
}
}
98 changes: 35 additions & 63 deletions cfdp-core/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,7 @@ use std::{
marker::PhantomData,
net::SocketAddr,
path::Path,
sync::{
atomic::{AtomicBool, Ordering},
Arc, RwLock,
},
sync::{Arc, RwLock},
};

use async_trait::async_trait;
Expand All @@ -31,7 +28,7 @@ use cfdp_core::{
};

use itertools::{Either, Itertools};
use log::{error, info};
use log::info;
use tempfile::TempDir;

use rstest::fixture;
Expand Down Expand Up @@ -988,6 +985,7 @@ pub(crate) struct LossyTransport {
counter: usize,
issue: TransportIssue,
buffer: Vec<PDU>,
bytes: Vec<u8>,
}
impl LossyTransport {
#[allow(dead_code)]
Expand All @@ -1003,11 +1001,32 @@ impl LossyTransport {
counter: 1,
issue,
buffer: vec![],
bytes: vec![0_u8; u16::MAX as usize],
})
}
}
impl TryFrom<(UdpSocket, HashMap<VariableID, SocketAddr>, TransportIssue)> for LossyTransport {
type Error = IoError;

fn request(&mut self, destination: VariableID, pdu: PDU) -> Result<(), IoError> {
self.entity_map
fn try_from(
inputs: (UdpSocket, HashMap<VariableID, SocketAddr>, TransportIssue),
) -> Result<Self, Self::Error> {
let me = Self {
socket: inputs.0,
entity_map: inputs.1,
counter: 1,
issue: inputs.2,
buffer: vec![],
bytes: vec![0_u8; u16::MAX as usize],
};
Ok(me)
}
}
#[async_trait]
impl PDUTransport for LossyTransport {
async fn request(&mut self, destination: VariableID, pdu: PDU) -> Result<(), IoError> {
let addr = self
.entity_map
.get(&destination)
.ok_or_else(|| IoError::from(ErrorKind::AddrNotAvailable))?;

Expand Down Expand Up @@ -1151,64 +1170,17 @@ impl LossyTransport {
}
}
}
}
impl TryFrom<(UdpSocket, HashMap<VariableID, SocketAddr>, TransportIssue)> for LossyTransport {
type Error = IoError;

fn try_from(
inputs: (UdpSocket, HashMap<VariableID, SocketAddr>, TransportIssue),
) -> Result<Self, Self::Error> {
let me = Self {
socket: inputs.0,
entity_map: inputs.1,
counter: 1,
issue: inputs.2,
buffer: vec![],
};
me.socket.set_read_timeout(Some(Duration::from_secs(1)))?;
me.socket.set_write_timeout(Some(Duration::from_secs(1)))?;
me.socket.set_nonblocking(true)?;
Ok(me)
}
}
impl PDUTransport for LossyTransport {
fn pdu_handler(
&mut self,
signal: Arc<AtomicBool>,
sender: Sender<PDU>,
mut recv: Receiver<(VariableID, PDU)>,
) -> Result<(), IoError> {
// this buffer will be 511 KiB, should be sufficiently small;
let mut buffer = vec![0_u8; u16::MAX as usize];
while !signal.load(Ordering::Relaxed) {
tokio::select! {
Ok((_n, _addr)) = self.socket.recv_from(&mut buffer) => {
match PDU::decode(&mut buffer.as_slice()) {
Ok(pdu) => {
match sender.send(pdu).await {
Ok(()) => {}
Err(error) => {
error!("Channel to daemon severed: {}", error);
return Err(IoError::from(ErrorKind::ConnectionAborted));
}
};
}
Err(error) => {
error!("Error decoding PDU: {}", error);
// might need to stop depending on the error.
// some are recoverable though
}
}
},
Some((entity, pdu)) = recv.recv() => {
self.request(entity, pdu).await?;
},
else => {
log::info!("UdpSocket or Channel disconnected");
break
}
async fn receive(&mut self) -> Result<PDU, IoError> {
let (_n, _addr) = self.socket.recv_from(&mut self.bytes).await?;

match PDU::decode(&mut self.bytes.as_slice()) {
Ok(pdu) => Ok(pdu),
Err(err) => {
// might need to stop depending on the error.
// some are recoverable though
Err(IoError::new(ErrorKind::InvalidData, err.to_string()))
}
}
Ok(())
}
}

0 comments on commit 95bcf20

Please sign in to comment.