-
Notifications
You must be signed in to change notification settings - Fork 67
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
DBZ-8193 Batch write to AWS Kinesis #121
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Kubha99 Thanks for the PR! It is definitely good start but needs additional polishing. Please take a look at the comments.
Also wrt the build failure, in this case it was in Redis - it is unfortunately a transient environment error that we were not able to fix yet so it does not relate to the PR.
@@ -61,6 +66,7 @@ public class KinesisChangeConsumer extends BaseChangeConsumer implements Debeziu | |||
private Optional<String> endpointOverride; | |||
private Optional<String> credentialsProfile; | |||
private static final int DEFAULT_RETRIES = 5; | |||
private static final int batchSize = 500; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PLease make the batch size configurable
} | ||
|
||
// Handle Error | ||
boolean notSuccesfull = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
boolean notSuccesfull = true; | |
boolean notSuccesful = true; |
@@ -61,6 +66,7 @@ public class KinesisChangeConsumer extends BaseChangeConsumer implements Debeziu | |||
private Optional<String> endpointOverride; | |||
private Optional<String> credentialsProfile; | |||
private static final int DEFAULT_RETRIES = 5; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please make the retry coun configurable
|
||
// Split the records into batches of size 500 | ||
String streamName = records.get(0).destination(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add guard just in case the records
is empty list. Should not happen but better to apply it as defensive policy.
Also what would happen if there will be records for multiple tables/destinations in the list?
PutRecordsResponse response = recordsSent(batchRequest, streamName); | ||
attempts++; | ||
if (response.failedRecordCount() > 0) { | ||
Metronome.sleeper(RETRY_INTERVAL, Clock.SYSTEM).pause(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a WARN
message with response details
LOGGER.trace("Response Receieved: " + putRecordsResponse); | ||
return putRecordsResponse; | ||
} | ||
|
||
private boolean recordSent(ChangeEvent<Object, Object> record) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove the unsused method
The test has failed with heap space. Please double check if there are any memory leaks. I'll trigger the test re-execution. |
Added batch write to AWS Kinesis by switching to PutRecords API.