diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java index 497945f7e9ba..f6a5acf4ee11 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java @@ -146,7 +146,7 @@ public class HoodieCompactionConfig extends HoodieConfig { .markAdvanced() .withDocumentation("Compaction strategy decides which file groups are picked up for " + "compaction during each compaction run. By default. Hudi picks the log file " - + "with most accumulated unmerged data"); + + "with most accumulated unmerged data. Support composite strategy by providing, format: strategy_class_name1,strategy_class_name2,..."); public static final ConfigProperty TARGET_PARTITIONS_PER_DAYBASED_COMPACTION = ConfigProperty .key("hoodie.compaction.daybased.target.partitions") @@ -408,8 +408,12 @@ public Builder approxRecordSize(int recordSizeEstimate) { return this; } - public Builder withCompactionStrategy(CompactionStrategy compactionStrategy) { - compactionConfig.setValue(COMPACTION_STRATEGY, compactionStrategy.getClass().getName()); + public Builder withCompactionStrategy(CompactionStrategy... compactionStrategies) { + StringBuilder compactionStrategyBuilder = new StringBuilder(); + for (CompactionStrategy compactionStrategy : compactionStrategies) { + compactionStrategyBuilder.append(compactionStrategy.getClass().getName()).append(","); + } + compactionConfig.setValue(COMPACTION_STRATEGY, compactionStrategyBuilder.toString()); return this; } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 19b3aa09082a..68efe732b9ae 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -83,6 +83,7 @@ import org.apache.hudi.table.action.cluster.ClusteringPlanPartitionFilterMode; import org.apache.hudi.table.action.compact.CompactionTriggerStrategy; import org.apache.hudi.table.action.compact.strategy.CompactionStrategy; +import org.apache.hudi.table.action.compact.strategy.CompositeCompactionStrategy; import org.apache.hudi.table.storage.HoodieStorageLayout; import org.apache.orc.CompressionKind; @@ -1655,7 +1656,11 @@ public int getInlineCompactDeltaSecondsMax() { } public CompactionStrategy getCompactionStrategy() { - return ReflectionUtils.loadClass(getString(HoodieCompactionConfig.COMPACTION_STRATEGY)); + String compactionStrategiesStr = getString(HoodieCompactionConfig.COMPACTION_STRATEGY); + String[] compactionStrategyArr = compactionStrategiesStr.split(","); + List compactionStrategies = Arrays.stream(compactionStrategyArr) + .map(className -> (CompactionStrategy) ReflectionUtils.loadClass(className)).collect(Collectors.toList()); + return compactionStrategies.size() == 1 ? compactionStrategies.get(0) : new CompositeCompactionStrategy(compactionStrategies); } public Long getTargetIOPerCompactionInMB() { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java index f7390cdd56f5..a84d6ad7b633 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java @@ -88,7 +88,7 @@ public HoodieCompactionPlan generateCompactionPlan(String compactionInstant) thr int allPartitionSize = partitionPaths.size(); // filter the partition paths if needed to reduce list status - partitionPaths = filterPartitionPathsByStrategy(writeConfig, partitionPaths); + partitionPaths = filterPartitionPathsByStrategy(partitionPaths); LOG.info("Strategy: {} matched {} partition paths from all {} partitions", writeConfig.getCompactionStrategy().getClass().getSimpleName(), partitionPaths.size(), allPartitionSize); if (partitionPaths.isEmpty()) { @@ -185,7 +185,7 @@ public HoodieCompactionPlan generateCompactionPlan(String compactionInstant) thr protected abstract boolean filterLogCompactionOperations(); - protected List filterPartitionPathsByStrategy(HoodieWriteConfig writeConfig, List partitionPaths) { + protected List filterPartitionPathsByStrategy(List partitionPaths) { return partitionPaths; } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieCompactionPlanGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieCompactionPlanGenerator.java index 445c8eb77566..a93ece710b02 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieCompactionPlanGenerator.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieCompactionPlanGenerator.java @@ -27,6 +27,7 @@ import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.action.compact.strategy.CompactionStrategy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,21 +41,25 @@ public class HoodieCompactionPlanGenerator operations) { // Filter the compactions with the passed in filter. This lets us choose most effective // compactions only - return writeConfig.getCompactionStrategy().generateCompactionPlan(writeConfig, operations, + return compactionStrategy.generateCompactionPlan(writeConfig, operations, CompactionUtils.getAllPendingCompactionPlans(metaClient).stream().map(Pair::getValue).collect(toList())); } @Override - protected List filterPartitionPathsByStrategy(HoodieWriteConfig writeConfig, List partitionPaths) { - return writeConfig.getCompactionStrategy().filterPartitionPaths(writeConfig, partitionPaths); + protected List filterPartitionPathsByStrategy(List partitionPaths) { + return compactionStrategy.filterPartitionPaths(writeConfig, partitionPaths); } @Override diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/CompositeCompactionStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/CompositeCompactionStrategy.java new file mode 100644 index 000000000000..e0875bca9d04 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/CompositeCompactionStrategy.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.table.action.compact.strategy; + +import org.apache.hudi.avro.model.HoodieCompactionOperation; +import org.apache.hudi.avro.model.HoodieCompactionPlan; +import org.apache.hudi.config.HoodieWriteConfig; + +import java.util.List; + +/** + * CompositeCompactionStrategy chains multiple compaction strategies together. + * The order of the strategies in the chain is important as the output of one strategy is passed as input to the next. + */ +public class CompositeCompactionStrategy extends CompactionStrategy { + + private List strategies; + + public CompositeCompactionStrategy(List strategies) { + this.strategies = strategies; + } + + @Override + public List orderAndFilter(HoodieWriteConfig writeConfig, List operations, List pendingCompactionPlans) { + List finalOperations = operations; + for (CompactionStrategy strategy : strategies) { + finalOperations = strategy.orderAndFilter(writeConfig, finalOperations, pendingCompactionPlans); + } + return finalOperations; + } + + @Override + public List filterPartitionPaths(HoodieWriteConfig writeConfig, List allPartitionPaths) { + List finalPartitionPaths = allPartitionPaths; + for (CompactionStrategy strategy : strategies) { + finalPartitionPaths = strategy.filterPartitionPaths(writeConfig, finalPartitionPaths); + } + return finalPartitionPaths; + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("CompactionStrategyChain ["); + for (CompactionStrategy strategy : strategies) { + builder.append(strategy.getClass()); + builder.append(" ===> "); + } + builder.append("]"); + return builder.toString(); + } +} diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/strategy/TestHoodieCompactionStrategy.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/strategy/TestHoodieCompactionStrategy.java index a67f51face4f..a368d43da253 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/strategy/TestHoodieCompactionStrategy.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/strategy/TestHoodieCompactionStrategy.java @@ -63,7 +63,7 @@ public void testUnBounded() { HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath("/tmp") .withCompactionConfig(HoodieCompactionConfig.newBuilder().withCompactionStrategy(strategy).build()).build(); List operations = createCompactionOperations(writeConfig, sizesMap); - List returned = strategy.orderAndFilter(writeConfig, operations, new ArrayList<>()); + List returned = writeConfig.getCompactionStrategy().orderAndFilter(writeConfig, operations, new ArrayList<>()); assertEquals(operations, returned, "UnBounded should not re-order or filter"); } @@ -79,7 +79,7 @@ public void testBoundedIOSimple() { HoodieCompactionConfig.newBuilder().withCompactionStrategy(strategy).withTargetIOPerCompactionInMB(400).build()) .build(); List operations = createCompactionOperations(writeConfig, sizesMap); - List returned = strategy.orderAndFilter(writeConfig, operations, new ArrayList<>()); + List returned = writeConfig.getCompactionStrategy().orderAndFilter(writeConfig, operations, new ArrayList<>()); assertTrue(returned.size() < operations.size(), "BoundedIOCompaction should have resulted in fewer compactions"); assertEquals(2, returned.size(), "BoundedIOCompaction should have resulted in 2 compactions being chosen"); @@ -103,7 +103,7 @@ public void testLogFileSizeCompactionSimple() { .withLogFileSizeThresholdBasedCompaction(100 * 1024 * 1024).build()) .build(); List operations = createCompactionOperations(writeConfig, sizesMap); - List returned = strategy.orderAndFilter(writeConfig, operations, new ArrayList<>()); + List returned = writeConfig.getCompactionStrategy().orderAndFilter(writeConfig, operations, new ArrayList<>()); assertTrue(returned.size() < operations.size(), "LogFileSizeBasedCompactionStrategy should have resulted in fewer compactions"); @@ -137,7 +137,7 @@ public void testDayBasedCompactionSimple() { HoodieWriteConfig.newBuilder().withPath("/tmp").withCompactionConfig(HoodieCompactionConfig.newBuilder() .withCompactionStrategy(strategy).withTargetPartitionsPerDayBasedCompaction(1).build()).build(); - List filterPartitions = strategy.filterPartitionPaths(writeConfig, Arrays.asList(partitionPaths)); + List filterPartitions = writeConfig.getCompactionStrategy().filterPartitionPaths(writeConfig, Arrays.asList(partitionPaths)); assertEquals(1, filterPartitions.size(), "DayBasedCompactionStrategy should have resulted in fewer partitions"); List operations = createCompactionOperationsForPartition(writeConfig, sizesMap, keyToPartitionMap, filterPartitions); @@ -182,11 +182,11 @@ public void testDayBasedCompactionWithIOBounded() { .build()) .build(); - List filterPartitions = strategy.filterPartitionPaths(writeConfig, Arrays.asList(partitionPaths)); + List filterPartitions = writeConfig.getCompactionStrategy().filterPartitionPaths(writeConfig, Arrays.asList(partitionPaths)); assertEquals(1, filterPartitions.size(), "DayBasedCompactionStrategy should have resulted in fewer partitions"); List operations = createCompactionOperationsForPartition(writeConfig, sizesMap, keyToPartitionMap, filterPartitions); - List returned = strategy.orderAndFilter(writeConfig, operations, new ArrayList<>()); + List returned = writeConfig.getCompactionStrategy().orderAndFilter(writeConfig, operations, new ArrayList<>()); assertEquals(1, returned.size(), "DayBasedAndBoundedIOCompactionStrategy should have resulted in fewer compactions"); @@ -241,7 +241,7 @@ public void testBoundedPartitionAwareCompactionSimple() { HoodieWriteConfig.newBuilder().withPath("/tmp").withCompactionConfig(HoodieCompactionConfig.newBuilder() .withCompactionStrategy(strategy).withTargetPartitionsPerDayBasedCompaction(2).build()).build(); List operations = createCompactionOperations(writeConfig, sizesMap, keyToPartitionMap); - List returned = strategy.orderAndFilter(writeConfig, operations, new ArrayList<>()); + List returned = writeConfig.getCompactionStrategy().orderAndFilter(writeConfig, operations, new ArrayList<>()); assertTrue(returned.size() < operations.size(), "BoundedPartitionAwareCompactionStrategy should have resulted in fewer compactions"); @@ -290,7 +290,7 @@ public void testUnboundedPartitionAwareCompactionSimple() { HoodieWriteConfig.newBuilder().withPath("/tmp").withCompactionConfig(HoodieCompactionConfig.newBuilder() .withCompactionStrategy(strategy).withTargetPartitionsPerDayBasedCompaction(2).build()).build(); List operations = createCompactionOperations(writeConfig, sizesMap, keyToPartitionMap); - List returned = strategy.orderAndFilter(writeConfig, operations, new ArrayList<>()); + List returned = writeConfig.getCompactionStrategy().orderAndFilter(writeConfig, operations, new ArrayList<>()); assertTrue(returned.size() < operations.size(), "UnBoundedPartitionAwareCompactionStrategy should not include last " @@ -312,7 +312,7 @@ public void testLogFileLengthBasedCompactionStrategy() { .withCompactionLogFileNumThreshold(2).build()) .build(); List operations = createCompactionOperations(writeConfig, sizesMap); - List returned = strategy.orderAndFilter(writeConfig, operations, new ArrayList<>()); + List returned = writeConfig.getCompactionStrategy().orderAndFilter(writeConfig, operations, new ArrayList<>()); assertTrue(returned.size() < operations.size(), "LogFileLengthBasedCompactionStrategy should have resulted in fewer compactions"); @@ -331,8 +331,43 @@ public void testLogFileLengthBasedCompactionStrategy() { // TOTAL_IO_MB: ( 120 + 90 ) * 2 + 521 + 521 + 60 + 10 + 80 assertEquals(1594, (long) returnedSize, "Should chose the first 2 compactions which should result in a total IO of 1594 MB"); + } + @Test + public void testCompositeCompactionStrategy() { + HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath("/tmp").withCompactionConfig( + HoodieCompactionConfig.newBuilder().withCompactionStrategy(new NumStrategy(), new PrefixStrategy()).withTargetIOPerCompactionInMB(1024) + .withCompactionLogFileNumThreshold(2).build()).build(); + List allPartitionPaths = Arrays.asList( + "2017/01/01", "2018/01/02", "2017/02/01" + ); + List returned = writeConfig.getCompactionStrategy().filterPartitionPaths(writeConfig, allPartitionPaths); + // filter by num first and then filter by prefix + assertEquals(1, returned.size()); + assertEquals("2017/01/01", returned.get(0)); + + writeConfig = HoodieWriteConfig.newBuilder().withPath("/tmp").withCompactionConfig( + HoodieCompactionConfig.newBuilder().withCompactionStrategy(new PrefixStrategy(), new NumStrategy()).withTargetIOPerCompactionInMB(1024) + .withCompactionLogFileNumThreshold(2).build()).build(); + returned = writeConfig.getCompactionStrategy().filterPartitionPaths(writeConfig, allPartitionPaths); + // filter by prefix first and then filter by num + assertEquals(2, returned.size()); + assertEquals("2017/01/01", returned.get(0)); + assertEquals("2017/02/01", returned.get(1)); + } + public static class NumStrategy extends CompactionStrategy { + @Override + public List filterPartitionPaths(HoodieWriteConfig writeConfig, List allPartitionPaths) { + return allPartitionPaths.stream().limit(2).collect(Collectors.toList()); + } + } + + public static class PrefixStrategy extends CompactionStrategy { + @Override + public List filterPartitionPaths(HoodieWriteConfig writeConfig, List allPartitionPaths) { + return allPartitionPaths.stream().filter(s -> s.startsWith("2017")).collect(Collectors.toList()); + } } @Test diff --git a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/HDFSParquetImporterUtils.java b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/HDFSParquetImporterUtils.java index d178fdd8e0d1..839b5e0d9de5 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/HDFSParquetImporterUtils.java +++ b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/HDFSParquetImporterUtils.java @@ -40,6 +40,7 @@ import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.storage.StoragePath; +import org.apache.hudi.table.action.compact.strategy.CompactionStrategy; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; @@ -277,9 +278,10 @@ public static DFSPropertiesConfiguration readConfig(Configuration hadoopConfig, */ public static SparkRDDWriteClient createHoodieClient(JavaSparkContext jsc, String basePath, String schemaStr, int parallelism, Option compactionStrategyClass, TypedProperties properties) { - HoodieCompactionConfig compactionConfig = compactionStrategyClass + Option strategyOpt = compactionStrategyClass.map(ReflectionUtils::loadClass); + HoodieCompactionConfig compactionConfig = strategyOpt .map(strategy -> HoodieCompactionConfig.newBuilder().withInlineCompaction(false) - .withCompactionStrategy(ReflectionUtils.loadClass(strategy)).build()) + .withCompactionStrategy(strategy).build()) .orElseGet(() -> HoodieCompactionConfig.newBuilder().withInlineCompaction(false).build()); HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java index 341642942eed..5acadd8109e2 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java @@ -53,6 +53,7 @@ import org.apache.hudi.storage.HoodieStorage; import org.apache.hudi.storage.StoragePath; import org.apache.hudi.storage.StorageSchemes; +import org.apache.hudi.table.action.compact.strategy.CompactionStrategy; import org.apache.hudi.utilities.checkpointing.InitialCheckPointProvider; import org.apache.hudi.utilities.config.HoodieSchemaProviderConfig; import org.apache.hudi.utilities.config.SchemaProviderPostProcessorConfig; @@ -395,9 +396,10 @@ public static JavaSparkContext buildSparkContext(String appName, String sparkMas */ public static SparkRDDWriteClient createHoodieClient(JavaSparkContext jsc, String basePath, String schemaStr, int parallelism, Option compactionStrategyClass, TypedProperties properties) { - HoodieCompactionConfig compactionConfig = compactionStrategyClass + Option strategyOpt = compactionStrategyClass.map(ReflectionUtils::loadClass); + HoodieCompactionConfig compactionConfig = strategyOpt .map(strategy -> HoodieCompactionConfig.newBuilder().withInlineCompaction(false) - .withCompactionStrategy(ReflectionUtils.loadClass(strategy)).build()) + .withCompactionStrategy(strategy).build()) .orElse(HoodieCompactionConfig.newBuilder().withInlineCompaction(false).build()); HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath)