Skip to content

Commit

Permalink
Add signatures for exactly once sink (#151)
Browse files Browse the repository at this point in the history
Interfaces and classes that define the structure of exactly-once sink
feature.
  • Loading branch information
jayehwhyehentee committed Sep 13, 2024
1 parent 730acd9 commit 83226c5
Show file tree
Hide file tree
Showing 17 changed files with 617 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.slf4j.LoggerFactory;

/** Base class for developing a BigQuery sink. */
abstract class BigQueryBaseSink implements Sink {
abstract class BigQueryBaseSink<IN> implements Sink<IN> {

protected final Logger logger = LoggerFactory.getLogger(getClass());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@
/**
* Sink to write data into a BigQuery table using {@link BigQueryDefaultWriter}.
*
* <p>Depending on the checkpointing mode, this sink will offer at-least-once consistency guarantee.
* <p>Depending on the checkpointing mode, this sink offers the following consistency guarantees:
* <li>{@link CheckpointingMode#EXACTLY_ONCE}: at-least-once write consistency.
* <li>{@link CheckpointingMode#AT_LEAST_ONCE}: at-least-once write consistency.
* <li>Checkpointing disabled: no consistency guarantee.
* <li>Checkpointing disabled (NOT RECOMMENDED!): no consistency guarantee.
*/
class BigQueryDefaultSink extends BigQueryBaseSink {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright (C) 2024 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package com.google.cloud.flink.bigquery.sink;

import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.core.io.SimpleVersionedSerializer;

import com.google.cloud.flink.bigquery.sink.committer.BigQueryCommittable;
import com.google.cloud.flink.bigquery.sink.writer.BigQueryWriterState;

import java.io.IOException;
import java.util.Collection;

/**
* Sink to write data into a BigQuery table using {@link BigQueryBufferedWriter}.
*
* <p>Depending on the checkpointing mode, this writer offers the following consistency guarantees:
* <li>{@link CheckpointingMode#EXACTLY_ONCE}: exactly-once write consistency.
* <li>{@link CheckpointingMode#AT_LEAST_ONCE}: at-least-once write consistency.
* <li>Checkpointing disabled: no consistency guarantee.
*
* @param <IN> Type of records written to BigQuery
*/
public class BigQueryExactlyOnceSink<IN> extends BigQueryBaseSink<IN>
implements TwoPhaseCommittingStatefulSink<IN, BigQueryWriterState, BigQueryCommittable> {

BigQueryExactlyOnceSink(BigQuerySinkConfig sinkConfig) {
super(sinkConfig);
}

@Override
public PrecommittingStatefulSinkWriter<IN, BigQueryWriterState, BigQueryCommittable>
createWriter(InitContext context) throws IOException {
throw new UnsupportedOperationException("createWriter not implemented");
}

@Override
public PrecommittingStatefulSinkWriter<IN, BigQueryWriterState, BigQueryCommittable>
restoreWriter(InitContext context, Collection<BigQueryWriterState> recoveredState)
throws IOException {
throw new UnsupportedOperationException("restoreWriter not implemented");
}

@Override
public Committer<BigQueryCommittable> createCommitter() throws IOException {
throw new UnsupportedOperationException("createCommitter not implemented");
}

@Override
public SimpleVersionedSerializer<BigQueryCommittable> getCommittableSerializer() {
throw new UnsupportedOperationException("getCommittableSerializer not implemented");
}

@Override
public SimpleVersionedSerializer<BigQueryWriterState> getWriterStateSerializer() {
throw new UnsupportedOperationException("getWriterStateSerializer not implemented");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,13 @@
* <p>With {@link DeliveryGuarantee#AT_LEAST_ONCE}, the Sink added to Flink job will be {@link
* BigQueryDefaultSink}.
*
* <p>Eventual data consistency at destination is also dependent on checkpointing mode. With {@link
* CheckpointingMode#AT_LEAST_ONCE} or {@link CheckpointingMode#EXACTLY_ONCE}, the {@link
* BigQueryDefaultSink} will offer at-least-once consistency. We recommend enabling checkpointing to
* avoid any unexpected behavior.
* <p>With {@link DeliveryGuarantee#EXACTLY_ONCE}, the Sink added to Flink job will be {@link
* BigQueryExactlyOnceSink}.
*
* <p>Support for exactly-once consistency in BigQuerySink will be offered soon!
* <p>Eventual data consistency at destination is also dependent on checkpointing mode. Look at
* {@link BigQueryDefaultSink} and {@link BigQueryExactlyOnceSink} for write consistencies offered
* across combinations of {@link CheckpointingMode} and sink's {@link DeliveryGuarantee}. It is
* recommended that checkpointing is enabled to avoid unexpected behavior.
*/
public class BigQuerySink {

Expand All @@ -44,8 +45,11 @@ public static Sink get(BigQuerySinkConfig sinkConfig, StreamExecutionEnvironment
if (sinkConfig.getDeliveryGuarantee() == DeliveryGuarantee.AT_LEAST_ONCE) {
return new BigQueryDefaultSink(sinkConfig);
}
if (sinkConfig.getDeliveryGuarantee() == DeliveryGuarantee.EXACTLY_ONCE) {
return new BigQueryExactlyOnceSink(sinkConfig);
}
LOG.error(
"Only at-least-once write consistency is supported in BigQuery sink. Found {}",
"BigQuery sink does not support {} delivery guarantee. Use AT_LEAST_ONCE or EXACTLY_ONCE.",
sinkConfig.getDeliveryGuarantee());
throw new UnsupportedOperationException(
String.format("%s is not supported", sinkConfig.getDeliveryGuarantee()));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright (C) 2024 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package com.google.cloud.flink.bigquery.sink;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.StatefulSink;
import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;

import java.io.IOException;
import java.util.Collection;

/**
* A combination of {@link TwoPhaseCommittingSink} and {@link StatefulSink}.
*
* <p>Interface for a sink that supports TPC protocol and statefulness.
*
* @param <IN> Type of the sink's input.
* @param <WriterStateT> Type of the sink writer's state.
* @param <CommittableT> Type of the committables.
*/
@Internal
public interface TwoPhaseCommittingStatefulSink<IN, WriterStateT, CommittableT>
extends TwoPhaseCommittingSink<IN, CommittableT>, StatefulSink<IN, WriterStateT> {

@Override
PrecommittingStatefulSinkWriter<IN, WriterStateT, CommittableT> createWriter(
Sink.InitContext context) throws IOException;

@Override
PrecommittingStatefulSinkWriter<IN, WriterStateT, CommittableT> restoreWriter(
Sink.InitContext context, Collection<WriterStateT> recoveredState) throws IOException;

/**
* A combination of {@link PrecommittingSinkWriter} and {@link StatefulSinkWriter}.
*
* <p>Interface for a writer that supports TPC protocol and statefulness.
*
* @param <IN> Type of the sink's input.
* @param <WriterStateT> Type of the sink writer's state.
* @param <CommittableT> Type of the committables.
*/
interface PrecommittingStatefulSinkWriter<IN, WriterStateT, CommittableT>
extends TwoPhaseCommittingSink.PrecommittingSinkWriter<IN, CommittableT>,
StatefulSink.StatefulSinkWriter<IN, WriterStateT> {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright (C) 2024 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package com.google.cloud.flink.bigquery.sink.committer;

import com.google.cloud.flink.bigquery.sink.state.BigQueryStreamState;

/**
* Information required for a commit operation, passed from {@link BigQueryBufferedWriter} to {@link
* BigQueryCommitter}.
*/
public class BigQueryCommittable extends BigQueryStreamState {

private final long producerId;

public BigQueryCommittable(long producerId, String streamName, long streamOffset) {
super(streamName, streamOffset);
this.producerId = producerId;
}

public long getProducerId() {
return producerId;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright (C) 2024 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package com.google.cloud.flink.bigquery.sink.committer;

import org.apache.flink.core.io.SimpleVersionedSerializer;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;

/** Serializer and deserializer for {@link BigQueryCommittable}. */
public class BigQueryCommittableSerializer
implements SimpleVersionedSerializer<BigQueryCommittable> {

@Override
public int getVersion() {
return 1;
}

@Override
public byte[] serialize(BigQueryCommittable committable) throws IOException {
try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final DataOutputStream out = new DataOutputStream(baos)) {
out.writeLong(committable.getProducerId());
out.writeUTF(committable.getStreamName());
out.writeLong(committable.getStreamOffset());
out.flush();
return baos.toByteArray();
}
}

@Override
public BigQueryCommittable deserialize(int version, byte[] serialized) throws IOException {
try (final ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
final DataInputStream in = new DataInputStream(bais)) {
final Long producerId = in.readLong();
final String streamName = in.readUTF();
final long streamOffset = in.readLong();
BigQueryCommittable committable =
new BigQueryCommittable(producerId, streamName, streamOffset);
return committable;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright (C) 2024 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package com.google.cloud.flink.bigquery.sink.committer;

import org.apache.flink.api.connector.sink2.Committer;

import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;

/**
* Committer implementation for {@link BigQueryExactlyOnceSink}.
*
* <p>The committer is responsible for committing records buffered in BigQuery write stream to
* BigQuery table.
*/
public class BigQueryCommitter implements Committer<BigQueryCommittable>, Closeable {

@Override
public void commit(Collection<CommitRequest<BigQueryCommittable>> commitRequests)
throws IOException, InterruptedException {}

@Override
public void close() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright (C) 2024 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package com.google.cloud.flink.bigquery.sink.state;

/** State representation of a BigQuery write stream. */
public abstract class BigQueryStreamState {

protected final String streamName;
protected final long streamOffset;

public BigQueryStreamState(String streamName, long streamOffset) {
this.streamName = streamName;
this.streamOffset = streamOffset;
}

public String getStreamName() {
return streamName;
}

public long getStreamOffset() {
return streamOffset;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright 2024 The Apache Software Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.flink.bigquery.sink.throttle;

/** Limits the rate at which an operation can be performed. */
public interface Throttler {

/** Limits the rate by waiting if necessary. */
void throttle();
}
Loading

0 comments on commit 83226c5

Please sign in to comment.