From 57c19c7aefa8793af82a5ca8366169dbd3c21797 Mon Sep 17 00:00:00 2001 From: Jeb Bearer Date: Wed, 11 Sep 2024 13:17:33 -0400 Subject: [PATCH 1/2] Ensure dropped transactions are reverted in a timely manner The old method of queuing dropped transactions for revert and then actually doing the revert the next time the transaction is used in an async context can leave idle transactions unreverted for a long time. This may seem ok if the connection is not otherwise being used. However, this can cause `SERIALIZABLE READ ONLY DEFERRABLE` transactions to block indefinitely waiting to acquire a snapshot, since the database sees that there is a writable transaction in progress indefinitely. The new method ensures transactions are always reverted soon after they are dropped by running the revert in a background task. If the connection is acquired again before the revert has completed, it will block on the cmpletion of the background task, to ensure the connection is not used while the old transaction is still in progress. --- src/data_source/storage/sql/transaction.rs | 71 +++++++++++------ src/task.rs | 88 ++++++++++++++++------ 2 files changed, 109 insertions(+), 50 deletions(-) diff --git a/src/data_source/storage/sql/transaction.rs b/src/data_source/storage/sql/transaction.rs index 0c9bb88ec..8a92f93b9 100644 --- a/src/data_source/storage/sql/transaction.rs +++ b/src/data_source/storage/sql/transaction.rs @@ -35,6 +35,7 @@ use crate::{ update::{self, ReadOnly}, }, merklized_state::{MerklizedState, UpdateStateData}, + task::Task, types::HeightIndexed, Header, Payload, QueryError::{self, NotFound}, @@ -42,7 +43,10 @@ use crate::{ }; use anyhow::{bail, ensure, Context}; use ark_serialize::CanonicalSerialize; -use async_std::{sync::MutexGuard, task::sleep}; +use async_std::{ + sync::{Arc, MutexGuard}, + task::sleep, +}; use async_trait::async_trait; use bit_vec::BitVec; use committable::Committable; @@ -67,37 +71,54 @@ use std::{ pub(super) struct Connection { #[deref] #[deref_mut] - client: Client, - revert_queued: bool, + client: Arc, + + // When the connection is dropped in a synchronous context, we spawn a task to revert the + // in-progress transaction, so the revert can run asynchronously without blocking the dropping + // thread. If such a revert is in progress, a handle to the task will be stored here. The handle + // _must_ be awaited to allow the revert to finish before the connection can be used again. + revert: Option>>, } impl From for Connection { fn from(client: Client) -> Self { Self { - client, - revert_queued: false, + client: Arc::new(client), + revert: None, } } } impl Connection { + /// Prepare the connection for use. + /// + /// This will wait for any async operations to finish which were spawned when the connection was + /// dropped in a synchronous context. This _must_ be called the first time the connection is + /// invoked in an async context after being dropped in a sync context. async fn acquire(&mut self) -> anyhow::Result<()> { - if self.revert_queued { - // If a revert was queued when this connection was released in a synchronous context, - // execute it now that we are in an async context. - self.revert().await?; - self.revert_queued = false; + if let Some(revert) = self.revert.take() { + // If a revert was started when this connection was released in a synchronous context, + // we must wait for it to finish before reusing the connection. + revert.join().await?; } Ok(()) } - fn queue_revert(&mut self) { - self.revert_queued = true; - } + /// Spawn a revert to run asynchronously. + fn spawn_revert(&mut self) { + // Consistency check. + assert!( + self.revert.is_none(), + "attempting to queue revert while a queued revert is in progress; this should not be possible", + ); - async fn revert(&mut self) -> anyhow::Result<()> { - self.client.batch_execute("ROLLBACK").await?; - Ok(()) + // Get a client that is not connected to the lifetime of this reference, so we can run the + // revert command in the background. + let client = self.client.clone(); + self.revert = Some(Task::spawn("revert postgres transaction", async move { + client.batch_execute("ROLLBACK").await?; + Ok(()) + })); } #[cfg(test)] @@ -113,7 +134,7 @@ impl Connection { #[derivative(Debug)] pub struct Transaction<'a> { inner: MutexGuard<'a, Connection>, - committed: bool, + finalized: bool, } impl<'a> Transaction<'a> { @@ -126,7 +147,7 @@ impl<'a> Transaction<'a> { .await?; Ok(Self { inner, - committed: false, + finalized: false, } .into()) } @@ -138,7 +159,7 @@ impl<'a> Transaction<'a> { .await?; Ok(Self { inner, - committed: false, + finalized: false, }) } } @@ -146,25 +167,25 @@ impl<'a> Transaction<'a> { impl<'a> update::Transaction for Transaction<'a> { async fn commit(mut self) -> anyhow::Result<()> { self.inner.batch_execute("COMMIT").await?; - self.committed = true; + self.finalized = true; Ok(()) } fn revert(mut self) -> impl Future + Send { async move { - self.inner.revert().await.unwrap(); + self.inner.batch_execute("ROLLBACK").await.unwrap(); + self.finalized = true; } } } impl<'a> Drop for Transaction<'a> { fn drop(&mut self) { - if !self.committed { + if !self.finalized { // Since `drop` is synchronous, we can't execute the asynchronous revert process here, // at least not without blocking the current thread (which may be an async executor // thread, blocking other unrelated futures and causing deadlocks). Instead, we will - // simply queue the transaction to be reverted the next time this connection is invoked - // in an async context. - self.inner.queue_revert(); + // revert the transaction asynchronously. + self.inner.spawn_revert(); } } } diff --git a/src/task.rs b/src/task.rs index 4f3b5811c..765d128a2 100644 --- a/src/task.rs +++ b/src/task.rs @@ -16,16 +16,11 @@ use async_std::{ sync::Arc, task::{spawn, JoinHandle}, }; +use derivative::Derivative; use futures::future::Future; use std::fmt::Display; use tracing::{info_span, Instrument}; -#[derive(Debug)] -struct BackgroundTaskInner { - name: String, - handle: JoinHandle<()>, -} - /// A background task which is cancelled on [`Drop`] /// /// This handle can be cloned; cloning it does not clone the underlying task. There may be many @@ -33,10 +28,9 @@ struct BackgroundTaskInner { /// dropped. #[derive(Clone, Debug)] pub struct BackgroundTask { - // The task handle is an `Option` so we can `take()` out of it during `drop`, where we have a - // mutable reference but need to move out of the underlying task handle to cancel it. This will - // always be `Some` except during cancellation. - inner: Option>, + // A handle to the inner task. This exists solely so that we can hold it and have it be dropped + // when the last clone of this object is dropped. + _inner: Arc>, } impl BackgroundTask { @@ -51,39 +45,83 @@ impl BackgroundTask { pub fn spawn(name: impl Display, future: F) -> Self where F: Future + Send + 'static, + { + // Ignore the output of the background task. + let future = async move { + future.await; + }; + Self { + _inner: Arc::new(Task::spawn(name, future)), + } + } +} + +#[derive(Derivative)] +#[derivative(Debug(bound = ""))] +struct TaskInner { + name: String, + #[derivative(Debug = "ignore")] + handle: JoinHandle, +} + +/// A task handle which can be joined, but is cancelled on [`Drop`] +#[derive(Derivative)] +#[derivative(Debug(bound = ""))] +pub struct Task { + // The task handle is an `Option` so we can `take()` out of it during `join` and `drop`. This + // will always be `Some` except during joining or cancellation. + inner: Option>, +} + +impl Task { + /// Spawn a task, which will be cancelled when dropped. + /// + /// The caller should ensure that `future` yields back to the executor fairly frequently, to + /// ensure timely cancellation in case the task is dropped. If an operation in `future` may run + /// for a long time without blocking or yielding, consider using + /// [`yield_now`](async_std::task::yield_now) periodically, or using + /// [`spawn`](async_std::task::spawn) or [`spawn_blocking`](async_std::task::spawn_blocking) to + /// run long operations in a sub-task. + pub fn spawn(name: impl Display, future: F) -> Self + where + F: Future + Send + 'static, { let name = name.to_string(); let handle = { let span = info_span!("task", name); spawn( async move { - tracing::info!("spawning background task"); - future.await; - tracing::info!("completed background task"); + tracing::info!("spawning task"); + let res = future.await; + tracing::info!("completed task"); + res } .instrument(span), ) }; Self { - inner: Some(Arc::new(BackgroundTaskInner { name, handle })), + inner: Some(TaskInner { name, handle }), } } + + /// Wait for the task to complete and get its output. + pub async fn join(mut self) -> T { + // We take here so that we will not attempt to cancel the joined task when this handle is + // dropped at the end of the function. We can unwrap here because `inner` is only `None` + // during `join` or `drop`. Since `join` consumes `self`, it is not possible that `join` + // already ran, and of course `self` has not been dropped yet. + let inner = self.inner.take().unwrap(); + inner.handle.await + } } -impl Drop for BackgroundTask { +impl Drop for Task { fn drop(&mut self) { - // `inner` should never be [`None`] here, we only `take` it because we are given `&mut - // self` (so that this can be called from [`drop`]) and thus we cannot move out of `self`. - // Nevertheless, it doesn't hurt to explicitly check for [`Some`]. if let Some(inner) = self.inner.take() { - // Check if this is the last instance of the [`Arc`] and, if so, cancel the underlying - // task. - if let Some(inner) = Arc::into_inner(inner) { - tracing::info!(name = inner.name, "cancelling background task"); - async_std::task::block_on(inner.handle.cancel()); - tracing::info!(name = inner.name, "cancelled background task"); - } + tracing::info!(name = inner.name, "cancelling task"); + async_std::task::block_on(inner.handle.cancel()); + tracing::info!(name = inner.name, "cancelled task"); } } } From 75ac51d96b8eecb271b2a0c0f1a5ef031ff739c1 Mon Sep 17 00:00:00 2001 From: Jeb Bearer Date: Wed, 11 Sep 2024 13:34:37 -0400 Subject: [PATCH 2/2] Publish `task` module This silences warnings where certain functions are unused with certain feature flags. Also, it makes sense to publish this as it is a generally useful thing that clients might make use of. Eventually, it would make sense to move this to its own crate. --- src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index e4a2824b8..d58e44c22 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -413,7 +413,7 @@ pub mod metrics; pub mod node; mod resolvable; pub mod status; -mod task; +pub mod task; pub mod testing; pub mod types;