Skip to content

Commit

Permalink
Fail the comparison between ordering values with different types
Browse files Browse the repository at this point in the history
  • Loading branch information
linliu-code committed Sep 18, 2024
1 parent c87663d commit 04be023
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 114 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.hudi.common.util.InternalSchemaCache;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.CloseableMappingIterator;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
Expand Down Expand Up @@ -157,44 +156,6 @@ public void close() {
records.clear();
}

/**
* Compares two {@link Comparable}s. If both are numbers, converts them to {@link Long} for comparison.
* If one of the {@link Comparable}s is a String, assumes that both are String values for comparison.
*
* @param readerContext {@link HoodieReaderContext} instance.
* @param o1 {@link Comparable} object.
* @param o2 other {@link Comparable} object to compare to.
* @return comparison result.
*/
@VisibleForTesting
static int compareTo(HoodieReaderContext readerContext, Comparable o1, Comparable o2) {
// TODO(HUDI-7848): fix the delete records to contain the correct ordering value type
// so this util with the number comparison is not necessary.
try {
return o1.compareTo(o2);
} catch (ClassCastException e) {
boolean isO1LongOrInteger = (o1 instanceof Long || o1 instanceof Integer);
boolean isO2LongOrInteger = (o2 instanceof Long || o2 instanceof Integer);
boolean isO1DoubleOrFloat = (o1 instanceof Double || o1 instanceof Float);
boolean isO2DoubleOrFloat = (o2 instanceof Double || o2 instanceof Float);
if (isO1LongOrInteger && isO2LongOrInteger) {
Long o1LongValue = ((Number) o1).longValue();
Long o2LongValue = ((Number) o2).longValue();
return o1LongValue.compareTo(o2LongValue);
} else if ((isO1LongOrInteger && isO2DoubleOrFloat)
|| (isO1DoubleOrFloat && isO2LongOrInteger)) {
Double o1DoubleValue = ((Number) o1).doubleValue();
Double o2DoubleValue = ((Number) o2).doubleValue();
return o1DoubleValue.compareTo(o2DoubleValue);
} else {
return readerContext.compareTo(o1, o2);
}
} catch (Throwable e) {
throw new HoodieException("Cannot compare values: "
+ o1 + "(" + o1.getClass() + "), " + o2 + "(" + o2.getClass() + ")", e);
}
}

