Skip to content

Commit

Permalink
chunk upload requests in data management (#218)
Browse files Browse the repository at this point in the history
  • Loading branch information
gvaradarajan committed Jun 13, 2024
1 parent 570528e commit 454cdbf
Show file tree
Hide file tree
Showing 3 changed files with 169 additions and 56 deletions.
3 changes: 3 additions & 0 deletions micro-rdk/src/common/app_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ pub enum AppClientError {
AppConfigHeaderDateMissingError,
#[error(transparent)]
AppGrpcClientError(#[from] GrpcClientError),
#[cfg(feature = "data")]
#[error("request timeout")]
AppClientRequestTimeout,
}

#[derive(Debug, Clone)]
Expand Down
207 changes: 158 additions & 49 deletions micro-rdk/src/common/data_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,16 @@ use super::robot::{LocalRobot, RobotError};
use async_io::Timer;
use bytes::BytesMut;
use futures_lite::prelude::Future;
use futures_lite::FutureExt;
use futures_util::lock::Mutex as AsyncMutex;
use prost::Message;
use thiserror::Error;

// Maximum size in bytes of readings that should be sent in a single request
// as recommended by Viam's data management team is 64K. To accommodate for
// the smaller amount of available RAM, we've halved it
static MAX_SENSOR_CONTENTS_SIZE: usize = 32000;

#[derive(Debug, Error)]
pub enum DataManagerError {
#[error("no data collectors in manager")]
Expand Down Expand Up @@ -234,22 +240,6 @@ pub struct DataSyncTask<StoreType> {
part_id: String,
}

fn read_messages_for_collector(
reader: &mut impl DataStoreReader,
) -> Result<Vec<SensorData>, DataSyncError> {
let mut raw_messages: Vec<BytesMut> = vec![];
loop {
let next_message = reader.read_next_message()?;
if next_message.is_empty() {
break;
}
raw_messages.push(next_message);
}
let data: Result<Vec<SensorData>, prost::DecodeError> =
raw_messages.into_iter().map(SensorData::decode).collect();
Ok(data?)
}

impl<StoreType> DataSyncTask<StoreType>
where
StoreType: DataStore,
Expand All @@ -262,43 +252,146 @@ where
async fn run<'b>(&mut self, app_client: &'b AppClient) -> Result<(), AppClientError> {
for collector_key in self.resource_method_keys.iter() {
let store_lock = self.store.lock().await;
let mut reader = match store_lock.get_reader(collector_key) {
Ok(reader) => reader,
Err(err) => {
log::error!(
"error acquiring reader for collector key ({:?}): {:?}",
collector_key,
err
);
continue;
}
};
let data = match read_messages_for_collector(&mut reader) {
Ok(data) => data,
Err(err) => {
let mut current_chunk: Vec<BytesMut> = vec![];
let mut current_chunk_size: usize = 0;
// we process the data for this region of the store in chunks, each iteration of this loop
// should represent the processing and uploading of a single chunk of data
loop {
let mut reader = match store_lock.get_reader(collector_key) {
Ok(reader) => reader,
Err(err) => {
log::error!(
"error acquiring reader for collector key ({:?}): {:?}",
collector_key,
err
);
break;
}
};
let next_message = match reader.read_next_message() {
Ok(msg) => msg,
Err(err) => {
log::error!(
"error reading message from store for collector key ({:?}): {:?}",
collector_key,
err
);

// we don't want to panic, and creating an AppClientError variant for this case
// feels too specific, so we'll move on to the next collector
break;
}
};

// if the first message is empty, we've reached the end of the store region
// and it's time to move on to the next collector
if next_message.is_empty() {
break;
} else if next_message.len() > MAX_SENSOR_CONTENTS_SIZE {
log::error!(
"error decoding readings for collector key ({:?}): {:?}",
collector_key,
err
"message encountered that was too large (>32K bytes) for collector {:?}",
collector_key
);
continue;
} else {
current_chunk_size = next_message.len();
current_chunk.push(next_message);
}
};
if !data.is_empty() {
let upload_request = DataCaptureUploadRequest {
metadata: Some(UploadMetadata {
part_id: self.part_id.clone(),
component_type: collector_key.component_type.clone(),
r#type: DataType::TabularSensor.into(),
component_name: collector_key.r_name.clone(),
method_name: collector_key.method.to_string(),
..Default::default()
}),
sensor_contents: data,

// We want to fill current_chunk until its size reaches just under
// MAX_SENSOR_CONTENTS_SIZE and then upload the data.
let should_flush = loop {
let next_message = match reader.read_next_message() {
Ok(msg) => msg,
Err(err) => {
log::error!(
"error reading message from store for collector key ({:?}): {:?}",
collector_key,
err
);

// we don't want to panic, and creating an AppClientError variant for this case
// feels too specific, so we'll move on to the next collector without flushing
// this region of the store
break false;
}
};

// skip this message if it's too big
if next_message.len() > MAX_SENSOR_CONTENTS_SIZE {
log::error!(
"message encountered that was too large (>32K bytes) for collector {:?}",
collector_key
);
continue;
}
if next_message.is_empty()
|| ((next_message.len() + current_chunk_size) > MAX_SENSOR_CONTENTS_SIZE)
{
let data: Result<Vec<SensorData>, prost::DecodeError> =
current_chunk.drain(..).map(SensorData::decode).collect();
let data = match data {
Ok(data) => data,
Err(err) => {
log::error!(
"error decoding readings for collector key ({:?}): {:?}",
collector_key,
err
);
break false;
}
};
let upload_request = DataCaptureUploadRequest {
metadata: Some(UploadMetadata {
part_id: self.part_id.clone(),
component_type: collector_key.component_type.clone(),
r#type: DataType::TabularSensor.into(),
component_name: collector_key.r_name.clone(),
method_name: collector_key.method.to_string(),
..Default::default()
}),
sensor_contents: data,
};
// Note: we are intentionally holding the lock on the store across this upload
// attempt to protect the potential subsequent flush operation for this chunk
// of the store. The one second timeout below should ensure that we're not holding
// on to the lock for too long
match app_client
.upload_data(upload_request)
.or(async {
async_io::Timer::after(Duration::from_millis(1000)).await;
Err(AppClientError::AppClientRequestTimeout)
})
.await
{
Ok(_) => {
current_chunk_size = next_message.len();
current_chunk.push(next_message);
break true;
}

// If the request takes too long to elicit a response, we don't know whether
// the upload was successful on app's side. We've decided that we'd rather
// potentially lose some data than upload duplicate data, and so we opt to
// assume the best and consume the messages
Err(AppClientError::AppClientRequestTimeout) => {
current_chunk_size = next_message.len();
current_chunk.push(next_message);
break true;
}
Err(err) => return Err(err),
};
} else {
current_chunk_size += next_message.len();
current_chunk.push(next_message);
}
};
app_client.upload_data(upload_request).await?;

// all of the data in the current chunk has been successfully uploaded, so we
// flush the messages from the store before moving on to the next chunk of data
if should_flush {
reader.flush();
}
}
reader.flush();
}
Ok(())
}
Expand Down Expand Up @@ -335,7 +428,7 @@ mod tests {
use prost::Message;
use ringbuf::{LocalRb, Rb};

use super::{read_messages_for_collector, DataManager};
use super::DataManager;
use crate::common::data_store::{DataStoreReader, WriteMode};
use crate::common::encoder::EncoderError;
use crate::common::{
Expand Down Expand Up @@ -762,6 +855,22 @@ mod tests {
read_data
}

fn read_messages_for_collector(
reader: &mut impl DataStoreReader,
) -> Result<Vec<SensorData>, DataStoreError> {
let mut raw_messages: Vec<BytesMut> = vec![];
loop {
let next_message = reader.read_next_message()?;
if next_message.is_empty() {
break;
}
raw_messages.push(next_message);
}
let data: Result<Vec<SensorData>, prost::DecodeError> =
raw_messages.into_iter().map(SensorData::decode).collect();
Ok(data?)
}

#[test_log::test]
fn test_reader() {
let resource_1 = ResourceType::Sensor(Arc::new(Mutex::new(TestSensor {})));
Expand Down
15 changes: 8 additions & 7 deletions micro-rdk/src/common/data_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,14 @@ mod tests {

#[test_log::test]
fn test_data_store() {
// test failure on attempt to initialize with no collectors
let collector_keys: Vec<ResourceMethodKey> = vec![];
let store_create_attempt = super::StaticMemoryDataStore::new(collector_keys);
assert!(matches!(
store_create_attempt,
Err(DataStoreError::NoCollectors)
));

let reading_requested_dt = chrono::offset::Local::now().fixed_offset();

let empty_message = SensorData {
Expand Down Expand Up @@ -387,13 +395,6 @@ mod tests {
]),
})),
};
// test that passing no collectors causes a store creation failure
let collector_keys: Vec<ResourceMethodKey> = vec![];
let store_create_attempt = super::StaticMemoryDataStore::new(collector_keys);
assert!(matches!(
store_create_attempt,
Err(DataStoreError::NoCollectors)
));
let collector_keys = vec![thing_key.clone(), thing_2_key.clone()];
let store = super::StaticMemoryDataStore::new(collector_keys);
assert!(store.is_ok());
Expand Down

0 comments on commit 454cdbf

Please sign in to comment.