Skip to content

Commit

Permalink
debug
Browse files Browse the repository at this point in the history
  • Loading branch information
dqhl76 committed Sep 4, 2024
1 parent 689b639 commit f8189d7
Showing 1 changed file with 12 additions and 3 deletions.
15 changes: 12 additions & 3 deletions src/query/service/src/servers/flight/flight_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -639,17 +639,21 @@ impl FlightDataAckStream {
begin: usize,
) -> Result<FlightDataAckStream> {
// reset begin
info!("Create FlightDataAckStream hold lock");
let mut state_guard = state.lock();
state_guard.seq.store(begin, Ordering::SeqCst);
state_guard.may_retry = true;
drop(state_guard);
info!("Create FlightDataAckStream release lock");
Ok(FlightDataAckStream { state })
}
}

impl Drop for FlightDataAckStream {
fn drop(&mut self) {
info!("Drop FlightDataAckStream");
let state_should_retry = {
info!("Drop stage1 hold lock");
let mut state = self.state.lock();
if state.may_retry {
state.may_retry = false;
Expand All @@ -659,17 +663,19 @@ impl Drop for FlightDataAckStream {
false
}
};

info!("Drop stage1 release lock");
if state_should_retry {
let weak = Arc::downgrade(&self.state);
GlobalIORuntime::instance().spawn(async move {
info!("Drop stage2 begin, wait for 60");
tokio::time::sleep(Duration::from_secs(60)).await;

if let Some(ss) = weak.upgrade() {
info!("Drop stage2 hold lock");
let ss = ss.lock();
if !ss.may_retry {
ss.receiver.close();
}
info!("Drop stage2 release lock");
}
});
}
Expand All @@ -680,6 +686,9 @@ impl Stream for FlightDataAckStream {
type Item = Result<Arc<FlightData>, Status>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.state.lock().poll_next(cx)
info!("Poll next hold lock");
let res = self.state.lock().poll_next(cx);
info!("Poll next release lock");
res
}
}

0 comments on commit f8189d7

Please sign in to comment.