From a3039f6135c037463c3b62be62c03745653fa8e5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pedro=20T=C3=B4rres?= Date: Fri, 14 Jul 2023 01:36:29 -0700 Subject: [PATCH] Fix Graceful Shutdown of Interceptor and Scaler (#732) Co-authored-by: Tom Kerkhove --- CHANGELOG.md | 2 + go.mod | 2 +- interceptor/forward_wait_func.go | 4 +- interceptor/main.go | 97 +++++++++---------- .../proxy_handlers_integration_test.go | 3 +- operator/controllers/http/ping.go | 3 +- pkg/http/server.go | 10 +- pkg/k8s/deployment_cache_informer.go | 6 +- pkg/k8s/endpoints.go | 3 +- pkg/net/dial_context.go | 2 +- pkg/queue/queue_rpc.go | 15 +-- pkg/routing/table.go | 2 +- pkg/util/errors.go | 25 +++++ scaler/main.go | 70 +++++++------ scaler/queue_pinger.go | 12 +-- 15 files changed, 134 insertions(+), 122 deletions(-) create mode 100644 pkg/util/errors.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 2f5bcf16..e6ff3ec4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,6 +31,8 @@ This changelog keeps track of work items that have been completed and are ready - **Scaler**: remplement custom interceptor metrics ([#718](https://github.com/kedacore/http-add-on/issues/718)) - **Operator**: Remove ScaledObject `name` & `app` custom labels ([#717](https://github.com/kedacore/http-add-on/issues/717)) - **Interceptor**: fatal error: concurrent map iteration and map write ([#726](https://github.com/kedacore/http-add-on/issues/726)) +- **Interceptor**: Provide graceful shutdown for http servers on SIGINT and SIGTERM ([#731](https://github.com/kedacore/http-add-on/issues/731)) +- **Scaler**: Provide graceful shutdown for grpc server on SIGINT and SIGTERM ([#731](https://github.com/kedacore/http-add-on/issues/731)) ### Deprecations diff --git a/go.mod b/go.mod index 215f5a17..89a726b0 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,6 @@ require ( github.com/kelseyhightower/envconfig v1.4.0 github.com/onsi/ginkgo/v2 v2.11.0 github.com/onsi/gomega v1.27.8 - github.com/pkg/errors v0.9.1 github.com/stretchr/testify v1.8.4 github.com/tj/assert v0.0.3 go.uber.org/zap v1.24.0 @@ -56,6 +55,7 @@ require ( github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect + github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_golang v1.16.0 // indirect github.com/prometheus/client_model v0.4.0 // indirect diff --git a/interceptor/forward_wait_func.go b/interceptor/forward_wait_func.go index 749970d2..9ebc69b1 100644 --- a/interceptor/forward_wait_func.go +++ b/interceptor/forward_wait_func.go @@ -36,7 +36,7 @@ func newDeployReplicasForwardWaitFunc( if err != nil { // if we didn't get the initial deployment state, bail out return 0, fmt.Errorf( - "error getting state for deployment %s/%s (%s)", + "error getting state for deployment %s/%s: %w", deployNS, deployName, err, @@ -62,7 +62,7 @@ func newDeployReplicasForwardWaitFunc( // otherwise, if the context is marked done before // we're done waiting, fail. return 0, fmt.Errorf( - "context marked done while waiting for deployment %s to reach > 0 replicas (%w)", + "context marked done while waiting for deployment %s to reach > 0 replicas: %w", deployName, ctx.Err(), ) diff --git a/interceptor/main.go b/interceptor/main.go index 3314ee85..685174c5 100644 --- a/interceptor/main.go +++ b/interceptor/main.go @@ -2,6 +2,7 @@ package main import ( "context" + "errors" "fmt" "math/rand" "net/http" @@ -41,14 +42,14 @@ func main() { fmt.Println("Error building logger", err) os.Exit(1) } + timeoutCfg := config.MustParseTimeouts() servingCfg := config.MustParseServing() if err := config.Validate(*servingCfg, *timeoutCfg); err != nil { lggr.Error(err, "invalid configuration") os.Exit(1) } - ctx := util.ContextWithLogger(context.Background(), lggr) - ctx, ctxDone := context.WithCancel(ctx) + lggr.Info( "starting interceptor", "timeoutConfig", @@ -94,73 +95,67 @@ func main() { lggr.Info("Interceptor starting") - errGrp, ctx := errgroup.WithContext(ctx) + ctx := ctrl.SetupSignalHandler() + ctx = util.ContextWithLogger(ctx, lggr) + + eg, ctx := errgroup.WithContext(ctx) // start the deployment cache updater - errGrp.Go(func() error { - defer ctxDone() - err := deployCache.Start(ctx) - lggr.Error(err, "deployment cache watcher failed") - return err + eg.Go(func() error { + lggr.Info("starting the deployment cache") + + deployCache.Start(ctx) + return nil }) // start the update loop that updates the routing table from // the ConfigMap that the operator updates as HTTPScaledObjects // enter and exit the system - errGrp.Go(func() error { - defer ctxDone() - err := routingTable.Start(ctx) - lggr.Error(err, "config map routing table updater failed") - return err + eg.Go(func() error { + lggr.Info("starting the routing table") + + if err := routingTable.Start(ctx); !util.IsIgnoredErr(err) { + lggr.Error(err, "routing table failed") + return err + } + + return nil }) // start the administrative server. this is the server // that serves the queue size API - errGrp.Go(func() error { - defer ctxDone() - lggr.Info( - "starting the admin server", - "port", - adminPort, - ) - err := runAdminServer( - ctx, - lggr, - q, - adminPort, - ) - lggr.Error(err, "admin server failed") - return err + eg.Go(func() error { + lggr.Info("starting the admin server", "port", adminPort) + + if err := runAdminServer(ctx, lggr, q, adminPort); !util.IsIgnoredErr(err) { + lggr.Error(err, "admin server failed") + return err + } + + return nil }) // start the proxy server. this is the server that // accepts, holds and forwards user requests - errGrp.Go(func() error { - defer ctxDone() - lggr.Info( - "starting the proxy server", - "port", - proxyPort, - ) - err := runProxyServer( - ctx, - lggr, - q, - waitFunc, - routingTable, - timeoutCfg, - proxyPort, - ) - lggr.Error(err, "proxy server failed") - return err + eg.Go(func() error { + lggr.Info("starting the proxy server", "port", proxyPort) + + if err := runProxyServer(ctx, lggr, q, waitFunc, routingTable, timeoutCfg, proxyPort); !util.IsIgnoredErr(err) { + lggr.Error(err, "proxy server failed") + return err + } + + return nil }) + build.PrintComponentInfo(lggr, "Interceptor") - // errGrp.Wait() should hang forever for healthy admin and proxy servers. - // if it returns an error, log and exit immediately. - waitErr := errGrp.Wait() - lggr.Error(waitErr, "error with interceptor") - os.Exit(1) + if err := eg.Wait(); err != nil && !errors.Is(err, context.Canceled) { + lggr.Error(err, "fatal error") + os.Exit(1) + } + + lggr.Info("Bye!") } func runAdminServer( diff --git a/interceptor/proxy_handlers_integration_test.go b/interceptor/proxy_handlers_integration_test.go index 3777a566..d4530534 100644 --- a/interceptor/proxy_handlers_integration_test.go +++ b/interceptor/proxy_handlers_integration_test.go @@ -13,7 +13,6 @@ import ( "time" "github.com/go-logr/logr" - "github.com/pkg/errors" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" @@ -360,7 +359,7 @@ func splitHostPort(hostPortStr string) (string, int, error) { host := spl[0] port, err := strconv.Atoi(spl[1]) if err != nil { - return "", 0, errors.Wrap(err, "port was invalid") + return "", 0, fmt.Errorf("port was invalid: %w", err) } return host, port, nil } diff --git a/operator/controllers/http/ping.go b/operator/controllers/http/ping.go index f2c4ce50..362a2aed 100644 --- a/operator/controllers/http/ping.go +++ b/operator/controllers/http/ping.go @@ -5,7 +5,6 @@ import ( "fmt" "net/http" - "github.com/pkg/errors" "golang.org/x/sync/errgroup" "sigs.k8s.io/controller-runtime/pkg/client" @@ -28,7 +27,7 @@ func pingInterceptors( k8s.EndpointsFuncForControllerClient(cl), ) if err != nil { - return errors.Wrap(err, "pingInterceptors") + return fmt.Errorf("pingInterceptors: %w", err) } errGrp, _ := errgroup.WithContext(ctx) for _, endpointURL := range endpointURLs { diff --git a/pkg/http/server.go b/pkg/http/server.go index de1beff7..501887ea 100644 --- a/pkg/http/server.go +++ b/pkg/http/server.go @@ -2,8 +2,9 @@ package http import ( "context" - "fmt" "net/http" + + "github.com/kedacore/http-add-on/pkg/util" ) func ServeContext(ctx context.Context, addr string, hdl http.Handler) error { @@ -14,9 +15,12 @@ func ServeContext(ctx context.Context, addr string, hdl http.Handler) error { go func() { <-ctx.Done() - if err := srv.Shutdown(ctx); err != nil { - fmt.Println("failed shutting down server:", err) + + if err := srv.Shutdown(context.Background()); err != nil { + logger := util.LoggerFromContext(ctx) + logger.Error(err, "failed shutting down server") } }() + return srv.ListenAndServe() } diff --git a/pkg/k8s/deployment_cache_informer.go b/pkg/k8s/deployment_cache_informer.go index 80ca6b5c..71674381 100644 --- a/pkg/k8s/deployment_cache_informer.go +++ b/pkg/k8s/deployment_cache_informer.go @@ -7,7 +7,6 @@ import ( "time" "github.com/go-logr/logr" - "github.com/pkg/errors" appsv1 "k8s.io/api/apps/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/watch" @@ -32,11 +31,8 @@ func (i *InformerBackedDeploymentCache) MarshalJSON() ([]byte, error) { return json.Marshal(&depls) } -func (i *InformerBackedDeploymentCache) Start(ctx context.Context) error { +func (i *InformerBackedDeploymentCache) Start(ctx context.Context) { i.deplInformer.Informer().Run(ctx.Done()) - return errors.Wrap( - ctx.Err(), "deployment cache informer was stopped", - ) } func (i *InformerBackedDeploymentCache) Get( diff --git a/pkg/k8s/endpoints.go b/pkg/k8s/endpoints.go index fc386c9a..c11a9b05 100644 --- a/pkg/k8s/endpoints.go +++ b/pkg/k8s/endpoints.go @@ -5,7 +5,6 @@ import ( "fmt" "net/url" - "github.com/pkg/errors" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" @@ -29,7 +28,7 @@ func EndpointsForService( ) ([]*url.URL, error) { endpoints, err := endpointsFn(ctx, ns, serviceName) if err != nil { - return nil, errors.Wrap(err, "pkg.k8s.EndpointsForService") + return nil, fmt.Errorf("pkg.k8s.EndpointsForService: %w", err) } ret := []*url.URL{} for _, subset := range endpoints.Subsets { diff --git a/pkg/net/dial_context.go b/pkg/net/dial_context.go index 3010465b..4b6a70bb 100644 --- a/pkg/net/dial_context.go +++ b/pkg/net/dial_context.go @@ -50,7 +50,7 @@ func DialContextWithRetry(coreDialer *net.Dialer, backoff wait.Backoff) DialCont select { case <-ctx.Done(): t.Stop() - return nil, fmt.Errorf("context timed out: %s", ctx.Err()) + return nil, fmt.Errorf("context timed out: %w", ctx.Err()) case <-t.C: t.Stop() } diff --git a/pkg/queue/queue_rpc.go b/pkg/queue/queue_rpc.go index 4ff3ed9b..53c6c464 100644 --- a/pkg/queue/queue_rpc.go +++ b/pkg/queue/queue_rpc.go @@ -7,7 +7,6 @@ import ( "net/url" "github.com/go-logr/logr" - "github.com/pkg/errors" ) const countsPath = "/queue" @@ -66,22 +65,12 @@ func GetCounts( interceptorURL.Path = countsPath resp, err := httpCl.Get(interceptorURL.String()) if err != nil { - errMsg := fmt.Sprintf( - "requesting the queue counts from %s", - interceptorURL.String(), - ) - return nil, errors.Wrap(err, errMsg) + return nil, fmt.Errorf("requesting the queue counts from %s: %w", interceptorURL.String(), err) } defer resp.Body.Close() counts := NewCounts() if err := json.NewDecoder(resp.Body).Decode(counts); err != nil { - return nil, errors.Wrap( - err, - fmt.Sprintf( - "decoding response from the interceptor at %s", - interceptorURL.String(), - ), - ) + return nil, fmt.Errorf("decoding response from the interceptor at %s: %w", interceptorURL.String(), err) } return counts, nil diff --git a/pkg/routing/table.go b/pkg/routing/table.go index a54ab167..2762d5df 100644 --- a/pkg/routing/table.go +++ b/pkg/routing/table.go @@ -2,11 +2,11 @@ package routing import ( "context" + "errors" "net/http" "sync" "time" - "github.com/pkg/errors" "golang.org/x/sync/errgroup" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/cache" diff --git a/pkg/util/errors.go b/pkg/util/errors.go new file mode 100644 index 00000000..9e448dee --- /dev/null +++ b/pkg/util/errors.go @@ -0,0 +1,25 @@ +package util + +import ( + "context" + "errors" + "net/http" +) + +var ( + ignoredErrs = []error{ + nil, + context.Canceled, + http.ErrServerClosed, + } +) + +func IsIgnoredErr(err error) bool { + for _, ignoredErr := range ignoredErrs { + if errors.Is(err, ignoredErr) { + return true + } + } + + return false +} diff --git a/scaler/main.go b/scaler/main.go index ccceb410..94b4b130 100644 --- a/scaler/main.go +++ b/scaler/main.go @@ -6,6 +6,7 @@ package main import ( "context" + "errors" "fmt" "log" "net" @@ -28,6 +29,7 @@ import ( "github.com/kedacore/http-add-on/pkg/build" "github.com/kedacore/http-add-on/pkg/k8s" pkglog "github.com/kedacore/http-add-on/pkg/log" + "github.com/kedacore/http-add-on/pkg/util" ) // +kubebuilder:rbac:groups="",namespace=keda,resources=configmaps,verbs=get;list;watch @@ -88,49 +90,57 @@ func main() { sharedInformerFactory := informers.NewSharedInformerFactory(httpCl, cfg.ConfigMapCacheRsyncPeriod) httpsoInformer := informershttpv1alpha1.New(sharedInformerFactory, "", nil).HTTPScaledObjects() - ctx, done := context.WithCancel( - context.Background(), - ) - defer done() + ctx := ctrl.SetupSignalHandler() + ctx = util.ContextWithLogger(ctx, lggr) - grp, ctx := errgroup.WithContext(ctx) + eg, ctx := errgroup.WithContext(ctx) // start the deployment informer - grp.Go(func() error { - defer done() - return deployInformer.Start(ctx) + eg.Go(func() error { + lggr.Info("starting the deployment informer") + + deployInformer.Start(ctx) + return nil }) // start the httpso informer - grp.Go(func() error { - defer done() + eg.Go(func() error { + lggr.Info("starting the httpso informer") + httpsoInformer.Informer().Run(ctx.Done()) - return ctx.Err() + return nil }) - grp.Go(func() error { - defer done() - return pinger.start( - ctx, - time.NewTicker(cfg.QueueTickDuration), - deployInformer, - ) + eg.Go(func() error { + lggr.Info("starting the queue pinger") + + if err := pinger.start(ctx, time.NewTicker(cfg.QueueTickDuration), deployInformer); !util.IsIgnoredErr(err) { + lggr.Error(err, "queue pinger failed") + return err + } + + return nil }) - grp.Go(func() error { - defer done() - return startGrpcServer( - ctx, - lggr, - grpcPort, - pinger, - httpsoInformer, - int64(targetPendingRequests), - ) + eg.Go(func() error { + lggr.Info("starting the grpc server") + + if err := startGrpcServer(ctx, lggr, grpcPort, pinger, httpsoInformer, int64(targetPendingRequests)); !util.IsIgnoredErr(err) { + lggr.Error(err, "grpc server failed") + return err + } + + return nil }) build.PrintComponentInfo(lggr, "Scaler") - lggr.Error(grp.Wait(), "one or more of the servers failed") + + if err := eg.Wait(); err != nil && !errors.Is(err, context.Canceled) { + lggr.Error(err, "fatal error") + os.Exit(1) + } + + lggr.Info("Bye!") } func startGrpcServer( @@ -172,7 +182,7 @@ func startGrpcServer( go func() { <-ctx.Done() - lis.Close() + grpcServer.GracefulStop() }() return grpcServer.Serve(lis) diff --git a/scaler/queue_pinger.go b/scaler/queue_pinger.go index f69a5fb6..6f139613 100644 --- a/scaler/queue_pinger.go +++ b/scaler/queue_pinger.go @@ -4,12 +4,12 @@ package main import ( "context" + "fmt" "net/http" "sync" "time" "github.com/go-logr/logr" - "github.com/pkg/errors" "golang.org/x/sync/errgroup" "github.com/kedacore/http-add-on/pkg/k8s" @@ -93,19 +93,13 @@ func (q *queuePinger) start( ctx.Err(), "context marked done. stopping queuePinger loop", ) - return errors.Wrap( - ctx.Err(), - "context marked done. stopping queuePinger loop", - ) + return fmt.Errorf("context marked done. stopping queuePinger loop: %w", ctx.Err()) // do our regularly scheduled work case <-ticker.C: err := q.fetchAndSaveCounts(ctx) if err != nil { lggr.Error(err, "getting request counts") - return errors.Wrap( - err, - "error getting request counts", - ) + return fmt.Errorf("error getting request counts: %w", err) } // handle changes to the interceptor fleet // Deployment