Skip to content

Commit

Permalink
clippy and fmt corrections
Browse files Browse the repository at this point in the history
Signed-off-by: Shwetha N <[email protected]>
  • Loading branch information
ShwethaSNayak committed Feb 23, 2024
1 parent 0a66706 commit 7399293
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 27 deletions.
20 changes: 14 additions & 6 deletions src/event/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ impl EventReader {
self.meta.last_segment_release = Instant::now();
} else {
//send an indication to the waiting rx that slice has been returned.
debug!(" slice return to rx success {:?} ", slice.meta );
debug!(" slice return to rx success {:?} ", slice.meta);
if let Some(tx) = slice.slice_return_tx.take() {
if let Err(_e) = tx.send(Some(slice.meta.clone())) {
warn!(
Expand All @@ -325,13 +325,16 @@ impl EventReader {
//Update latest reader positions if UPDATE_OFFSET_INTERVAL is elapsed
if self.meta.last_offset_update.elapsed() > UPDATE_OFFSET_INTERVAL {
let mut offset_map: HashMap<ScopedSegment, Offset> = HashMap::new();
for metadata in self.meta.slices.values(){
for metadata in self.meta.slices.values() {
offset_map.insert(
ScopedSegment::from(metadata.scoped_segment.as_str()),
Offset::new(metadata.read_offset),
);
}
debug!(" update reader position {:?} for reader {:?} ", offset_map, self.id );
debug!(
" update reader position {:?} for reader {:?} ",
offset_map, self.id
);
self.rg_state
.lock()
.await
Expand Down Expand Up @@ -552,13 +555,16 @@ impl EventReader {
//Update latest reader positions if UPDATE_OFFSET_INTERVAL is elapsed
if self.meta.last_offset_update.elapsed() > UPDATE_OFFSET_INTERVAL {
let mut offset_map: HashMap<ScopedSegment, Offset> = HashMap::new();
for metadata in self.meta.slices.values(){
for metadata in self.meta.slices.values() {
offset_map.insert(
ScopedSegment::from(metadata.scoped_segment.as_str()),
Offset::new(metadata.read_offset),
);
}
debug!(" update reader position {:?} for reader {:?} ", offset_map, self.id );
debug!(
" update reader position {:?} for reader {:?} ",
offset_map, self.id
);
self.rg_state
.lock()
.await
Expand Down Expand Up @@ -1580,7 +1586,9 @@ mod tests {
.return_once(move |_| Ok(0 as isize));
rg_mock.expect_check_online().return_const(true);
rg_mock.expect_remove_reader().return_once(move |_, _| Ok(()));
rg_mock.expect_update_reader_positions().return_once(move |_, _| Ok(()));
rg_mock
.expect_update_reader_positions()
.return_once(move |_, _| Ok(()));
// create a new Event Reader with the segment slice data.
let mut reader = EventReader::init_event_reader(
Arc::new(Mutex::new(rg_mock)),
Expand Down
9 changes: 5 additions & 4 deletions src/event/reader_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use snafu::Snafu;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::Mutex;
use tracing::{error};
use tracing::error;

cfg_if::cfg_if! {
if #[cfg(test)] {
Expand Down Expand Up @@ -137,7 +137,7 @@ impl ReaderGroup {
&client_factory,
init_segments,
)
.await;
.await;
ReaderGroup {
name: name.clone(),
config: rg_config.clone(),
Expand Down Expand Up @@ -229,7 +229,8 @@ impl ReaderGroup {
entry.insert(segment_id, offset.read);
acc
});
let streamcuts: Vec<StreamCut> = streamcut_map.into_iter()
let streamcuts: Vec<StreamCut> = streamcut_map
.into_iter()
.map(|(scoped_stream, segment_offset_map)| StreamCut::new(scoped_stream, segment_offset_map))
.collect();

Expand Down Expand Up @@ -651,4 +652,4 @@ mod tests {
fn test_reader_group_config_builder_invalid() {
let _rg_config = ReaderGroupConfigBuilder::default().build();
}
}
}
30 changes: 13 additions & 17 deletions src/event/reader_group_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,8 @@ impl ReaderGroupState {
self.sync.fetch_updates().await.expect("should fetch updates");
debug!(
"Assaigned segments {:?} for reader {:?} ",
self.sync.get_inner_map(ASSIGNED), reader
self.sync.get_inner_map(ASSIGNED),
reader
);
ReaderGroupState::get_reader_positions_internal(reader, self.sync.get_inner_map(ASSIGNED))
}
Expand Down Expand Up @@ -445,25 +446,20 @@ impl ReaderGroupState {
let segment_offset: HashMap<ScopedSegment, Offset> =
deserialize_from(&v.data).expect("deserialize assigned segments");
segment_offset_map.extend(segment_offset)

}

let unassign_segment_offset: HashMap<ScopedSegment, Offset> =
unassigned_segments
.iter()
.map(|(k,v)|{
let segment_str = &*k.to_owned();
(
ScopedSegment::from(segment_str),
deserialize_from(&v.data).expect("deserialize offset"),
)

}).collect::<HashMap<ScopedSegment, Offset>>();
let unassign_segment_offset: HashMap<ScopedSegment, Offset> = unassigned_segments
.iter()
.map(|(k, v)| {
let segment_str = &*k.to_owned();
(
ScopedSegment::from(segment_str),
deserialize_from(&v.data).expect("deserialize offset"),
)
})
.collect::<HashMap<ScopedSegment, Offset>>();
segment_offset_map.extend(unassign_segment_offset);
debug!(
"Segment to offset map {:?} from reader group",
segment_offset_map
);
debug!("Segment to offset map {:?} from reader group", segment_offset_map);
segment_offset_map
}

Expand Down

0 comments on commit 7399293

Please sign in to comment.