Skip to content

Commit

Permalink
DBZ-7575 Move future waiting inside the for loop
Browse files Browse the repository at this point in the history
  • Loading branch information
PlugaruT authored and jpechane committed Aug 20, 2024
1 parent c84cb30 commit 11cab71
Showing 1 changed file with 6 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,9 @@
package io.debezium.server.kafka;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
Expand Down Expand Up @@ -91,7 +88,6 @@ void stop() {
public void handleBatch(final List<ChangeEvent<Object, Object>> records,
final RecordCommitter<ChangeEvent<Object, Object>> committer)
throws InterruptedException {
List<Future<RecordMetadata>> futures = new ArrayList<>();
for (ChangeEvent<Object, Object> record : records) {
try {
LOGGER.trace("Received event '{}'", record);
Expand All @@ -108,27 +104,19 @@ public void handleBatch(final List<ChangeEvent<Object, Object>> records,
LOGGER.trace("Sent message with offset: {}", metadata.offset());
}
});
futures.add(recordMetadataFuture);
committer.markProcessed(record);
}
catch (Exception e) {
throw new DebeziumException(e);
}
}

for (Future<RecordMetadata> future : futures) {
try {
if (waitMessageDeliveryTimeout == 0) {
future.get();
recordMetadataFuture.get();
}
else {
future.get(waitMessageDeliveryTimeout, TimeUnit.MILLISECONDS);
recordMetadataFuture.get(waitMessageDeliveryTimeout, TimeUnit.MILLISECONDS);
}
committer.markProcessed(record);
}
catch (TimeoutException | ExecutionException e) {
throw new DebeziumException("Error while waiting for Kafka send operations to complete", e);
catch (Exception e) {
throw new DebeziumException(e);
}
}

committer.markBatchFinished();
}

Expand Down

0 comments on commit 11cab71

Please sign in to comment.