Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DBZ-7244 Event Hubs topic routing #57

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
package io.debezium.server.eventhubs;

import java.util.HashMap;
import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -16,8 +15,6 @@
import com.azure.messaging.eventhubs.models.CreateBatchOptions;

import io.debezium.DebeziumException;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;

public class BatchManager {
private static final Logger LOGGER = LoggerFactory.getLogger(BatchManager.class);
Expand All @@ -32,22 +29,18 @@ public class BatchManager {
// Prepare CreateBatchOptions for N partitions
private final HashMap<Integer, CreateBatchOptions> batchOptions = new HashMap<>();
private final HashMap<Integer, EventDataBatchProxy> batches = new HashMap<>();
private List<ChangeEvent<Object, Object>> records;
private DebeziumEngine.RecordCommitter<ChangeEvent<Object, Object>> committer;
private final Integer partitionCount;

public BatchManager(EventHubProducerClient producer, String configurePartitionId,
String configuredPartitionKey, Integer maxBatchSize) {
String configuredPartitionKey, Integer partitionCount, Integer maxBatchSize) {
this.producer = producer;
this.configuredPartitionId = configurePartitionId;
this.configuredPartitionKey = configuredPartitionKey;
this.partitionCount = partitionCount;
this.maxBatchSize = maxBatchSize;
}

public void initializeBatch(List<ChangeEvent<Object, Object>> records,
DebeziumEngine.RecordCommitter<ChangeEvent<Object, Object>> committer) {
this.records = records;
this.committer = committer;

public void initializeBatch() {
if (!configuredPartitionId.isEmpty() || !configuredPartitionKey.isEmpty()) {
CreateBatchOptions op = new CreateBatchOptions();

Expand Down Expand Up @@ -99,7 +92,7 @@ public void closeAndEmitBatches() {
batches.forEach((partitionId, batch) -> {
if (batch.getCount() > 0) {
LOGGER.trace("Dispatching {} events.", batch.getCount());
emitBatchToEventHub(records, committer, batch);
emitBatchToEventHub(batch);
}
});
}
Expand All @@ -118,15 +111,26 @@ public void sendEventToPartitionId(EventData eventData, Integer recordIndex, Int
LOGGER.debug("Maximum batch size reached, dispatching {} events.", batch.getCount());

// Max size reached, dispatch the batch to EventHub
emitBatchToEventHub(records, committer, batch);
emitBatchToEventHub(batch);
// Renew the batch proxy so we can continue.
batch = new EventDataBatchProxy(producer, batchOptions.get(partitionId));
batches.put(partitionId, batch);
}
}

private void emitBatchToEventHub(List<ChangeEvent<Object, Object>> records, DebeziumEngine.RecordCommitter<ChangeEvent<Object, Object>> committer,
EventDataBatchProxy batch) {
public Integer getPartitionCount() {
return partitionCount;
}

public String getEventHubName() {
return producer.getEventHubName();
}

public void closeProducer() {
producer.close();
}

private void emitBatchToEventHub(EventDataBatchProxy batch) {
final int batchEventSize = batch.getCount();
if (batchEventSize > 0) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
*/
package io.debezium.server.eventhubs;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
Expand Down Expand Up @@ -52,71 +54,78 @@ public class EventHubsChangeConsumer extends BaseChangeConsumer
private static final String PROP_MAX_BATCH_SIZE = PROP_PREFIX + "maxbatchsize";

private String connectionString;
private String eventHubName;
private String[] eventHubNames;
private String configuredPartitionId;
private String configuredPartitionKey;
private Integer maxBatchSize;
private Integer partitionCount;

// connection string format -
// Endpoint=sb://<NAMESPACE>/;SharedAccessKeyName=<KEY_NAME>;SharedAccessKey=<ACCESS_KEY>;EntityPath=<HUB_NAME>
private static final String CONNECTION_STRING_FORMAT = "%s;EntityPath=%s";

private EventHubProducerClient producer = null;
private BatchManager batchManager = null;
private Map<String, BatchManager> batchManagers = new HashMap<>();

@Inject
@CustomConsumerBuilder
Instance<EventHubProducerClient> customProducer;

@PostConstruct
void connect() {
if (customProducer.isResolvable()) {
producer = customProducer.get();
LOGGER.info("Obtained custom configured Event Hubs client for namespace '{}'",
customProducer.get().getFullyQualifiedNamespace());
return;
}

final Config config = ConfigProvider.getConfig();

// Required config
connectionString = config.getValue(PROP_CONNECTION_STRING_NAME, String.class);
eventHubName = config.getValue(PROP_EVENTHUB_NAME, String.class);
eventHubNames = config.getValue(PROP_EVENTHUB_NAME, String.class).split(",");

// optional config
// Optional config
maxBatchSize = config.getOptionalValue(PROP_MAX_BATCH_SIZE, Integer.class).orElse(0);
configuredPartitionId = config.getOptionalValue(PROP_PARTITION_ID, String.class).orElse("");
configuredPartitionKey = config.getOptionalValue(PROP_PARTITION_KEY, String.class).orElse("");

String finalConnectionString = String.format(CONNECTION_STRING_FORMAT, connectionString, eventHubName);

try {
producer = new EventHubClientBuilder().connectionString(finalConnectionString).buildProducerClient();
batchManager = new BatchManager(producer, configuredPartitionId, configuredPartitionKey, maxBatchSize);
createBatchManagersFromEventHubNames();
}
catch (Exception e) {
throw new DebeziumException(e);
}
}

LOGGER.info("Using default Event Hubs client for namespace '{}'", producer.getFullyQualifiedNamespace());
private void createBatchManagersFromEventHubNames() {
if (customProducer.isResolvable()) {
EventHubProducerClient producer = customProducer.get();
int partitionCount = (int) producer.getPartitionIds().stream().count();
validatePartitionId(partitionCount, eventHubNames[0]);

// Retrieve available partition count for the EventHub
partitionCount = (int) producer.getPartitionIds().stream().count();
LOGGER.trace("Event Hub '{}' has {} partitions available", producer.getEventHubName(), partitionCount);
BatchManager batchManager = new BatchManager(producer, configuredPartitionId, configuredPartitionKey, partitionCount, maxBatchSize);
batchManagers.put(eventHubNames[0], batchManager);

if (!configuredPartitionId.isEmpty() && Integer.parseInt(configuredPartitionId) > partitionCount - 1) {
throw new IndexOutOfBoundsException(
String.format("Target partition id %s does not exist in target EventHub %s", configuredPartitionId, eventHubName));
LOGGER.info("Obtained custom configured Event Hubs client ({} partitions in hub) for namespace '{}'",
partitionCount,
customProducer.get().getFullyQualifiedNamespace());

return;
}

for (String hubName : eventHubNames) {
String finalConnectionString = String.format(CONNECTION_STRING_FORMAT, connectionString, hubName);
EventHubProducerClient producer = new EventHubClientBuilder().connectionString(finalConnectionString).buildProducerClient();

int partitionCount = (int) producer.getPartitionIds().stream().count();
validatePartitionId(partitionCount, hubName);

BatchManager batchManager = new BatchManager(producer, configuredPartitionId, configuredPartitionKey, partitionCount, maxBatchSize);
batchManagers.put(hubName, batchManager);
LOGGER.info("Obtained Event Hubs client for event hub '{}' ({} partitions)", hubName, partitionCount);
}
}

@PreDestroy
void close() {
try {
producer.close();
LOGGER.info("Closed Event Hubs producer client");
batchManagers.values().forEach(BatchManager::closeProducer);
LOGGER.info("Closed Event Hubs producer clients");
}
catch (Exception e) {
LOGGER.warn("Exception while closing Event Hubs producer: {}", e);
LOGGER.warn("Exception while closing Event Hubs producers: {}", e);
}
}

Expand All @@ -126,7 +135,7 @@ public void handleBatch(List<ChangeEvent<Object, Object>> records,
throws InterruptedException {
LOGGER.trace("Event Hubs sink adapter processing change events");

batchManager.initializeBatch(records, committer);
batchManagers.values().forEach(BatchManager::initializeBatch);

for (int recordIndex = 0; recordIndex < records.size();) {
int start = recordIndex;
Expand Down Expand Up @@ -171,13 +180,25 @@ else if (!configuredPartitionKey.isEmpty()) {
}
}

// Check that the target partition exists.
if (targetPartitionId < BatchManager.BATCH_INDEX_FOR_NO_PARTITION_ID || targetPartitionId > partitionCount - 1) {
throw new IndexOutOfBoundsException(
String.format("Target partition id %d does not exist in target EventHub %s", targetPartitionId, eventHubName));
}

try {
String destinationHub = record.destination();
BatchManager batchManager = batchManagers.get(destinationHub);

if (batchManager == null) {
batchManager = batchManagers.get(eventHubNames[0]);

if (batchManager == null) {
throw new DebeziumException(String.format("Could not find batch manager for destination hub {}, nor for the default configured event hub {}",
destinationHub, eventHubNames[0]));
}
}

// Check that the target partition exists.
if (targetPartitionId < BatchManager.BATCH_INDEX_FOR_NO_PARTITION_ID || targetPartitionId > batchManager.getPartitionCount() - 1) {
throw new IndexOutOfBoundsException(
String.format("Target partition id %d does not exist in target EventHub %s", targetPartitionId, batchManager.getEventHubName()));
}

batchManager.sendEventToPartitionId(eventData, recordIndex, targetPartitionId);
}
catch (IllegalArgumentException e) {
Expand All @@ -195,7 +216,7 @@ else if (!configuredPartitionKey.isEmpty()) {
}
}

batchManager.closeAndEmitBatches();
batchManagers.values().forEach(BatchManager::closeAndEmitBatches);

LOGGER.trace("Marking {} records as processed.", records.size());
for (ChangeEvent<Object, Object> record : records) {
Expand All @@ -204,4 +225,11 @@ else if (!configuredPartitionKey.isEmpty()) {
committer.markBatchFinished();
LOGGER.trace("Batch marked finished");
}

private void validatePartitionId(int partitionCount, String eventHubName) {
if (!configuredPartitionId.isEmpty() && Integer.parseInt(configuredPartitionId) > partitionCount - 1) {
throw new IndexOutOfBoundsException(
String.format("Target partition id %s does not exist in target EventHub %s", configuredPartitionId, eventHubName));
}
}
}
Loading