Skip to content

Consumer lag monitoring

edenhill edited this page Dec 16, 2014 · 2 revisions

Consumer lag is the difference between the last produced message and the last consumed message, it is expressed in the number of messages rather than a time unit.

librdkafka automatically monitors consumer lag for RD_KAFKA_CONSUMER handles and the information is available in the statistics JSON object as emitted by stats_cb. A statistics callback is set up with rd_kafka_conf_set_stats_cb() and the callback is called at the configured interval (statistics.interval.ms) from an rd_kafka_poll() call.

The partition high and low message watermarks are pulled from broker at regular intervals (statistics.interval.ms) and is thus not strictly exact. librdkafka automatically compensates for this and will never return a negative number for consumer lag.

Example JSON excerpt for topic test partition 0 with a current lag of 238952 messages:

....
  "topics": {
    "test": {
      "topic": "test",
      "metadata_age": 991,
      "partitions": {
        "0": {
          "partition": 0,
          "leader": 1,
.....
          "fetch_state": "active",
          "query_offset": -2,
          "next_offset": 342044334, <==== NEXT MESSAGE TO BE FETCHED FROM BROKER
          "app_offset": 0,
          "stored_offset": 342044333, <==== MOST RECENTLY STORED/HANDLED MESSAGE OFFSET
.....
          "lo_offset": 24290101,   <==== BROKER'S OLDEST MESSAGE (OFFSET) FOR PARTITION
          "hi_offset": 342283286,  <==== BROKER'S MOST RECENT MESSAGE FOR PARTITION
          "consumer_lag": 238952,  <==== CONSUMER LAG
.....
   },
....