Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-7848] Fail comparisons between different typed ordering values #11961

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.hudi.common.util.InternalSchemaCache;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.CloseableMappingIterator;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
Expand Down Expand Up @@ -157,44 +156,6 @@ public void close() {
records.clear();
}

/**
* Compares two {@link Comparable}s. If both are numbers, converts them to {@link Long} for comparison.
* If one of the {@link Comparable}s is a String, assumes that both are String values for comparison.
*
* @param readerContext {@link HoodieReaderContext} instance.
* @param o1 {@link Comparable} object.
* @param o2 other {@link Comparable} object to compare to.
* @return comparison result.
*/
@VisibleForTesting
static int compareTo(HoodieReaderContext readerContext, Comparable o1, Comparable o2) {
// TODO(HUDI-7848): fix the delete records to contain the correct ordering value type
// so this util with the number comparison is not necessary.
try {
return o1.compareTo(o2);
} catch (ClassCastException e) {
boolean isO1LongOrInteger = (o1 instanceof Long || o1 instanceof Integer);
boolean isO2LongOrInteger = (o2 instanceof Long || o2 instanceof Integer);
boolean isO1DoubleOrFloat = (o1 instanceof Double || o1 instanceof Float);
boolean isO2DoubleOrFloat = (o2 instanceof Double || o2 instanceof Float);
if (isO1LongOrInteger && isO2LongOrInteger) {
Long o1LongValue = ((Number) o1).longValue();
Long o2LongValue = ((Number) o2).longValue();
return o1LongValue.compareTo(o2LongValue);
} else if ((isO1LongOrInteger && isO2DoubleOrFloat)
|| (isO1DoubleOrFloat && isO2LongOrInteger)) {
Double o1DoubleValue = ((Number) o1).doubleValue();
Double o2DoubleValue = ((Number) o2).doubleValue();
return o1DoubleValue.compareTo(o2DoubleValue);
} else {
return readerContext.compareTo(o1, o2);
}
} catch (Throwable e) {
throw new HoodieException("Cannot compare values: "
+ o1 + "(" + o1.getClass() + "), " + o2 + "(" + o2.getClass() + ")", e);
}
}

