Skip to content

Commit

Permalink
feat: add retry for do_action
Browse files Browse the repository at this point in the history
  • Loading branch information
dqhl76 committed Aug 22, 2024
1 parent 0357267 commit 4914933
Showing 1 changed file with 31 additions and 8 deletions.
39 changes: 31 additions & 8 deletions src/query/service/src/clusters/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,13 @@ use futures::future::Either;
use futures::Future;
use futures::StreamExt;
use log::error;
use log::info;
use log::warn;
use rand::thread_rng;
use rand::Rng;
use serde::Deserialize;
use serde::Serialize;
use tokio::time::sleep;

use crate::servers::flight::FlightClient;

Expand All @@ -79,7 +81,7 @@ pub trait ClusterHelper {

fn get_nodes(&self) -> Vec<Arc<NodeInfo>>;

async fn do_action<T: Serialize + Send, Res: for<'de> Deserialize<'de> + Send>(
async fn do_action<T: Serialize + Send + Clone, Res: for<'de> Deserialize<'de> + Send>(
&self,
path: &str,
message: HashMap<String, T>,
Expand Down Expand Up @@ -116,7 +118,7 @@ impl ClusterHelper for Cluster {
self.nodes.to_vec()
}

async fn do_action<T: Serialize + Send, Res: for<'de> Deserialize<'de> + Send>(
async fn do_action<T: Serialize + Send + Clone, Res: for<'de> Deserialize<'de> + Send>(
&self,
path: &str,
message: HashMap<String, T>,
Expand Down Expand Up @@ -145,12 +147,33 @@ impl ClusterHelper for Cluster {
let node_secret = node.secret.clone();

async move {
let mut conn = create_client(&config, &flight_address).await?;
Ok::<_, ErrorCode>((
id,
conn.do_action::<_, Res>(path, node_secret, message, timeout)
.await?,
))
let mut attempt = 0;
let max_attempts = 2;

loop {
let mut conn = create_client(&config, &flight_address).await?;
match conn
.do_action::<_, Res>(
path,
node_secret.clone(),
message.clone(),
timeout,
)
.await
{
Ok(result) => return Ok((id, result)),
Err(e)
if e.code() == ErrorCode::CANNOT_CONNECT_NODE
&& attempt < max_attempts =>
{
// only retry when error is network problem
info!("retry do_action, attempt: {}", attempt);
attempt += 1;
sleep(Duration::from_secs(1)).await;
}
Err(e) => return Err(e),
}
}
}
});
}
Expand Down

0 comments on commit 4914933

Please sign in to comment.