From 66daac0e1084e0d335cfd9a023d18d79d46bb0d0 Mon Sep 17 00:00:00 2001 From: lspgn Date: Fri, 8 Dec 2023 20:44:06 -0800 Subject: [PATCH] batch error for decoder (template warnings) --- cmd/goflow2/main.go | 41 +++++++++++++++++++++++++++-------------- 1 file changed, 27 insertions(+), 14 deletions(-) diff --git a/cmd/goflow2/main.go b/cmd/goflow2/main.go index 69c60e92..cbf44fb6 100644 --- a/cmd/goflow2/main.go +++ b/cmd/goflow2/main.go @@ -287,6 +287,8 @@ func main() { decodeFunc = metrics.PromDecoderWrapper(decodeFunc, listenAddrUrl.Scheme) pipes = append(pipes, p) + bm := utils.NewBatchMute(*ErrInt, *ErrCnt) + // starts receivers // the function either returns an error if err := recv.Start(hostname, int(port), decodeFunc); err != nil { @@ -301,22 +303,33 @@ func main() { case <-q: return case err := <-recv.Errors(): - l := l.WithError(err) - if errors.Is(err, netflow.ErrorTemplateNotFound) { - l.Warn("template error") - } else if errors.Is(err, debug.PanicError) { - var pErrMsg *debug.PanicErrorMessage - if errors.As(err, &pErrMsg) { - l = l.WithFields(log.Fields{ - "message": pErrMsg.Msg, - "stacktrace": string(pErrMsg.Stacktrace), - }) - } - l.Error("intercepted panic") - } else if errors.Is(err, net.ErrClosed) { + if errors.Is(err, net.ErrClosed) { l.Info("closed receiver") - } else { + continue + } else if !errors.Is(err, netflow.ErrorTemplateNotFound) && !errors.Is(err, debug.PanicError) { l.Error("error") + continue + } + + muted, skipped := bm.Increment() + if muted && skipped == 0 { + log.Warn("too many receiver messages, muting") + } else if !muted && skipped > 0 { + log.Warnf("skipped %d receiver messages", skipped) + } else if !muted { + l := l.WithError(err) + if errors.Is(err, netflow.ErrorTemplateNotFound) { + l.Warn("template error") + } else if errors.Is(err, debug.PanicError) { + var pErrMsg *debug.PanicErrorMessage + if errors.As(err, &pErrMsg) { + l = l.WithFields(log.Fields{ + "message": pErrMsg.Msg, + "stacktrace": string(pErrMsg.Stacktrace), + }) + } + l.Error("intercepted panic") + } } }