Skip to content

Commit

Permalink
problem seems to be with the old read path, not with fg reader
Browse files Browse the repository at this point in the history
  • Loading branch information
Jonathan Vexler committed Sep 20, 2024
1 parent 5ace882 commit 080f024
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,6 @@ default Comparable<?> getOrderingValue() {
return 0;
}

static String getAvroPayloadForMergeMode(String mergeMode) {
return getAvroPayloadForMergeMode(RecordMergeMode.valueOf(mergeMode));
}

static String getAvroPayloadForMergeMode(RecordMergeMode mergeMode) {
switch (mergeMode) {
case EVENT_TIME_ORDERING:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ public static class Config implements Serializable {
@Parameter(names = {"--payload-class"}, description = "Deprecated. Use --merge-mode for overwite or event time merging."
+ " Subclass of HoodieRecordPayload, that works off a GenericRecord. Implement your own, if you want to do something "
+ "other than overwriting existing value")
public String payloadClassName = OverwriteWithLatestAvroPayload.class.getName();
public String payloadClassName = null;

@Parameter(names = {"--merge-mode", "--record-merge-mode"}, description = "mode to merge records with")
public RecordMergeMode recordMergeMode = RecordMergeMode.OVERWRITE_WITH_LATEST;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Either;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.CloseableMappingIterator;
import org.apache.hudi.exception.HoodieException;
Expand Down Expand Up @@ -82,6 +83,7 @@ public static Option<JavaRDD<HoodieRecord>> createHoodieRecords(HoodieStreamer.C
boolean useConsistentLogicalTimestamp = ConfigUtils.getBooleanWithAltKeys(
props, KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED);
Set<String> partitionColumns = getPartitionColumns(props);
String payloadClassName = StringUtils.isNullOrEmpty(cfg.payloadClassName) ? HoodieRecordPayload.getAvroPayloadForMergeMode(cfg.recordMergeMode) : cfg.payloadClassName;
return avroRDDOptional.map(avroRDD -> {
SerializableSchema avroSchema = new SerializableSchema(schemaProvider.getTargetSchema());
SerializableSchema processedAvroSchema = new SerializableSchema(isDropPartitionColumns(props) ? HoodieAvroUtils.removeMetadataFields(avroSchema.get()) : avroSchema.get());
Expand All @@ -98,9 +100,9 @@ public static Option<JavaRDD<HoodieRecord>> createHoodieRecords(HoodieStreamer.C
try {
HoodieKey hoodieKey = new HoodieKey(builtinKeyGenerator.getRecordKey(genRec), builtinKeyGenerator.getPartitionPath(genRec));
GenericRecord gr = isDropPartitionColumns(props) ? HoodieAvroUtils.removeFields(genRec, partitionColumns) : genRec;
HoodieRecordPayload payload = shouldCombine ? DataSourceUtils.createPayload(cfg.payloadClassName, gr,
HoodieRecordPayload payload = shouldCombine ? DataSourceUtils.createPayload(payloadClassName, gr,
(Comparable) HoodieAvroUtils.getNestedFieldVal(gr, cfg.sourceOrderingField, false, useConsistentLogicalTimestamp))
: DataSourceUtils.createPayload(cfg.payloadClassName, gr);
: DataSourceUtils.createPayload(payloadClassName, gr);
return Either.left(new HoodieAvroRecord<>(hoodieKey, payload));
} catch (Exception e) {
return generateErrorRecordOrThrowException(genRec, e, shouldErrorTable);
Expand Down

0 comments on commit 080f024

Please sign in to comment.