Skip to content

Commit

Permalink
graceful close of producer (#226)
Browse files Browse the repository at this point in the history
  • Loading branch information
shivasishdas committed Aug 17, 2021
1 parent e8f6f37 commit aaf256a
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 5 deletions.
23 changes: 18 additions & 5 deletions internal/brokerstore/brokerstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,16 +183,25 @@ func (b *BrokerStore) GetProducer(ctx context.Context, op messagebroker.Producer
key := NewKey(b.variant, op.Topic)
producer, ok := b.producerMap.Load(key.String())
if ok && producer != nil {
logger.Ctx(ctx).Infow("found existing producer, skipping init and re-using", "key", key)
return producer.(messagebroker.Producer), nil
p := producer.(messagebroker.Producer)
if !p.IsClosed(ctx) {
logger.Ctx(ctx).Infow("found existing producer, skipping init and re-using", "key", key)
return p, nil
}
}
b.partitionLock.Lock(key.String()) // lock
defer b.partitionLock.Unlock(key.String()) // unlock

producer, ok = b.producerMap.Load(key.String()) // double-check
if ok && producer != nil {
return producer.(messagebroker.Producer), nil
p := producer.(messagebroker.Producer)
if !p.IsClosed(ctx) {
logger.Ctx(ctx).Infow("found existing producer, skipping init and re-using", "key", key)
return producer.(messagebroker.Producer), nil
}
return p, nil
}

newProducer, perr := messagebroker.NewProducerClient(ctx,
b.variant,
b.bConfig,
Expand All @@ -201,8 +210,10 @@ func (b *BrokerStore) GetProducer(ctx context.Context, op messagebroker.Producer
if perr != nil {
return nil, perr
}
producer, _ = b.producerMap.LoadOrStore(key.String(), newProducer)
return producer.(messagebroker.Producer), nil

// can safely override any previously available values for this producer key
b.producerMap.Store(key.String(), newProducer)
return newProducer.(messagebroker.Producer), nil
}

// RemoveProducer deletes the producer from the store followed by a shutdown
Expand Down Expand Up @@ -232,6 +243,8 @@ func (b *BrokerStore) RemoveProducer(ctx context.Context, op messagebroker.Produ
wasProducerFound = true
b.producerMap.Delete(producer)
producer.(messagebroker.Producer).Shutdown(ctx)
// init a new producer after removal so that a non-nil producer is available on subsequent calls
b.GetProducer(ctx, op)
}

if wasProducerFound {
Expand Down
11 changes: 11 additions & 0 deletions pkg/messagebroker/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ type KafkaBroker struct {
POptions *ProducerClientOptions
COptions *ConsumerClientOptions
AOptions *AdminClientOptions

// flags
isProducerClosed bool
}

// newKafkaConsumerClient returns a kafka consumer
Expand Down Expand Up @@ -756,6 +759,9 @@ func (k *KafkaBroker) IsHealthy(ctx context.Context) (bool, error) {

// Shutdown closes the producer
func (k *KafkaBroker) Shutdown(ctx context.Context) {
// immediately mark the producer as closed so that it is not re-used during the close operation
k.isProducerClosed = true

messageBrokerOperationCount.WithLabelValues(env, Kafka, "Shutdown").Inc()

startTime := time.Now()
Expand All @@ -772,3 +778,8 @@ func (k *KafkaBroker) Shutdown(ctx context.Context) {

logger.Ctx(ctx).Infow("kafka: producer already closed", "topic", k.COptions.Topics)
}

// IsClosed checks if producer has been closed
func (k *KafkaBroker) IsClosed(_ context.Context) bool {
return k.isProducerClosed
}
3 changes: 3 additions & 0 deletions pkg/messagebroker/messagebroker.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ type Producer interface {
// SendMessage sends a message on the topic
SendMessage(context.Context, SendMessageToTopicRequest) (*SendMessageToTopicResponse, error)

// IsClosed checks if producer has been closed
IsClosed(context.Context) bool

// Shutdown closes the producer
Shutdown(context.Context)
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/messagebroker/pulsar.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,3 +313,8 @@ func (p *PulsarBroker) IsHealthy(_ context.Context) (bool, error) {
func (p *PulsarBroker) Shutdown(ctx context.Context) {
panic("implement this!")
}

// IsClosed checks if producer has been closed
func (p *PulsarBroker) IsClosed(_ context.Context) bool {
panic("implement this!")
}

0 comments on commit aaf256a

Please sign in to comment.