Skip to content

Commit

Permalink
panic interceptor
Browse files Browse the repository at this point in the history
  • Loading branch information
lspgn committed Dec 9, 2023
1 parent 8982326 commit 9b2bfd0
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 8 deletions.
11 changes: 11 additions & 0 deletions cmd/goflow2/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ func main() {
log.Fatalf("producer %s does not exist", *Produce)
}

// intercept panic and generate an error
flowProducer = producer.WrapPanicProducer(flowProducer)
// wrap producer with Prometheus metrics
flowProducer = metrics.WrapPromProducer(flowProducer)

Expand Down Expand Up @@ -293,6 +295,15 @@ func main() {
l := l.WithError(err)
if errors.Is(err, netflow.ErrorTemplateNotFound) {
l.Warn("template error")
} else if errors.Is(err, producer.ProducerError) {
var pErrMsg *producer.ProducerErrorMessage
if errors.As(err, &pErrMsg) {
l = l.WithFields(log.Fields{
"message": pErrMsg.Msg,
"stacktrace": string(pErrMsg.Stacktrace),
})
}
l.Error("producer error")
} else if errors.Is(err, net.ErrClosed) {
l.Info("closed receiver")
} else {
Expand Down
57 changes: 57 additions & 0 deletions producer/panic.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package producer

import (
"fmt"
"runtime/debug"
)

var (
ProducerError = fmt.Errorf("producer error")
)

type ProducerErrorMessage struct {
Msg interface{}
Inner string
Stacktrace []byte
}

func (e *ProducerErrorMessage) Error() string {
return fmt.Sprintf("%s", e.Inner)
}

func (e *ProducerErrorMessage) Unwrap() []error {
return []error{ProducerError,}
}

type PanicProducerWrapper struct {
wrapped ProducerInterface
}

func (p *PanicProducerWrapper) Produce(msg interface{}, args *ProduceArgs) (flowMessageSet []ProducerMessage, err error) {

defer func() {
if pErr := recover(); pErr != nil {

pErrC, _ := pErr.(string)
err = &ProducerErrorMessage{Msg:msg, Inner: pErrC, Stacktrace: debug.Stack()}
}
}()

flowMessageSet, err = p.wrapped.Produce(msg, args)
return flowMessageSet, err
}


func (p *PanicProducerWrapper) Close() {
p.wrapped.Close()
}

func (p *PanicProducerWrapper) Commit(flowMessageSet []ProducerMessage) {
p.wrapped.Commit(flowMessageSet)
}

func WrapPanicProducer(wrapped ProducerInterface) ProducerInterface {
return &PanicProducerWrapper{
wrapped: wrapped,
}
}
8 changes: 0 additions & 8 deletions producer/proto/proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,6 @@ func (p *ProtoProducer) getSamplingRateSystem(args *producer.ProduceArgs) Sampli
}

func (p *ProtoProducer) Produce(msg interface{}, args *producer.ProduceArgs) (flowMessageSet []producer.ProducerMessage, err error) {

defer func() {
pErr := recover()
if pErr != nil {
err = fmt.Errorf("produce panic %v %v", pErr, msg)
}
}()

tr := uint64(args.TimeReceived.UnixNano())
sa, _ := args.SamplerAddress.Unmap().MarshalBinary()
switch msgConv := msg.(type) {
Expand Down

0 comments on commit 9b2bfd0

Please sign in to comment.