Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add signatures for exactly once sink #151

Merged
merged 3 commits into from
Sep 13, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.slf4j.LoggerFactory;

/** Base class for developing a BigQuery sink. */
abstract class BigQueryBaseSink implements Sink {
abstract class BigQueryBaseSink<IN> implements Sink<IN> {

protected final Logger logger = LoggerFactory.getLogger(getClass());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@
/**
* Sink to write data into a BigQuery table using {@link BigQueryDefaultWriter}.
*
* <p>Depending on the checkpointing mode, this sink will offer at-least-once consistency guarantee.
* <p>Depending on the checkpointing mode, this sink offers following consistency guarantees:
* <li>{@link CheckpointingMode#EXACTLY_ONCE}: at-least-once write consistency.
* <li>{@link CheckpointingMode#AT_LEAST_ONCE}: at-least-once write consistency.
* <li>Checkpointing disabled: no consistency guarantee.
* <li>Checkpointing disabled (NOT RECOMMENDED!): no consistency guarantee.
*/
class BigQueryDefaultSink extends BigQueryBaseSink {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright (C) 2024 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package com.google.cloud.flink.bigquery.sink;

import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.core.io.SimpleVersionedSerializer;

import com.google.cloud.flink.bigquery.sink.committer.BigQueryCommittable;
import com.google.cloud.flink.bigquery.sink.writer.BigQueryWriterState;

import java.io.IOException;
import java.util.Collection;

/**
* Sink to write data into a BigQuery table using {@link BigQueryBufferedWriter}.
*
* <p>Depending on the checkpointing mode, this writer offers following consistency guarantees:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we can just use this writer with checkpointing in ALO mode, does this mean that the BigQueryDefaultSink is redundant now?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great question, and the answer is no. Fundamental critical difference is the write streams they use. ALO's writer uses default stream (data instantly written to BQ table), while this writer first buffers data and then requires an explicit commit (BQ's FlushRows API) so that data is written to the table. Hence, our ALO solution ensures data is instantly available in BQ at the cost of duplicates, while exactly once solution ensures data consistency but at the cost of availability (data is written to BQ only at Flink application's checkpoints).

* <li>{@link CheckpointingMode#EXACTLY_ONCE}: exactly-once write consistency.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(here and below) should this be CheckpointingMode#EXACTLY_ONCE or DeliveryGuarantee#EXACTLY_ONCE?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CheckpointingMode

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DeliveryGuarantee controls which BQ sink is used, ALO or exactly once. CheckpointingMode controls Flink's internal consistency handling. There is semantic overlap between the two, but the user needs to set both at separate stages. So, we have documented the outcome for different combinations.

* <li>{@link CheckpointingMode#AT_LEAST_ONCE}: at-least-once write consistency.
* <li>Checkpointing disabled: no consistency guarantee.
*
* @param <IN> Type of records written to BigQuery
*/
public class BigQueryExactlyOnceSink<IN> extends BigQueryBaseSink<IN>
implements TwoPhaseCommittingStatefulSink<IN, BigQueryWriterState, BigQueryCommittable> {

BigQueryExactlyOnceSink(BigQuerySinkConfig sinkConfig) {
super(sinkConfig);
}

@Override
public PrecommittingStatefulSinkWriter<IN, BigQueryWriterState, BigQueryCommittable>
createWriter(InitContext context) throws IOException {
throw new UnsupportedOperationException("createWriter not implemented");
}

@Override
public PrecommittingStatefulSinkWriter<IN, BigQueryWriterState, BigQueryCommittable>
restoreWriter(InitContext context, Collection<BigQueryWriterState> recoveredState)
throws IOException {
throw new UnsupportedOperationException("restoreWriter not implemented");
}

@Override
public Committer<BigQueryCommittable> createCommitter() throws IOException {
throw new UnsupportedOperationException("createCommitter not implemented");
}

@Override
public SimpleVersionedSerializer<BigQueryCommittable> getCommittableSerializer() {
throw new UnsupportedOperationException("getCommittableSerializer not implemented");
}

@Override
public SimpleVersionedSerializer<BigQueryWriterState> getWriterStateSerializer() {
throw new UnsupportedOperationException("getWriterStateSerializer not implemented");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,13 @@
* <p>With {@link DeliveryGuarantee#AT_LEAST_ONCE}, the Sink added to Flink job will be {@link
* BigQueryDefaultSink}.
*
* <p>Eventual data consistency at destination is also dependent on checkpointing mode. With {@link
* CheckpointingMode#AT_LEAST_ONCE} or {@link CheckpointingMode#EXACTLY_ONCE}, the {@link
* BigQueryDefaultSink} will offer at-least-once consistency. We recommend enabling checkpointing to
* avoid any unexpected behavior.
* <p>With {@link DeliveryGuarantee#EXACTLY_ONCE}, the Sink added to Flink job will be {@link
* BigQueryExactlyOnceSink}.
*
* <p>Support for exactly-once consistency in BigQuerySink will be offered soon!
* <p>Eventual data consistency at destination is also dependent on checkpointing mode. Look at
* {@link BigQueryDefaultSink} and {@link BigQueryExactlyOnceSink} for write consistencies offered
* across combinations of {@link CheckpointingMode} and sink's {@link DeliveryGuarantee}. It is
* recommended that checkpointing is enabled to avoid unexpected behavior.
*/
public class BigQuerySink {

Expand All @@ -44,8 +45,11 @@ public static Sink get(BigQuerySinkConfig sinkConfig, StreamExecutionEnvironment
if (sinkConfig.getDeliveryGuarantee() == DeliveryGuarantee.AT_LEAST_ONCE) {
return new BigQueryDefaultSink(sinkConfig);
}
if (sinkConfig.getDeliveryGuarantee() == DeliveryGuarantee.EXACTLY_ONCE) {
return new BigQueryExactlyOnceSink(sinkConfig);
}
LOG.error(
"Only at-least-once write consistency is supported in BigQuery sink. Found {}",
"BigQuery sink does not support {} delivery guarantee. Use AT_LEAST_ONCE or EXACTLY_ONCE.",
sinkConfig.getDeliveryGuarantee());
throw new UnsupportedOperationException(
String.format("%s is not supported", sinkConfig.getDeliveryGuarantee()));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright (C) 2024 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package com.google.cloud.flink.bigquery.sink;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.StatefulSink;
import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;

import java.io.IOException;
import java.util.Collection;

/**
* A combination of {@link TwoPhaseCommittingSink} and {@link StatefulSink}.
*
* <p>Interface for a sink that supports TPC protocol and statefulness.
*
* @param <IN> Type of the sink's input.
* @param <WriterStateT> Type of the sink writer's state.
* @param <CommittableT> Type of the committables.
*/
@Internal
public interface TwoPhaseCommittingStatefulSink<IN, WriterStateT, CommittableT>
extends TwoPhaseCommittingSink<IN, CommittableT>, StatefulSink<IN, WriterStateT> {

@Override
PrecommittingStatefulSinkWriter<IN, WriterStateT, CommittableT> createWriter(
Sink.InitContext context) throws IOException;

@Override
PrecommittingStatefulSinkWriter<IN, WriterStateT, CommittableT> restoreWriter(
Sink.InitContext context, Collection<WriterStateT> recoveredState) throws IOException;

/**
* A combination of {@link PrecommittingSinkWriter} and {@link StatefulSinkWriter}.
*
* <p>Interface for a writer that supports TPC protocol and statefulness.
*
* @param <IN> Type of the sink's input.
* @param <WriterStateT> Type of the sink writer's state.
* @param <CommittableT> Type of the committables.
*/
interface PrecommittingStatefulSinkWriter<IN, WriterStateT, CommittableT>
extends TwoPhaseCommittingSink.PrecommittingSinkWriter<IN, CommittableT>,
StatefulSink.StatefulSinkWriter<IN, WriterStateT> {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright (C) 2024 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package com.google.cloud.flink.bigquery.sink.committer;

import com.google.cloud.flink.bigquery.sink.state.BigQueryStreamState;

/**
* Information required for a commit operation, passed from {@link BigQueryBufferedWriter} to {@link
* BigQueryCommitter}.
*/
public class BigQueryCommittable extends BigQueryStreamState {

private final long producerId;

public BigQueryCommittable(long producerId, String streamName, long streamOffset) {
super(streamName, streamOffset);
this.producerId = producerId;
}

public long getProducerId() {
return producerId;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright (C) 2024 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package com.google.cloud.flink.bigquery.sink.committer;

import org.apache.flink.core.io.SimpleVersionedSerializer;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;

/** Serializer and deserializer for {@link BigQueryCommittable}. */
public class BigQueryCommittableSerializer
implements SimpleVersionedSerializer<BigQueryCommittable> {

@Override
public int getVersion() {
return 1;
}

@Override
public byte[] serialize(BigQueryCommittable committable) throws IOException {
try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final DataOutputStream out = new DataOutputStream(baos)) {
out.writeLong(committable.getProducerId());
out.writeUTF(committable.getStreamName());
out.writeLong(committable.getStreamOffset());
out.flush();
return baos.toByteArray();
}
}

@Override
public BigQueryCommittable deserialize(int version, byte[] serialized) throws IOException {
try (final ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
final DataInputStream in = new DataInputStream(bais)) {
final Long producerId = in.readLong();
final String streamName = in.readUTF();
final long streamOffset = in.readLong();
BigQueryCommittable committable =
new BigQueryCommittable(producerId, streamName, streamOffset);
return committable;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright (C) 2024 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package com.google.cloud.flink.bigquery.sink.committer;

import org.apache.flink.api.connector.sink2.Committer;

import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;

/**
* Committer implementation for {@link BigQueryExactlyOnceSink}.
*
* <p>The committer is responsible for committing records buffered in BigQuery write stream to
* BigQuery table.
*/
public class BigQueryCommitter implements Committer<BigQueryCommittable>, Closeable {

@Override
public void commit(Collection<CommitRequest<BigQueryCommittable>> commitRequests)
throws IOException, InterruptedException {}

@Override
public void close() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright (C) 2024 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package com.google.cloud.flink.bigquery.sink.state;

/** State representation of a BigQuery write stream. */
public abstract class BigQueryStreamState {

protected final String streamName;
protected final long streamOffset;

public BigQueryStreamState(String streamName, long streamOffset) {
this.streamName = streamName;
this.streamOffset = streamOffset;
}

public String getStreamName() {
return streamName;
}

public long getStreamOffset() {
return streamOffset;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright 2024 The Apache Software Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.flink.bigquery.sink.throttle;

/** Limits the rate at which an operation can be performed. */

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we document why this is needed? i assume it's because the storage write api doesnt have built in throttling?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

public interface Throttler {

/** Limits the rate by waiting if necessary. */
void throttle();
}
Loading
Loading