/**
* Merge two log data records if needed.
*
Expand Down Expand Up @@ -247,8 +208,18 @@ protected Option<Pair<T, Map<String, Object>>> doProcessNextDataRecord(T record,
}
Comparable incomingOrderingValue = readerContext.getOrderingValue(
Option.of(record), metadata, readerSchema, props);
if (compareTo(readerContext, incomingOrderingValue, existingOrderingValue) > 0) {
return Option.of(Pair.of(record, metadata));

try {
if (incomingOrderingValue.compareTo(existingOrderingValue) > 0) {
return Option.of(Pair.of(record, metadata));
}
} catch (ClassCastException e) {
throw new HoodieException(String.format(
"Cannot compare values: %s(%s), %s(%s)",
incomingOrderingValue,
incomingOrderingValue.getClass(),
existingOrderingValue,
existingOrderingValue.getClass()));
Comment on lines +216 to +222
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In MergeIntoHoodieTableCommand you need to validate the input schema first. If the schema is incompatible, the error should be thrown there. Deferring error here does not help user as this is not the root cause.

}
return Option.empty();
case CUSTOM:
Expand Down Expand Up @@ -416,8 +387,17 @@ protected Option<T> merge(Option<T> older, Map<String, Object> olderInfoMap,
if (isDeleteRecordWithNaturalOrder(newer, newOrderingValue)) {
return Option.empty();
}
if (compareTo(readerContext, oldOrderingValue, newOrderingValue) > 0) {
return older;
try {
if (oldOrderingValue.compareTo(newOrderingValue) > 0) {
return older;
}
} catch (ClassCastException e) {
throw new HoodieException(String.format(
"Cannot compare values: %s(%s), %s(%s)",
oldOrderingValue,
oldOrderingValue.getClass(),
newOrderingValue,
newOrderingValue.getClass()));
}
return newer;
case CUSTOM:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import org.apache.hudi.storage.StorageConfiguration;

import org.apache.avro.Schema;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
Expand All @@ -62,13 +61,11 @@

import static org.apache.hudi.common.model.HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID;
import static org.apache.hudi.common.model.HoodieRecordMerger.OVERWRITE_MERGER_STRATEGY_UUID;
import static org.apache.hudi.common.model.WriteOperationType.BULK_INSERT;
import static org.apache.hudi.common.model.WriteOperationType.INSERT;
import static org.apache.hudi.common.model.WriteOperationType.UPSERT;
import static org.apache.hudi.common.table.HoodieTableConfig.PARTITION_FIELDS;
import static org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGER_STRATEGY;
import static org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_MODE;
import static org.apache.hudi.common.table.read.HoodieBaseFileGroupRecordBuffer.compareTo;
import static org.apache.hudi.common.testutils.HoodieTestUtils.getLogFileListFromFileSlice;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
Expand Down Expand Up @@ -109,71 +106,6 @@ public void validateRecordsInFileGroup(String tablePath,

public abstract Comparable getComparableUTF8String(String value);

@Test
public void testCompareToComparable() throws Exception {
Map<String, String> writeConfigs = new HashMap<>(getCommonConfigs(RecordMergeMode.EVENT_TIME_ORDERING));
// Prepare a table for initializing reader context
try (HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(0xDEEF)) {
commitToTable(dataGen.generateInserts("001", 1), BULK_INSERT.value(), writeConfigs);
}
StorageConfiguration<?> storageConf = getStorageConf();
String tablePath = getBasePath();
HoodieTableMetaClient metaClient = HoodieTestUtils.createMetaClient(storageConf, tablePath);
Schema avroSchema = new TableSchemaResolver(metaClient).getTableAvroSchema();
HoodieReaderContext<T> readerContext = getHoodieReaderContext(tablePath, avroSchema, storageConf);

// Test same type
assertEquals(1, compareTo(readerContext, Boolean.TRUE, Boolean.FALSE));
assertEquals(0, compareTo(readerContext, Boolean.TRUE, Boolean.TRUE));
assertEquals(-1, compareTo(readerContext, Boolean.FALSE, Boolean.TRUE));
assertEquals(1, compareTo(readerContext, 20, 15));
assertEquals(0, compareTo(readerContext, 15, 15));
assertEquals(-1, compareTo(readerContext, 10, 15));
assertEquals(1, compareTo(readerContext, 1.1f, 1.0f));
assertEquals(0, compareTo(readerContext, 1.0f, 1.0f));
assertEquals(-1, compareTo(readerContext, 0.9f, 1.0f));
assertEquals(1, compareTo(readerContext, 1.1, 1.0));
assertEquals(0, compareTo(readerContext, 1.0, 1.0));
assertEquals(-1, compareTo(readerContext, 0.9, 1.0));
assertEquals(1, compareTo(readerContext, 1.1, 1));
assertEquals(-1, compareTo(readerContext, 0.9, 1));
assertEquals(1, compareTo(readerContext, "value2", "value1"));
assertEquals(0, compareTo(readerContext, "value1", "value1"));
assertEquals(-1, compareTo(readerContext, "value1", "value2"));
// Test different types which are comparable
assertEquals(1, compareTo(readerContext, Long.MAX_VALUE / 2L, 10));
assertEquals(1, compareTo(readerContext, 20, 10L));
assertEquals(0, compareTo(readerContext, 10L, 10));
assertEquals(0, compareTo(readerContext, 10, 10L));
assertEquals(-1, compareTo(readerContext, 10, Long.MAX_VALUE));
assertEquals(-1, compareTo(readerContext, 10L, 20));
assertEquals(1, compareTo(readerContext, 10.01f, 10));
assertEquals(1, compareTo(readerContext, 10.01f, 10L));
assertEquals(1, compareTo(readerContext, 10.01, 10));
assertEquals(1, compareTo(readerContext, 10.01, 10L));
assertEquals(1, compareTo(readerContext, 11L, 10.99f));
assertEquals(1, compareTo(readerContext, 11, 10.99));
// Throw exception if comparing Double with Float which have different precision
assertThrows(IllegalArgumentException.class, () -> compareTo(readerContext, 10.01f, 10.0));
assertThrows(IllegalArgumentException.class, () -> compareTo(readerContext, 10.01, 10.0f));
assertEquals(0, compareTo(readerContext, 10.0, 10L));
assertEquals(0, compareTo(readerContext, 10.0f, 10L));
assertEquals(0, compareTo(readerContext, 10.0, 10));
assertEquals(0, compareTo(readerContext, 10.0f, 10));
assertEquals(-1, compareTo(readerContext, 9.99f, 10));
assertEquals(-1, compareTo(readerContext, 9.99f, 10L));
assertEquals(-1, compareTo(readerContext, 9.99, 10));
assertEquals(-1, compareTo(readerContext, 9.99, 10L));
assertEquals(-1, compareTo(readerContext, 10L, 10.01f));
assertEquals(-1, compareTo(readerContext, 10, 10.01));
assertEquals(1, compareTo(readerContext, getComparableUTF8String("value2"), "value1"));
assertEquals(1, compareTo(readerContext, "value2", getComparableUTF8String("value1")));
assertEquals(0, compareTo(readerContext, getComparableUTF8String("value1"), "value1"));
assertEquals(0, compareTo(readerContext, "value1", getComparableUTF8String("value1")));
assertEquals(-1, compareTo(readerContext, getComparableUTF8String("value1"), "value2"));
assertEquals(-1, compareTo(readerContext, "value1", getComparableUTF8String("value2")));
}

private static Stream<Arguments> testArguments() {
return Stream.of(
arguments(RecordMergeMode.OVERWRITE_WITH_LATEST, "avro"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,11 +224,11 @@ select id, name, price, ts, dt from h1_p order by id;

merge into h1_p t0
using (
select 1 as id, '_delete' as name, 10 as price, 1000 as ts, '2021-05-07' as dt
select 1 as id, '_delete' as name, 10 as price, 1000L as ts, '2021-05-07' as dt
union
select 2 as id, '_update' as name, 12 as price, 1001 as ts, '2021-05-07' as dt
select 2 as id, '_update' as name, 12 as price, 1001L as ts, '2021-05-07' as dt
union
select 6 as id, '_insert' as name, 10 as price, 1000 as ts, '2021-05-08' as dt
select 6 as id, '_insert' as name, 10 as price, 1000L as ts, '2021-05-08' as dt
) s0
on s0.id = t0.id
when matched and s0.name = '_update'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import org.apache.hudi.DataSourceWriteOptions.SPARK_SQL_OPTIMIZED_WRITES
import org.apache.hudi.config.HoodieWriteConfig.MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT
import org.apache.hudi.hadoop.fs.HadoopFSUtils
import org.apache.hudi.{DataSourceReadOptions, HoodieDataSourceHelpers, HoodieSparkUtils, ScalaAssertionSupport}

import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
import org.apache.spark.sql.internal.SQLConf

Expand Down Expand Up @@ -1242,4 +1241,62 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase with ScalaAssertionSuppo
})
}
}

test("Test MergeInto For PreCombineField With Different Types") {
spark.sql(s"set ${MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key} = 0")
withRecordType()(withTempDir { tmp =>
spark.sql("set hoodie.payload.combined.schema.validate = true")
Seq("mor").foreach { tableType =>
val tableName1 = generateTableName
spark.sql(
s"""
| create table $tableName1 (
| id int,
| name string,
| price double,
| v long,
| dt string
| ) using hudi
| tblproperties (
| type = '$tableType',
| primaryKey = 'id',
| preCombineField = 'v',
| hoodie.compaction.payload.class = 'org.apache.hudi.common.model.DefaultHoodieRecordPayload'
| )
| partitioned by(dt)
| location '${tmp.getCanonicalPath}/$tableName1'
""".stripMargin)

// Insert data; pre-combine field value type is long.
spark.sql(
s"""
| merge into $tableName1 as t0
| using (
| select 1 as id, 'a1' as name, 10 as price, 1001L as v, '2021-03-21' as dt
| ) as s0
| on t0.id = s0.id
| when not matched and s0.id % 2 = 1 then insert *
""".stripMargin
)
checkAnswer(s"select id,name,price,dt,v from $tableName1")(
Seq(1, "a1", 10, "2021-03-21", 1001)
)

// Insert data; pre-combine field value type is short.
checkExceptionContain(
s"""
| merge into $tableName1 as t0
| using (
| select 1 as id, 'a1' as name, 12 as price, 1001S as v, '2021-03-21' as dt
| ) as s0
| on t0.id = s0.id
| when matched then update set
| id = s0.id, name = s0.name, price = s0.price, v = s0.v, dt = s0.dt
| when not matched then insert *
| """.stripMargin) (
"Merge into Hoodie table command failed"
)
}
})
}
}
Loading