Skip to content

Consumer offset management

Magnus Edenhill edited this page May 31, 2018 · 10 revisions

librdkafka currently supports two consumer offset management methods:

  • broker based offsets (default, requires broker 0.8.2 or later)
  • local file based offsets

Enabling offset management

Offset management is configured through topic configuration properties and enabled by passing start_offset as RD_KAFKA_OFFSET_STORED to rd_kafka_consume_start().

The various rdkafka tools, such as rdkafka_example and kafkacat accepts the -o stored command-line argument.

Terminology

  • Commit - Offset committed to permanent storage (broker, file). When consumer restarts this is where it will start consuming from. The committed offset should be last_message_offset+1.
  • Store - Offsets to be committed are stored in memory until the next call to commit() (without offsets specified) or the next auto commit.

However, the different permanent offset storages (broker, file) are referred to as offsets stores, and RD_KAFKA_OFFSET_STORED actually represents the committed offset, not the stored.

Configuration properties

  • enable.auto.commit - If true (default), periodically commit offset of the last message handed to the application. The committed offset will be used when the process restarts to pick up where it left off.
  • auto.commit.interval.ms - The frequency in milliseconds that the consumer offsets are commited (written) to offset storage.
  • enable.auto.offset.store - If true (default) the client will automatically store the offset+1 of the message just prior to passing the message to the application. The offset is stored in memory and will be used by the next call to commit() (without explicit offsets specified) or the next auto commit. If false and enable.auto.commit=true, the application will manually have to call rd_kafka_offset_store() to store the offset to auto commit. (optional).

Broker based offsets (default)

The OffsetCommit API was added to Apache Kafka 0.8.2 and thus require you to run a broker of that version or later. This is the preferred method.

With this method offsets are written to the Kafka cluster through the Kafka protocol. This is not to be confused with Zookeeper based offsets which the official 0.8 Scala Kafka clients use, but the new 0.9 Java client uses broker based offset storage exclusively.

The consumer group id must be configured using the group.id configuration property. No additional configuration is required for this method.

Local file based offsets (deprecated)

Offsets are written to a local file, defaulting to {offset.store.path}/topicname-partition*.offset.

Topic configuration properties

  • offset.store.method - Offset commit store method: 'file' - local file store (offset.store.path, et.al), 'broker' - broker commit store (requires Apache Kafka 0.8.2 or later on the broker).
  • offset.store.path - Path to local file for storing offsets. If the path is a directory a filename will be automatically generated in that directory based on the topic and partition. Defaults to the current directory.
  • offset.store.sync.interval.ms - fsync() interval for the offset file, in milliseconds. Use -1 to disable syncing, and 0 for immediate sync after each write.