Skip to content

Commit

Permalink
catalog: Combine epoch and deploy generation (#29269)
Browse files Browse the repository at this point in the history
Previously, the catalog used both the epoch and the deploy generation
as fencing tokens. However, they were both written to the catalog as
separate entries and at different times. This complicated the fencing
logic and often resulted in fencing errors with less information than
we wanted.

This commit combines the epoch and deploy generation into a single
fencing token that is always written to the catalog atomically. It
utilizes the `FenceToken` update kind which was added in a previous
commit. Since the epoch and fence token must be available before the
catalog is opened (i.e. before migrations are run), this commit needs
to handle both epochs and fence tokens. In the next release we can
fully remove the `Epoch` update kind.

Additionally, fence errors can indicate with certainty whether the
fence is due to the deploy generation or epoch. Therefore, we can
remove the halt varint of `unwrap_or_terminate` which was used when
we weren't sure if the deploy generation had increased.

Works towards resolving #29199

---------

Co-authored-by: Dennis Felsing <[email protected]>
  • Loading branch information
jkosh44 and def- committed Sep 6, 2024
1 parent 2e0a9a9 commit cacaf5a
Show file tree
Hide file tree
Showing 12 changed files with 488 additions and 330 deletions.
4 changes: 2 additions & 2 deletions ci/nightly/pipeline.template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1271,7 +1271,7 @@ steps:
composition: retain-history

- id: data-ingest
label: "Data Ingest"
label: "Data Ingest %N"
depends_on: build-aarch64
timeout_in_minutes: 90
parallelism: 2
Expand Down Expand Up @@ -1472,7 +1472,7 @@ steps:
sanitizer: skip

- id: txn-wal-fencing
label: Txn-wal fencing
label: Txn-wal fencing %N
depends_on: build-aarch64
timeout_in_minutes: 120
parallelism: 2
Expand Down
3 changes: 3 additions & 0 deletions misc/python/materialize/cli/ci_annotate_errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,9 @@
| fatal:\ userauth_pubkey
# TODO(def-) Remove in ~3 weeks
| .* incompatible\ persist\ version\ \d+\.\d+\.\d+(-dev\.\d+)?,\ current:\ \d+\.\d+\.\d+(-dev\.\d+)?,\ make\ sure\ to\ upgrade\ the\ catalog\ one\ version\ forward\ at\ a\ time
# Fences without incrementing deploy generation
| txn-wal-fencing-mz_first-.* \| .*unable\ to\ confirm\ leadership
| txn-wal-fencing-mz_first-.* \| .*fenced\ by\ envd
)
""",
re.VERBOSE | re.MULTILINE,
Expand Down
86 changes: 37 additions & 49 deletions src/adapter/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use mz_compute_client::controller::error::{
};
use mz_controller_types::ClusterId;
use mz_ore::tracing::OpenTelemetryContext;
use mz_ore::{exit, halt, soft_assert_no_log};
use mz_ore::{exit, soft_assert_no_log};
use mz_repr::{RelationDesc, RowIterator, ScalarType};
use mz_sql::names::FullItemName;
use mz_sql::plan::StatementDesc;
Expand Down Expand Up @@ -284,71 +284,59 @@ where
fn unwrap_or_terminate(self, context: &str) -> T {
match self {
Ok(t) => t,
Err(e) => match e.should_terminate_gracefully() {
Terminate::Gracefully => exit!(0, "{context}: {e:?}"),
Terminate::Halt => halt!("{context}: {e:?}"),
Terminate::Panic => panic!("{context}: {e:?}"),
},
Err(e) if e.should_terminate_gracefully() => exit!(0, "{context}: {e:?}"),
Err(e) => panic!("{context}: {e:?}"),
}
}

fn maybe_terminate(self, context: &str) -> Self {
if let Err(e) = &self {
match e.should_terminate_gracefully() {
Terminate::Gracefully => exit!(0, "{context}: {e:?}"),
Terminate::Halt => halt!("{context}: {e:?}"),
Terminate::Panic => {}
if e.should_terminate_gracefully() {
exit!(0, "{context}: {e:?}");
}
}

self
}
}

/// An enum describing how to terminate
enum Terminate {
Gracefully,
Halt,
Panic,
}

/// A trait for errors that should halt or terminate gracefully rather than
/// panic the process.
/// A trait for errors that should terminate gracefully rather than panic
/// the process.
trait ShouldTerminateGracefully {
/// Reports whether the error should halt of terminate the process
/// gracefully rather than panic.
fn should_terminate_gracefully(&self) -> Terminate;
/// Reports whether the error should terminate the process gracefully
/// rather than panic.
fn should_terminate_gracefully(&self) -> bool;
}

impl ShouldTerminateGracefully for AdapterError {
fn should_terminate_gracefully(&self) -> Terminate {
fn should_terminate_gracefully(&self) -> bool {
match self {
AdapterError::Catalog(e) => e.should_terminate_gracefully(),
_ => Terminate::Panic,
_ => false,
}
}
}

impl ShouldTerminateGracefully for mz_catalog::memory::error::Error {
fn should_terminate_gracefully(&self) -> Terminate {
fn should_terminate_gracefully(&self) -> bool {
match &self.kind {
mz_catalog::memory::error::ErrorKind::Durable(e) => e.should_terminate_gracefully(),
_ => Terminate::Panic,
_ => false,
}
}
}

impl ShouldTerminateGracefully for mz_catalog::durable::CatalogError {
fn should_terminate_gracefully(&self) -> Terminate {
fn should_terminate_gracefully(&self) -> bool {
match &self {
Self::Durable(e) => e.should_terminate_gracefully(),
_ => Terminate::Panic,
_ => false,
}
}
}

impl ShouldTerminateGracefully for DurableCatalogError {
fn should_terminate_gracefully(&self) -> Terminate {
fn should_terminate_gracefully(&self) -> bool {
match self {
DurableCatalogError::Fence(err) => err.should_terminate_gracefully(),
DurableCatalogError::IncompatibleDataVersion { .. }
Expand All @@ -359,22 +347,22 @@ impl ShouldTerminateGracefully for DurableCatalogError {
| DurableCatalogError::DuplicateKey
| DurableCatalogError::UniquenessViolation
| DurableCatalogError::Storage(_)
| DurableCatalogError::Internal(_) => Terminate::Panic,
| DurableCatalogError::Internal(_) => false,
}
}
}

impl ShouldTerminateGracefully for FenceError {
fn should_terminate_gracefully(&self) -> Terminate {
fn should_terminate_gracefully(&self) -> bool {
match self {
FenceError::DeployGeneration { .. } => Terminate::Gracefully,
FenceError::Epoch { .. } | FenceError::MigrationUpper { .. } => Terminate::Halt,
FenceError::DeployGeneration { .. } => true,
FenceError::Epoch { .. } | FenceError::MigrationUpper { .. } => false,
}
}
}

impl<T> ShouldTerminateGracefully for StorageError<T> {
fn should_terminate_gracefully(&self) -> Terminate {
fn should_terminate_gracefully(&self) -> bool {
match self {
StorageError::ResourceExhausted(_)
| StorageError::CollectionMetadataAlreadyExists(_)
Expand All @@ -397,78 +385,78 @@ impl<T> ShouldTerminateGracefully for StorageError<T> {
| StorageError::ShuttingDown(_)
| StorageError::MissingSubsourceReference { .. }
| StorageError::RtrTimeout(_)
| StorageError::RtrDropFailure(_) => Terminate::Panic,
| StorageError::RtrDropFailure(_) => false,
}
}
}

impl ShouldTerminateGracefully for DataflowCreationError {
fn should_terminate_gracefully(&self) -> Terminate {
fn should_terminate_gracefully(&self) -> bool {
match self {
DataflowCreationError::SinceViolation(_)
| DataflowCreationError::InstanceMissing(_)
| DataflowCreationError::CollectionMissing(_)
| DataflowCreationError::MissingAsOf
| DataflowCreationError::EmptyAsOfForSubscribe
| DataflowCreationError::EmptyAsOfForCopyTo => Terminate::Panic,
| DataflowCreationError::EmptyAsOfForCopyTo => false,
}
}
}

impl ShouldTerminateGracefully for CollectionUpdateError {
fn should_terminate_gracefully(&self) -> Terminate {
fn should_terminate_gracefully(&self) -> bool {
match self {
CollectionUpdateError::InstanceMissing(_)
| CollectionUpdateError::CollectionMissing(_) => Terminate::Panic,
| CollectionUpdateError::CollectionMissing(_) => false,
}
}
}

impl ShouldTerminateGracefully for PeekError {
fn should_terminate_gracefully(&self) -> Terminate {
fn should_terminate_gracefully(&self) -> bool {
match self {
PeekError::SinceViolation(_)
| PeekError::InstanceMissing(_)
| PeekError::CollectionMissing(_)
| PeekError::ReplicaMissing(_) => Terminate::Panic,
| PeekError::ReplicaMissing(_) => false,
}
}
}

impl ShouldTerminateGracefully for ReadPolicyError {
fn should_terminate_gracefully(&self) -> Terminate {
fn should_terminate_gracefully(&self) -> bool {
match self {
ReadPolicyError::InstanceMissing(_)
| ReadPolicyError::CollectionMissing(_)
| ReadPolicyError::WriteOnlyCollection(_) => Terminate::Panic,
| ReadPolicyError::WriteOnlyCollection(_) => false,
}
}
}

impl ShouldTerminateGracefully for SubscribeTargetError {
fn should_terminate_gracefully(&self) -> Terminate {
fn should_terminate_gracefully(&self) -> bool {
match self {
SubscribeTargetError::InstanceMissing(_)
| SubscribeTargetError::SubscribeMissing(_)
| SubscribeTargetError::ReplicaMissing(_)
| SubscribeTargetError::SubscribeAlreadyStarted => Terminate::Panic,
| SubscribeTargetError::SubscribeAlreadyStarted => false,
}
}
}

impl ShouldTerminateGracefully for TransformError {
fn should_terminate_gracefully(&self) -> Terminate {
fn should_terminate_gracefully(&self) -> bool {
match self {
TransformError::Internal(_)
| TransformError::IdentifierMissing(_)
| TransformError::CallerShouldPanic(_) => Terminate::Panic,
| TransformError::CallerShouldPanic(_) => false,
}
}
}

impl ShouldTerminateGracefully for InstanceMissing {
fn should_terminate_gracefully(&self) -> Terminate {
Terminate::Panic
fn should_terminate_gracefully(&self) -> bool {
false
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/catalog/src/durable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,8 @@ pub trait OpenableDurableCatalogState: Debug + Send {
/// NB: We may remove this in later iterations of Pv2.
async fn epoch(&mut self) -> Result<Epoch, CatalogError>;

/// Get the most recent deployment generation written to the catalog.
/// Get the most recent deployment generation written to the catalog. Not necessarily the
/// deploy generation of this instance.
async fn get_deployment_generation(&mut self) -> Result<u64, CatalogError>;

/// Get the `enable_0dt_deployment` config value of this instance.
Expand Down
14 changes: 0 additions & 14 deletions src/catalog/src/durable/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,20 +138,6 @@ pub enum FenceError {
}

impl FenceError {
pub fn deploy_generation(current_generation: u64, fence_generation: u64) -> Self {
Self::DeployGeneration {
current_generation,
fence_generation,
}
}

pub fn epoch(current_epoch: Epoch, fence_epoch: Epoch) -> Self {
Self::Epoch {
current_epoch,
fence_epoch,
}
}

pub fn migration(err: UpperMismatch<Timestamp>) -> Self {
Self::MigrationUpper {
expected_upper: antichain_to_timestamp(err.expected),
Expand Down
2 changes: 0 additions & 2 deletions src/catalog/src/durable/initialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,6 @@ pub(crate) async fn initialize(
tx: &mut Transaction<'_>,
options: &BootstrapArgs,
initial_ts: EpochMillis,
deploy_generation: u64,
catalog_content_version: String,
) -> Result<(), CatalogError> {
// Collect audit events so we can commit them once at the very end.
Expand Down Expand Up @@ -656,7 +655,6 @@ pub(crate) async fn initialize(

for (key, value) in [
(USER_VERSION_KEY.to_string(), CATALOG_VERSION),
(DEPLOY_GENERATION.to_string(), deploy_generation),
(SYSTEM_CONFIG_SYNCED_KEY.to_string(), 0),
] {
tx.insert_config(key, value)?;
Expand Down
Loading

0 comments on commit cacaf5a

Please sign in to comment.