Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consumer segfaults if broker doesn't support OffsetFetch #4789

Open
7 tasks done
xJakub opened this issue Jul 19, 2024 · 0 comments · May be fixed by #4790
Open
7 tasks done

Consumer segfaults if broker doesn't support OffsetFetch #4789

xJakub opened this issue Jul 19, 2024 · 0 comments · May be fixed by #4790

Comments

@xJakub
Copy link

xJakub commented Jul 19, 2024

Description

The consumer crashes at rd_kafka_assignment_handle_OffsetFetch when the broker doesn't support OffsetFetch. The offset var is left uninitialized (null) but the code isn't prepared for this case.

Thread 3 "rdk:main" received signal SIGSEGV, Segmentation fault.
[Switching to Thread 0x7ffff60006c0 (LWP 242811)]
0x00005555555fe10b in rd_kafka_assignment_handle_OffsetFetch (rk=0x5555557f4770, rkb=0x5555557fbe30, err=RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE, reply=<optimized out>, 
    request=<optimized out>, opaque=<optimized out>) at rdkafka_assignment.c:319
319	                        rd_kafka_consumer_err(
(gdb) bt
#0  0x00005555555fe10b in rd_kafka_assignment_handle_OffsetFetch (rk=0x5555557f4770, rkb=0x5555557fbe30, err=RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE, reply=<optimized out>, 
    request=<optimized out>, opaque=<optimized out>) at rdkafka_assignment.c:319
#1  0x00005555555a5054 in rd_kafka_buf_callback (rk=0x5555557f4770, rkb=0x5555557fbe30, err=RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE, response=0x0, request=0x7ffff001a8b0)
    at rdkafka_buf.c:509
#2  0x00005555555abf10 in rd_kafka_op_handle_std (rk=<optimized out>, rkq=<optimized out>, rko=<optimized out>, cb_type=<optimized out>) at rdkafka_op.c:905
#3  0x00005555555abfb0 in rd_kafka_op_handle (rk=0x5555557f4770, rkq=0x7ffff5ffbd50, rko=0x7fffe4002500, cb_type=RD_KAFKA_Q_CB_CALLBACK, opaque=0x5555557f4770, 
    callback=0x555555574ba0 <rd_kafka_poll_cb>) at rdkafka_op.c:945
#4  0x00005555555a86f4 in rd_kafka_q_serve (rkq=0x5555557f29c0, timeout_ms=<optimized out>, max_cnt=max_cnt@entry=0, cb_type=cb_type@entry=RD_KAFKA_Q_CB_CALLBACK, 
    callback=callback@entry=0x0, opaque=opaque@entry=0x0) at rdkafka_queue.c:578
#5  0x0000555555577d0d in rd_kafka_thread_main (arg=0x5555557f4770) at rdkafka.c:2143
#6  0x00007ffff749c86b in start_thread (arg=<optimized out>) at ./nptl/pthread_create.c:444
#7  0x00007ffff7529c3c in clone3 () at ../sysdeps/unix/sysv/linux/x86_64/clone3.S:78
(gdb) info locals
offsets = 0x0
req_assignment_version = <optimized out>
allow_retry = <optimized out>

How to reproduce

Build a Kafka without OffsetFetch support. The following patch works against the Kafka repo:

diff --git a/core/src/main/scala/kafka/server/ApiVersionManager.scala b/core/src/main/scala/kafka/server/ApiVersionManager.scala
index 4157641903..e86258efff 100644
--- a/core/src/main/scala/kafka/server/ApiVersionManager.scala
+++ b/core/src/main/scala/kafka/server/ApiVersionManager.scala
@@ -20,6 +20,7 @@ import kafka.network
 import kafka.network.RequestChannel
 import org.apache.kafka.common.feature.SupportedVersionRange
 import org.apache.kafka.common.message.ApiMessageType.ListenerType
