Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add explicit TransmissionTypes #10

Merged
merged 1 commit into from
Jul 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 79 additions & 20 deletions src/client.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use anyhow::{anyhow, Result, Context};
use serde::{Deserialize, Serialize};
use std::fmt;
use std::io;
use std::io::Write;
use std::sync::Arc;
use std::sync::Mutex;
Expand All @@ -20,18 +21,19 @@ use crate::logger::Logger;
use crate::parse::FlattenedArguments;
use crate::util::CONSTANT_BIT_TO_BYTE;
use crate::util::CONSTANT_US_TO_MS;
use crate::util::THREAD_SLEEP_TIME_SHORT_MS;
use crate::util::calculate_statistics;
use crate::util::DynamicValue;
use crate::util::THREAD_SLEEP_FINISH_MS;
use crate::util::THREAD_SLEEP_TIME_SHORT_US;
use crate::util::get_active_cca;
use crate::util::sockopt_get_tcp_info;
use crate::util::sockopt_patch_cwnd;
use crate::util::sockopt_set_cc;
use crate::util::sockopt_prepare_transmission;


/// Array of 8192 bytes filled with 0x01
const DUMMY_DATA_SIZE: usize = 32768;// 16384; // 8192
const DUMMY_DATA_SIZE: usize = 8192;
//const DUMMY_DATA: &[u8] = &[0x01; 8192];

