-
Notifications
You must be signed in to change notification settings - Fork 11
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add signatures for exactly once sink #151
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,72 @@ | ||
/* | ||
* Copyright (C) 2024 Google Inc. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not | ||
* use this file except in compliance with the License. You may obtain a copy of | ||
* the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | ||
* License for the specific language governing permissions and limitations under | ||
* the License. | ||
*/ | ||
|
||
package com.google.cloud.flink.bigquery.sink; | ||
|
||
import org.apache.flink.api.connector.sink2.Committer; | ||
import org.apache.flink.core.io.SimpleVersionedSerializer; | ||
|
||
import com.google.cloud.flink.bigquery.sink.committer.BigQueryCommittable; | ||
import com.google.cloud.flink.bigquery.sink.writer.BigQueryWriterState; | ||
|
||
import java.io.IOException; | ||
import java.util.Collection; | ||
|
||
/** | ||
* Sink to write data into a BigQuery table using {@link BigQueryBufferedWriter}. | ||
* | ||
* <p>Depending on the checkpointing mode, this writer offers following consistency guarantees: | ||
* <li>{@link CheckpointingMode#EXACTLY_ONCE}: exactly-once write consistency. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (here and below) should this be CheckpointingMode#EXACTLY_ONCE or DeliveryGuarantee#EXACTLY_ONCE? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. CheckpointingMode There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
* <li>{@link CheckpointingMode#AT_LEAST_ONCE}: at-least-once write consistency. | ||
* <li>Checkpointing disabled: no consistency guarantee. | ||
* | ||
* @param <IN> Type of records written to BigQuery | ||
*/ | ||
public class BigQueryExactlyOnceSink<IN> extends BigQueryBaseSink<IN> | ||
implements TwoPhaseCommittingStatefulSink<IN, BigQueryWriterState, BigQueryCommittable> { | ||
|
||
BigQueryExactlyOnceSink(BigQuerySinkConfig sinkConfig) { | ||
super(sinkConfig); | ||
} | ||
|
||
@Override | ||
public PrecommittingStatefulSinkWriter<IN, BigQueryWriterState, BigQueryCommittable> | ||
createWriter(InitContext context) throws IOException { | ||
throw new UnsupportedOperationException("createWriter not implemented"); | ||
} | ||
|
||
@Override | ||
public PrecommittingStatefulSinkWriter<IN, BigQueryWriterState, BigQueryCommittable> | ||
restoreWriter(InitContext context, Collection<BigQueryWriterState> recoveredState) | ||
throws IOException { | ||
throw new UnsupportedOperationException("restoreWriter not implemented"); | ||
} | ||
|
||
@Override | ||
public Committer<BigQueryCommittable> createCommitter() throws IOException { | ||
throw new UnsupportedOperationException("createCommitter not implemented"); | ||
} | ||
|
||
@Override | ||
public SimpleVersionedSerializer<BigQueryCommittable> getCommittableSerializer() { | ||
throw new UnsupportedOperationException("getCommittableSerializer not implemented"); | ||
} | ||
|
||
@Override | ||
public SimpleVersionedSerializer<BigQueryWriterState> getWriterStateSerializer() { | ||
throw new UnsupportedOperationException("getWriterStateSerializer not implemented"); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,60 @@ | ||
/* | ||
* Copyright (C) 2024 Google Inc. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not | ||
* use this file except in compliance with the License. You may obtain a copy of | ||
* the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | ||
* License for the specific language governing permissions and limitations under | ||
* the License. | ||
*/ | ||
|
||
package com.google.cloud.flink.bigquery.sink; | ||
|
||
import org.apache.flink.annotation.Internal; | ||
import org.apache.flink.api.connector.sink2.Sink; | ||
import org.apache.flink.api.connector.sink2.StatefulSink; | ||
import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink; | ||
|
||
import java.io.IOException; | ||
import java.util.Collection; | ||
|
||
/** | ||
* A combination of {@link TwoPhaseCommittingSink} and {@link StatefulSink}. | ||
* | ||
* <p>Interface for a sink that supports TPC protocol and statefulness. | ||
* | ||
* @param <IN> Type of the sink's input. | ||
* @param <WriterStateT> Type of the sink writer's state. | ||
* @param <CommittableT> Type of the committables. | ||
*/ | ||
@Internal | ||
public interface TwoPhaseCommittingStatefulSink<IN, WriterStateT, CommittableT> | ||
extends TwoPhaseCommittingSink<IN, CommittableT>, StatefulSink<IN, WriterStateT> { | ||
|
||
@Override | ||
PrecommittingStatefulSinkWriter<IN, WriterStateT, CommittableT> createWriter( | ||
Sink.InitContext context) throws IOException; | ||
|
||
@Override | ||
PrecommittingStatefulSinkWriter<IN, WriterStateT, CommittableT> restoreWriter( | ||
Sink.InitContext context, Collection<WriterStateT> recoveredState) throws IOException; | ||
|
||
/** | ||
* A combination of {@link PrecommittingSinkWriter} and {@link StatefulSinkWriter}. | ||
* | ||
* <p>Interface for a writer that supports TPC protocol and statefulness. | ||
* | ||
* @param <IN> Type of the sink's input. | ||
* @param <WriterStateT> Type of the sink writer's state. | ||
* @param <CommittableT> Type of the committables. | ||
*/ | ||
interface PrecommittingStatefulSinkWriter<IN, WriterStateT, CommittableT> | ||
extends TwoPhaseCommittingSink.PrecommittingSinkWriter<IN, CommittableT>, | ||
StatefulSink.StatefulSinkWriter<IN, WriterStateT> {} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
/* | ||
* Copyright (C) 2024 Google Inc. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not | ||
* use this file except in compliance with the License. You may obtain a copy of | ||
* the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | ||
* License for the specific language governing permissions and limitations under | ||
* the License. | ||
*/ | ||
|
||
package com.google.cloud.flink.bigquery.sink.committer; | ||
|
||
import com.google.cloud.flink.bigquery.sink.state.BigQueryStreamState; | ||
|
||
/** | ||
* Information required for a commit operation, passed from {@link BigQueryBufferedWriter} to {@link | ||
* BigQueryCommitter}. | ||
*/ | ||
public class BigQueryCommittable extends BigQueryStreamState { | ||
|
||
private final long producerId; | ||
|
||
public BigQueryCommittable(long producerId, String streamName, long streamOffset) { | ||
super(streamName, streamOffset); | ||
this.producerId = producerId; | ||
} | ||
|
||
public long getProducerId() { | ||
return producerId; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,60 @@ | ||
/* | ||
* Copyright (C) 2024 Google Inc. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not | ||
* use this file except in compliance with the License. You may obtain a copy of | ||
* the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | ||
* License for the specific language governing permissions and limitations under | ||
* the License. | ||
*/ | ||
|
||
package com.google.cloud.flink.bigquery.sink.committer; | ||
|
||
import org.apache.flink.core.io.SimpleVersionedSerializer; | ||
|
||
import java.io.ByteArrayInputStream; | ||
import java.io.ByteArrayOutputStream; | ||
import java.io.DataInputStream; | ||
import java.io.DataOutputStream; | ||
import java.io.IOException; | ||
|
||
/** Serializer and deserializer for {@link BigQueryCommittable}. */ | ||
public class BigQueryCommittableSerializer | ||
implements SimpleVersionedSerializer<BigQueryCommittable> { | ||
|
||
@Override | ||
public int getVersion() { | ||
return 1; | ||
} | ||
|
||
@Override | ||
public byte[] serialize(BigQueryCommittable committable) throws IOException { | ||
try (final ByteArrayOutputStream baos = new ByteArrayOutputStream(); | ||
final DataOutputStream out = new DataOutputStream(baos)) { | ||
out.writeLong(committable.getProducerId()); | ||
out.writeUTF(committable.getStreamName()); | ||
out.writeLong(committable.getStreamOffset()); | ||
out.flush(); | ||
return baos.toByteArray(); | ||
} | ||
} | ||
|
||
@Override | ||
public BigQueryCommittable deserialize(int version, byte[] serialized) throws IOException { | ||
try (final ByteArrayInputStream bais = new ByteArrayInputStream(serialized); | ||
final DataInputStream in = new DataInputStream(bais)) { | ||
final Long producerId = in.readLong(); | ||
final String streamName = in.readUTF(); | ||
final long streamOffset = in.readLong(); | ||
BigQueryCommittable committable = | ||
new BigQueryCommittable(producerId, streamName, streamOffset); | ||
return committable; | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
/* | ||
* Copyright (C) 2024 Google Inc. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not | ||
* use this file except in compliance with the License. You may obtain a copy of | ||
* the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | ||
* License for the specific language governing permissions and limitations under | ||
* the License. | ||
*/ | ||
|
||
package com.google.cloud.flink.bigquery.sink.committer; | ||
|
||
import org.apache.flink.api.connector.sink2.Committer; | ||
|
||
import java.io.Closeable; | ||
import java.io.IOException; | ||
import java.util.Collection; | ||
|
||
/** | ||
* Committer implementation for {@link BigQueryExactlyOnceSink}. | ||
* | ||
* <p>The committer is responsible for committing records buffered in BigQuery write stream to | ||
* BigQuery table. | ||
*/ | ||
public class BigQueryCommitter implements Committer<BigQueryCommittable>, Closeable { | ||
|
||
@Override | ||
public void commit(Collection<CommitRequest<BigQueryCommittable>> commitRequests) | ||
throws IOException, InterruptedException {} | ||
|
||
@Override | ||
public void close() {} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
/* | ||
* Copyright (C) 2024 Google Inc. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not | ||
* use this file except in compliance with the License. You may obtain a copy of | ||
* the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | ||
* License for the specific language governing permissions and limitations under | ||
* the License. | ||
*/ | ||
|
||
package com.google.cloud.flink.bigquery.sink.state; | ||
|
||
/** State representation of a BigQuery write stream. */ | ||
public abstract class BigQueryStreamState { | ||
|
||
protected final String streamName; | ||
protected final long streamOffset; | ||
|
||
public BigQueryStreamState(String streamName, long streamOffset) { | ||
this.streamName = streamName; | ||
this.streamOffset = streamOffset; | ||
} | ||
|
||
public String getStreamName() { | ||
return streamName; | ||
} | ||
|
||
public long getStreamOffset() { | ||
return streamOffset; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
/* | ||
* Copyright 2024 The Apache Software Foundation. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package com.google.cloud.flink.bigquery.sink.throttle; | ||
|
||
/** Limits the rate at which an operation can be performed. */ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we document why this is needed? i assume it's because the storage write api doesnt have built in throttling? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This will be documented in a future PR: https://github.com/jayehwhyehentee/flink-bigquery-connector/blob/exo_writer/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/throttle/WriteStreamCreationThrottler.java#L25-L38. Also, we will add this in the root README |
||
public interface Throttler { | ||
|
||
/** Limits the rate by waiting if necessary. */ | ||
void throttle(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we can just use this writer with checkpointing in ALO mode, does this mean that the BigQueryDefaultSink is redundant now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great question, and the answer is no. Fundamental critical difference is the write streams they use. ALO's writer uses default stream (data instantly written to BQ table), while this writer first buffers data and then requires an explicit commit (BQ's FlushRows API) so that data is written to the table. Hence, our ALO solution ensures data is instantly available in BQ at the cost of duplicates, while exactly once solution ensures data consistency but at the cost of availability (data is written to BQ only at Flink application's checkpoints).