/**
* Merge two log data records if needed.
*
Expand Down Expand Up @@ -247,8 +208,18 @@ protected Option<Pair<T, Map<String, Object>>> doProcessNextDataRecord(T record,
}
Comparable incomingOrderingValue = readerContext.getOrderingValue(
Option.of(record), metadata, readerSchema, props);
if (compareTo(readerContext, incomingOrderingValue, existingOrderingValue) > 0) {
return Option.of(Pair.of(record, metadata));

try {
if (incomingOrderingValue.compareTo(existingOrderingValue) > 0) {
return Option.of(Pair.of(record, metadata));
}
} catch (ClassCastException e) {
throw new HoodieException(String.format(
"Cannot compare values: %s(%s), %s(%s)",
incomingOrderingValue,
incomingOrderingValue.getClass(),
existingOrderingValue,
existingOrderingValue.getClass()));
}
return Option.empty();
case CUSTOM:
Expand Down Expand Up @@ -416,8 +387,17 @@ protected Option<T> merge(Option<T> older, Map<String, Object> olderInfoMap,
if (isDeleteRecordWithNaturalOrder(newer, newOrderingValue)) {
return Option.empty();
}
if (compareTo(readerContext, oldOrderingValue, newOrderingValue) > 0) {
return older;
try {
if (oldOrderingValue.compareTo(newOrderingValue) > 0) {
return older;
}
} catch (ClassCastException e) {
throw new HoodieException(String.format(
"Cannot compare values: %s(%s), %s(%s)",
oldOrderingValue,
oldOrderingValue.getClass(),
newOrderingValue,
newOrderingValue.getClass()));
}
return newer;
case CUSTOM:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import org.apache.hudi.storage.StorageConfiguration;

import org.apache.avro.Schema;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
Expand All @@ -61,13 +60,11 @@

import static org.apache.hudi.common.model.HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID;
import static org.apache.hudi.common.model.HoodieRecordMerger.OVERWRITE_MERGER_STRATEGY_UUID;
import static org.apache.hudi.common.model.WriteOperationType.BULK_INSERT;
import static org.apache.hudi.common.model.WriteOperationType.INSERT;
import static org.apache.hudi.common.model.WriteOperationType.UPSERT;
import static org.apache.hudi.common.table.HoodieTableConfig.PARTITION_FIELDS;
import static org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGER_STRATEGY;
import static org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_MODE;
import static org.apache.hudi.common.table.read.HoodieBaseFileGroupRecordBuffer.compareTo;
import static org.apache.hudi.common.testutils.HoodieTestUtils.getLogFileListFromFileSlice;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
Expand Down Expand Up @@ -100,71 +97,6 @@ public abstract void validateRecordsInFileGroup(String tablePath,

public abstract Comparable getComparableUTF8String(String value);

@Test
public void testCompareToComparable() throws Exception {
Map<String, String> writeConfigs = new HashMap<>(getCommonConfigs(RecordMergeMode.EVENT_TIME_ORDERING));
// Prepare a table for initializing reader context
try (HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(0xDEEF)) {
commitToTable(dataGen.generateInserts("001", 1), BULK_INSERT.value(), writeConfigs);
}
StorageConfiguration<?> storageConf = getStorageConf();
String tablePath = getBasePath();
HoodieTableMetaClient metaClient = HoodieTestUtils.createMetaClient(storageConf, tablePath);
Schema avroSchema = new TableSchemaResolver(metaClient).getTableAvroSchema();
HoodieReaderContext<T> readerContext = getHoodieReaderContext(tablePath, avroSchema, storageConf);

// Test same type
assertEquals(1, compareTo(readerContext, Boolean.TRUE, Boolean.FALSE));
assertEquals(0, compareTo(readerContext, Boolean.TRUE, Boolean.TRUE));
assertEquals(-1, compareTo(readerContext, Boolean.FALSE, Boolean.TRUE));
assertEquals(1, compareTo(readerContext, 20, 15));
assertEquals(0, compareTo(readerContext, 15, 15));
assertEquals(-1, compareTo(readerContext, 10, 15));
assertEquals(1, compareTo(readerContext, 1.1f, 1.0f));
assertEquals(0, compareTo(readerContext, 1.0f, 1.0f));
assertEquals(-1, compareTo(readerContext, 0.9f, 1.0f));
assertEquals(1, compareTo(readerContext, 1.1, 1.0));
assertEquals(0, compareTo(readerContext, 1.0, 1.0));
assertEquals(-1, compareTo(readerContext, 0.9, 1.0));
assertEquals(1, compareTo(readerContext, 1.1, 1));
assertEquals(-1, compareTo(readerContext, 0.9, 1));
assertEquals(1, compareTo(readerContext, "value2", "value1"));
assertEquals(0, compareTo(readerContext, "value1", "value1"));
assertEquals(-1, compareTo(readerContext, "value1", "value2"));
// Test different types which are comparable
assertEquals(1, compareTo(readerContext, Long.MAX_VALUE / 2L, 10));
assertEquals(1, compareTo(readerContext, 20, 10L));
assertEquals(0, compareTo(readerContext, 10L, 10));
assertEquals(0, compareTo(readerContext, 10, 10L));
assertEquals(-1, compareTo(readerContext, 10, Long.MAX_VALUE));
assertEquals(-1, compareTo(readerContext, 10L, 20));
assertEquals(1, compareTo(readerContext, 10.01f, 10));
assertEquals(1, compareTo(readerContext, 10.01f, 10L));
assertEquals(1, compareTo(readerContext, 10.01, 10));
assertEquals(1, compareTo(readerContext, 10.01, 10L));
assertEquals(1, compareTo(readerContext, 11L, 10.99f));
assertEquals(1, compareTo(readerContext, 11, 10.99));
// Throw exception if comparing Double with Float which have different precision
assertThrows(IllegalArgumentException.class, () -> compareTo(readerContext, 10.01f, 10.0));
assertThrows(IllegalArgumentException.class, () -> compareTo(readerContext, 10.01, 10.0f));
assertEquals(0, compareTo(readerContext, 10.0, 10L));
assertEquals(0, compareTo(readerContext, 10.0f, 10L));
assertEquals(0, compareTo(readerContext, 10.0, 10));
assertEquals(0, compareTo(readerContext, 10.0f, 10));
assertEquals(-1, compareTo(readerContext, 9.99f, 10));
assertEquals(-1, compareTo(readerContext, 9.99f, 10L));
assertEquals(-1, compareTo(readerContext, 9.99, 10));
assertEquals(-1, compareTo(readerContext, 9.99, 10L));
assertEquals(-1, compareTo(readerContext, 10L, 10.01f));
assertEquals(-1, compareTo(readerContext, 10, 10.01));
assertEquals(1, compareTo(readerContext, getComparableUTF8String("value2"), "value1"));
assertEquals(1, compareTo(readerContext, "value2", getComparableUTF8String("value1")));
assertEquals(0, compareTo(readerContext, getComparableUTF8String("value1"), "value1"));
assertEquals(0, compareTo(readerContext, "value1", getComparableUTF8String("value1")));
assertEquals(-1, compareTo(readerContext, getComparableUTF8String("value1"), "value2"));
assertEquals(-1, compareTo(readerContext, "value1", getComparableUTF8String("value2")));
}

private static Stream<Arguments> testArguments() {
return Stream.of(
arguments(RecordMergeMode.OVERWRITE_WITH_LATEST, "avro"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,11 +224,11 @@ select id, name, price, ts, dt from h1_p order by id;

merge into h1_p t0
using (
select 1 as id, '_delete' as name, 10 as price, 1000 as ts, '2021-05-07' as dt
select 1 as id, '_delete' as name, 10 as price, 1000L as ts, '2021-05-07' as dt
union
select 2 as id, '_update' as name, 12 as price, 1001 as ts, '2021-05-07' as dt
select 2 as id, '_update' as name, 12 as price, 1001L as ts, '2021-05-07' as dt
union
select 6 as id, '_insert' as name, 10 as price, 1000 as ts, '2021-05-08' as dt
select 6 as id, '_insert' as name, 10 as price, 1000L as ts, '2021-05-08' as dt
) s0
on s0.id = t0.id
when matched and s0.name = '_update'
Expand Down

0 comments on commit 04be023

Please sign in to comment.