Skip to content

Commit

Permalink
return streamcut instead of position
Browse files Browse the repository at this point in the history
Signed-off-by: Shwetha N <[email protected]>
  • Loading branch information
ShwethaSNayak committed Feb 15, 2024
1 parent b762204 commit 9757f2d
Showing 1 changed file with 26 additions and 5 deletions.
31 changes: 26 additions & 5 deletions src/event/reader_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use snafu::Snafu;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::Mutex;
use crate::sync::synchronizer::SynchronizerError;

cfg_if::cfg_if! {
if #[cfg(test)] {
Expand Down Expand Up @@ -136,7 +137,7 @@ impl ReaderGroup {
&client_factory,
init_segments,
)
.await;
.await;
ReaderGroup {
name: name.clone(),
config: rg_config.clone(),
Expand Down Expand Up @@ -218,15 +219,35 @@ impl ReaderGroup {
/// Return the latest positions for the given reader.
/// These positions to be used to construct StreamCutV1
///
pub async fn get_reader_positions(

pub async fn get_reader_streamcut(
&self,
reader_id: String,
) -> Result<HashMap<ScopedSegment, Offset>, ReaderGroupStateError> {
) -> Result<StreamCutV1, ReaderGroupStateError> {
let r: Reader = reader_id.into();
self.state.lock().await.get_reader_positions(&r).await
let positions = self.state.lock().await.get_reader_positions(&r).await;
if let Some((seg, offset)) = positions.unwrap().iter().next() {
let scoped_stream = seg.get_scoped_stream();
let mut scoped_segment_map: HashMap<ScopedSegment, i64> = HashMap::new();
scoped_segment_map.insert(seg.clone(), offset.read);
// Return the StreamCutV1 object
Ok(StreamCutV1::new(scoped_stream, scoped_segment_map))
}else {
//Here only possible error thrown will be readr_offline
// Other error like deserialize position are not thrown back
Err(ReaderGroupStateError::ReaderAlreadyOfflineError {
error_msg: format!("Reader already marked offline {:?}", r),
source: SynchronizerError::SyncPreconditionError {
error_msg: String::from("Precondition failure"),
},
})
}


}
}


/// Specifies the ReaderGroupConfig.
/// ReaderGroupConfig::default() ensures the group refresh interval is set to 3 seconds.
#[derive(Serialize, Deserialize, PartialEq, Debug, Clone)]
Expand Down Expand Up @@ -635,4 +656,4 @@ mod tests {
fn test_reader_group_config_builder_invalid() {
let _rg_config = ReaderGroupConfigBuilder::default().build();
}
}
}

0 comments on commit 9757f2d

Please sign in to comment.