Skip to content

Commit

Permalink
fix(cluster): try fix cluster hang
Browse files Browse the repository at this point in the history
  • Loading branch information
zhang2014 committed Oct 13, 2023
1 parent 024beb3 commit f572161
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 3 deletions.
2 changes: 1 addition & 1 deletion src/query/pipeline/core/src/processors/port.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ impl SharedStatus {

#[inline(always)]
pub fn get_flags(&self) -> usize {
self.data.load(Ordering::Relaxed) as usize & FLAGS_MASK
self.data.load(Ordering::SeqCst) as usize & FLAGS_MASK
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,11 @@ impl Processor for ExchangeShuffleTransform {
self.finished_outputs += 1;
output.status = PortStatus::Finished;
}

self.buffer.clear(*output_index);

self.wakeup_inputs();
self.wakeup_outputs();
} else if output.port.can_push() {
if !self.buffer.is_empty(*output_index) {
let data_block = self.buffer.pop(*output_index).unwrap();
Expand Down Expand Up @@ -256,9 +261,12 @@ impl ExchangeShuffleTransform {
let output = &mut self.outputs[*waiting_output];

if output.port.is_finished() {
self.finished_outputs += 1;
if output.status != PortStatus::Finished {
self.finished_outputs += 1;
output.status = PortStatus::Finished;
}

self.buffer.clear(*waiting_output);
output.status = PortStatus::Finished;
continue;
}

Expand Down

0 comments on commit f572161

Please sign in to comment.