Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-8215] Support composite compaction strategy #11963

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ public class HoodieCompactionConfig extends HoodieConfig {
.markAdvanced()
.withDocumentation("Compaction strategy decides which file groups are picked up for "
+ "compaction during each compaction run. By default. Hudi picks the log file "
+ "with most accumulated unmerged data");
+ "with most accumulated unmerged data. Support composite strategy by providing, format: strategy_class_name1,strategy_class_name2,...");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The strategy can be composed with multiple strategies by concatenating the class names with ','.


public static final ConfigProperty<String> TARGET_PARTITIONS_PER_DAYBASED_COMPACTION = ConfigProperty
.key("hoodie.compaction.daybased.target.partitions")
Expand Down Expand Up @@ -408,8 +408,12 @@ public Builder approxRecordSize(int recordSizeEstimate) {
return this;
}

public Builder withCompactionStrategy(CompactionStrategy compactionStrategy) {
compactionConfig.setValue(COMPACTION_STRATEGY, compactionStrategy.getClass().getName());
public Builder withCompactionStrategy(CompactionStrategy... compactionStrategies) {
StringBuilder compactionStrategyBuilder = new StringBuilder();
for (CompactionStrategy compactionStrategy : compactionStrategies) {
compactionStrategyBuilder.append(compactionStrategy.getClass().getName()).append(",");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we trim out the tailing ,?

}
compactionConfig.setValue(COMPACTION_STRATEGY, compactionStrategyBuilder.toString());
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import org.apache.hudi.table.action.cluster.ClusteringPlanPartitionFilterMode;
import org.apache.hudi.table.action.compact.CompactionTriggerStrategy;
import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;
import org.apache.hudi.table.action.compact.strategy.CompositeCompactionStrategy;
import org.apache.hudi.table.storage.HoodieStorageLayout;

import org.apache.orc.CompressionKind;
Expand Down Expand Up @@ -1655,7 +1656,11 @@ public int getInlineCompactDeltaSecondsMax() {
}

public CompactionStrategy getCompactionStrategy() {
return ReflectionUtils.loadClass(getString(HoodieCompactionConfig.COMPACTION_STRATEGY));
String compactionStrategiesStr = getString(HoodieCompactionConfig.COMPACTION_STRATEGY);
String[] compactionStrategyArr = compactionStrategiesStr.split(",");
List<CompactionStrategy> compactionStrategies = Arrays.stream(compactionStrategyArr)
.map(className -> (CompactionStrategy) ReflectionUtils.loadClass(className)).collect(Collectors.toList());
return new CompositeCompactionStrategy(compactionStrategies);
TheR1sing3un marked this conversation as resolved.
Show resolved Hide resolved
}

public Long getTargetIOPerCompactionInMB() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public HoodieCompactionPlan generateCompactionPlan(String compactionInstant) thr
int allPartitionSize = partitionPaths.size();

// filter the partition paths if needed to reduce list status
partitionPaths = filterPartitionPathsByStrategy(writeConfig, partitionPaths);
partitionPaths = filterPartitionPathsByStrategy(partitionPaths);
LOG.info("Strategy: {} matched {} partition paths from all {} partitions",
writeConfig.getCompactionStrategy().getClass().getSimpleName(), partitionPaths.size(), allPartitionSize);
if (partitionPaths.isEmpty()) {
Expand Down Expand Up @@ -185,7 +185,7 @@ public HoodieCompactionPlan generateCompactionPlan(String compactionInstant) thr

protected abstract boolean filterLogCompactionOperations();

protected List<String> filterPartitionPathsByStrategy(HoodieWriteConfig writeConfig, List<String> partitionPaths) {
protected List<String> filterPartitionPathsByStrategy(List<String> partitionPaths) {
return partitionPaths;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -40,21 +41,25 @@ public class HoodieCompactionPlanGenerator<T extends HoodieRecordPayload, I, K,

private static final Logger LOG = LoggerFactory.getLogger(HoodieCompactionPlanGenerator.class);

private final CompactionStrategy compactionStrategy;

public HoodieCompactionPlanGenerator(HoodieTable table, HoodieEngineContext engineContext, HoodieWriteConfig writeConfig) {
super(table, engineContext, writeConfig);
this.compactionStrategy = writeConfig.getCompactionStrategy();
LOG.info("Compaction Strategy used is: " + compactionStrategy.toString());
}

@Override
protected HoodieCompactionPlan getCompactionPlan(HoodieTableMetaClient metaClient, List<HoodieCompactionOperation> operations) {
// Filter the compactions with the passed in filter. This lets us choose most effective
// compactions only
return writeConfig.getCompactionStrategy().generateCompactionPlan(writeConfig, operations,
return compactionStrategy.generateCompactionPlan(writeConfig, operations,
CompactionUtils.getAllPendingCompactionPlans(metaClient).stream().map(Pair::getValue).collect(toList()));
}

@Override
protected List<String> filterPartitionPathsByStrategy(HoodieWriteConfig writeConfig, List<String> partitionPaths) {
return writeConfig.getCompactionStrategy().filterPartitionPaths(writeConfig, partitionPaths);
protected List<String> filterPartitionPathsByStrategy(List<String> partitionPaths) {
return compactionStrategy.filterPartitionPaths(writeConfig, partitionPaths);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.hudi.table.action.compact.strategy;

import org.apache.hudi.avro.model.HoodieCompactionOperation;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.config.HoodieWriteConfig;

import java.util.List;

/**
* CompositeCompactionStrategy chains multiple compaction strategies together.
* The order of the strategies in the chain is important as the output of one strategy is passed as input to the next.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we address that the compisition predicate is and instead of or?

*/
public class CompositeCompactionStrategy extends CompactionStrategy {

private List<CompactionStrategy> strategies;

public CompositeCompactionStrategy(List<CompactionStrategy> strategies) {
this.strategies = strategies;
}

@Override
public List<HoodieCompactionOperation> orderAndFilter(HoodieWriteConfig writeConfig, List<HoodieCompactionOperation> operations, List<HoodieCompactionPlan> pendingCompactionPlans) {
List<HoodieCompactionOperation> finalOperations = operations;
for (CompactionStrategy strategy : strategies) {
finalOperations = strategy.orderAndFilter(writeConfig, finalOperations, pendingCompactionPlans);
}
return finalOperations;
}

@Override
public List<String> filterPartitionPaths(HoodieWriteConfig writeConfig, List<String> allPartitionPaths) {
List<String> finalPartitionPaths = allPartitionPaths;
for (CompactionStrategy strategy : strategies) {
finalPartitionPaths = strategy.filterPartitionPaths(writeConfig, finalPartitionPaths);
}
return finalPartitionPaths;
}

@Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("CompactionStrategyChain [");
for (CompactionStrategy strategy : strategies) {
builder.append(strategy.getClass());
builder.append(" ===> ");
}
builder.append("]");
return builder.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public void testUnBounded() {
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath("/tmp")
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withCompactionStrategy(strategy).build()).build();
List<HoodieCompactionOperation> operations = createCompactionOperations(writeConfig, sizesMap);
List<HoodieCompactionOperation> returned = strategy.orderAndFilter(writeConfig, operations, new ArrayList<>());
List<HoodieCompactionOperation> returned = writeConfig.getCompactionStrategy().orderAndFilter(writeConfig, operations, new ArrayList<>());
assertEquals(operations, returned, "UnBounded should not re-order or filter");
}

Expand All @@ -79,7 +79,7 @@ public void testBoundedIOSimple() {
HoodieCompactionConfig.newBuilder().withCompactionStrategy(strategy).withTargetIOPerCompactionInMB(400).build())
.build();
List<HoodieCompactionOperation> operations = createCompactionOperations(writeConfig, sizesMap);
List<HoodieCompactionOperation> returned = strategy.orderAndFilter(writeConfig, operations, new ArrayList<>());
List<HoodieCompactionOperation> returned = writeConfig.getCompactionStrategy().orderAndFilter(writeConfig, operations, new ArrayList<>());

assertTrue(returned.size() < operations.size(), "BoundedIOCompaction should have resulted in fewer compactions");
assertEquals(2, returned.size(), "BoundedIOCompaction should have resulted in 2 compactions being chosen");
Expand All @@ -103,7 +103,7 @@ public void testLogFileSizeCompactionSimple() {
.withLogFileSizeThresholdBasedCompaction(100 * 1024 * 1024).build())
.build();
List<HoodieCompactionOperation> operations = createCompactionOperations(writeConfig, sizesMap);
List<HoodieCompactionOperation> returned = strategy.orderAndFilter(writeConfig, operations, new ArrayList<>());
List<HoodieCompactionOperation> returned = writeConfig.getCompactionStrategy().orderAndFilter(writeConfig, operations, new ArrayList<>());

assertTrue(returned.size() < operations.size(),
"LogFileSizeBasedCompactionStrategy should have resulted in fewer compactions");
Expand Down Expand Up @@ -137,7 +137,7 @@ public void testDayBasedCompactionSimple() {
HoodieWriteConfig.newBuilder().withPath("/tmp").withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withCompactionStrategy(strategy).withTargetPartitionsPerDayBasedCompaction(1).build()).build();

List<String> filterPartitions = strategy.filterPartitionPaths(writeConfig, Arrays.asList(partitionPaths));
List<String> filterPartitions = writeConfig.getCompactionStrategy().filterPartitionPaths(writeConfig, Arrays.asList(partitionPaths));
assertEquals(1, filterPartitions.size(), "DayBasedCompactionStrategy should have resulted in fewer partitions");

List<HoodieCompactionOperation> operations = createCompactionOperationsForPartition(writeConfig, sizesMap, keyToPartitionMap, filterPartitions);
Expand Down Expand Up @@ -182,11 +182,11 @@ public void testDayBasedCompactionWithIOBounded() {
.build())
.build();

List<String> filterPartitions = strategy.filterPartitionPaths(writeConfig, Arrays.asList(partitionPaths));
List<String> filterPartitions = writeConfig.getCompactionStrategy().filterPartitionPaths(writeConfig, Arrays.asList(partitionPaths));
assertEquals(1, filterPartitions.size(), "DayBasedCompactionStrategy should have resulted in fewer partitions");

List<HoodieCompactionOperation> operations = createCompactionOperationsForPartition(writeConfig, sizesMap, keyToPartitionMap, filterPartitions);
List<HoodieCompactionOperation> returned = strategy.orderAndFilter(writeConfig, operations, new ArrayList<>());
List<HoodieCompactionOperation> returned = writeConfig.getCompactionStrategy().orderAndFilter(writeConfig, operations, new ArrayList<>());

assertEquals(1, returned.size(),
"DayBasedAndBoundedIOCompactionStrategy should have resulted in fewer compactions");
Expand Down Expand Up @@ -241,7 +241,7 @@ public void testBoundedPartitionAwareCompactionSimple() {
HoodieWriteConfig.newBuilder().withPath("/tmp").withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withCompactionStrategy(strategy).withTargetPartitionsPerDayBasedCompaction(2).build()).build();
List<HoodieCompactionOperation> operations = createCompactionOperations(writeConfig, sizesMap, keyToPartitionMap);
List<HoodieCompactionOperation> returned = strategy.orderAndFilter(writeConfig, operations, new ArrayList<>());
List<HoodieCompactionOperation> returned = writeConfig.getCompactionStrategy().orderAndFilter(writeConfig, operations, new ArrayList<>());

assertTrue(returned.size() < operations.size(),
"BoundedPartitionAwareCompactionStrategy should have resulted in fewer compactions");
Expand Down Expand Up @@ -290,7 +290,7 @@ public void testUnboundedPartitionAwareCompactionSimple() {
HoodieWriteConfig.newBuilder().withPath("/tmp").withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withCompactionStrategy(strategy).withTargetPartitionsPerDayBasedCompaction(2).build()).build();
List<HoodieCompactionOperation> operations = createCompactionOperations(writeConfig, sizesMap, keyToPartitionMap);
List<HoodieCompactionOperation> returned = strategy.orderAndFilter(writeConfig, operations, new ArrayList<>());
List<HoodieCompactionOperation> returned = writeConfig.getCompactionStrategy().orderAndFilter(writeConfig, operations, new ArrayList<>());

assertTrue(returned.size() < operations.size(),
"UnBoundedPartitionAwareCompactionStrategy should not include last "
Expand All @@ -312,7 +312,7 @@ public void testLogFileLengthBasedCompactionStrategy() {
.withCompactionLogFileNumThreshold(2).build())
.build();
List<HoodieCompactionOperation> operations = createCompactionOperations(writeConfig, sizesMap);
List<HoodieCompactionOperation> returned = strategy.orderAndFilter(writeConfig, operations, new ArrayList<>());
List<HoodieCompactionOperation> returned = writeConfig.getCompactionStrategy().orderAndFilter(writeConfig, operations, new ArrayList<>());

assertTrue(returned.size() < operations.size(),
"LogFileLengthBasedCompactionStrategy should have resulted in fewer compactions");
Expand All @@ -331,8 +331,43 @@ public void testLogFileLengthBasedCompactionStrategy() {
// TOTAL_IO_MB: ( 120 + 90 ) * 2 + 521 + 521 + 60 + 10 + 80
assertEquals(1594, (long) returnedSize,
"Should chose the first 2 compactions which should result in a total IO of 1594 MB");
}

@Test
public void testCompositeCompactionStrategy() {
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath("/tmp").withCompactionConfig(
HoodieCompactionConfig.newBuilder().withCompactionStrategy(new NumStrategy(), new PrefixStrategy()).withTargetIOPerCompactionInMB(1024)
.withCompactionLogFileNumThreshold(2).build()).build();
List<String> allPartitionPaths = Arrays.asList(
"2017/01/01", "2018/01/02", "2017/02/01"
);
List<String> returned = writeConfig.getCompactionStrategy().filterPartitionPaths(writeConfig, allPartitionPaths);
// filter by num first and then filter by prefix
assertEquals(1, returned.size());
assertEquals("2017/01/01", returned.get(0));

writeConfig = HoodieWriteConfig.newBuilder().withPath("/tmp").withCompactionConfig(
HoodieCompactionConfig.newBuilder().withCompactionStrategy(new PrefixStrategy(), new NumStrategy()).withTargetIOPerCompactionInMB(1024)
.withCompactionLogFileNumThreshold(2).build()).build();
returned = writeConfig.getCompactionStrategy().filterPartitionPaths(writeConfig, allPartitionPaths);
// filter by prefix first and then filter by num
assertEquals(2, returned.size());
assertEquals("2017/01/01", returned.get(0));
assertEquals("2017/02/01", returned.get(1));
}

public static class NumStrategy extends CompactionStrategy {
@Override
public List<String> filterPartitionPaths(HoodieWriteConfig writeConfig, List<String> allPartitionPaths) {
return allPartitionPaths.stream().limit(2).collect(Collectors.toList());
}
}

public static class PrefixStrategy extends CompactionStrategy {
@Override
public List<String> filterPartitionPaths(HoodieWriteConfig writeConfig, List<String> allPartitionPaths) {
return allPartitionPaths.stream().filter(s -> s.startsWith("2017")).collect(Collectors.toList());
}
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
Expand Down Expand Up @@ -277,9 +278,10 @@ public static DFSPropertiesConfiguration readConfig(Configuration hadoopConfig,
*/
public static SparkRDDWriteClient<HoodieRecordPayload> createHoodieClient(JavaSparkContext jsc, String basePath, String schemaStr,
int parallelism, Option<String> compactionStrategyClass, TypedProperties properties) {
HoodieCompactionConfig compactionConfig = compactionStrategyClass
Option<CompactionStrategy> strategyOpt = compactionStrategyClass.map(ReflectionUtils::loadClass);
HoodieCompactionConfig compactionConfig = strategyOpt
.map(strategy -> HoodieCompactionConfig.newBuilder().withInlineCompaction(false)
.withCompactionStrategy(ReflectionUtils.loadClass(strategy)).build())
.withCompactionStrategy(strategy).build())
.orElseGet(() -> HoodieCompactionConfig.newBuilder().withInlineCompaction(false).build());
HoodieWriteConfig config =
HoodieWriteConfig.newBuilder().withPath(basePath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.StorageSchemes;
import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;
import org.apache.hudi.utilities.checkpointing.InitialCheckPointProvider;
import org.apache.hudi.utilities.config.HoodieSchemaProviderConfig;
import org.apache.hudi.utilities.config.SchemaProviderPostProcessorConfig;
Expand Down Expand Up @@ -395,9 +396,10 @@ public static JavaSparkContext buildSparkContext(String appName, String sparkMas
*/
public static SparkRDDWriteClient<HoodieRecordPayload> createHoodieClient(JavaSparkContext jsc, String basePath, String schemaStr,
int parallelism, Option<String> compactionStrategyClass, TypedProperties properties) {
HoodieCompactionConfig compactionConfig = compactionStrategyClass
Option<CompactionStrategy> strategyOpt = compactionStrategyClass.map(ReflectionUtils::loadClass);
HoodieCompactionConfig compactionConfig = strategyOpt
.map(strategy -> HoodieCompactionConfig.newBuilder().withInlineCompaction(false)
.withCompactionStrategy(ReflectionUtils.loadClass(strategy)).build())
.withCompactionStrategy(strategy).build())
.orElse(HoodieCompactionConfig.newBuilder().withInlineCompaction(false).build());
HoodieWriteConfig config =
HoodieWriteConfig.newBuilder().withPath(basePath)
Expand Down
Loading