From f5721611bb4a6b212016ba4e9b507b6383051771 Mon Sep 17 00:00:00 2001 From: zhang2014 Date: Fri, 13 Oct 2023 16:30:25 +0800 Subject: [PATCH] fix(cluster): try fix cluster hang --- src/query/pipeline/core/src/processors/port.rs | 2 +- .../api/rpc/exchange/exchange_transform_shuffle.rs | 12 ++++++++++-- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/src/query/pipeline/core/src/processors/port.rs b/src/query/pipeline/core/src/processors/port.rs index dd7ff5d6f92a..0173a0288c73 100644 --- a/src/query/pipeline/core/src/processors/port.rs +++ b/src/query/pipeline/core/src/processors/port.rs @@ -116,7 +116,7 @@ impl SharedStatus { #[inline(always)] pub fn get_flags(&self) -> usize { - self.data.load(Ordering::Relaxed) as usize & FLAGS_MASK + self.data.load(Ordering::SeqCst) as usize & FLAGS_MASK } } diff --git a/src/query/service/src/api/rpc/exchange/exchange_transform_shuffle.rs b/src/query/service/src/api/rpc/exchange/exchange_transform_shuffle.rs index 25681a4c438e..0fb7247481b4 100644 --- a/src/query/service/src/api/rpc/exchange/exchange_transform_shuffle.rs +++ b/src/query/service/src/api/rpc/exchange/exchange_transform_shuffle.rs @@ -166,6 +166,11 @@ impl Processor for ExchangeShuffleTransform { self.finished_outputs += 1; output.status = PortStatus::Finished; } + + self.buffer.clear(*output_index); + + self.wakeup_inputs(); + self.wakeup_outputs(); } else if output.port.can_push() { if !self.buffer.is_empty(*output_index) { let data_block = self.buffer.pop(*output_index).unwrap(); @@ -256,9 +261,12 @@ impl ExchangeShuffleTransform { let output = &mut self.outputs[*waiting_output]; if output.port.is_finished() { - self.finished_outputs += 1; + if output.status != PortStatus::Finished { + self.finished_outputs += 1; + output.status = PortStatus::Finished; + } + self.buffer.clear(*waiting_output); - output.status = PortStatus::Finished; continue; }