Skip to content

Commit

Permalink
Provide a rebalance callback for cooperative rebalance strategy
Browse files Browse the repository at this point in the history
  • Loading branch information
thynson committed Aug 12, 2021
1 parent f7653f2 commit 21a2226
Showing 1 changed file with 46 additions and 20 deletions.
66 changes: 46 additions & 20 deletions lib/kafka-consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,49 @@ var DEFAULT_CONSUME_LOOP_TIMEOUT_DELAY = 500;
var DEFAULT_CONSUME_TIME_OUT = 1000;
util.inherits(KafkaConsumer, Client);

var eagerRebalanceCallback = function(err, assignment) {
// Create the librdkafka error
err = LibrdKafkaError.create(err);
// Emit the event
self.emit('rebalance', err, assignment);

// That's it
try {
if (err.code === -175 /*ERR__ASSIGN_PARTITIONS*/) {
self.assign(assignment);
} else if (err.code === -174 /*ERR__REVOKE_PARTITIONS*/) {
self.unassign();
}
} catch (e) {
// Ignore exceptions if we are not connected
if (self.isConnected()) {
self.emit('rebalance.error', e);
}
}
}

var cooperativeRebalanceCallback = function(err, assignment) {
// Create the librdkafka error
err = LibrdKafkaError.create(err);
// Emit the event
self.emit('rebalance', err, assignment);

// That's it
try {
if (err.code === -175 /*ERR__ASSIGN_PARTITIONS*/) {
self.incrementalAssign(assignment);
} else if (err.code === -174 /*ERR__REVOKE_PARTITIONS*/) {
self.incrementalUnassign(assignment);
}
} catch (e) {
// Ignore exceptions if we are not connected
if (self.isConnected()) {
self.emit('rebalance.error', e);
}
}
}


/**
* KafkaConsumer class for reading messages from Kafka
*
Expand Down Expand Up @@ -52,26 +95,9 @@ function KafkaConsumer(conf, topicConf) {

// If rebalance is undefined we don't want any part of this
if (onRebalance && typeof onRebalance === 'boolean') {
conf.rebalance_cb = function(err, assignment) {
// Create the librdkafka error
err = LibrdKafkaError.create(err);
// Emit the event
self.emit('rebalance', err, assignment);

// That's it
try {
if (err.code === -175 /*ERR__ASSIGN_PARTITIONS*/) {
self.assign(assignment);
} else if (err.code === -174 /*ERR__REVOKE_PARTITIONS*/) {
self.unassign();
}
} catch (e) {
// Ignore exceptions if we are not connected
if (self.isConnected()) {
self.emit('rebalance.error', e);
}
}
};
conf.rebalance_cb = conf['partition.assignment.strategy'] === 'cooperative-sticky'
? cooperativeRebalanceCallback
: eagerRebalanceCallback;
} else if (onRebalance && typeof onRebalance === 'function') {
/*
* Once this is opted in to, that's it. It's going to manually rebalance
Expand Down

0 comments on commit 21a2226

Please sign in to comment.