+import org.apache.kafka.common.message.ApiVersionsResponseData
 import org.apache.kafka.common.protocol.ApiKeys
 import org.apache.kafka.common.requests.ApiVersionsResponse
 import org.apache.kafka.server.ClientMetricsManager
@@ -170,9 +171,17 @@ class DefaultApiVersionManager(
         enableUnstableLastVersion,
         clientTelemetryEnabled)
     }
+
+    val apiVersionsWithoutOffsetFetch = new ApiVersionsResponseData.ApiVersionCollection
+    apiVersions.forEach { apiVersion =>
+      if (apiVersion.apiKey() != ApiKeys.OFFSET_FETCH.id) {
+        apiVersionsWithoutOffsetFetch.add(apiVersion.duplicate())
+      }
+    }
+
     new ApiVersionsResponse.Builder().
       setThrottleTimeMs(throttleTimeMs).
-      setApiVersions(apiVersions).
+      setApiVersions(apiVersionsWithoutOffsetFetch).
       setSupportedFeatures(brokerFeatures.supportedFeatures).
       setFinalizedFeatures(finalizedFeatures.finalizedFeatures()).
       setFinalizedFeaturesEpoch(finalizedFeatures.finalizedFeaturesEpoch()).

Create a topic and try to consume with the consumer from examples/:

$ ./consumer localhost:9092 some_group topic
% Subscribed to 1 topic(s), waiting for rebalance and messages...
Segmentation fault (core dumped)

Checklist

IMPORTANT: We will close issues where the checklist has not been completed.

Please provide the following information:

  • librdkafka version (release number or git tag): 6eaf89fb124c421b66b43b195879d458a3a31f86
  • Apache Kafka version: 931bb62a2340a563e598e522b196c07b870c285c
  • librdkafka client configuration: This is the sample consumer from examples
  • Operating system: Ubuntu 24.04 LTS
  • Provide logs (with debug=.. as necessary) from librdkafka
  • Provide broker log excerpts: not really relevant in this case
[2024-07-22 10:10:46,906] INFO [GroupCoordinator 0]: Dynamic member with unknown member id joins group some_group in Empty state. Created a new member id rdkafka-f92e226f-542d-4290-b35d-eb770ea8e0db and request the member to rejoin with this id. (kafka.coordinator.group.GroupCoordinator)
[2024-07-22 10:10:46,911] INFO [GroupCoordinator 0]: Preparing to rebalance group some_group in state PreparingRebalance with old generation 4 (__consumer_offsets-24) (reason: Adding new member rdkafka-f92e226f-542d-4290-b35d-eb770ea8e0db with group instance id None; client reason: not provided) (kafka.coordinator.group.GroupCoordinator)
[2024-07-22 10:10:46,913] INFO [GroupCoordinator 0]: Stabilized group some_group generation 5 (__consumer_offsets-24) with 1 members (kafka.coordinator.group.GroupCoordinator)
[2024-07-22 10:10:46,921] INFO [GroupCoordinator 0]: Assignment received from leader rdkafka-f92e226f-542d-4290-b35d-eb770ea8e0db for group some_group for generation 5. The group has 1 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator)
[2024-07-22 10:11:31,928] INFO [GroupCoordinator 0]: Member rdkafka-f92e226f-542d-4290-b35d-eb770ea8e0db in group some_group has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2024-07-22 10:11:31,928] INFO [GroupCoordinator 0]: Preparing to rebalance group some_group in state PreparingRebalance with old generation 5 (__consumer_offsets-24) (reason: removing member rdkafka-f92e226f-542d-4290-b35d-eb770ea8e0db on heartbeat expiration) (kafka.coordinator.group.GroupCoordinator)
[2024-07-22 10:11:31,929] INFO [GroupCoordinator 0]: Group some_group with generation 6 is now empty (__consumer_offsets-24) (kafka.coordinator.group.GroupCoordinator)
  • Critical issue: not a critical issue, but you can trigger segfaults in services that allow connecting to external kafkas.
@xJakub xJakub linked a pull request Jul 19, 2024 that will close this issue
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant