diff --git a/src/storage-controller/src/collection_mgmt.rs b/src/storage-controller/src/collection_mgmt.rs index 556042ae1448..8f731a551564 100644 --- a/src/storage-controller/src/collection_mgmt.rs +++ b/src/storage-controller/src/collection_mgmt.rs @@ -74,10 +74,12 @@ use mz_ore::retry::Retry; use mz_ore::task::AbortOnDropHandle; use mz_ore::vec::VecExt; use mz_persist_client::read::ReadHandle; +use mz_persist_client::write::WriteHandle; use mz_persist_types::Codec64; use mz_repr::{Diff, GlobalId, Row, TimestampManipulation}; -use mz_storage_client::client::{TimestamplessUpdate, Update}; +use mz_storage_client::client::TimestamplessUpdate; use mz_storage_client::controller::{MonotonicAppender, StorageWriteOp}; +use mz_storage_types::controller::InvalidUpper; use mz_storage_types::parameters::STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION_DEFAULT; use mz_storage_types::sources::SourceData; use timely::progress::{Antichain, Timestamp}; @@ -85,7 +87,7 @@ use tokio::sync::{mpsc, oneshot, watch}; use tokio::time::{Duration, Instant}; use tracing::{debug, error, info}; -use crate::{persist_handles, StorageError}; +use crate::StorageError; // Note(parkmycar): The capacity here was chosen arbitrarily. const CHANNEL_CAPACITY: usize = 4096; @@ -140,8 +142,6 @@ where append_only_collections: Arc, WriteTask, ShutdownSender)>>>, - write_handle: persist_handles::PersistMonotonicWriteWorker, - /// Amount of time we'll wait before sending a batch of inserts to Persist, for user /// collections. user_batch_duration_ms: Arc, @@ -161,11 +161,7 @@ impl CollectionManager where T: Timestamp + Lattice + Codec64 + From + TimestampManipulation, { - pub(super) fn new( - read_only: bool, - write_handle: persist_handles::PersistMonotonicWriteWorker, - now: NowFn, - ) -> CollectionManager { + pub(super) fn new(read_only: bool, now: NowFn) -> CollectionManager { let batch_duration_ms: u64 = STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION_DEFAULT .as_millis() .try_into() @@ -180,7 +176,6 @@ where hacky_always_false_watch: (always_false_tx, always_false_rx), differential_collections: Arc::new(Mutex::new(BTreeMap::new())), append_only_collections: Arc::new(Mutex::new(BTreeMap::new())), - write_handle, user_batch_duration_ms: Arc::new(AtomicU64::new(batch_duration_ms)), now, } @@ -211,6 +206,7 @@ where pub(super) fn register_differential_collection( &self, id: GlobalId, + write_handle: WriteHandle, read_handle_fn: R, force_writable: bool, ) where @@ -238,7 +234,7 @@ where // Spawns a new task so we can write to this collection. let writer_and_handle = DifferentialWriteTask::spawn( id, - self.write_handle.clone(), + write_handle, read_handle_fn, read_only_rx, self.now.clone(), @@ -258,7 +254,12 @@ where /// /// The [CollectionManager] will automatically advance the upper of every /// registered collection every second. - pub(super) fn register_append_only_collection(&self, id: GlobalId, force_writable: bool) { + pub(super) fn register_append_only_collection( + &self, + id: GlobalId, + write_handle: WriteHandle, + force_writable: bool, + ) { let mut guard = self .append_only_collections .lock() @@ -279,7 +280,7 @@ where // Spawns a new task so we can write to this collection. let writer_and_handle = append_only_write_task( id, - self.write_handle.clone(), + write_handle, Arc::clone(&self.user_batch_duration_ms), self.now.clone(), read_only_rx, @@ -442,7 +443,7 @@ where /// The collection that we are writing to. id: GlobalId, - write_handle: persist_handles::PersistMonotonicWriteWorker, + write_handle: WriteHandle, /// For getting a [`ReadHandle`] to sync our state to persist contents. read_handle_fn: R, @@ -501,7 +502,7 @@ where /// handles for interacting with it. fn spawn( id: GlobalId, - write_handle: persist_handles::PersistMonotonicWriteWorker, + write_handle: WriteHandle, read_handle_fn: R, read_only_watch: watch::Receiver, now: NowFn, @@ -610,31 +611,28 @@ where return ControlFlow::Continue(()); } - let request = vec![(self.id, vec![], self.current_upper.clone(), now.clone())]; - assert!(!self.read_only); - match self.write_handle.compare_and_append(request).await { + let res = self + .write_handle + .compare_and_append_batch( + &mut [], + Antichain::from_elem(self.current_upper.clone()), + Antichain::from_elem(now.clone()), + ) + .await + .expect("valid usage"); + match res { // All good! - Ok(Ok(())) => { + Ok(()) => { tracing::debug!(%self.id, "bumped upper of differential collection"); self.current_upper = now; } - Ok(Err(StorageError::InvalidUppers(failed_ids))) => { + Err(err) => { // Someone else wrote to the collection or bumped the upper. We // need to sync to latest persist state and potentially patch up // our `to_write`, based on what we learn and `desired`. - assert_eq!( - failed_ids.len(), - 1, - "received errors for more than one collection" - ); - assert_eq!( - failed_ids[0].id, self.id, - "received errors for a different collection" - ); - - let actual_upper = if let Some(ts) = failed_ids[0].current_upper.as_option() { + let actual_upper = if let Some(ts) = err.current.as_option() { ts.clone() } else { return ControlFlow::Break("upper is the empty antichain".to_string()); @@ -646,17 +644,6 @@ where self.sync_to_persist().await; } - Ok(Err(err)) => { - panic!( - "unexpected error while trying to bump upper of {}: {:?}", - self.id, err - ); - } - // Sender hung up, this seems fine and can happen when shutting down. - Err(_recv_error) => { - // Exit the run loop because there is no other work we can do. - return ControlFlow::Break("persist worker is gone".to_string()); - } } ControlFlow::Continue(()) @@ -785,47 +772,35 @@ where loop { // Append updates to persist! - let updates_to_write = self - .to_write - .iter() - .map(|(row, diff)| Update { - row: row.clone(), - timestamp: self.current_upper.clone(), - diff: diff.clone(), - }) - .collect(); - let now = T::from((self.now)()); let new_upper = std::cmp::max( now, TimestampManipulation::step_forward(&self.current_upper), ); - let request = vec![( - self.id, - updates_to_write, - self.current_upper.clone(), - new_upper.clone(), - )]; + let updates_to_write = self + .to_write + .iter() + .map(|(row, diff)| { + ( + (SourceData(Ok(row.clone())), ()), + self.current_upper.clone(), + diff.clone(), + ) + }) + .collect::>(); assert!(!self.read_only); - let append_result = match self.write_handle.compare_and_append(request.clone()).await { - // We got a response! - Ok(append_result) => append_result, - // Failed to receive which means the worker shutdown. - Err(_recv_error) => { - // Sender hung up, this seems fine and can happen when - // shutting down. - notify_listeners(responders, || { - Err(StorageError::ShuttingDown("PersistMonotonicWriteWorker")) - }); - - // End the task since we can no longer send writes to persist. - return ControlFlow::Break("sender hung up".to_string()); - } - }; - - match append_result { + let res = self + .write_handle + .compare_and_append( + updates_to_write, + Antichain::from_elem(self.current_upper.clone()), + Antichain::from_elem(new_upper.clone()), + ) + .await + .expect("valid usage"); + match res { // Everything was successful! Ok(()) => { // Notify all of our listeners. @@ -843,22 +818,11 @@ where break; } // Failed to write to some collections, - Err(StorageError::InvalidUppers(failed_ids)) => { + Err(err) => { // Someone else wrote to the collection. We need to read // from persist and update to_write based on that and the // desired state. - - assert_eq!( - failed_ids.len(), - 1, - "received errors for more than one collection" - ); - assert_eq!( - failed_ids[0].id, self.id, - "received errors for a different collection" - ); - - let actual_upper = if let Some(ts) = failed_ids[0].current_upper.as_option() { + let actual_upper = if let Some(ts) = err.current.as_option() { ts.clone() } else { return ControlFlow::Break("upper is the empty antichain".to_string()); @@ -869,11 +833,16 @@ where // We've exhausted all of our retries, notify listeners and // break out of the retry loop so we can wait for more data. if retries.next().await.is_none() { + let invalid_upper = InvalidUpper { + id: self.id, + current_upper: err.current, + }; notify_listeners(responders, || { - Err(StorageError::InvalidUppers(failed_ids.clone())) + Err(StorageError::InvalidUppers(vec![invalid_upper.clone()])) }); error!( - "exhausted retries when appending to managed collection {failed_ids:?}" + "exhausted retries when appending to managed collection {}", + self.id ); break; } @@ -882,14 +851,7 @@ where self.sync_to_persist().await; - debug!("Retrying invalid-uppers error while appending to differential collection {failed_ids:?}"); - } - // Uh-oh, something else went wrong! - Err(other) => { - panic!( - "Unhandled error while appending to managed collection {:?}: {:?}", - self.id, other - ) + debug!("Retrying invalid-uppers error while appending to differential collection {}", self.id); } } } @@ -940,7 +902,7 @@ where /// TODO(parkmycar): Maybe add prometheus metrics for each collection? fn append_only_write_task( id: GlobalId, - write_handle: persist_handles::PersistMonotonicWriteWorker, + mut write_handle: WriteHandle, user_batch_duration_ms: Arc, now: NowFn, read_only: watch::Receiver, @@ -1040,70 +1002,11 @@ where .flatten() .map(|(row, diff)| TimestamplessUpdate { row, diff }) .collect(); - let request = vec![(id, rows, T::from(now()))]; + let at_least = T::from(now()); - // We'll try really hard to succeed, but eventually stop. - // - // Note: it's very rare we should ever need to retry, and if we need to - // retry it should only take 1 or 2 attempts. We set `max_tries` to be - // high though because if we hit some edge case we want to try hard to - // commit the data. - let retries = Retry::default() - .initial_backoff(Duration::from_secs(1)) - .clamp_backoff(Duration::from_secs(3)) - .factor(1.25) - .max_tries(20) - .into_retry_stream(); - let mut retries = Box::pin(retries); - - 'append_retry: loop { - let append_result = match write_handle.monotonic_append(request.clone()).await { - // We got a response! - Ok(append_result) => append_result, - // Failed to receive which means the worker shutdown. - Err(_recv_error) => { - // Sender hung up, this seems fine and can happen when shutting down. - notify_listeners(responders, || Err(StorageError::ShuttingDown("PersistMonotonicWriteWorker"))); - - // End the task since we can no longer send writes to persist. - break 'run; - } - }; - - match append_result { - // Everything was successful! - Ok(()) => { - // Notify all of our listeners. - notify_listeners(responders, || Ok(())); - // Break out of the retry loop so we can wait for more data. - break 'append_retry; - }, - // Failed to write to some collections, - Err(StorageError::InvalidUppers(failed_ids)) => { - // It's fine to retry invalid-uppers errors here, since - // monotonic appends do not specify a particular upper or - // timestamp. - - assert_eq!(failed_ids.len(), 1, "received errors for more than one collection"); - assert_eq!(failed_ids[0].id, id, "received errors for a different collection"); - - // We've exhausted all of our retries, notify listeners - // and break out of the retry loop so we can wait for more - // data. - if retries.next().await.is_none() { - notify_listeners(responders, || Err(StorageError::InvalidUppers(failed_ids.clone()))); - error!("exhausted retries when appending to managed collection {failed_ids:?}"); - break 'append_retry; - } - - debug!("Retrying invalid-uppers error while appending to managed collection {failed_ids:?}"); - } - // Uh-oh, something else went wrong! - Err(other) => { - panic!("Unhandled error while appending to managed collection {id:?}: {other:?}") - } - } - } + monotonic_append(&mut write_handle, rows, at_least).await; + // Notify all of our listeners. + notify_listeners(responders, || Ok(())); // Wait until our artificial latency has completed. // @@ -1127,21 +1030,14 @@ where // Update our collection. let now = T::from(now()); - let updates = vec![(id, vec![], now.clone())]; + let updates = vec![]; + let at_least = now.clone(); // Failures don't matter when advancing collections' uppers. This might // fail when a clusterd happens to be writing to this concurrently. // Advancing uppers here is best-effort and only needs to succeed if no // one else is advancing it; contention proves otherwise. - match write_handle.monotonic_append(updates).await { - // All good! - Ok(_append_result) => (), - // Sender hung up, this seems fine and can happen when shutting down. - Err(_recv_error) => { - // Exit the run loop because there is no other work we can do. - break 'run; - } - } + monotonic_append(&mut write_handle, updates, at_least).await; }, } } @@ -1153,6 +1049,55 @@ where (tx, handle.abort_on_drop(), shutdown_tx) } +async fn monotonic_append( + write_handle: &mut WriteHandle, + updates: Vec, + at_least: T, +) { + let mut expected_upper = write_handle.shared_upper(); + loop { + if updates.is_empty() && expected_upper.is_empty() { + // Ignore timestamp advancement for + // closed collections. TODO? Make this a + // correctable error + return; + } + + let upper = expected_upper + .into_option() + .expect("cannot append data to closed collection"); + + let lower = if upper.less_than(&at_least) { + at_least.clone() + } else { + upper.clone() + }; + + let new_upper = TimestampManipulation::step_forward(&lower); + let updates = updates + .iter() + .map(|TimestamplessUpdate { row, diff }| { + ((SourceData(Ok(row.clone())), ()), lower.clone(), diff) + }) + .collect::>(); + let res = write_handle + .compare_and_append( + updates, + Antichain::from_elem(upper), + Antichain::from_elem(new_upper), + ) + .await + .expect("valid usage"); + match res { + Ok(()) => return, + Err(err) => { + expected_upper = err.current; + continue; + } + } + } +} + // Helper method for notifying listeners. fn notify_listeners( responders: impl IntoIterator>, diff --git a/src/storage-controller/src/lib.rs b/src/storage-controller/src/lib.rs index 57776bf3af33..0ec94eb859b9 100644 --- a/src/storage-controller/src/lib.rs +++ b/src/storage-controller/src/lib.rs @@ -50,7 +50,7 @@ use mz_repr::adt::timestamp::CheckedTimestamp; use mz_repr::{ColumnName, Datum, Diff, GlobalId, RelationDesc, Row, TimestampManipulation}; use mz_storage_client::client::{ ProtoStorageCommand, ProtoStorageResponse, RunIngestionCommand, RunSinkCommand, Status, - StatusUpdate, StorageCommand, StorageResponse, TimestamplessUpdate, Update, + StatusUpdate, StorageCommand, StorageResponse, TimestamplessUpdate, }; use mz_storage_client::controller::{ CollectionDescription, DataSource, DataSourceOther, ExportDescription, ExportState, @@ -137,8 +137,6 @@ pub struct Controller + Tim /// Write handle for table shards. pub(crate) persist_table_worker: persist_handles::PersistTableWriteWorker, - /// Write handle for monotonic shards. - pub(crate) persist_monotonic_worker: persist_handles::PersistMonotonicWriteWorker, /// A shared TxnsCache running in a task and communicated with over a channel. txns_read: TxnsRead, txns_metrics: Arc, @@ -262,7 +260,15 @@ where } DataSource::IngestionExport { .. } => (), DataSource::Introspection(i) => { - introspections_to_run.push((*id, i)); + let write_handle = self + .open_data_handles( + id, + collection.collection_metadata.data_shard, + collection.collection_metadata.relation_desc.clone(), + &persist_client, + ) + .await; + introspections_to_run.push((*id, i, write_handle)); } DataSource::Webhook => (), DataSource::Other(DataSourceOther::TableWrites) => { @@ -287,14 +293,20 @@ where }; } - for (id, introspection_type) in introspections_to_run { + for (id, introspection_type, mut write_handle) in introspections_to_run { + let recent_upper = write_handle.shared_upper(); // Introspection collections are registered with the // CollectionManager when they are created. We only bring ourselves // up to date with collection contents or do any preparatory // consolidation work when we actually start writing to them! - self.prepare_introspection_collection(id, introspection_type) - .await - .expect("cannot fail to prepare introspection collections now"); + self.prepare_introspection_collection( + id, + introspection_type, + recent_upper, + Some(&mut write_handle), + ) + .await + .expect("cannot fail to prepare introspection collections now"); } // We first do any cleanup/truncation work above and then allow the @@ -718,19 +730,31 @@ where // Install the collection state in the appropriate spot. match &collection_state.data_source { - DataSource::Introspection(_) => { + DataSource::Introspection(typ) => { debug!(data_source = ?collection_state.data_source, meta = ?metadata, "registering {} with persist monotonic worker", id); - self.persist_monotonic_worker.register(id, write); + // We always register the collection with the collection manager, + // regardless of read-only mode. The CollectionManager itself is + // aware of read-only mode and will not attempt to write before told + // to do so. + // + self.register_introspection_collection(id, *typ, write) + .await?; self.collections.insert(id, collection_state); } DataSource::Webhook => { debug!(data_source = ?collection_state.data_source, meta = ?metadata, "registering {} with persist monotonic worker", id); - self.persist_monotonic_worker.register(id, write); self.collections.insert(id, collection_state); new_source_statistic_entries.insert(id); // This collection of statistics is periodically aggregated into // `source_statistics`. new_webhook_statistic_entries.insert(id); + // Register the collection so our manager knows about it. + // + // NOTE: Maybe this shouldn't be in the collection manager, + // and collection manager should only be responsible for + // built-in introspection collections? + self.collection_manager + .register_append_only_collection(id, write, false); } DataSource::IngestionExport { ingestion_id, @@ -883,26 +907,8 @@ where DataSource::IngestionExport { .. } => unreachable!( "ingestion exports do not execute directly, but instead schedule their source to be re-executed" ), - DataSource::Introspection(i) => { - - - // We always register the collection with the collection manager, - // regardless of read-only mode. The CollectionManager itself is - // aware of read-only mode and will not attempt to write before told - // to do so. - // - self.register_introspection_collection(id, *i) - .await?; - - } - DataSource::Webhook => { - // Register the collection so our manager knows about it. - // - // NOTE: Maybe this shouldn't be in the collection manager, - // and collection manager should only be responsible for - // built-in introspection collections? - self.collection_manager.register_append_only_collection(id, false); - } + DataSource::Introspection(_) => {} + DataSource::Webhook => {} DataSource::Progress | DataSource::Other(_) => {} }; } @@ -1630,12 +1636,7 @@ where &mut self, id: GlobalId, ) -> Result, StorageError> { - let upper = self - .persist_monotonic_worker - .recent_upper(id) - .await - .expect("sender hung up")?; - + let upper = self.recent_upper(id).await?; let res = match upper.as_option() { Some(f) if f > &T::minimum() => { let as_of = f.step_back().unwrap(); @@ -1925,12 +1926,9 @@ where // could probably use some love and maybe get merged together? let unregister_notif = self.collection_manager.unregister_collection(id); - let monotonic_worker = self.persist_monotonic_worker.clone(); let drop_fut = async move { // Wait for the collection manager to stop writing. unregister_notif.await; - // Wait for the montonic worker to drop the handle. - monotonic_worker.drop_handle(id).await; }; let drop_fut = drop_fut.boxed(); @@ -2416,14 +2414,8 @@ where persist_handles::PersistTableWriteWorker::new_txns(txns) }; let txns_read = TxnsRead::start::(txns_client.clone(), txns_id).await; - let persist_monotonic_worker = persist_handles::PersistMonotonicWriteWorker::new(); - let collection_manager_write_handle = persist_monotonic_worker.clone(); - let collection_manager = collection_mgmt::CollectionManager::new( - read_only, - collection_manager_write_handle, - now.clone(), - ); + let collection_manager = collection_mgmt::CollectionManager::new(read_only, now.clone()); let introspection_ids = Arc::new(Mutex::new(BTreeMap::new())); @@ -2443,7 +2435,6 @@ where collections: BTreeMap::default(), exports: BTreeMap::default(), persist_table_worker, - persist_monotonic_worker, txns_read, txns_metrics, stashed_response: None, @@ -2774,6 +2765,33 @@ where .map(|(id, e)| (*id, e)) } + async fn recent_upper(&self, id: GlobalId) -> Result, StorageError> { + let metadata = &self.storage_collections.collection_metadata(id)?; + let persist_client = self + .persist + .open(metadata.persist_location.clone()) + .await + .unwrap(); + // Duplicate part of open_data_handles here because we don't need the + // fetch_recent_upper call. The pubsub-updated shared_upper is enough. + let diagnostics = Diagnostics { + shard_name: id.to_string(), + handle_purpose: format!("controller data for {}", id), + }; + // NB: Opening a WriteHandle is cheap if it's never used in a + // compare_and_append operation. + let write = persist_client + .open_writer::( + metadata.data_shard, + Arc::new(metadata.relation_desc.clone()), + Arc::new(UnitSchema), + diagnostics.clone(), + ) + .await + .expect("invalid persist usage"); + Ok(write.shared_upper()) + } + /// Opens a write and critical since handles for the given `shard`. /// /// `since` is an optional `since` that the read handle will be forwarded to if it is less than @@ -2824,6 +2842,7 @@ where &mut self, id: GlobalId, introspection_type: IntrospectionType, + mut write_handle: WriteHandle, ) -> Result<(), StorageError> { tracing::info!(%id, ?introspection_type, "registering introspection collection"); @@ -2877,6 +2896,8 @@ where fut.boxed() }; + let recent_upper = write_handle.shared_upper(); + // Types of storage-managed/introspection collections: // // Append-only: Only accepts blind writes, writes that can @@ -2892,7 +2913,6 @@ where // of the collection stays constant if the thing that is // mirrored doesn’t change in cardinality. At steady state, // updates always come in pairs of retractions/additions. - match introspection_type { // For these, we first register the collection and then prepare it, // because the code that prepares differential collection expects to @@ -2905,13 +2925,19 @@ where | IntrospectionType::StorageSinkStatistics => { self.collection_manager.register_differential_collection( id, + write_handle, read_handle_fn, force_writable, ); if !self.read_only { - self.prepare_introspection_collection(id, introspection_type) - .await?; + self.prepare_introspection_collection( + id, + introspection_type, + recent_upper, + None, + ) + .await?; } } @@ -2926,12 +2952,20 @@ where | IntrospectionType::SinkStatusHistory | IntrospectionType::PrivatelinkConnectionStatusHistory => { if !self.read_only { - self.prepare_introspection_collection(id, introspection_type) - .await?; + self.prepare_introspection_collection( + id, + introspection_type, + recent_upper, + Some(&mut write_handle), + ) + .await?; } - self.collection_manager - .register_append_only_collection(id, force_writable); + self.collection_manager.register_append_only_collection( + id, + write_handle, + force_writable, + ); } // Same as our other differential collections, but for these the @@ -2943,13 +2977,19 @@ where | IntrospectionType::ComputeHydrationTimes => { self.collection_manager.register_differential_collection( id, + write_handle, read_handle_fn, force_writable, ); if !self.read_only { - self.prepare_introspection_collection(id, introspection_type) - .await?; + self.prepare_introspection_collection( + id, + introspection_type, + recent_upper, + None, + ) + .await?; } } @@ -2961,12 +3001,20 @@ where | IntrospectionType::StatementLifecycleHistory | IntrospectionType::SqlText => { if !self.read_only { - self.prepare_introspection_collection(id, introspection_type) - .await?; + self.prepare_introspection_collection( + id, + introspection_type, + recent_upper, + Some(&mut write_handle), + ) + .await?; } - self.collection_manager - .register_append_only_collection(id, force_writable); + self.collection_manager.register_append_only_collection( + id, + write_handle, + force_writable, + ); } } @@ -2982,19 +3030,21 @@ where &mut self, id: GlobalId, introspection_type: IntrospectionType, + recent_upper: Antichain, + write_handle: Option<&mut WriteHandle>, ) -> Result<(), StorageError> { tracing::info!(%id, ?introspection_type, "preparing introspection collection for writes"); match introspection_type { IntrospectionType::ShardMapping => { - self.initialize_shard_mapping().await; + // Done by the `self.append_shard_mappings` call. } IntrospectionType::Frontiers | IntrospectionType::ReplicaFrontiers => { // Differential collections start with an empty // desired state. No need to manually reset. } IntrospectionType::StorageSourceStatistics => { - let prev = self.snapshot_statistics(id).await; + let prev = self.snapshot_statistics(id, recent_upper).await; let scraper_token = statistics::spawn_statistics_scraper::< statistics::SourceStatistics, @@ -3022,7 +3072,7 @@ where .insert(id, Box::new((scraper_token, web_token))); } IntrospectionType::StorageSinkStatistics => { - let prev = self.snapshot_statistics(id).await; + let prev = self.snapshot_statistics(id, recent_upper).await; let scraper_token = statistics::spawn_statistics_scraper::<_, SinkStatisticsUpdate, _>( @@ -3041,8 +3091,12 @@ where self.introspection_tokens.insert(id, scraper_token); } IntrospectionType::SourceStatusHistory => { + let write_handle = write_handle.expect("filled in by caller"); let last_status_per_id = self - .partially_truncate_status_history(IntrospectionType::SourceStatusHistory) + .partially_truncate_status_history( + IntrospectionType::SourceStatusHistory, + write_handle, + ) .await; let status_col = collection_status::MZ_SOURCE_STATUS_HISTORY_DESC @@ -3066,8 +3120,12 @@ where ) } IntrospectionType::SinkStatusHistory => { + let write_handle = write_handle.expect("filled in by caller"); let last_status_per_id = self - .partially_truncate_status_history(IntrospectionType::SinkStatusHistory) + .partially_truncate_status_history( + IntrospectionType::SinkStatusHistory, + write_handle, + ) .await; let status_col = collection_status::MZ_SINK_STATUS_HISTORY_DESC @@ -3091,8 +3149,10 @@ where ) } IntrospectionType::PrivatelinkConnectionStatusHistory => { + let write_handle = write_handle.expect("filled in by caller"); self.partially_truncate_status_history( IntrospectionType::PrivatelinkConnectionStatusHistory, + write_handle, ) .await; } @@ -3129,14 +3189,7 @@ where /// // TODO(guswynn): we need to be more careful about the update time we get here: // - async fn snapshot_statistics(&mut self, id: GlobalId) -> Vec { - let upper = self - .persist_monotonic_worker - .recent_upper(id) - .await - .expect("missing collection") - .expect("missing collection"); - + async fn snapshot_statistics(&mut self, id: GlobalId, upper: Antichain) -> Vec { match upper.as_option() { Some(f) if f > &T::minimum() => { let as_of = f.step_back().unwrap(); @@ -3171,34 +3224,6 @@ where .retain(|k, _| self.exports.contains_key(k)); } - /// Initializes the data expressing which global IDs correspond to which - /// shards. Necessary because we cannot write any of these mappings that we - /// discover before the shard mapping collection exists. - /// - /// # Panics - /// - If `IntrospectionType::ShardMapping` is not associated with a - /// `GlobalId` in `self.introspection_ids`. - /// - If `IntrospectionType::ShardMapping`'s `GlobalId` is not registered as - /// a managed collection. - async fn initialize_shard_mapping(&mut self) { - let id = - self.introspection_ids.lock().expect("poisoned lock")[&IntrospectionType::ShardMapping]; - - let mut row_buf = Row::default(); - let collection_metadatas = self.storage_collections.active_collection_metadatas(); - let mut updates = Vec::with_capacity(collection_metadatas.len()); - for (global_id, CollectionMetadata { data_shard, .. }) in collection_metadatas { - let mut packer = row_buf.packer(); - packer.push(Datum::from(global_id.to_string().as_str())); - packer.push(Datum::from(data_shard.to_string().as_str())); - updates.push((row_buf.clone(), 1)); - } - - self.collection_manager - .differential_append(id, updates) - .await; - } - /// Effectively truncates the status history shard except for the most /// recent updates from each ID. /// @@ -3212,6 +3237,7 @@ where async fn partially_truncate_status_history( &mut self, collection: IntrospectionType, + write_handle: &mut WriteHandle, ) -> BTreeMap { let (keep_n, occurred_at_col, id_col) = match collection { IntrospectionType::SourceStatusHistory => ( @@ -3254,12 +3280,7 @@ where let id = self.introspection_ids.lock().expect("poisoned")[&collection]; - let upper = self - .persist_monotonic_worker - .recent_upper(id) - .await - .expect("missing collection") - .expect("missing collection"); + let upper = write_handle.fetch_recent_upper().await.clone(); let mut rows = match upper.as_option() { Some(f) if f > &T::minimum() => { @@ -3351,56 +3372,36 @@ where // Re-pack all rows let mut packer = row_buf.packer(); packer.extend(unpacked_row.into_iter()); - - let update = Update { - row: row_buf.clone(), - timestamp: expected_upper.clone(), - diff: -1, - }; - - update + ( + (SourceData(Ok(row_buf.clone())), ()), + expected_upper.clone(), + -1, + ) }) - .collect(); - - let command = (id, updates, expected_upper.clone(), new_upper); + .collect::>(); - let res = self - .persist_monotonic_worker - .compare_and_append(vec![command]) + let res = write_handle + .compare_and_append( + updates, + Antichain::from_elem(expected_upper.clone()), + Antichain::from_elem(new_upper), + ) .await - .expect("command must succeed"); + .expect("usage was valid"); match res { Ok(_) => { // All good, yay! } - Err(storage_err) => { - match storage_err { - StorageError::InvalidUppers(failed_ids) => { - assert_eq!( - failed_ids.len(), - 1, - "received errors for more than one collection" - ); - assert_eq!( - failed_ids[0].id, id, - "received errors for a different collection" - ); - - // This is fine, it just means the upper moved because - // of continual upper advancement or because seomeone - // already appended some more retractions/updates. - // - // NOTE: We might want to attempt these partial - // retractions on an interval, instead of only when - // starting up! - info!(%id, ?expected_upper, current_upper = ?failed_ids[0].current_upper, "failed to append partial truncation"); - } - // Uh-oh, something else went wrong! - other => { - panic!("Unhandled error while appending to managed collection {id:?}: {other:?}") - } - } + Err(err) => { + // This is fine, it just means the upper moved because + // of continual upper advancement or because seomeone + // already appended some more retractions/updates. + // + // NOTE: We might want to attempt these partial + // retractions on an interval, instead of only when + // starting up! + info!(%id, ?expected_upper, current_upper = ?err.current, "failed to append partial truncation"); } } @@ -3424,11 +3425,6 @@ where /// Use a `diff` of 1 to append a new entry; -1 to retract an existing /// entry. /// - /// However, data is written iff we know of the `GlobalId` of the - /// `IntrospectionType::ShardMapping` collection; in other cases, data is - /// dropped on the floor. In these cases, the data is later written by - /// [`Self::initialize_shard_mapping`]. - /// /// # Panics /// - If `self.collections` does not have an entry for `global_id`. /// - If `IntrospectionType::ShardMapping`'s `GlobalId` is not registered as @@ -3441,15 +3437,12 @@ where { mz_ore::soft_assert_or_log!(diff == -1 || diff == 1, "use 1 for insert or -1 for delete"); - let id = match self + let id = *self .introspection_ids .lock() .expect("poisoned") .get(&IntrospectionType::ShardMapping) - { - Some(id) => *id, - _ => return, - }; + .expect("should be registered before this call"); let mut updates = vec![]; // Pack updates into rows diff --git a/src/storage-controller/src/persist_handles.rs b/src/storage-controller/src/persist_handles.rs index f25c2d547b85..d5d5b4ac215f 100644 --- a/src/storage-controller/src/persist_handles.rs +++ b/src/storage-controller/src/persist_handles.rs @@ -10,7 +10,7 @@ //! A tokio tasks (and support machinery) for dealing with the persist handles //! that the storage controller needs to hold. -use std::collections::{BTreeMap, BTreeSet, VecDeque}; +use std::collections::{BTreeMap, BTreeSet}; use std::fmt::Debug; use std::fmt::Write; use std::sync::Arc; @@ -514,341 +514,3 @@ where } } } - -#[derive(Debug, Clone)] -pub struct PersistMonotonicWriteWorker { - inner: Arc>, -} - -/// Commands for [PersistMonotonicWriteWorker]. -#[derive(Debug)] -enum PersistMonotonicWriteCmd { - Register(GlobalId, WriteHandle), - Update(GlobalId, WriteHandle), - DropHandle { - /// Object that we want to drop our handle for. - id: GlobalId, - /// Notifies us when all resources have been cleaned up. - tx: oneshot::Sender<()>, - }, - RecentUpper( - GlobalId, - tokio::sync::oneshot::Sender, StorageError>>, - ), - Append( - Vec<(GlobalId, Vec>, T, T)>, - tokio::sync::oneshot::Sender>>, - ), - /// Appends `Vec` to `GlobalId` at, essentially, - /// `max(write_frontier, T)`. - MonotonicAppend( - Vec<(GlobalId, Vec, T)>, - tokio::sync::oneshot::Sender>>, - ), - Shutdown, -} - -impl PersistMonotonicWriteWorker { - pub(crate) fn new() -> Self { - let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<(tracing::Span, _)>(); - - mz_ore::task::spawn(|| "PersistMonotonicWriteWorker", async move { - let mut write_handles = - BTreeMap::>::new(); - - let mut shutdown = false; - while let Some(cmd) = rx.recv().await { - // Peel off all available commands. - // We do this in case we can consolidate commands. - // It would be surprising to receive multiple concurrent `Append` commands, - // but we might receive multiple *empty* `Append` commands. - let mut commands = VecDeque::new(); - commands.push_back(cmd); - while let Ok(cmd) = rx.try_recv() { - commands.push_back(cmd); - } - - // Accumulated updates and upper frontier. - let mut all_updates = BTreeMap::default(); - let mut all_responses = Vec::default(); - - while let Some((span, command)) = commands.pop_front() { - match command { - PersistMonotonicWriteCmd::Register(id, write_handle) => { - let previous = write_handles.insert(id, write_handle); - if previous.is_some() { - panic!("already registered a WriteHandle for collection {:?}", id); - } - } - PersistMonotonicWriteCmd::Update(id, write_handle) => { - write_handles.insert(id, write_handle).expect("PersistMonotonicWriteCmd::Update only valid for updating extant write handles"); - } - PersistMonotonicWriteCmd::DropHandle { id, tx } => { - // n.b. this should only remove the - // handle from the persist worker and - // not take any additional action such - // as closing the shard it's connected - // to because dataflows might still be - // using it. - write_handles.remove(&id); - // We don't care if our listener went away. - let _ = tx.send(()); - } - PersistMonotonicWriteCmd::RecentUpper(id, response) => { - let write = write_handles.get_mut(&id); - - if let Some(write) = write { - let upper = write.fetch_recent_upper().await; - let _ = response.send(Ok(upper.clone())); - } else { - let _ = response.send(Err(StorageError::IdentifierMissing(id))); - } - } - PersistMonotonicWriteCmd::Append(updates, response) => { - let mut ids = BTreeSet::new(); - for (id, update, expected_upper, new_upper) in updates { - ids.insert(id); - let (old_span, updates, old_expected_upper, old_new_upper) = - all_updates.entry(id).or_insert_with(|| { - ( - span.clone(), - Vec::default(), - Antichain::from_elem(T::minimum()), - Antichain::from_elem(T::minimum()), - ) - }); - - if old_span.id() != span.id() { - // Link in any spans for `Append` - // operations that we lump together by - // doing this. This is not ideal, - // because we only have a true tracing - // history for the "first" span that we - // process, but it's better than - // nothing. - old_span.follows_from(span.id()); - } - updates.extend(update); - old_new_upper.join_assign(&Antichain::from_elem(new_upper)); - old_expected_upper - .join_assign(&Antichain::from_elem(expected_upper)); - } - all_responses.push((ids, response)); - } - PersistMonotonicWriteCmd::MonotonicAppend(updates, response) => { - let mut updates_outer = Vec::with_capacity(updates.len()); - for (id, update, at_least) in updates { - let current_upper = write_handles[&id].upper().clone(); - if update.is_empty() && current_upper.is_empty() { - // Ignore timestamp advancement for - // closed collections. TODO? Make this a - // correctable error - continue; - } - - let current_upper = current_upper - .into_option() - .expect("cannot append data to closed collection"); - - let lower = if current_upper.less_than(&at_least) { - at_least - } else { - current_upper.clone() - }; - - let upper = TimestampManipulation::step_forward(&lower); - let update = update - .into_iter() - .map(|TimestamplessUpdate { row, diff }| Update { - row, - diff, - timestamp: lower.clone(), - }) - .collect::>(); - - updates_outer.push((id, update, current_upper, upper)); - } - commands.push_front(( - span, - PersistMonotonicWriteCmd::Append(updates_outer, response), - )); - } - PersistMonotonicWriteCmd::Shutdown => { - shutdown = true; - } - } - } - - let result = append_work(&mut write_handles, all_updates).await; - - for (ids, response) in all_responses { - let result = match &result { - Err(bad_ids) => { - let filtered: Vec<_> = bad_ids - .iter() - .filter(|(id, _)| ids.contains(id)) - .cloned() - .map(|(id, current_upper)| InvalidUpper { id, current_upper }) - .collect(); - if filtered.is_empty() { - Ok(()) - } else { - Err(StorageError::InvalidUppers(filtered)) - } - } - Ok(()) => Ok(()), - }; - // It is not an error for the other end to hang up. - let _ = response.send(result); - } - - if shutdown { - tracing::trace!("shutting down persist write append task"); - break; - } - } - - tracing::info!("PersistMonotonicWriteWorker shutting down"); - }); - - Self { - inner: Arc::new(PersistMonotonicWriteWorkerInner::new(tx)), - } - } - - pub(crate) fn register( - &self, - id: GlobalId, - write_handle: WriteHandle, - ) { - self.send(PersistMonotonicWriteCmd::Register(id, write_handle)) - } - - /// Update the existing write handle associated with `id` to `write_handle`. - /// - /// Note that this should only be called when updating a write handle; to - /// initially associate an `id` to a write handle, use [`Self::register`]. - /// - /// # Panics - /// - If `id` is not currently associated with any write handle. - #[allow(dead_code)] - pub(crate) fn update(&self, id: GlobalId, write_handle: WriteHandle) { - self.send(PersistMonotonicWriteCmd::Update(id, write_handle)) - } - - /// TODO! - pub(crate) fn recent_upper( - &self, - id: GlobalId, - ) -> tokio::sync::oneshot::Receiver, StorageError>> { - let (tx, rx) = tokio::sync::oneshot::channel(); - self.send(PersistMonotonicWriteCmd::RecentUpper(id, tx)); - rx - } - - /// TODO! - pub(crate) fn compare_and_append( - &self, - updates: Vec<(GlobalId, Vec>, T, T)>, - ) -> tokio::sync::oneshot::Receiver>> { - let (tx, rx) = tokio::sync::oneshot::channel(); - if updates.is_empty() { - tx.send(Ok(())) - .expect("rx has not been dropped at this point"); - rx - } else { - self.send(PersistMonotonicWriteCmd::Append(updates, tx)); - rx - } - } - - /// Appends values to collections associated with `GlobalId`, but lets - /// the persist worker chose timestamps guaranteed to be monotonic and - /// that the time will be at least `T`. - /// - /// This lets the writer influence how far forward the timestamp will be - /// advanced, while still guaranteeing that it will advance. - /// - /// Note it is still possible for the append operation to fail in the - /// face of contention from other writers. - /// - /// # Panics - /// - If appending non-empty `TimelessUpdate` to closed collections - /// (i.e. those with empty uppers), whose uppers cannot be - /// monotonically increased. - /// - /// Collections with empty uppers can continue receiving empty - /// updates, i.e. those used soley to advance collections' uppers. - pub(crate) fn monotonic_append( - &self, - updates: Vec<(GlobalId, Vec, T)>, - ) -> tokio::sync::oneshot::Receiver>> { - let (tx, rx) = tokio::sync::oneshot::channel(); - if updates.is_empty() { - tx.send(Ok(())) - .expect("rx has not been dropped at this point"); - rx - } else { - self.send(PersistMonotonicWriteCmd::MonotonicAppend(updates, tx)); - rx - } - } - - /// Drops the handle associated with `id` from this worker. - /// - /// Note that this does not perform any other cleanup, such as finalizing - /// the handle's shard. - pub(crate) fn drop_handle(&self, id: GlobalId) -> BoxFuture<'static, ()> { - let (tx, rx) = oneshot::channel(); - self.send(PersistMonotonicWriteCmd::DropHandle { id, tx }); - Box::pin(rx.map(|_| ())) - } - - fn send(&self, cmd: PersistMonotonicWriteCmd) { - self.inner.send(cmd); - } -} - -/// Contains the components necessary for sending commands to a `PersistMonotonicWriteWorker`. -/// -/// When `Drop`-ed sends a shutdown command, as such this should _never_ implement `Clone` because -/// if one clone is dropped, the other clones will be unable to send commands. If you need this -/// to be `Clone`-able, wrap it in an `Arc` or `Rc` first. -/// -/// #[derive(Clone)] <-- do not do this. -/// -#[derive(Debug)] -struct PersistMonotonicWriteWorkerInner { - /// Sending side of a channel that we can use to send commands. - tx: UnboundedSender<(tracing::Span, PersistMonotonicWriteCmd)>, -} - -impl Drop for PersistMonotonicWriteWorkerInner -where - T: Timestamp + Lattice + Codec64 + TimestampManipulation, -{ - fn drop(&mut self) { - self.send(PersistMonotonicWriteCmd::Shutdown); - // TODO: Can't easily block on shutdown occurring. - } -} - -impl PersistMonotonicWriteWorkerInner -where - T: Timestamp + Lattice + Codec64 + TimestampManipulation, -{ - fn new(tx: UnboundedSender<(tracing::Span, PersistMonotonicWriteCmd)>) -> Self { - PersistMonotonicWriteWorkerInner { tx } - } - - fn send(&self, cmd: PersistMonotonicWriteCmd) { - let mut span = info_span!(parent: None, "PersistMonotonicWriteCmd::send"); - OpenTelemetryContext::obtain().attach_as_parent_to(&mut span); - match self.tx.send((span, cmd)) { - Ok(()) => (), // All good! - Err(e) => { - tracing::trace!("could not forward command: {:?}", e); - } - } - } -}