diff --git a/Cargo.lock b/Cargo.lock index cf6cb30b2299c..35d68a9fdd30b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3348,6 +3348,7 @@ dependencies = [ "geos", "geozero 0.13.0", "http 1.1.0", + "hyper 0.14.30", "opendal", "parquet", "paste", diff --git a/Cargo.toml b/Cargo.toml index 01c9cbd581e69..c1550bc1df2ae 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -252,6 +252,7 @@ http = "1" itertools = "0.10.5" jsonb = "0.4.1" jwt-simple = "0.11.0" +hyper = "0.14.20" match-template = "0.0.1" mysql_async = { version = "0.34", default-features = false, features = ["native-tls-tls"] } object_store_opendal = "0.46" diff --git a/src/common/exception/Cargo.toml b/src/common/exception/Cargo.toml index 033999b74d0d2..e9aa1119deb79 100644 --- a/src/common/exception/Cargo.toml +++ b/src/common/exception/Cargo.toml @@ -31,6 +31,7 @@ serde_json = { workspace = true } tantivy = { workspace = true } thiserror = { workspace = true } tonic = { workspace = true } +hyper = {workspace = true} [package.metadata.cargo-machete] ignored = ["geos"] diff --git a/src/common/exception/src/exception_into.rs b/src/common/exception/src/exception_into.rs index 4f1a2ff90d5ee..82d63ebad1235 100644 --- a/src/common/exception/src/exception_into.rs +++ b/src/common/exception/src/exception_into.rs @@ -325,6 +325,13 @@ impl From for ErrorCode { tonic::Code::Unknown => { let details = status.details(); if details.is_empty() { + if status.source().map_or(false, |e| e.is::()) { + return ErrorCode::CannotConnectNode(format!( + "{}, source: {:?}", + status.message(), + status.source() + )); + } return ErrorCode::UnknownException(format!( "{}, source: {:?}", status.message(), diff --git a/src/query/service/src/servers/flight/flight_client.rs b/src/query/service/src/servers/flight/flight_client.rs index 06693b115f155..eabd6ff407157 100644 --- a/src/query/service/src/servers/flight/flight_client.rs +++ b/src/query/service/src/servers/flight/flight_client.rs @@ -12,8 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::VecDeque; +use std::pin::Pin; use std::str::FromStr; +use std::sync::atomic::AtomicPtr; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering; use std::sync::Arc; +use std::task::Context; +use std::task::Poll; use async_channel::Receiver; use async_channel::Sender; @@ -22,16 +29,21 @@ use databend_common_arrow::arrow_format::flight::data::FlightData; use databend_common_arrow::arrow_format::flight::data::Ticket; use databend_common_arrow::arrow_format::flight::service::flight_service_client::FlightServiceClient; use databend_common_base::base::tokio::time::Duration; -use databend_common_base::runtime::drop_guard; +use databend_common_base::runtime::GlobalIORuntime; +use databend_common_base::runtime::TrySpawn; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use fastrace::full_name; use fastrace::future::FutureExt; use fastrace::Span; +use futures::Stream; use futures::StreamExt; use futures_util::future::Either; +use log::info; +use parking_lot::Mutex; use serde::Deserialize; use serde::Serialize; +use tokio::time::sleep; use tonic::metadata::AsciiMetadataKey; use tonic::metadata::AsciiMetadataValue; use tonic::transport::channel::Channel; @@ -41,6 +53,7 @@ use tonic::Streaming; use crate::pipelines::executor::WatchNotify; use crate::servers::flight::request_builder::RequestBuilder; +use crate::servers::flight::v1::exchange::DataExchangeManager; use crate::servers::flight::v1::packets::DataPacket; pub struct FlightClient { @@ -119,19 +132,31 @@ impl FlightClient { &mut self, query_id: &str, target: &str, + source_address: &str, + retry_times: usize, + retry_interval: usize, ) -> Result { - let streaming = self - .get_streaming( - RequestBuilder::create(Ticket::default()) - .with_metadata("x-type", "request_server_exchange")? - .with_metadata("x-target", target)? - .with_metadata("x-query-id", query_id)? - .build(), - ) - .await?; + let req = RequestBuilder::create(Ticket::default()) + .with_metadata("x-type", "request_server_exchange")? + .with_metadata("x-target", target)? + .with_metadata("x-query-id", query_id)? + .with_metadata("x-continue-from", "0")? + .build(); + let streaming = self.get_streaming(req).await?; let (notify, rx) = Self::streaming_receiver(streaming); - Ok(FlightExchange::create_receiver(notify, rx)) + Ok(FlightExchange::create_receiver( + notify, + rx, + Some(ConnectionInfo { + query_id: query_id.to_string(), + target: target.to_string(), + fragment: None, + source_address: source_address.to_string(), + retry_times, + retry_interval: Duration::from_secs(retry_interval as u64), + }), + )) } #[async_backtrace::framed] @@ -141,19 +166,34 @@ impl FlightClient { query_id: &str, target: &str, fragment: usize, + source_address: &str, + retry_times: usize, + retry_interval: usize, ) -> Result { let request = RequestBuilder::create(Ticket::default()) .with_metadata("x-type", "exchange_fragment")? .with_metadata("x-target", target)? .with_metadata("x-query-id", query_id)? .with_metadata("x-fragment-id", &fragment.to_string())? + .with_metadata("x-continue-from", "0")? .build(); let request = databend_common_tracing::inject_span_to_tonic_request(request); let streaming = self.get_streaming(request).await?; let (notify, rx) = Self::streaming_receiver(streaming); - Ok(FlightExchange::create_receiver(notify, rx)) + Ok(FlightExchange::create_receiver( + notify, + rx, + Some(ConnectionInfo { + query_id: query_id.to_string(), + target: target.to_string(), + fragment: Some(fragment), + source_address: source_address.to_string(), + retry_times, + retry_interval: Duration::from_secs(retry_interval as u64), + }), + )) } fn streaming_receiver( @@ -209,27 +249,51 @@ impl FlightClient { Err(status) => Err(ErrorCode::from(status).add_message_back("(while in query flight)")), } } + + #[async_backtrace::framed] + async fn reconnect(&mut self, info: &ConnectionInfo, seq: usize) -> Result { + let request = match info.fragment { + Some(fragment_id) => RequestBuilder::create(Ticket::default()) + .with_metadata("x-type", "exchange_fragment")? + .with_metadata("x-target", &info.target)? + .with_metadata("x-query-id", &info.query_id)? + .with_metadata("x-fragment-id", &fragment_id.to_string())? + .with_metadata("x-continue-from", &seq.to_string())? + .build(), + None => RequestBuilder::create(Ticket::default()) + .with_metadata("x-type", "request_server_exchange")? + .with_metadata("x-target", &info.target)? + .with_metadata("x-query-id", &info.query_id)? + .with_metadata("x-continue-from", &seq.to_string())? + .build(), + }; + let request = databend_common_tracing::inject_span_to_tonic_request(request); + + let streaming = self.get_streaming(request).await?; + + let (network_notify, recv) = Self::streaming_receiver(streaming); + Ok(FlightRxInner::create(network_notify, recv)) + } } -pub struct FlightReceiver { - notify: Arc, - rx: Receiver>, +#[derive(Clone)] +pub struct ConnectionInfo { + pub query_id: String, + pub target: String, + pub fragment: Option, + pub source_address: String, + pub retry_times: usize, + pub retry_interval: Duration, } -impl Drop for FlightReceiver { - fn drop(&mut self) { - drop_guard(move || { - self.close(); - }) - } +pub struct FlightRxInner { + notify: Arc, + rx: Receiver>, } -impl FlightReceiver { - pub fn create(rx: Receiver>) -> FlightReceiver { - FlightReceiver { - rx, - notify: Arc::new(WatchNotify::new()), - } +impl FlightRxInner { + pub fn create(notify: Arc, rx: Receiver>) -> FlightRxInner { + FlightRxInner { rx, notify } } #[async_backtrace::framed] @@ -247,12 +311,107 @@ impl FlightReceiver { } } +pub struct RetryableFlightReceiver { + seq: Arc, + info: Option, + inner: Arc>, +} + +impl Drop for RetryableFlightReceiver { + fn drop(&mut self) { + self.close(); + } +} + +impl RetryableFlightReceiver { + pub fn dummy() -> RetryableFlightReceiver { + RetryableFlightReceiver { + seq: Arc::new(AtomicUsize::new(0)), + info: None, + inner: Arc::new(Default::default()), + } + } + + #[async_backtrace::framed] + pub async fn recv(&self) -> Result> { + // Non thread safe, we only use atomic to implement mutable. + loop { + let inner = unsafe { &*self.inner.load(Ordering::SeqCst) }; + return match inner.recv().await { + Ok(message) => { + self.seq.fetch_add(1, Ordering::SeqCst); + Ok(message) + } + Err(cause) => { + info!("Error while receiving data from flight : {:?}", cause); + if cause.code() == ErrorCode::CANNOT_CONNECT_NODE { + // only retry when error is network problem + let Err(cause) = self.retry().await else { + info!("Retry flight connection successfully!"); + continue; + }; + + info!("Retry flight connection failure, cause: {:?}", cause); + } + + Err(cause) + } + }; + } + } + + #[async_backtrace::framed] + async fn retry(&self) -> Result<()> { + if let Some(connection_info) = &self.info { + let mut flight_client = + DataExchangeManager::create_client(&connection_info.source_address, true).await?; + + for attempts in 0..connection_info.retry_times { + let Ok(recv) = flight_client + .reconnect(connection_info, self.seq.load(Ordering::Acquire)) + .await + else { + info!("Reconnect attempt {} failed", attempts); + sleep(connection_info.retry_interval).await; + continue; + }; + + let ptr = self + .inner + .swap(Box::into_raw(Box::new(recv)), Ordering::SeqCst); + + unsafe { + // We cannot determine the number of strong ref. so close it. + let broken_connection = Box::from_raw(ptr); + broken_connection.close(); + } + + return Ok(()); + } + + return Err(ErrorCode::Timeout("Exceed max retries time")); + } + + Ok(()) + } + + pub fn close(&self) { + unsafe { + let inner = self.inner.load(Ordering::SeqCst); + + if !inner.is_null() { + (*inner).close(); + } + } + } +} + pub struct FlightSender { - tx: Sender, Status>>, + tx: Sender>, } impl FlightSender { - pub fn create(tx: Sender, Status>>) -> FlightSender { + pub fn create(tx: Sender>) -> FlightSender { FlightSender { tx } } @@ -262,11 +421,7 @@ impl FlightSender { #[async_backtrace::framed] pub async fn send(&self, data: DataPacket) -> Result<()> { - if let Err(_cause) = self - .tx - .send(Ok(Arc::new(FlightData::try_from(data)?))) - .await - { + if let Err(_cause) = self.tx.send(Ok(FlightData::try_from(data)?)).await { return Err(ErrorCode::AbortedQuery( "Aborted query, because the remote flight channel is closed.", )); @@ -280,41 +435,237 @@ impl FlightSender { } } +pub struct SenderPayload { + pub state: Arc>, + pub sender: Option>>, +} + +pub struct ReceiverPayload { + seq: Arc, + info: Option, + inner: Arc>, +} + pub enum FlightExchange { Dummy, - Receiver { - notify: Arc, - receiver: Receiver>, - }, - Sender(Sender, Status>>), + + Sender(SenderPayload), + Receiver(ReceiverPayload), + + MovedSender(SenderPayload), + MovedReceiver(ReceiverPayload), } impl FlightExchange { - pub fn create_sender(sender: Sender, Status>>) -> FlightExchange { - FlightExchange::Sender(sender) + pub fn create_sender( + state: Arc>, + sender: Sender>, + ) -> FlightExchange { + FlightExchange::Sender(SenderPayload { + state, + sender: Some(sender), + }) } pub fn create_receiver( notify: Arc, receiver: Receiver>, + connection_info: Option, ) -> FlightExchange { - FlightExchange::Receiver { notify, receiver } + FlightExchange::Receiver(ReceiverPayload { + seq: Arc::new(AtomicUsize::new(0)), + info: connection_info, + inner: Arc::new(AtomicPtr::new(Box::into_raw(Box::new( + FlightRxInner::create(notify, receiver), + )))), + }) + } + pub fn take_as_sender(&mut self) -> FlightSender { + let mut flight_sender = FlightExchange::Dummy; + std::mem::swap(self, &mut flight_sender); + + if let FlightExchange::Sender(mut v) = flight_sender { + let flight_sender = FlightSender::create(v.sender.take().unwrap()); + *self = FlightExchange::MovedSender(v); + return flight_sender; + } + + unreachable!("take as sender miss match exchange type") + } + + pub fn take_as_receiver(&mut self) -> RetryableFlightReceiver { + let mut flight_receiver = FlightExchange::Dummy; + std::mem::swap(self, &mut flight_receiver); + + if let FlightExchange::Receiver(v) = flight_receiver { + let flight_receiver = RetryableFlightReceiver { + seq: v.seq.clone(), + info: v.info.clone(), + inner: v.inner.clone(), + }; + + *self = FlightExchange::MovedReceiver(v); + + return flight_receiver; + } + + unreachable!("take as receiver miss match exchange type") + } +} + +pub struct FlightDataAckState { + seq: AtomicUsize, + auto_ack_window_size: usize, + + may_retry: bool, + receiver: Receiver>, + confirmation_queue: VecDeque<(usize, Result, Status>)>, +} + +impl FlightDataAckState { + pub fn create( + window_size: usize, + receiver: Receiver>, + ) -> Arc> { + Arc::new(Mutex::new(FlightDataAckState { + receiver, + may_retry: true, + seq: AtomicUsize::new(0), + auto_ack_window_size: window_size, + confirmation_queue: VecDeque::with_capacity(window_size), + })) + } + + fn ack_message(&mut self, seq: usize) { + while let Some((id, _)) = self.confirmation_queue.front() { + if *id <= seq { + self.confirmation_queue.pop_front(); + } + } } - pub fn convert_to_sender(self) -> FlightSender { - match self { - FlightExchange::Sender(tx) => FlightSender { tx }, - _ => unreachable!(), + fn end_of_stream(&mut self) -> Poll, Status>>> { + let message_seq = self.seq.fetch_add(1, Ordering::SeqCst); + self.ack_message(message_seq); + + self.may_retry = false; + Poll::Ready(None) + } + + fn error_of_stream(&mut self, cause: Status) -> Poll, Status>>> { + let message_seq = self.seq.fetch_add(1, Ordering::SeqCst); + + // Automatically acknowledge messages outside the ACK window. + // A better approach is for the client to send back an ACK. + if message_seq >= self.auto_ack_window_size { + self.ack_message(message_seq - self.auto_ack_window_size); } + + self.confirmation_queue + .push_back((message_seq, Err(cause.clone()))); + Poll::Ready(Some(Err(cause))) } - pub fn convert_to_receiver(self) -> FlightReceiver { - match self { - FlightExchange::Receiver { notify, receiver } => FlightReceiver { - notify, - rx: receiver, - }, - _ => unreachable!(), + fn message(&mut self, data: FlightData) -> Poll, Status>>> { + let message_seq = self.seq.fetch_add(1, Ordering::SeqCst); + let data = Arc::new(data); + let duplicate = data.clone(); + + // Automatically acknowledge messages outside the ACK window. + // A better approach is for the client to send back an ACK. + if message_seq >= self.auto_ack_window_size { + self.ack_message(message_seq - self.auto_ack_window_size); } + + self.confirmation_queue.push_back((message_seq, Ok(data))); + Poll::Ready(Some(Ok(duplicate))) + } + + fn check_resend(&mut self) -> Option, Status>> { + let current_seq = self.seq.load(Ordering::SeqCst); + + // normal case, no resend + if let Some((id, _)) = self.confirmation_queue.back() { + if *id == current_seq - 1 { + return None; + } + } + + // message is ack + if let Some((id, _)) = self.confirmation_queue.front() { + if *id > current_seq { + return Some(Err(Status::aborted( + "Aborted query, because the remote flight channel is closed.", + ))); + } + } + + // resend case, iterate the queue to find the message to resend + for (id, res) in self.confirmation_queue.iter() { + if *id == current_seq { + self.seq.fetch_add(1, Ordering::SeqCst); + return Some(res.clone()); + } + } + + None + } + + pub fn poll_next( + &mut self, + cx: &mut Context<'_>, + ) -> Poll, Status>>> { + if let Some(res) = self.check_resend() { + return Poll::Ready(Some(res)); + } + match Pin::new(&mut self.receiver).poll_next(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(None) => self.end_of_stream(), + Poll::Ready(Some(Err(status))) => self.error_of_stream(status), + Poll::Ready(Some(Ok(flight_data))) => self.message(flight_data), + } + } +} + +pub struct FlightDataAckStream { + state: Arc>, +} + +impl FlightDataAckStream { + pub fn create( + state: Arc>, + begin: usize, + ) -> Result { + // reset begin + state.lock().seq.store(begin, Ordering::SeqCst); + Ok(FlightDataAckStream { state }) + } +} + +impl Drop for FlightDataAckStream { + fn drop(&mut self) { + let state = self.state.lock(); + + state.receiver.close(); + + if state.may_retry { + drop(state); + let weak = Arc::downgrade(&self.state); + GlobalIORuntime::instance().spawn(async move { + tokio::time::sleep(Duration::from_secs(60)).await; + if let Some(ss) = weak.upgrade() { + let ss = ss.lock(); + ss.receiver.close(); + } + }); + } + } +} + +impl Stream for FlightDataAckStream { + type Item = Result, Status>; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.state.lock().poll_next(cx) } } diff --git a/src/query/service/src/servers/flight/mod.rs b/src/query/service/src/servers/flight/mod.rs index ea435893df189..c057bfc09c55b 100644 --- a/src/query/service/src/servers/flight/mod.rs +++ b/src/query/service/src/servers/flight/mod.rs @@ -22,7 +22,7 @@ pub mod v1; pub use codec::MessageCodec; pub use flight_client::FlightClient; pub use flight_client::FlightExchange; -pub use flight_client::FlightReceiver; pub use flight_client::FlightSender; +pub use flight_client::RetryableFlightReceiver; pub use flight_server::FlightService; pub use request_builder::RequestBuilder; diff --git a/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs b/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs index 7e5b2f9e43699..f2c219956536e 100644 --- a/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs +++ b/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs @@ -21,8 +21,6 @@ use std::sync::atomic::Ordering; use std::sync::Arc; use std::time::Duration; -use async_channel::Receiver; -use databend_common_arrow::arrow_format::flight::data::FlightData; use databend_common_arrow::arrow_format::flight::service::flight_service_client::FlightServiceClient; use databend_common_base::base::GlobalInstance; use databend_common_base::runtime::GlobalIORuntime; @@ -41,7 +39,6 @@ use parking_lot::Mutex; use parking_lot::ReentrantMutex; use petgraph::prelude::EdgeRef; use petgraph::Direction; -use tonic::Status; use super::exchange_params::ExchangeParams; use super::exchange_params::MergeExchangeParams; @@ -57,6 +54,9 @@ use crate::pipelines::PipelineBuildResult; use crate::pipelines::PipelineBuilder; use crate::schedulers::QueryFragmentActions; use crate::schedulers::QueryFragmentsActions; +use crate::servers::flight::flight_client::FlightDataAckState; +use crate::servers::flight::flight_client::FlightDataAckStream; +use crate::servers::flight::flight_client::RetryableFlightReceiver; use crate::servers::flight::v1::actions::init_query_fragments; use crate::servers::flight::v1::actions::INIT_QUERY_FRAGMENTS; use crate::servers::flight::v1::actions::START_PREPARED_QUERY; @@ -69,7 +69,6 @@ use crate::servers::flight::v1::packets::QueryFragment; use crate::servers::flight::v1::packets::QueryFragments; use crate::servers::flight::FlightClient; use crate::servers::flight::FlightExchange; -use crate::servers::flight::FlightReceiver; use crate::servers::flight::FlightSender; use crate::sessions::QueryContext; use crate::sessions::TableContext; @@ -129,6 +128,9 @@ impl DataExchangeManager { let config = GlobalConfig::instance(); let with_cur_rt = env.create_rpc_clint_with_current_rt; + let flight_retry_times = env.settings.get_max_flight_retry_times()?; + let flight_retry_interval = env.settings.get_flight_retry_interval()?; + let mut request_exchanges = HashMap::new(); let mut targets_exchanges = HashMap::new(); @@ -154,12 +156,27 @@ impl DataExchangeManager { Edge::Fragment(v) => QueryExchange::Fragment { source: source.id.clone(), fragment: v, - exchange: flight_client.do_get(&query_id, &target.id, v).await?, + exchange: flight_client + .do_get( + &query_id, + &target.id, + v, + &address, + flight_retry_times, + flight_retry_interval, + ) + .await?, }, Edge::Statistics => QueryExchange::Statistics { source: source.id.clone(), exchange: flight_client - .request_server_exchange(&query_id, &target.id) + .request_server_exchange( + &query_id, + &target.id, + &address, + flight_retry_times, + flight_retry_interval, + ) .await?, }, }) @@ -348,15 +365,21 @@ impl DataExchangeManager { &self, id: String, target: String, - ) -> Result, Status>>> { + continue_from: usize, + ) -> Result { let queries_coordinator_guard = self.queries_coordinator.lock(); let queries_coordinator = unsafe { &mut *queries_coordinator_guard.deref().get() }; match queries_coordinator.entry(id) { - Entry::Occupied(mut v) => v.get_mut().add_statistics_exchange(target), - Entry::Vacant(v) => v - .insert(QueryCoordinator::create()) - .add_statistics_exchange(target), + Entry::Occupied(mut v) => v.get_mut().add_statistics_exchange(target, continue_from), + Entry::Vacant(v) => match continue_from == 0 { + true => v + .insert(QueryCoordinator::create()) + .add_statistics_exchange(target, continue_from), + false => Err(ErrorCode::Timeout( + "Reconnection timeout, the state has been cleared", + )), + }, } } @@ -366,15 +389,26 @@ impl DataExchangeManager { query: String, target: String, fragment: usize, - ) -> Result, Status>>> { + continue_from: usize, + ) -> Result { let queries_coordinator_guard = self.queries_coordinator.lock(); let queries_coordinator = unsafe { &mut *queries_coordinator_guard.deref().get() }; match queries_coordinator.entry(query) { - Entry::Occupied(mut v) => v.get_mut().add_fragment_exchange(target, fragment), - Entry::Vacant(v) => v - .insert(QueryCoordinator::create()) - .add_fragment_exchange(target, fragment), + Entry::Occupied(mut v) => { + v.get_mut() + .add_fragment_exchange(target, fragment, continue_from) + } + Entry::Vacant(v) => match continue_from == 0 { + true => v.insert(QueryCoordinator::create()).add_fragment_exchange( + target, + fragment, + continue_from, + ), + false => Err(ErrorCode::Timeout( + "Reconnection timeout, the state has been cleared", + )), + }, } } @@ -457,12 +491,12 @@ impl DataExchangeManager { match queries_coordinator.get_mut(&query_id) { None => Err(ErrorCode::Internal("Query not exists.")), Some(query_coordinator) => { - assert!(query_coordinator.fragment_exchanges.is_empty()); + query_coordinator.assert_leak_fragment_exchanges(); let injector = DefaultExchangeInjector::create(); let mut build_res = query_coordinator.subscribe_fragment(&ctx, fragment_id, injector)?; - let exchanges = std::mem::take(&mut query_coordinator.statistics_exchanges); + let exchanges = query_coordinator.take_statistics_receivers(); let statistics_receiver = StatisticsReceiver::spawn_receiver(&ctx, exchanges)?; let statistics_receiver: Mutex = @@ -506,13 +540,13 @@ impl DataExchangeManager { pub fn get_flight_receiver( &self, params: &ExchangeParams, - ) -> Result> { + ) -> Result> { let queries_coordinator_guard = self.queries_coordinator.lock(); let queries_coordinator = unsafe { &mut *queries_coordinator_guard.deref().get() }; match queries_coordinator.get_mut(¶ms.get_query_id()) { None => Err(ErrorCode::Internal("Query not exists.")), - Some(coordinator) => coordinator.get_flight_receiver(params), + Some(coordinator) => coordinator.take_flight_receiver(params), } } @@ -550,15 +584,34 @@ struct QueryInfo { query_executor: Option>, } -static FLIGHT_SENDER: u8 = 1; -static FLIGHT_RECEIVER: u8 = 2; +#[derive(Hash, Eq, PartialEq)] +pub struct FragmentExchangeIdentifier { + target: String, + fragment: usize, +} + +#[derive(Hash, Eq, PartialEq)] +pub enum ExchangeIdentifier { + Statistics(String), + DataSender(FragmentExchangeIdentifier), + DataReceiver(FragmentExchangeIdentifier), +} + +impl ExchangeIdentifier { + pub fn fragment_sender(target: String, fragment: usize) -> Self { + ExchangeIdentifier::DataSender(FragmentExchangeIdentifier { target, fragment }) + } + + pub fn fragment_receiver(target: String, fragment: usize) -> Self { + ExchangeIdentifier::DataReceiver(FragmentExchangeIdentifier { target, fragment }) + } +} struct QueryCoordinator { info: Option, fragments_coordinator: HashMap>, - statistics_exchanges: HashMap, - fragment_exchanges: HashMap<(String, usize, u8), FlightExchange>, + exchanges: HashMap, } impl QueryCoordinator { @@ -566,24 +619,67 @@ impl QueryCoordinator { QueryCoordinator { info: None, fragments_coordinator: HashMap::new(), - fragment_exchanges: HashMap::new(), - statistics_exchanges: HashMap::new(), + exchanges: HashMap::new(), + } + } + + pub fn take_statistics_senders(&mut self) -> Vec { + let mut statistics_senders = Vec::with_capacity(1); + + for (identifier, exchange) in &mut self.exchanges { + if let ExchangeIdentifier::Statistics(_) = identifier { + statistics_senders.push(exchange.take_as_sender()); + } + } + + statistics_senders + } + + pub fn take_statistics_receivers(&mut self) -> Vec { + let mut statistics_receivers = Vec::with_capacity(self.exchanges.len()); + + for (identifier, exchange) in &mut self.exchanges { + if let ExchangeIdentifier::Statistics(_) = identifier { + statistics_receivers.push(exchange.take_as_receiver()); + } + } + + statistics_receivers + } + + pub fn assert_leak_fragment_exchanges(&self) { + for (identifier, exchange) in &self.exchanges { + if !matches!(identifier, ExchangeIdentifier::Statistics(_)) { + assert!(matches!( + exchange, + FlightExchange::MovedSender(_) | FlightExchange::MovedReceiver(_) + )); + } } } pub fn add_statistics_exchange( &mut self, target: String, - ) -> Result, Status>>> { + begin: usize, + ) -> Result { let (tx, rx) = async_channel::bounded(8); - match self - .statistics_exchanges - .insert(target, FlightExchange::create_sender(tx)) - { - None => Ok(rx), - Some(_) => Err(ErrorCode::Internal( - "statistics exchanges can only have one", - )), + let identifier = ExchangeIdentifier::Statistics(target); + + match self.exchanges.entry(identifier) { + Entry::Vacant(v) => { + let state = FlightDataAckState::create(10, rx); + v.insert(FlightExchange::create_sender(state.clone(), tx)); + FlightDataAckStream::create(state, begin) + } + Entry::Occupied(mut v) => match v.get_mut() { + FlightExchange::MovedSender(v) => { + FlightDataAckStream::create(v.state.clone(), begin) + } + _ => Err(ErrorCode::Internal( + "statistics exchanges can only have one", + )), + }, } } @@ -592,7 +688,8 @@ impl QueryCoordinator { exchanges: HashMap, ) -> Result<()> { for (source, exchange) in exchanges.into_iter() { - if self.statistics_exchanges.insert(source, exchange).is_some() { + let identifier = ExchangeIdentifier::Statistics(source); + if self.exchanges.insert(identifier, exchange).is_some() { return Err(ErrorCode::Internal( "Internal error, statistics exchange can only have one.", )); @@ -606,13 +703,24 @@ impl QueryCoordinator { &mut self, target: String, fragment: usize, - ) -> Result, Status>>> { + begin: usize, + ) -> Result { let (tx, rx) = async_channel::bounded(8); - self.fragment_exchanges.insert( - (target, fragment, FLIGHT_SENDER), - FlightExchange::create_sender(tx), - ); - Ok(rx) + let identifier = ExchangeIdentifier::fragment_sender(target, fragment); + + match self.exchanges.entry(identifier) { + Entry::Vacant(v) => { + let state = FlightDataAckState::create(10, rx); + v.insert(FlightExchange::create_sender(state.clone(), tx)); + FlightDataAckStream::create(state, begin) + } + Entry::Occupied(mut v) => match v.get_mut() { + FlightExchange::MovedSender(v) => { + FlightDataAckStream::create(v.state.clone(), begin) + } + _ => Err(ErrorCode::Internal("fragment exchange can only have one")), + }, + } } pub fn add_fragment_exchanges( @@ -620,83 +728,101 @@ impl QueryCoordinator { exchanges: HashMap<(String, usize), FlightExchange>, ) -> Result<()> { for ((source, fragment), exchange) in exchanges.into_iter() { - self.fragment_exchanges - .insert((source, fragment, FLIGHT_RECEIVER), exchange); + let identifier = ExchangeIdentifier::fragment_receiver(source, fragment); + + self.exchanges.insert(identifier, exchange); } Ok(()) } pub fn get_flight_senders(&mut self, params: &ExchangeParams) -> Result> { + let mut fragments_exchanges = Vec::with_capacity(self.exchanges.len()); + match params { - ExchangeParams::MergeExchange(params) => Ok(self - .fragment_exchanges - .extract_if(|(_, f, r), _| f == ¶ms.fragment_id && *r == FLIGHT_SENDER) - .map(|(_, v)| v.convert_to_sender()) - .collect::>()), - ExchangeParams::ShuffleExchange(params) => { - let mut exchanges = Vec::with_capacity(params.destination_ids.len()); + ExchangeParams::MergeExchange(params) => { + for (identifier, exchange) in &mut self.exchanges { + if let ExchangeIdentifier::DataSender(v) = identifier { + if v.fragment != params.fragment_id { + continue; + } - for destination in ¶ms.destination_ids { - exchanges.push(match destination == ¶ms.executor_id { - true => Ok(FlightSender::create(async_channel::bounded(1).0)), - false => match self.fragment_exchanges.remove(&( - destination.clone(), - params.fragment_id, - FLIGHT_SENDER, - )) { - Some(exchange_channel) => Ok(exchange_channel.convert_to_sender()), - None => Err(ErrorCode::UnknownFragmentExchange(format!( - "Unknown fragment exchange channel, {}, {}", - destination, params.fragment_id - ))), - }, - }?); + fragments_exchanges.push(exchange.take_as_sender()); + } } + } + ExchangeParams::ShuffleExchange(params) => { + for destination in ¶ms.destination_ids { + if destination == ¶ms.executor_id { + let dummy = FlightSender::create(async_channel::bounded(1).0); + fragments_exchanges.push(dummy); + continue; + } + + let target = destination.clone(); + let fragment = params.fragment_id; + let identifier = ExchangeIdentifier::fragment_sender(target, fragment); + if let Some(v) = self.exchanges.get_mut(&identifier) { + fragments_exchanges.push(v.take_as_sender()); + continue; + } - Ok(exchanges) + return Err(ErrorCode::UnknownFragmentExchange(format!( + "Unknown fragment exchange channel, {}, {}", + destination, params.fragment_id + ))); + } } - } + }; + + Ok(fragments_exchanges) } - pub fn get_flight_receiver( + pub fn take_flight_receiver( &mut self, params: &ExchangeParams, - ) -> Result> { + ) -> Result> { + let mut fragments_exchanges = Vec::with_capacity(self.exchanges.len()); + match params { - ExchangeParams::MergeExchange(params) => Ok(self - .fragment_exchanges - .extract_if(|(_, f, r), _| f == ¶ms.fragment_id && *r == FLIGHT_RECEIVER) - .map(|((source, _, _), v)| (source.clone(), v.convert_to_receiver())) - .collect::>()), - ExchangeParams::ShuffleExchange(params) => { - let mut exchanges = Vec::with_capacity(params.destination_ids.len()); + ExchangeParams::MergeExchange(params) => { + for (identifier, exchange) in &mut self.exchanges { + if let ExchangeIdentifier::DataReceiver(v) = identifier { + if v.fragment != params.fragment_id { + continue; + } - for destination in ¶ms.destination_ids { - exchanges.push(( - destination.clone(), - match destination == ¶ms.executor_id { - true => Ok(FlightReceiver::create(async_channel::bounded(1).1)), - false => match self.fragment_exchanges.remove(&( - destination.clone(), - params.fragment_id, - FLIGHT_RECEIVER, - )) { - Some(v) => Ok(v.convert_to_receiver()), - _ => Err(ErrorCode::UnknownFragmentExchange(format!( - "Unknown fragment flight receiver, {}, {}", - destination, params.fragment_id - ))), - }, - }?, - )); + fragments_exchanges.push((v.target.clone(), exchange.take_as_receiver())); + } } + } + ExchangeParams::ShuffleExchange(params) => { + for destination in ¶ms.destination_ids { + if destination == ¶ms.executor_id { + let dummy = RetryableFlightReceiver::dummy(); + fragments_exchanges.push((destination.clone(), dummy)); + continue; + } - Ok(exchanges) + let source = destination.clone(); + let fragment = params.fragment_id; + let identifier = ExchangeIdentifier::fragment_receiver(source, fragment); + if let Some(v) = self.exchanges.get_mut(&identifier) { + let receiver = v.take_as_receiver(); + fragments_exchanges.push((destination.clone(), receiver)); + continue; + } + + return Err(ErrorCode::UnknownFragmentExchange(format!( + "Unknown fragment flight receiver, {}, {}", + destination, params.fragment_id + ))); + } } - } - } + }; + Ok(fragments_exchanges) + } pub fn prepare_pipeline(&mut self, fragments: &QueryFragments) -> Result<()> { let query_info = self.info.as_ref().expect("expect query info"); let query_context = query_info.query_ctx.clone(); @@ -839,28 +965,30 @@ impl QueryCoordinator { let settings = ExecutorSettings::try_create(info.query_ctx.clone())?; let executor = PipelineCompleteExecutor::from_pipelines(pipelines, settings)?; - assert!(self.fragment_exchanges.is_empty()); + self.assert_leak_fragment_exchanges(); let info_mut = self.info.as_mut().expect("Query info is None"); info_mut.query_executor = Some(executor.clone()); let query_id = info_mut.query_id.clone(); let query_ctx = info_mut.query_ctx.clone(); - let request_server_exchanges = std::mem::take(&mut self.statistics_exchanges); - if request_server_exchanges.len() != 1 { + let ctx = query_ctx.clone(); + let mut statistics_senders = self.take_statistics_senders(); + + let Some(statistics_sender) = statistics_senders.pop() else { + return Err(ErrorCode::Internal( + "Request server must less than 1 if is not request server.", + )); + }; + + if !statistics_senders.is_empty() { return Err(ErrorCode::Internal( "Request server must less than 1 if is not request server.", )); } - let ctx = query_ctx.clone(); - let (_, request_server_exchange) = request_server_exchanges.into_iter().next().unwrap(); - let mut statistics_sender = StatisticsSender::spawn( - &query_id, - ctx, - request_server_exchange, - executor.get_inner(), - ); + let mut statistics_sender = + StatisticsSender::spawn(&query_id, ctx, statistics_sender, executor.get_inner()); let span = if let Some(parent) = SpanContext::current_local_parent() { Span::root("Distributed-Executor", parent) diff --git a/src/query/service/src/servers/flight/v1/exchange/exchange_source_reader.rs b/src/query/service/src/servers/flight/v1/exchange/exchange_source_reader.rs index 88a1a139c21b4..daa7c4bfec835 100644 --- a/src/query/service/src/servers/flight/v1/exchange/exchange_source_reader.rs +++ b/src/query/service/src/servers/flight/v1/exchange/exchange_source_reader.rs @@ -27,15 +27,15 @@ use databend_common_pipeline_core::processors::ProcessorPtr; use databend_common_pipeline_core::PipeItem; use log::info; +use crate::servers::flight::flight_client::RetryableFlightReceiver; use crate::servers::flight::v1::exchange::serde::ExchangeDeserializeMeta; use crate::servers::flight::v1::packets::DataPacket; -use crate::servers::flight::FlightReceiver; pub struct ExchangeSourceReader { finished: AtomicBool, output: Arc, output_data: Vec, - flight_receiver: FlightReceiver, + flight_receiver: RetryableFlightReceiver, source: String, destination: String, fragment: usize, @@ -44,7 +44,7 @@ pub struct ExchangeSourceReader { impl ExchangeSourceReader { pub fn create( output: Arc, - flight_receiver: FlightReceiver, + flight_receiver: RetryableFlightReceiver, source: &str, destination: &str, fragment: usize, @@ -156,7 +156,7 @@ impl Processor for ExchangeSourceReader { } pub fn create_reader_item( - flight_receiver: FlightReceiver, + flight_receiver: RetryableFlightReceiver, source: &str, destination: &str, fragment: usize, diff --git a/src/query/service/src/servers/flight/v1/exchange/statistics_receiver.rs b/src/query/service/src/servers/flight/v1/exchange/statistics_receiver.rs index 84b89ab89b344..bfb8b3291e069 100644 --- a/src/query/service/src/servers/flight/v1/exchange/statistics_receiver.rs +++ b/src/query/service/src/servers/flight/v1/exchange/statistics_receiver.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; use std::sync::Arc; use databend_common_base::base::tokio::sync::broadcast::channel; @@ -25,8 +24,8 @@ use databend_common_exception::Result; use futures_util::future::select; use futures_util::future::Either; +use crate::servers::flight::flight_client::RetryableFlightReceiver; use crate::servers::flight::v1::packets::DataPacket; -use crate::servers::flight::FlightExchange; use crate::sessions::QueryContext; pub struct StatisticsReceiver { @@ -38,14 +37,13 @@ pub struct StatisticsReceiver { impl StatisticsReceiver { pub fn spawn_receiver( ctx: &Arc, - statistics_exchanges: HashMap, + statistics_exchanges: Vec, ) -> Result { let (shutdown_tx, _shutdown_rx) = channel(2); let mut exchange_handler = Vec::with_capacity(statistics_exchanges.len()); let runtime = Runtime::with_worker_threads(2, Some(String::from("StatisticsReceiver")))?; - for (_source, exchange) in statistics_exchanges.into_iter() { - let rx = exchange.convert_to_receiver(); + for rx in statistics_exchanges.into_iter() { exchange_handler.push(runtime.spawn({ let ctx = ctx.clone(); let shutdown_rx = shutdown_tx.subscribe(); diff --git a/src/query/service/src/servers/flight/v1/exchange/statistics_sender.rs b/src/query/service/src/servers/flight/v1/exchange/statistics_sender.rs index b26e8f4d2854d..95d0af8a730fa 100644 --- a/src/query/service/src/servers/flight/v1/exchange/statistics_sender.rs +++ b/src/query/service/src/servers/flight/v1/exchange/statistics_sender.rs @@ -29,7 +29,6 @@ use log::warn; use crate::pipelines::executor::PipelineExecutor; use crate::servers::flight::v1::packets::DataPacket; use crate::servers::flight::v1::packets::ProgressInfo; -use crate::servers::flight::FlightExchange; use crate::servers::flight::FlightSender; use crate::sessions::QueryContext; @@ -43,11 +42,10 @@ impl StatisticsSender { pub fn spawn( query_id: &str, ctx: Arc, - exchange: FlightExchange, + tx: FlightSender, executor: Arc, ) -> Self { let spawner = ctx.clone(); - let tx = exchange.convert_to_sender(); let (shutdown_flag_sender, shutdown_flag_receiver) = async_channel::bounded(1); let handle = spawner.spawn({ @@ -231,8 +229,4 @@ impl StatisticsSender { progress_info } - - // fn fetch_profiling(ctx: &Arc) -> Result> { - // // ctx.get_exchange_manager() - // } } diff --git a/src/query/service/src/servers/flight/v1/flight_service.rs b/src/query/service/src/servers/flight/v1/flight_service.rs index c9a255f8bda44..a1c2d0cd8367c 100644 --- a/src/query/service/src/servers/flight/v1/flight_service.rs +++ b/src/query/service/src/servers/flight/v1/flight_service.rs @@ -111,8 +111,16 @@ impl FlightOperation for DatabendQueryFlightService { "request_server_exchange" => { let target = request.get_metadata("x-target")?; let query_id = request.get_metadata("x-query-id")?; + let continue_from = request + .get_metadata("x-continue-from")? + .parse::() + .unwrap(); Ok(RawResponse::new(Box::pin( - DataExchangeManager::instance().handle_statistics_exchange(query_id, target)?, + DataExchangeManager::instance().handle_statistics_exchange( + query_id, + target, + continue_from, + )?, ))) } "exchange_fragment" => { @@ -122,10 +130,17 @@ impl FlightOperation for DatabendQueryFlightService { .get_metadata("x-fragment-id")? .parse::() .unwrap(); - + let continue_from = request + .get_metadata("x-continue-from")? + .parse::() + .unwrap(); Ok(RawResponse::new(Box::pin( - DataExchangeManager::instance() - .handle_exchange_fragment(query_id, target, fragment)?, + DataExchangeManager::instance().handle_exchange_fragment( + query_id, + target, + fragment, + continue_from, + )?, ))) } "health" => Ok(RawResponse::new(build_health_response())), diff --git a/src/query/service/src/servers/flight/v1/packets/packet_executor.rs b/src/query/service/src/servers/flight/v1/packets/packet_executor.rs index c555184d82912..29999b7727b31 100644 --- a/src/query/service/src/servers/flight/v1/packets/packet_executor.rs +++ b/src/query/service/src/servers/flight/v1/packets/packet_executor.rs @@ -14,7 +14,7 @@ use crate::servers::flight::v1::packets::QueryFragment; -#[derive(Debug, serde::Serialize, serde::Deserialize)] +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct QueryFragments { pub query_id: String, pub fragments: Vec, diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index 84f90218bd66c..3273fc9ec2e87 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -825,6 +825,18 @@ impl DefaultSettings { mode: SettingMode::Both, range: Some(SettingRange::Numeric(0..=1)), }), + ("max_flight_connection_retry_times", DefaultSettingValue { + value: UserSettingValue::UInt64(0), + desc: "The maximum retry count for cluster flight. Disable if 0.", + mode: SettingMode::Both, + range: Some(SettingRange::Numeric(0..=30)), + }), + ("flight_connection_retry_interval", DefaultSettingValue { + value: UserSettingValue::UInt64(60), + desc: "The retry interval of cluster flight is in seconds.", + mode: SettingMode::Both, + range: Some(SettingRange::Numeric(0..=900)), + }), ("random_function_seed", DefaultSettingValue { value: UserSettingValue::UInt64(0), desc: "Seed for random function", diff --git a/src/query/settings/src/settings_getter_setter.rs b/src/query/settings/src/settings_getter_setter.rs index 4cdeb95972337..c8fae40181d0b 100644 --- a/src/query/settings/src/settings_getter_setter.rs +++ b/src/query/settings/src/settings_getter_setter.rs @@ -689,4 +689,12 @@ impl Settings { pub fn get_random_function_seed(&self) -> Result { Ok(self.try_get_u64("random_function_seed")? == 1) } + + pub fn get_flight_retry_interval(&self) -> Result { + Ok(self.try_get_u64("flight_connection_retry_interval")? as usize) + } + + pub fn get_max_flight_retry_times(&self) -> Result { + Ok(self.try_get_u64("max_flight_connection_retry_times")? as usize) + } }