/* PATCH CONSTATNS */
Expand Down Expand Up @@ -312,7 +314,9 @@ pub fn handle_client(mut client_args: ClientArgs) -> Result<()> {
let logger: &mut Arc<Mutex<Logger>> = &mut client_args.logger;
let socket_file_descriptor: i32 = stream.as_raw_fd();

logger.lock().unwrap().log_stdout("[client] new client:")?;
logger.lock().unwrap().log_stdout(&format!("[client] new client ({})", chrono::Local::now().format("%Y-%m-%d %H:%M:%S")))?;
logger.lock().unwrap().log_stdout(&format!(" client_args.transmission_type: {:?}", transmission_type))?;
logger.lock().unwrap().log_stdout(&format!(" client_args.transmission_duration_ms: {:?}", transmission_duration_ms))?;
logger.lock().unwrap().log_stdout(&format!(" client_args.set_initial_cwnd: {:?}", set_initial_cwnd))?;
logger.lock().unwrap().log_stdout(&format!(" client_args.set_upper_bound_cwnd: {:?}", set_upper_bound_cwnd))?;
logger.lock().unwrap().log_stdout(&format!(" client_args.set_direct_cwnd: {:?}", set_direct_cwnd))?;
Expand All @@ -327,7 +331,7 @@ pub fn handle_client(mut client_args: ClientArgs) -> Result<()> {
println!(" fair_share_send_rate: {:?}", latest_metric.get_rate());
}

sockopt_set_cc(socket_file_descriptor, &transmission_type)?;
sockopt_prepare_transmission(socket_file_descriptor, &transmission_type)?;
println!(" active CCA:\t{}", get_active_cca(socket_file_descriptor)?);
println!();

Expand All @@ -343,7 +347,7 @@ pub fn handle_client(mut client_args: ClientArgs) -> Result<()> {
if uses_external {
wait_until_client_metric(&client_metrics.clone().unwrap(), &client_addr);
}
if transmission_type.is_pbe() {
if transmission_type.is_l2b() {
let initial_cwnd_option = if let Some(initial_cwnd_dyn) = set_initial_cwnd.clone() {
patch_initial_cwnd(initial_cwnd_dyn, socket_file_descriptor, &client_metrics, &client_addr)?
} else {
Expand All @@ -368,7 +372,7 @@ pub fn handle_client(mut client_args: ClientArgs) -> Result<()> {

let min_rtt_us: u64 = sockopt_get_tcp_info(socket_file_descriptor)?.tcpi_rtt as u64;

let start_timestamp_us = chrono::Utc::now().timestamp_micros() as u64;
let start_timestamp_us = chrono::Local::now().timestamp_micros() as u64;
let end_timestamp_us = start_timestamp_us + (transmission_duration_ms * 1000);
let mut last_logging_timestamp_us = 0;
let mut last_metric_timestamp_us = 0;
Expand All @@ -387,7 +391,7 @@ pub fn handle_client(mut client_args: ClientArgs) -> Result<()> {
break;
}

let now_us = chrono::Utc::now().timestamp_micros() as u64;
let now_us = chrono::Local::now().timestamp_micros() as u64;

if now_us - last_logging_timestamp_us >= logging_interval_us {
append_tcp_info_to_stats_log(socket_file_descriptor,
Expand All @@ -397,8 +401,8 @@ pub fn handle_client(mut client_args: ClientArgs) -> Result<()> {
None)?;
last_logging_timestamp_us = now_us;
}
if transmission_type.is_pbe() && now_us - last_set_cwnd_timestamp_us >= min_rtt_us {
last_set_cwnd_timestamp_us = chrono::Utc::now().timestamp_micros() as u64;
if transmission_type.is_l2b() && now_us - last_set_cwnd_timestamp_us >= min_rtt_us {
last_set_cwnd_timestamp_us = chrono::Local::now().timestamp_micros() as u64;
if let Some(upper_cwnd_dyn) = set_upper_bound_cwnd.clone() {
let upper_cwnd_option = patch_upper_cwnd_if_new_metric(upper_cwnd_dyn,
socket_file_descriptor,
Expand Down Expand Up @@ -429,8 +433,6 @@ pub fn handle_client(mut client_args: ClientArgs) -> Result<()> {
thread::sleep(Duration::from_micros(THREAD_SLEEP_TIME_SHORT_US));
}

thread::sleep(Duration::from_millis(THREAD_SLEEP_FINISH_MS));

finish_transmission(&mut joined_stream)?;

let (rtt_mean, cwnd_mean, rtt_median, cwnd_median) = calculate_statistics(&timedata);
Expand All @@ -448,7 +450,8 @@ pub fn handle_client(mut client_args: ClientArgs) -> Result<()> {
timedata,
};

logger.lock().unwrap().log_stdout(&format!("[client] tranmission finished:\n{}", &transmission))?;
logger.lock().unwrap().log_stdout(&format!("[client] tranmission finished ({}):\n{}",
chrono::Local::now().format("%Y-%m-%d %H:%M:%S"), &transmission))?;
let json_str = serde_json::to_string(&transmission)?;
logger.lock().unwrap().log_transmission(&json_str, client_args.path)?;

Expand All @@ -470,34 +473,90 @@ fn run_sending_thread(
mut stream: TcpStream,
finish_timestamp_us: u64,
) -> Result<TcpStream> {
let dummy_data: Box<[u8]> = init_dummy_data();
let mut dummy_data: Box<[u8]> = init_dummy_data();
let socket_file_descriptor: i32 = stream.as_raw_fd();

stream.set_nonblocking(true)?;

let mut now: u64;
loop {
now = chrono::Utc::now().timestamp_micros() as u64;
now = chrono::Local::now().timestamp_micros() as u64;
if now >= finish_timestamp_us {
break;
}
stream.write_all(&dummy_data)?;
encode_rtt_in_payload(socket_file_descriptor, &mut dummy_data);
match stream.write(&dummy_data) {
Ok(nof_bytes_written) => {
if nof_bytes_written <= (dummy_data.len() / 10) {
thread::sleep(Duration::from_micros(THREAD_SLEEP_TIME_SHORT_MS));
}
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
// Sending buffer is full, wait for a while before retrying
thread::sleep(Duration::from_micros(THREAD_SLEEP_TIME_SHORT_MS));
}
Err(e) => return Err(e.into()), // Other I/O errors
}

thread::sleep(Duration::from_micros(THREAD_SLEEP_TIME_SHORT_US));
}

// Restore the blocking mode
stream.set_nonblocking(false)?;

Ok(stream)
}

fn encode_rtt_in_payload(socket_file_descriptor: i32, payload: &mut Box<[u8]>) {
if let Ok(tcp_info) = sockopt_get_tcp_info(socket_file_descriptor) {
let rtt_us: u32 = tcp_info.tcpi_rtt;
let encoded_rtt = encode_rtt_to_byte_array(rtt_us);
repeat_slice_in_array(payload, encoded_rtt);
}
}

fn encode_rtt_to_byte_array(rtt_us: u32) -> [u8; 10] {
let mut rtt_array = [0u8; 10];

rtt_array[0] = 0xAA;
rtt_array[1] = 0xAB;
rtt_array[2] = 0xAC;

// Encode u32 into 4 bytes
rtt_array[3] = (rtt_us >> 24) as u8;
rtt_array[4] = (rtt_us >> 16) as u8;
rtt_array[5] = (rtt_us >> 8) as u8;
rtt_array[6] = rtt_us as u8;

rtt_array[7] = 0xBA;
rtt_array[8] = 0xBB;
rtt_array[9] = 0xBC;

rtt_array
}

fn repeat_slice_in_array(dst: &mut Box<[u8]>, src: [u8; 10]) {
let src_len = src.len();
let dst_len = dst.len();
let mut i = 0;

while i + src_len <= dst_len {
dst[i..i + src_len].copy_from_slice(&src);
i += src_len;
}
}

fn finish_transmission(stream: &mut TcpStream) -> Result<()> {
thread::sleep(Duration::from_millis(THREAD_SLEEP_FINISH_MS));
stream.flush()?;
thread::sleep(Duration::from_millis(THREAD_SLEEP_FINISH_MS));
stream.shutdown(std::net::Shutdown::Write)?;
thread::sleep(Duration::from_millis(THREAD_SLEEP_FINISH_MS));
Ok(())
}

/*
fn run_sending_thread() -> Result<()> {

while (start_time.elapsed()?.as_millis() as u64) < transmission_duration_ms {
let now_us = chrono::Utc::now().timestamp_micros() as u64;
let now_us = chrono::Local::now().timestamp_micros() as u64;
if now_us - last_logging_timestamp_us >= logging_interval_us {
append_tcp_info_to_stats_log(&stream, &mut timedata)?;
last_logging_timestamp_us = now_us;
Expand Down Expand Up @@ -528,7 +587,7 @@ fn append_tcp_info_to_stats_log(
) -> Result<()> {
let latest_tcp_info = sockopt_get_tcp_info(socket_file_descriptor)?;

let timestamp_us = chrono::Utc::now().timestamp_micros() as u64;
let timestamp_us = chrono::Local::now().timestamp_micros() as u64;
let rtt = latest_tcp_info.tcpi_rtt;
let cwnd = latest_tcp_info.tcpi_snd_cwnd;

Expand Down
Loading
Loading