Skip to content

Commit

Permalink
Test: Use try with resources for the producer
Browse files Browse the repository at this point in the history
  • Loading branch information
rozza committed Feb 28, 2020
1 parent 980ae67 commit c77614d
Showing 1 changed file with 17 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,17 @@ void testASimpleProducerSmokeTest() {
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");

KafkaProducer<Integer, String> producer = new KafkaProducer<>(props);
producer.initTransactions();
producer.beginTransaction();
try(KafkaProducer<Integer, String> producer = new KafkaProducer<>(props)) {
producer.initTransactions();
producer.beginTransaction();

IntStream.range(0, 10).forEach(i -> {
producer.send(new ProducerRecord<>(topicName, i, "Hello, World!"));
});
producer.commitTransaction();
IntStream.range(0, 10).forEach(i -> {
producer.send(new ProducerRecord<>(topicName, i, "Hello, World!"));
});
producer.commitTransaction();

assertProduced(10, topicName);
assertProduced(10, topicName);
}
}

@Test
Expand All @@ -84,14 +85,15 @@ void testSinkSavesAvroDataToMongoDB() {
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroSerializer");
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroSerializer");
producerProps.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, KAFKA.schemaRegistryUrl());
KafkaProducer<String, TweetMsg> producer = new KafkaProducer<>(producerProps);

producer.initTransactions();
producer.beginTransaction();
tweets.forEach(tweet -> producer.send(new ProducerRecord<>(topicName, tweet)));
producer.commitTransaction();
try(KafkaProducer<String, TweetMsg> producer = new KafkaProducer<>(producerProps)) {
producer.initTransactions();
producer.beginTransaction();
tweets.forEach(tweet -> producer.send(new ProducerRecord<>(topicName, tweet)));
producer.commitTransaction();

assertProduced(100, topicName);
assertEquals(100, getCollection().countDocuments());
assertProduced(100, topicName);
assertEquals(100, getCollection().countDocuments());
}
}
}

0 comments on commit c77614d

Please sign in to comment.