Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Incremental cooperative rebalancing #283

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open

Incremental cooperative rebalancing #283

wants to merge 1 commit into from

Conversation

a-a-f
Copy link

@a-a-f a-a-f commented Jul 21, 2021

Hello.

I created this (hopefully) small PR not to get it merged but to receive comments about my use case. I hope I don't write in the wrong place. I have an issue and I am exploring three hypotheses to explain it:

  • I'm using cppkafka incorrectly.
  • Something is not ok with librdkafka.
  • Our Kafka broker coordinator is misconfigured and I should check with the staff who administers it.

At my job, as part of a project, we are tasked with adding support for incremental cooperative rebalancing in cppkafka. I built cppkafka from this PR against librdkafka v1.6.1 (yes it's an old version but it's a constraint at my job). I performed the following 2 tests:

  • In the 1st test, I had 1 Kafka topic with 18 partitions and 1 consumer in a consumer group. Then another consumer joined this group. In the end, nothing happened, i.e. the 1st consumer went on receiving messages from 18 partitions while the 2nd consumer got no messages (bad).
  • In the 2nd test, 2 Kafka topics were set up, each with 18 partitions. I had 2 consumers in a consumer group. After taking one consumer offline, the remaining consumer ended up receiving messages from 36 partitions (good).

There is no such odd behavior when switching to the roundrobin rebalancing strategy.

My question is: am I doing anything wrong with cppkafka?

I hope I clearly and concisely stated the problem I have been encountering and am thankful for any help.

Note: in order to make cppkafka build, I had to copy a couple of librdkafka headers manually, namely rdtypes.h and rdkafka_error.h. Indeed the external header generated by librdkafka after performing ./configure && make && make install does not export enough information.

@@ -134,6 +135,21 @@ void Consumer::unassign() {
check_error(error);
}

void Consumer::incremental_assign(const TopicPartitionList& topic_partitions) {
if (topic_partitions.empty()) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this right? The documentation says:

The application must pass the partition list passed to the callback (or a copy of it), even if the list is empty.

Copy link
Owner

@mfontanini mfontanini left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, it took me quite a bit to reply!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants