Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DBZ-8193 Batch write to AWS Kinesis #121

Open
wants to merge 8 commits into
base: main
Choose a base branch
from

Conversation

Kubha99
Copy link

@Kubha99 Kubha99 commented Sep 11, 2024

Added batch write to AWS Kinesis by switching to PutRecords API.

@Kubha99 Kubha99 changed the title DBZ-8193 DBZ-8193 Batch write to AWS Kinesis Sep 11, 2024
Copy link
Contributor

@jpechane jpechane left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Kubha99 Thanks for the PR! It is definitely good start but needs additional polishing. Please take a look at the comments.
Also wrt the build failure, in this case it was in Redis - it is unfortunately a transient environment error that we were not able to fix yet so it does not relate to the PR.

@@ -61,6 +66,7 @@ public class KinesisChangeConsumer extends BaseChangeConsumer implements Debeziu
private Optional<String> endpointOverride;
private Optional<String> credentialsProfile;
private static final int DEFAULT_RETRIES = 5;
private static final int batchSize = 500;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PLease make the batch size configurable

}

// Handle Error
boolean notSuccesfull = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
boolean notSuccesfull = true;
boolean notSuccesful = true;

@@ -61,6 +66,7 @@ public class KinesisChangeConsumer extends BaseChangeConsumer implements Debeziu
private Optional<String> endpointOverride;
private Optional<String> credentialsProfile;
private static final int DEFAULT_RETRIES = 5;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please make the retry coun configurable


// Split the records into batches of size 500
String streamName = records.get(0).destination();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add guard just in case the records is empty list. Should not happen but better to apply it as defensive policy.
Also what would happen if there will be records for multiple tables/destinations in the list?

PutRecordsResponse response = recordsSent(batchRequest, streamName);
attempts++;
if (response.failedRecordCount() > 0) {
Metronome.sleeper(RETRY_INTERVAL, Clock.SYSTEM).pause();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a WARN message with response details

LOGGER.trace("Response Receieved: " + putRecordsResponse);
return putRecordsResponse;
}

private boolean recordSent(ChangeEvent<Object, Object> record) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove the unsused method

@jpechane
Copy link
Contributor

The test has failed with heap space. Please double check if there are any memory leaks. I'll trigger the test re-execution.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants