Skip to content

Commit

Permalink
[Delta Uniform] Delete Iceberg Metadata when Vacuum (#3614)
Browse files Browse the repository at this point in the history
<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

## Description

Guard the iceberg metadata for Vacuum only when uniform is enabled on
table

## How was this patch tested?

UTs
  • Loading branch information
ChengJi-db committed Aug 28, 2024
1 parent 851ddf9 commit d94a9fe
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -287,12 +287,15 @@ object DeltaTableUtils extends PredicateHelper
private val logThrottler = new LogThrottler()

/** Whether a path should be hidden for delta-related file operations, such as Vacuum and Fsck. */
def isHiddenDirectory(partitionColumnNames: Seq[String], pathName: String): Boolean = {
def isHiddenDirectory(
partitionColumnNames: Seq[String],
pathName: String,
shouldIcebergMetadataDirBeHidden: Boolean = true): Boolean = {
// Names of the form partitionCol=[value] are partition directories, and should be
// GCed even if they'd normally be hidden. The _db_index directory contains (bloom filter)
// indexes and these must be GCed when the data they are tied to is GCed.
// metadata name is reserved for converted iceberg metadata with delta universal format
pathName.equals("metadata") ||
(shouldIcebergMetadataDirBeHidden && pathName.equals("metadata")) ||
(pathName.startsWith(".") || pathName.startsWith("_")) &&
!pathName.startsWith("_delta_index") && !pathName.startsWith("_change_data") &&
!partitionColumnNames.exists(c => pathName.startsWith(c ++ "="))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,11 @@ object VacuumCommand extends VacuumCommandImpl with Serializable {
}.toDF("path")
}

def getFilesFromInventory(basePath: String,
partitionColumns: Seq[String],
inventory: DataFrame): Dataset[SerializableFileStatus] = {
def getFilesFromInventory(
basePath: String,
partitionColumns: Seq[String],
inventory: DataFrame,
shouldIcebergMetadataDirBeHidden: Boolean): Dataset[SerializableFileStatus] = {
implicit val fileNameAndSizeEncoder: Encoder[SerializableFileStatus] =
org.apache.spark.sql.Encoders.product[SerializableFileStatus]

Expand All @@ -167,7 +169,9 @@ object VacuumCommand extends VacuumCommandImpl with Serializable {
.flatMap {
row =>
val path = row.getString(0)
if(!DeltaTableUtils.isHiddenDirectory(partitionColumns, path)) {
if(!DeltaTableUtils.isHiddenDirectory(
partitionColumns, path, shouldIcebergMetadataDirBeHidden)
) {
Seq(SerializableFileStatus(path,
row.getLong(1), row.getBoolean(2), row.getLong(3)))
} else {
Expand Down Expand Up @@ -249,14 +253,24 @@ object VacuumCommand extends VacuumCommandImpl with Serializable {

val partitionColumns = snapshot.metadata.partitionSchema.fieldNames
val parallelism = spark.sessionState.conf.parallelPartitionDiscoveryParallelism
val shouldIcebergMetadataDirBeHidden = UniversalFormat.icebergEnabled(snapshot.metadata)
val allFilesAndDirsWithDuplicates = inventory match {
case Some(inventoryDF) => getFilesFromInventory(basePath, partitionColumns, inventoryDF)
case Some(inventoryDF) =>
getFilesFromInventory(
basePath, partitionColumns, inventoryDF, shouldIcebergMetadataDirBeHidden
)
case None => DeltaFileOperations.recursiveListDirs(
spark,
Seq(basePath),
hadoopConf,
hiddenDirNameFilter = DeltaTableUtils.isHiddenDirectory(partitionColumns, _),
hiddenFileNameFilter = DeltaTableUtils.isHiddenDirectory(partitionColumns, _),
hiddenDirNameFilter =
DeltaTableUtils.isHiddenDirectory(
partitionColumns, _, shouldIcebergMetadataDirBeHidden
),
hiddenFileNameFilter =
DeltaTableUtils.isHiddenDirectory(
partitionColumns, _, shouldIcebergMetadataDirBeHidden
),
fileListingParallelism = Option(parallelism)
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1043,7 +1043,8 @@ class DeltaVacuumSuite
}
}

test("hidden metadata dir") {

test("gc metadata dir when uniform disabled") {
withEnvironment { (tempDir, clock) =>
spark.emptyDataset[Int].write.format("delta").save(tempDir)
val deltaLog = DeltaLog.forTable(spark, tempDir, clock)
Expand All @@ -1053,7 +1054,9 @@ class DeltaVacuumSuite

AdvanceClock(defaultTombstoneInterval + 1000),
GC(dryRun = false, Seq(tempDir)),
CheckFiles(Seq("metadata", "metadata/file1.json"))
CheckFiles(Seq("metadata/file1.json"), exist = false),
GC(dryRun = false, Seq(tempDir)), // Second GC clears empty dir
CheckFiles(Seq("metadata"), exist = false)
)
}
}
Expand Down

0 comments on commit d94a9fe

Please sign in to comment.