Skip to content

Commit

Permalink
Merge pull request #693 from EspressoSystems/jb/poor-mans-connection-…
Browse files Browse the repository at this point in the history
…pool

Ensure dropped transactions are reverted in a timely manner
  • Loading branch information
jbearer committed Sep 11, 2024
2 parents 60df4f9 + 75ac51d commit aee574c
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 51 deletions.
71 changes: 46 additions & 25 deletions src/data_source/storage/sql/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,18 @@ use crate::{
update::{self, ReadOnly},
},
merklized_state::{MerklizedState, UpdateStateData},
task::Task,
types::HeightIndexed,
Header, Payload,
QueryError::{self, NotFound},
QueryResult, VidShare,
};
use anyhow::{bail, ensure, Context};
use ark_serialize::CanonicalSerialize;
use async_std::{sync::MutexGuard, task::sleep};
use async_std::{
sync::{Arc, MutexGuard},
task::sleep,
};
use async_trait::async_trait;
use bit_vec::BitVec;
use committable::Committable;
Expand All @@ -67,37 +71,54 @@ use std::{
pub(super) struct Connection {
#[deref]
#[deref_mut]
client: Client,
revert_queued: bool,
client: Arc<Client>,

// When the connection is dropped in a synchronous context, we spawn a task to revert the
// in-progress transaction, so the revert can run asynchronously without blocking the dropping
// thread. If such a revert is in progress, a handle to the task will be stored here. The handle
// _must_ be awaited to allow the revert to finish before the connection can be used again.
revert: Option<Task<anyhow::Result<()>>>,
}

impl From<Client> for Connection {
fn from(client: Client) -> Self {
Self {
client,
revert_queued: false,
client: Arc::new(client),
revert: None,
}
}
}

impl Connection {
/// Prepare the connection for use.
///
/// This will wait for any async operations to finish which were spawned when the connection was
/// dropped in a synchronous context. This _must_ be called the first time the connection is
/// invoked in an async context after being dropped in a sync context.
async fn acquire(&mut self) -> anyhow::Result<()> {
if self.revert_queued {
// If a revert was queued when this connection was released in a synchronous context,
// execute it now that we are in an async context.
self.revert().await?;
self.revert_queued = false;
if let Some(revert) = self.revert.take() {
// If a revert was started when this connection was released in a synchronous context,
// we must wait for it to finish before reusing the connection.
revert.join().await?;
}
Ok(())
}

fn queue_revert(&mut self) {
self.revert_queued = true;
}
/// Spawn a revert to run asynchronously.
fn spawn_revert(&mut self) {
// Consistency check.
assert!(
self.revert.is_none(),
"attempting to queue revert while a queued revert is in progress; this should not be possible",
);

async fn revert(&mut self) -> anyhow::Result<()> {
self.client.batch_execute("ROLLBACK").await?;
Ok(())
// Get a client that is not connected to the lifetime of this reference, so we can run the
// revert command in the background.
let client = self.client.clone();
self.revert = Some(Task::spawn("revert postgres transaction", async move {
client.batch_execute("ROLLBACK").await?;
Ok(())
}));
}

#[cfg(test)]
Expand All @@ -113,7 +134,7 @@ impl Connection {
#[derivative(Debug)]
pub struct Transaction<'a> {
inner: MutexGuard<'a, Connection>,
committed: bool,
finalized: bool,
}

impl<'a> Transaction<'a> {
Expand All @@ -126,7 +147,7 @@ impl<'a> Transaction<'a> {
.await?;
Ok(Self {
inner,
committed: false,
finalized: false,
}
.into())
}
Expand All @@ -138,33 +159,33 @@ impl<'a> Transaction<'a> {
.await?;
Ok(Self {
inner,
committed: false,
finalized: false,
})
}
}

impl<'a> update::Transaction for Transaction<'a> {
async fn commit(mut self) -> anyhow::Result<()> {
self.inner.batch_execute("COMMIT").await?;
self.committed = true;
self.finalized = true;
Ok(())
}
fn revert(mut self) -> impl Future + Send {
async move {
self.inner.revert().await.unwrap();
self.inner.batch_execute("ROLLBACK").await.unwrap();
self.finalized = true;
}
}
}

impl<'a> Drop for Transaction<'a> {
fn drop(&mut self) {
if !self.committed {
if !self.finalized {
// Since `drop` is synchronous, we can't execute the asynchronous revert process here,
// at least not without blocking the current thread (which may be an async executor
// thread, blocking other unrelated futures and causing deadlocks). Instead, we will
// simply queue the transaction to be reverted the next time this connection is invoked
// in an async context.
self.inner.queue_revert();
// revert the transaction asynchronously.
self.inner.spawn_revert();
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ pub mod metrics;
pub mod node;
mod resolvable;
pub mod status;
mod task;
pub mod task;
pub mod testing;
pub mod types;

Expand Down
88 changes: 63 additions & 25 deletions src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,21 @@ use async_std::{
sync::Arc,
task::{spawn, JoinHandle},
};
use derivative::Derivative;
use futures::future::Future;
use std::fmt::Display;
use tracing::{info_span, Instrument};

#[derive(Debug)]
struct BackgroundTaskInner {
name: String,
handle: JoinHandle<()>,
}

/// A background task which is cancelled on [`Drop`]
///
/// This handle can be cloned; cloning it does not clone the underlying task. There may be many
/// handles to the same background task, and the task will be cancelled when all handles are
/// dropped.
#[derive(Clone, Debug)]
pub struct BackgroundTask {
// The task handle is an `Option` so we can `take()` out of it during `drop`, where we have a
// mutable reference but need to move out of the underlying task handle to cancel it. This will
// always be `Some` except during cancellation.
inner: Option<Arc<BackgroundTaskInner>>,
// A handle to the inner task. This exists solely so that we can hold it and have it be dropped
// when the last clone of this object is dropped.
_inner: Arc<Task<()>>,
}

impl BackgroundTask {
Expand All @@ -51,39 +45,83 @@ impl BackgroundTask {
pub fn spawn<F>(name: impl Display, future: F) -> Self
where
F: Future + Send + 'static,
{
// Ignore the output of the background task.
let future = async move {
future.await;
};
Self {
_inner: Arc::new(Task::spawn(name, future)),
}
}
}

#[derive(Derivative)]
#[derivative(Debug(bound = ""))]
struct TaskInner<T> {
name: String,
#[derivative(Debug = "ignore")]
handle: JoinHandle<T>,
}

/// A task handle which can be joined, but is cancelled on [`Drop`]
#[derive(Derivative)]
#[derivative(Debug(bound = ""))]
pub struct Task<T> {
// The task handle is an `Option` so we can `take()` out of it during `join` and `drop`. This
// will always be `Some` except during joining or cancellation.
inner: Option<TaskInner<T>>,
}

impl<T: Send + 'static> Task<T> {
/// Spawn a task, which will be cancelled when dropped.
///
/// The caller should ensure that `future` yields back to the executor fairly frequently, to
/// ensure timely cancellation in case the task is dropped. If an operation in `future` may run
/// for a long time without blocking or yielding, consider using
/// [`yield_now`](async_std::task::yield_now) periodically, or using
/// [`spawn`](async_std::task::spawn) or [`spawn_blocking`](async_std::task::spawn_blocking) to
/// run long operations in a sub-task.
pub fn spawn<F>(name: impl Display, future: F) -> Self
where
F: Future<Output = T> + Send + 'static,
{
let name = name.to_string();
let handle = {
let span = info_span!("task", name);
spawn(
async move {
tracing::info!("spawning background task");
future.await;
tracing::info!("completed background task");
tracing::info!("spawning task");
let res = future.await;
tracing::info!("completed task");
res
}
.instrument(span),
)
};

Self {
inner: Some(Arc::new(BackgroundTaskInner { name, handle })),
inner: Some(TaskInner { name, handle }),
}
}

/// Wait for the task to complete and get its output.
pub async fn join(mut self) -> T {
// We take here so that we will not attempt to cancel the joined task when this handle is
// dropped at the end of the function. We can unwrap here because `inner` is only `None`
// during `join` or `drop`. Since `join` consumes `self`, it is not possible that `join`
// already ran, and of course `self` has not been dropped yet.
let inner = self.inner.take().unwrap();
inner.handle.await
}
}

impl Drop for BackgroundTask {
impl<T> Drop for Task<T> {
fn drop(&mut self) {
// `inner` should never be [`None`] here, we only `take` it because we are given `&mut
// self` (so that this can be called from [`drop`]) and thus we cannot move out of `self`.
// Nevertheless, it doesn't hurt to explicitly check for [`Some`].
if let Some(inner) = self.inner.take() {
// Check if this is the last instance of the [`Arc`] and, if so, cancel the underlying
// task.
if let Some(inner) = Arc::into_inner(inner) {
tracing::info!(name = inner.name, "cancelling background task");
async_std::task::block_on(inner.handle.cancel());
tracing::info!(name = inner.name, "cancelled background task");
}
tracing::info!(name = inner.name, "cancelling task");
async_std::task::block_on(inner.handle.cancel());
tracing::info!(name = inner.name, "cancelled task");
}
}
}

0 comments on commit aee574c

Please sign in to comment.