-
Notifications
You must be signed in to change notification settings - Fork 11
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Buffered writer for exactly once sink #156
Buffered writer for exactly once sink #156
Conversation
@clmccart kindly review this PR |
try { | ||
writeClient = BigQueryServicesFactory.instance(connectOptions).storageWrite(); | ||
return writeClient.createStreamWriter(streamName, protoSchema, enableConnectionPool); | ||
logger.debug("Creating BigQuery StreamWriter in subtask {}", subtaskId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think it might be worth logging this at the info level. thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
} catch (IOException e) { | ||
logger.error("Unable to create StreamWriter for stream {}", streamName); | ||
throw new BigQueryConnectorException("Unable to create StreamWriter", e); | ||
} | ||
} | ||
|
||
/** Creates a write stream and StreamWriter for appending to BigQuery table. */ | ||
void createWriteStreamAndStreamWriter( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there a simple way to refactor this so that it invokes createStreamWriter + some additional logic instead of duplicating code here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, thanks
@Override | ||
public void write(IN element, Context context) { | ||
totalRecordsSeen++; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this might be out of scope for this PR but it might be worth trcking the element sizes here as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ie, totalRecordsSeenSize
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@prashastia this should go into our document tracking Flink metrics
public void throttle() { | ||
int waitSeconds = writerId % MAX_BUCKETS; | ||
try { | ||
// Sleep does nothing if input is 0 or less. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should probably log here that we're throttling and for how long
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
*/ | ||
public class WriteStreamCreationThrottler implements Throttler { | ||
|
||
public static final int MAX_BUCKETS = BigQueryExactlyOnceSink.MAX_SINK_PARALLELISM / 3; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i dont see where BigQueryExactlyOnceSink.MAX_SINK_PARALLELISM is defined. looks like it's not apart of this PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Its not.
Line 39 in 8254a5e
public static final int MAX_SINK_PARALLELISM = 128; |
public class WriteStreamCreationThrottlerTest { | ||
|
||
@Test | ||
public void testThrottle() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we add a few more tests here for some of the edge cases (ie, weird subtaskid values, etc)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
Create writer used by exactly once sink, following the two-phase commit protocol. This writer uses BQ's buffered stream for appending data to the table.
Note to reviewers: this PR is bloated by tests
/gcbrun