From aaf256a27da5499c27b43746571f4819af213155 Mon Sep 17 00:00:00 2001 From: Shivasish Das <55731562+shivasishdas@users.noreply.github.com> Date: Tue, 17 Aug 2021 12:24:15 +0530 Subject: [PATCH] graceful close of producer (#226) --- internal/brokerstore/brokerstore.go | 23 ++++++++++++++++++----- pkg/messagebroker/kafka.go | 11 +++++++++++ pkg/messagebroker/messagebroker.go | 3 +++ pkg/messagebroker/pulsar.go | 5 +++++ 4 files changed, 37 insertions(+), 5 deletions(-) diff --git a/internal/brokerstore/brokerstore.go b/internal/brokerstore/brokerstore.go index c89cb2ad..473ba124 100644 --- a/internal/brokerstore/brokerstore.go +++ b/internal/brokerstore/brokerstore.go @@ -183,16 +183,25 @@ func (b *BrokerStore) GetProducer(ctx context.Context, op messagebroker.Producer key := NewKey(b.variant, op.Topic) producer, ok := b.producerMap.Load(key.String()) if ok && producer != nil { - logger.Ctx(ctx).Infow("found existing producer, skipping init and re-using", "key", key) - return producer.(messagebroker.Producer), nil + p := producer.(messagebroker.Producer) + if !p.IsClosed(ctx) { + logger.Ctx(ctx).Infow("found existing producer, skipping init and re-using", "key", key) + return p, nil + } } b.partitionLock.Lock(key.String()) // lock defer b.partitionLock.Unlock(key.String()) // unlock producer, ok = b.producerMap.Load(key.String()) // double-check if ok && producer != nil { - return producer.(messagebroker.Producer), nil + p := producer.(messagebroker.Producer) + if !p.IsClosed(ctx) { + logger.Ctx(ctx).Infow("found existing producer, skipping init and re-using", "key", key) + return producer.(messagebroker.Producer), nil + } + return p, nil } + newProducer, perr := messagebroker.NewProducerClient(ctx, b.variant, b.bConfig, @@ -201,8 +210,10 @@ func (b *BrokerStore) GetProducer(ctx context.Context, op messagebroker.Producer if perr != nil { return nil, perr } - producer, _ = b.producerMap.LoadOrStore(key.String(), newProducer) - return producer.(messagebroker.Producer), nil + + // can safely override any previously available values for this producer key + b.producerMap.Store(key.String(), newProducer) + return newProducer.(messagebroker.Producer), nil } // RemoveProducer deletes the producer from the store followed by a shutdown @@ -232,6 +243,8 @@ func (b *BrokerStore) RemoveProducer(ctx context.Context, op messagebroker.Produ wasProducerFound = true b.producerMap.Delete(producer) producer.(messagebroker.Producer).Shutdown(ctx) + // init a new producer after removal so that a non-nil producer is available on subsequent calls + b.GetProducer(ctx, op) } if wasProducerFound { diff --git a/pkg/messagebroker/kafka.go b/pkg/messagebroker/kafka.go index fabe710a..c41cc2b7 100644 --- a/pkg/messagebroker/kafka.go +++ b/pkg/messagebroker/kafka.go @@ -39,6 +39,9 @@ type KafkaBroker struct { POptions *ProducerClientOptions COptions *ConsumerClientOptions AOptions *AdminClientOptions + + // flags + isProducerClosed bool } // newKafkaConsumerClient returns a kafka consumer @@ -756,6 +759,9 @@ func (k *KafkaBroker) IsHealthy(ctx context.Context) (bool, error) { // Shutdown closes the producer func (k *KafkaBroker) Shutdown(ctx context.Context) { + // immediately mark the producer as closed so that it is not re-used during the close operation + k.isProducerClosed = true + messageBrokerOperationCount.WithLabelValues(env, Kafka, "Shutdown").Inc() startTime := time.Now() @@ -772,3 +778,8 @@ func (k *KafkaBroker) Shutdown(ctx context.Context) { logger.Ctx(ctx).Infow("kafka: producer already closed", "topic", k.COptions.Topics) } + +// IsClosed checks if producer has been closed +func (k *KafkaBroker) IsClosed(_ context.Context) bool { + return k.isProducerClosed +} diff --git a/pkg/messagebroker/messagebroker.go b/pkg/messagebroker/messagebroker.go index 4305b5b5..4c63b6ff 100644 --- a/pkg/messagebroker/messagebroker.go +++ b/pkg/messagebroker/messagebroker.go @@ -29,6 +29,9 @@ type Producer interface { // SendMessage sends a message on the topic SendMessage(context.Context, SendMessageToTopicRequest) (*SendMessageToTopicResponse, error) + // IsClosed checks if producer has been closed + IsClosed(context.Context) bool + // Shutdown closes the producer Shutdown(context.Context) } diff --git a/pkg/messagebroker/pulsar.go b/pkg/messagebroker/pulsar.go index 985f0f64..c6794342 100644 --- a/pkg/messagebroker/pulsar.go +++ b/pkg/messagebroker/pulsar.go @@ -313,3 +313,8 @@ func (p *PulsarBroker) IsHealthy(_ context.Context) (bool, error) { func (p *PulsarBroker) Shutdown(ctx context.Context) { panic("implement this!") } + +// IsClosed checks if producer has been closed +func (p *PulsarBroker) IsClosed(_ context.Context) bool { + panic("implement this!") +}