Skip to content

Commit

Permalink
[CherryPick][#23062] Fix Java Examples_Flink in 2.42.0 release. (#23605)
Browse files Browse the repository at this point in the history
  • Loading branch information
ahmedabu98 committed Oct 12, 2022
1 parent 926fb53 commit c7f2cab
Showing 1 changed file with 5 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,8 @@ class BatchLoads<DestinationT, ElementT>
private final Coder<ElementT> elementCoder;
private final RowWriterFactory<ElementT, DestinationT> rowWriterFactory;
private final @Nullable String kmsKey;
private final boolean clusteringEnabled;
private final String tempDataset;
private Coder<TableDestination> tableDestinationCoder;

// The maximum number of times to retry failed load or copy jobs.
private int maxRetryJobs = DEFAULT_MAX_RETRY_JOBS;
Expand Down Expand Up @@ -186,9 +186,10 @@ class BatchLoads<DestinationT, ElementT>
this.elementCoder = elementCoder;
this.kmsKey = kmsKey;
this.rowWriterFactory = rowWriterFactory;
this.clusteringEnabled = clusteringEnabled;
schemaUpdateOptions = Collections.emptySet();
this.tempDataset = tempDataset;
this.tableDestinationCoder =
clusteringEnabled ? TableDestinationCoderV3.of() : TableDestinationCoderV2.of();
}

void setSchemaUpdateOptions(Set<SchemaUpdateOption> schemaUpdateOptions) {
Expand Down Expand Up @@ -493,7 +494,8 @@ public WriteResult expandUntriggered(PCollection<KV<DestinationT, ElementT>> inp
maxRetryJobs,
kmsKey,
loadJobProjectId))
.withSideInputs(copyJobIdPrefixView));
.withSideInputs(copyJobIdPrefixView))
.setCoder(tableDestinationCoder);

PCollectionList<TableDestination> allSuccessfulWrites =
PCollectionList.of(successfulSinglePartitionWrites).and(successfulMultiPartitionWrites);
Expand Down Expand Up @@ -755,9 +757,6 @@ PCollection<TableDestination> writeSinglePartition(
List<PCollectionView<?>> sideInputs = Lists.newArrayList(loadJobIdPrefixView);
sideInputs.addAll(dynamicDestinations.getSideInputs());

Coder<TableDestination> tableDestinationCoder =
clusteringEnabled ? TableDestinationCoderV3.of() : TableDestinationCoderV2.of();

Coder<KV<ShardedKey<DestinationT>, WritePartition.Result>> partitionsCoder =
KvCoder.of(
ShardedKeyCoder.of(NullableCoder.of(destinationCoder)),
Expand Down

0 comments on commit c7f2cab

Please sign in to comment.