-
Notifications
You must be signed in to change notification settings - Fork 11
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Buffered writer for exactly once sink #156
Changes from 9 commits
5ab683a
fd9a39b
c5c1042
1c76261
9857b1b
8f864f5
53162da
6fcfc5a
8254a5e
6b38afd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,59 @@ | ||
/* | ||
* Copyright 2024 The Apache Software Foundation. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package com.google.cloud.flink.bigquery.sink.throttle; | ||
|
||
import com.google.cloud.flink.bigquery.sink.BigQueryExactlyOnceSink; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import java.util.concurrent.TimeUnit; | ||
|
||
/** | ||
* Throttler implementation for BigQuery write stream creation. | ||
* | ||
* <p>Each {@link BigQueryBufferedWriter} will invoke BigQuery's CreateWriteStream API before its | ||
* initial write to a BigQuery table. This API, however, requires a low QPS (~3) for best | ||
* performance in steady state since write stream creation is an expensive operation for BigQuery | ||
* storage backend. Hence, this throttler is responsible for distributing writers into buckets which | ||
* correspond to a specific "wait" duration before calling the CreateWriteStream API. | ||
* | ||
* <p>Note that actual separation between CreateWriteStream invocations across all writers will not | ||
* ensure exact QPS of 3, because neither all writers are initialized at the same instant, nor do | ||
* they all identify the need to create a write stream after some uniform fixed duration. Given | ||
* these uncontrolled variations, this throttler aims to achieve ~3 QPS on a best effort basis. | ||
*/ | ||
public class WriteStreamCreationThrottler implements Throttler { | ||
|
||
public static final int MAX_BUCKETS = BigQueryExactlyOnceSink.MAX_SINK_PARALLELISM / 3; | ||
private static final Logger LOG = LoggerFactory.getLogger(WriteStreamCreationThrottler.class); | ||
private final int writerId; | ||
|
||
public WriteStreamCreationThrottler(int writerId) { | ||
this.writerId = writerId; | ||
} | ||
|
||
public void throttle() { | ||
int waitSeconds = writerId % MAX_BUCKETS; | ||
try { | ||
// Sleep does nothing if input is 0 or less. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we should probably log here that we're throttling and for how long There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
TimeUnit.SECONDS.sleep(waitSeconds); | ||
} catch (InterruptedException e) { | ||
LOG.warn("Throttle attempt interrupted in subtask {}", writerId); | ||
Thread.currentThread().interrupt(); | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,6 +25,7 @@ | |
import com.google.cloud.bigquery.storage.v1.ProtoSchema; | ||
import com.google.cloud.bigquery.storage.v1.ProtoSchemaConverter; | ||
import com.google.cloud.bigquery.storage.v1.StreamWriter; | ||
import com.google.cloud.bigquery.storage.v1.WriteStream; | ||
import com.google.cloud.flink.bigquery.common.config.BigQueryConnectOptions; | ||
import com.google.cloud.flink.bigquery.services.BigQueryServices; | ||
import com.google.cloud.flink.bigquery.services.BigQueryServicesFactory; | ||
|
@@ -70,23 +71,34 @@ abstract class BaseWriter<IN> implements SinkWriter<IN> { | |
|
||
// Number of bytes to be sent in the next append request. | ||
private long appendRequestSizeBytes; | ||
private BigQueryServices.StorageWriteClient writeClient; | ||
protected final int subtaskId; | ||
private final String tablePath; | ||
private final BigQueryConnectOptions connectOptions; | ||
private final ProtoSchema protoSchema; | ||
private final BigQueryProtoSerializer serializer; | ||
private final Queue<ApiFuture> appendResponseFuturesQueue; | ||
private final ProtoRows.Builder protoRowsBuilder; | ||
|
||
final Queue<AppendInfo> appendResponseFuturesQueue; | ||
// Initialization of writeClient has been deferred to first append call. BigQuery's best | ||
// practices suggest that client connections should be opened when needed. | ||
BigQueryServices.StorageWriteClient writeClient; | ||
StreamWriter streamWriter; | ||
String streamName; | ||
long totalRecordsSeen; | ||
// In exactly-once mode, "totalRecordsWritten" actually represents records appended to a | ||
// write stream by this writer. Only at a checkpoint, when sink's commit is invoked, will | ||
// the records in a stream get committed to the table. Hence, records written to BigQuery | ||
// table is equal to this "totalRecordsWritten" only upon checkpoint completion. | ||
long totalRecordsWritten; | ||
|
||
BaseWriter( | ||
int subtaskId, | ||
String tablePath, | ||
BigQueryConnectOptions connectOptions, | ||
BigQuerySchemaProvider schemaProvider, | ||
BigQueryProtoSerializer serializer) { | ||
this.subtaskId = subtaskId; | ||
this.tablePath = tablePath; | ||
this.connectOptions = connectOptions; | ||
this.protoSchema = getProtoSchema(schemaProvider); | ||
this.serializer = serializer; | ||
|
@@ -102,7 +114,7 @@ public void flush(boolean endOfInput) { | |
if (appendRequestSizeBytes > 0) { | ||
append(); | ||
} | ||
logger.debug("Validating all pending append responses in subtask {}", subtaskId); | ||
logger.info("Validating all pending append responses in subtask {}", subtaskId); | ||
validateAppendResponses(true); | ||
} | ||
|
||
|
@@ -125,10 +137,10 @@ public void close() { | |
} | ||
|
||
/** Invoke BigQuery storage API for appending data to a table. */ | ||
abstract ApiFuture sendAppendRequest(ProtoRows protoRows); | ||
abstract void sendAppendRequest(ProtoRows protoRows); | ||
|
||
/** Checks append response for errors. */ | ||
abstract void validateAppendResponse(ApiFuture<AppendRowsResponse> appendResponseFuture); | ||
abstract void validateAppendResponse(AppendInfo appendInfo); | ||
|
||
/** Add serialized record to append request. */ | ||
void addToAppendRequest(ByteString protoRow) { | ||
|
@@ -138,24 +150,43 @@ void addToAppendRequest(ByteString protoRow) { | |
|
||
/** Send append request to BigQuery storage and prepare for next append request. */ | ||
void append() { | ||
ApiFuture responseFuture = sendAppendRequest(protoRowsBuilder.build()); | ||
appendResponseFuturesQueue.add(responseFuture); | ||
sendAppendRequest(protoRowsBuilder.build()); | ||
protoRowsBuilder.clear(); | ||
appendRequestSizeBytes = 0L; | ||
} | ||
|
||
/** Creates a StreamWriter for appending to BigQuery table. */ | ||
StreamWriter createStreamWriter(boolean enableConnectionPool) { | ||
logger.debug("Creating BigQuery StreamWriter in subtask {}", subtaskId); | ||
void createStreamWriter(boolean enableConnectionPool) { | ||
try { | ||
writeClient = BigQueryServicesFactory.instance(connectOptions).storageWrite(); | ||
return writeClient.createStreamWriter(streamName, protoSchema, enableConnectionPool); | ||
logger.debug("Creating BigQuery StreamWriter in subtask {}", subtaskId); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i think it might be worth logging this at the info level. thoughts? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
if (writeClient == null) { | ||
writeClient = BigQueryServicesFactory.instance(connectOptions).storageWrite(); | ||
} | ||
streamWriter = | ||
writeClient.createStreamWriter(streamName, protoSchema, enableConnectionPool); | ||
} catch (IOException e) { | ||
logger.error("Unable to create StreamWriter for stream {}", streamName); | ||
throw new BigQueryConnectorException("Unable to create StreamWriter", e); | ||
} | ||
} | ||
|
||
/** Creates a write stream and StreamWriter for appending to BigQuery table. */ | ||
void createWriteStreamAndStreamWriter( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is there a simple way to refactor this so that it invokes createStreamWriter + some additional logic instead of duplicating code here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yep, thanks |
||
WriteStream.Type streamType, boolean enableConnectionPool) { | ||
logger.info("Creating BigQuery write stream and StreamWriter in subtask {}", subtaskId); | ||
try { | ||
if (writeClient == null) { | ||
writeClient = BigQueryServicesFactory.instance(connectOptions).storageWrite(); | ||
} | ||
streamName = writeClient.createWriteStream(tablePath, streamType).getName(); | ||
streamWriter = | ||
writeClient.createStreamWriter(streamName, protoSchema, enableConnectionPool); | ||
} catch (IOException e) { | ||
logger.error("Unable to connect to BigQuery in subtask {}", streamName); | ||
throw new BigQueryConnectorException("Unable to create write stream", e); | ||
} | ||
} | ||
|
||
/** Checks if serialized record can fit in current append request. */ | ||
boolean fitsInAppendRequest(ByteString protoRow) { | ||
return appendRequestSizeBytes + getProtoRowBytes(protoRow) <= MAX_APPEND_REQUEST_BYTES; | ||
|
@@ -196,17 +227,55 @@ private int getProtoRowBytes(ByteString protoRow) { | |
* order, we proceed to check the next response only after the previous one has arrived. | ||
*/ | ||
void validateAppendResponses(boolean waitForResponse) { | ||
ApiFuture<AppendRowsResponse> appendResponseFuture; | ||
while ((appendResponseFuture = appendResponseFuturesQueue.peek()) != null) { | ||
if (waitForResponse || appendResponseFuture.isDone()) { | ||
while (!appendResponseFuturesQueue.isEmpty()) { | ||
AppendInfo appendInfo = appendResponseFuturesQueue.peek(); | ||
if (waitForResponse || appendInfo.getFuture().isDone()) { | ||
appendResponseFuturesQueue.poll(); | ||
validateAppendResponse(appendResponseFuture); | ||
validateAppendResponse(appendInfo); | ||
} else { | ||
break; | ||
} | ||
} | ||
} | ||
|
||
void logAndThrowFatalException(Throwable error) { | ||
logger.error(String.format("AppendRows request failed in subtask %d", subtaskId), error); | ||
throw new BigQueryConnectorException("Error while writing to BigQuery", error); | ||
} | ||
|
||
void logAndThrowFatalException(String errorMessage) { | ||
logger.error( | ||
String.format( | ||
"AppendRows request failed in subtask %d\n%s", subtaskId, errorMessage)); | ||
throw new BigQueryConnectorException( | ||
String.format("Error while writing to BigQuery\n%s", errorMessage)); | ||
} | ||
|
||
static class AppendInfo { | ||
private final ApiFuture<AppendRowsResponse> future; | ||
private final long expectedOffset; | ||
private final long recordsAppended; | ||
|
||
AppendInfo( | ||
ApiFuture<AppendRowsResponse> future, long expectedOffset, long recordsAppended) { | ||
this.future = future; | ||
this.expectedOffset = expectedOffset; | ||
this.recordsAppended = recordsAppended; | ||
} | ||
|
||
public ApiFuture<AppendRowsResponse> getFuture() { | ||
return future; | ||
} | ||
|
||
public long getExpectedOffset() { | ||
return expectedOffset; | ||
} | ||
|
||
public long getRecordsAppended() { | ||
return recordsAppended; | ||
} | ||
} | ||
|
||
/** | ||
* Following "getters" expose some internal fields required for testing. | ||
* | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i dont see where BigQueryExactlyOnceSink.MAX_SINK_PARALLELISM is defined. looks like it's not apart of this PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Its not.
flink-bigquery-connector/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/BigQueryBaseSink.java
Line 39 in 8254a5e