Skip to content

Producing messages

Matias Fontanini edited this page Aug 23, 2017 · 2 revisions

Message builder

In order to build a message, you need to use the MessageBuilder class, which allows setting each field in the message to whatever value you want.

// Construct a message builder for the kafka topic some_topic
MessageBuilder builder("some_topic");

// The key and value we'll use
const string key = "some_key";
const string payload = "Hello world!";

// Set the partition, key and payload. These can be easily chained.
builder.partition(10).key(key).payload(payload);

// Ready to be used!

Note that if you don't set a partition explicitly, this will be set to the unassigned partition (-1) so whatever configured partitioner callback will be called to determine the one to use.

MessageBuilder uses Buffer objects for the key and the payload. This means whatever was used to build those buffers (in the example above, the key and payload variables), need to be kept in scope until the message is sent to the producer.

Producing messages

Once you've built a MessageBuilder, producing its message is trivial:

// Build a producer from some Configuration object
Producer producer(config);

const string key = "some_key";
const string payload = "Hello world!";

// Build and produce the message
producer.produce(MessageBuilder("some_topic").key(key).payload(payload));

Using a partitioner callback

If your messages contain a key then you probably want to control which key is mapped into which partition. This can be done via a partitioning callback, which can be set on the producer's configuration. The callback will receive the topic in which the message was produced, along with its partition count and the message's key and it has to return a number between 0 and the amount of of partitions - 1 which indicates the partition in which that message should be written to.

As an example, have a look at the following code:

// Construct some configuration
Configuration config = {
    // Some configs...
};

// Now let's define our default topic configuration
TopicConfiguration default_topic_config;

// Create our partitioner callback
const auto callback = [](const Topic&, const Buffer& key, int32_t partition_count) {
    // We'll convert the key into an int and perform modulo 
    // over the amount of partitions
    return stoi(key) % partition_count;
};

// Now set the partitioner callback on the topic config
default_topic_config.set_partitioner_callback(callback);

// Finally, mark our topic config as the default one
config.set_default_topic_configuration(move(default_topic_config));

// Create a producer
Producer producer(config);

// Produce a message. Don't specify a partition so that
// our callback defines which one it will be mapped to.
const string key = "42";
const string payload = "foo";
producer.produce(MessageBuilder("test").key(key).payload(payload));
Clone this wiki locally