Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Buffered writer for exactly once sink #156

Merged
merged 10 commits into from
Sep 20, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,6 @@ class BigQueryDefaultSink extends BigQueryBaseSink {
public SinkWriter createWriter(InitContext context) {
checkParallelism(context.getNumberOfParallelSubtasks());
return new BigQueryDefaultWriter(
context.getSubtaskId(), connectOptions, schemaProvider, serializer, tablePath);
context.getSubtaskId(), tablePath, connectOptions, schemaProvider, serializer);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
* <p>Depending on the checkpointing mode, this writer offers the following consistency guarantees:
* <li>{@link CheckpointingMode#EXACTLY_ONCE}: exactly-once write consistency.
* <li>{@link CheckpointingMode#AT_LEAST_ONCE}: at-least-once write consistency.
* <li>Checkpointing disabled: no consistency guarantee.
* <li>Checkpointing disabled (NOT RECOMMENDED!): no consistency guarantee.
*
* @param <IN> Type of records written to BigQuery
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright 2024 The Apache Software Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.flink.bigquery.sink.throttle;

import com.google.cloud.flink.bigquery.sink.BigQueryExactlyOnceSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.TimeUnit;

/**
* Throttler implementation for BigQuery write stream creation.
*
* <p>Each {@link BigQueryBufferedWriter} will invoke BigQuery's CreateWriteStream API before its
* initial write to a BigQuery table. This API, however, requires a low QPS (~3) for best
* performance in steady state since write stream creation is an expensive operation for BigQuery
* storage backend. Hence, this throttler is responsible for distributing writers into buckets which
* correspond to a specific "wait" duration before calling the CreateWriteStream API.
*
* <p>Note that actual separation between CreateWriteStream invocations across all writers will not
* ensure exact QPS of 3, because neither all writers are initialized at the same instant, nor do
* they all identify the need to create a write stream after some uniform fixed duration. Given
* these uncontrolled variations, this throttler aims to achieve ~3 QPS on a best effort basis.
*/
public class WriteStreamCreationThrottler implements Throttler {

public static final int MAX_BUCKETS = BigQueryExactlyOnceSink.MAX_SINK_PARALLELISM / 3;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i dont see where BigQueryExactlyOnceSink.MAX_SINK_PARALLELISM is defined. looks like it's not apart of this PR

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private static final Logger LOG = LoggerFactory.getLogger(WriteStreamCreationThrottler.class);
private final int writerId;

public WriteStreamCreationThrottler(int writerId) {
this.writerId = writerId;
}

public void throttle() {
int waitSeconds = writerId % MAX_BUCKETS;
try {
// Sleep does nothing if input is 0 or less.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should probably log here that we're throttling and for how long

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

TimeUnit.SECONDS.sleep(waitSeconds);
} catch (InterruptedException e) {
LOG.warn("Throttle attempt interrupted in subtask {}", writerId);
Thread.currentThread().interrupt();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.cloud.bigquery.storage.v1.ProtoSchema;
import com.google.cloud.bigquery.storage.v1.ProtoSchemaConverter;
import com.google.cloud.bigquery.storage.v1.StreamWriter;
import com.google.cloud.bigquery.storage.v1.WriteStream;
import com.google.cloud.flink.bigquery.common.config.BigQueryConnectOptions;
import com.google.cloud.flink.bigquery.services.BigQueryServices;
import com.google.cloud.flink.bigquery.services.BigQueryServicesFactory;
Expand Down Expand Up @@ -70,23 +71,34 @@ abstract class BaseWriter<IN> implements SinkWriter<IN> {

// Number of bytes to be sent in the next append request.
private long appendRequestSizeBytes;
private BigQueryServices.StorageWriteClient writeClient;
protected final int subtaskId;
private final String tablePath;
private final BigQueryConnectOptions connectOptions;
private final ProtoSchema protoSchema;
private final BigQueryProtoSerializer serializer;
private final Queue<ApiFuture> appendResponseFuturesQueue;
private final ProtoRows.Builder protoRowsBuilder;

final Queue<AppendInfo> appendResponseFuturesQueue;
// Initialization of writeClient has been deferred to first append call. BigQuery's best
// practices suggest that client connections should be opened when needed.
BigQueryServices.StorageWriteClient writeClient;
StreamWriter streamWriter;
String streamName;
long totalRecordsSeen;
// In exactly-once mode, "totalRecordsWritten" actually represents records appended to a
// write stream by this writer. Only at a checkpoint, when sink's commit is invoked, will
// the records in a stream get committed to the table. Hence, records written to BigQuery
// table is equal to this "totalRecordsWritten" only upon checkpoint completion.
long totalRecordsWritten;

BaseWriter(
int subtaskId,
String tablePath,
BigQueryConnectOptions connectOptions,
BigQuerySchemaProvider schemaProvider,
BigQueryProtoSerializer serializer) {
this.subtaskId = subtaskId;
this.tablePath = tablePath;
this.connectOptions = connectOptions;
this.protoSchema = getProtoSchema(schemaProvider);
this.serializer = serializer;
Expand All @@ -102,7 +114,7 @@ public void flush(boolean endOfInput) {
if (appendRequestSizeBytes > 0) {
append();
}
logger.debug("Validating all pending append responses in subtask {}", subtaskId);
logger.info("Validating all pending append responses in subtask {}", subtaskId);
validateAppendResponses(true);
}

Expand All @@ -125,10 +137,10 @@ public void close() {
}

/** Invoke BigQuery storage API for appending data to a table. */
abstract ApiFuture sendAppendRequest(ProtoRows protoRows);
abstract void sendAppendRequest(ProtoRows protoRows);

/** Checks append response for errors. */
abstract void validateAppendResponse(ApiFuture<AppendRowsResponse> appendResponseFuture);
abstract void validateAppendResponse(AppendInfo appendInfo);

/** Add serialized record to append request. */
void addToAppendRequest(ByteString protoRow) {
Expand All @@ -138,24 +150,43 @@ void addToAppendRequest(ByteString protoRow) {

/** Send append request to BigQuery storage and prepare for next append request. */
void append() {
ApiFuture responseFuture = sendAppendRequest(protoRowsBuilder.build());
appendResponseFuturesQueue.add(responseFuture);
sendAppendRequest(protoRowsBuilder.build());
protoRowsBuilder.clear();
appendRequestSizeBytes = 0L;
}

/** Creates a StreamWriter for appending to BigQuery table. */
StreamWriter createStreamWriter(boolean enableConnectionPool) {
logger.debug("Creating BigQuery StreamWriter in subtask {}", subtaskId);
void createStreamWriter(boolean enableConnectionPool) {
try {
writeClient = BigQueryServicesFactory.instance(connectOptions).storageWrite();
return writeClient.createStreamWriter(streamName, protoSchema, enableConnectionPool);
logger.debug("Creating BigQuery StreamWriter in subtask {}", subtaskId);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think it might be worth logging this at the info level. thoughts?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

if (writeClient == null) {
writeClient = BigQueryServicesFactory.instance(connectOptions).storageWrite();
}
streamWriter =
writeClient.createStreamWriter(streamName, protoSchema, enableConnectionPool);
} catch (IOException e) {
logger.error("Unable to create StreamWriter for stream {}", streamName);
throw new BigQueryConnectorException("Unable to create StreamWriter", e);
}
}

/** Creates a write stream and StreamWriter for appending to BigQuery table. */
void createWriteStreamAndStreamWriter(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a simple way to refactor this so that it invokes createStreamWriter + some additional logic instead of duplicating code here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, thanks

WriteStream.Type streamType, boolean enableConnectionPool) {
logger.info("Creating BigQuery write stream and StreamWriter in subtask {}", subtaskId);
try {
if (writeClient == null) {
writeClient = BigQueryServicesFactory.instance(connectOptions).storageWrite();
}
streamName = writeClient.createWriteStream(tablePath, streamType).getName();
streamWriter =
writeClient.createStreamWriter(streamName, protoSchema, enableConnectionPool);
} catch (IOException e) {
logger.error("Unable to connect to BigQuery in subtask {}", streamName);
throw new BigQueryConnectorException("Unable to create write stream", e);
}
}

/** Checks if serialized record can fit in current append request. */
boolean fitsInAppendRequest(ByteString protoRow) {
return appendRequestSizeBytes + getProtoRowBytes(protoRow) <= MAX_APPEND_REQUEST_BYTES;
Expand Down Expand Up @@ -196,17 +227,55 @@ private int getProtoRowBytes(ByteString protoRow) {
* order, we proceed to check the next response only after the previous one has arrived.
*/
void validateAppendResponses(boolean waitForResponse) {
ApiFuture<AppendRowsResponse> appendResponseFuture;
while ((appendResponseFuture = appendResponseFuturesQueue.peek()) != null) {
if (waitForResponse || appendResponseFuture.isDone()) {
while (!appendResponseFuturesQueue.isEmpty()) {
AppendInfo appendInfo = appendResponseFuturesQueue.peek();
if (waitForResponse || appendInfo.getFuture().isDone()) {
appendResponseFuturesQueue.poll();
validateAppendResponse(appendResponseFuture);
validateAppendResponse(appendInfo);
} else {
break;
}
}
}

void logAndThrowFatalException(Throwable error) {
logger.error(String.format("AppendRows request failed in subtask %d", subtaskId), error);
throw new BigQueryConnectorException("Error while writing to BigQuery", error);
}

void logAndThrowFatalException(String errorMessage) {
logger.error(
String.format(
"AppendRows request failed in subtask %d\n%s", subtaskId, errorMessage));
throw new BigQueryConnectorException(
String.format("Error while writing to BigQuery\n%s", errorMessage));
}

static class AppendInfo {
private final ApiFuture<AppendRowsResponse> future;
private final long expectedOffset;
private final long recordsAppended;

AppendInfo(
ApiFuture<AppendRowsResponse> future, long expectedOffset, long recordsAppended) {
this.future = future;
this.expectedOffset = expectedOffset;
this.recordsAppended = recordsAppended;
}

public ApiFuture<AppendRowsResponse> getFuture() {
return future;
}

public long getExpectedOffset() {
return expectedOffset;
}

public long getRecordsAppended() {
return recordsAppended;
}
}

/**
* Following "getters" expose some internal fields required for testing.
*
Expand Down
Loading
Loading