Skip to content

Commit

Permalink
Fix Graceful Shutdown of Interceptor and Scaler (#732)
Browse files Browse the repository at this point in the history
Co-authored-by: Tom Kerkhove <[email protected]>
  • Loading branch information
t0rr3sp3dr0 and tomkerkhove committed Jul 14, 2023
1 parent ecd4a41 commit a3039f6
Show file tree
Hide file tree
Showing 15 changed files with 134 additions and 122 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ This changelog keeps track of work items that have been completed and are ready
- **Scaler**: remplement custom interceptor metrics ([#718](https://github.com/kedacore/http-add-on/issues/718))
- **Operator**: Remove ScaledObject `name` & `app` custom labels ([#717](https://github.com/kedacore/http-add-on/issues/717))
- **Interceptor**: fatal error: concurrent map iteration and map write ([#726](https://github.com/kedacore/http-add-on/issues/726))
- **Interceptor**: Provide graceful shutdown for http servers on SIGINT and SIGTERM ([#731](https://github.com/kedacore/http-add-on/issues/731))
- **Scaler**: Provide graceful shutdown for grpc server on SIGINT and SIGTERM ([#731](https://github.com/kedacore/http-add-on/issues/731))

### Deprecations

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ require (
github.com/kelseyhightower/envconfig v1.4.0
github.com/onsi/ginkgo/v2 v2.11.0
github.com/onsi/gomega v1.27.8
github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.8.4
github.com/tj/assert v0.0.3
go.uber.org/zap v1.24.0
Expand Down Expand Up @@ -56,6 +55,7 @@ require (
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_golang v1.16.0 // indirect
github.com/prometheus/client_model v0.4.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions interceptor/forward_wait_func.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func newDeployReplicasForwardWaitFunc(
if err != nil {
// if we didn't get the initial deployment state, bail out
return 0, fmt.Errorf(
"error getting state for deployment %s/%s (%s)",
"error getting state for deployment %s/%s: %w",
deployNS,
deployName,
err,
Expand All @@ -62,7 +62,7 @@ func newDeployReplicasForwardWaitFunc(
// otherwise, if the context is marked done before
// we're done waiting, fail.
return 0, fmt.Errorf(
"context marked done while waiting for deployment %s to reach > 0 replicas (%w)",
"context marked done while waiting for deployment %s to reach > 0 replicas: %w",
deployName,
ctx.Err(),
)
Expand Down
97 changes: 46 additions & 51 deletions interceptor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"context"
"errors"
"fmt"
"math/rand"
"net/http"
Expand Down Expand Up @@ -41,14 +42,14 @@ func main() {
fmt.Println("Error building logger", err)
os.Exit(1)
}

timeoutCfg := config.MustParseTimeouts()
servingCfg := config.MustParseServing()
if err := config.Validate(*servingCfg, *timeoutCfg); err != nil {
lggr.Error(err, "invalid configuration")
os.Exit(1)
}
ctx := util.ContextWithLogger(context.Background(), lggr)
ctx, ctxDone := context.WithCancel(ctx)

lggr.Info(
"starting interceptor",
"timeoutConfig",
Expand Down Expand Up @@ -94,73 +95,67 @@ func main() {

lggr.Info("Interceptor starting")

errGrp, ctx := errgroup.WithContext(ctx)
ctx := ctrl.SetupSignalHandler()
ctx = util.ContextWithLogger(ctx, lggr)

eg, ctx := errgroup.WithContext(ctx)

// start the deployment cache updater
errGrp.Go(func() error {
defer ctxDone()
err := deployCache.Start(ctx)
lggr.Error(err, "deployment cache watcher failed")
return err
eg.Go(func() error {
lggr.Info("starting the deployment cache")

deployCache.Start(ctx)
return nil
})

// start the update loop that updates the routing table from
// the ConfigMap that the operator updates as HTTPScaledObjects
// enter and exit the system
errGrp.Go(func() error {
defer ctxDone()
err := routingTable.Start(ctx)
lggr.Error(err, "config map routing table updater failed")
return err
eg.Go(func() error {
lggr.Info("starting the routing table")

if err := routingTable.Start(ctx); !util.IsIgnoredErr(err) {
lggr.Error(err, "routing table failed")
return err
}

return nil
})

// start the administrative server. this is the server
// that serves the queue size API
errGrp.Go(func() error {
defer ctxDone()
lggr.Info(
"starting the admin server",
"port",
adminPort,
)
err := runAdminServer(
ctx,
lggr,
q,
adminPort,
)
lggr.Error(err, "admin server failed")
return err
eg.Go(func() error {
lggr.Info("starting the admin server", "port", adminPort)

if err := runAdminServer(ctx, lggr, q, adminPort); !util.IsIgnoredErr(err) {
lggr.Error(err, "admin server failed")
return err
}

return nil
})

// start the proxy server. this is the server that
// accepts, holds and forwards user requests
errGrp.Go(func() error {
defer ctxDone()
lggr.Info(
"starting the proxy server",
"port",
proxyPort,
)
err := runProxyServer(
ctx,
lggr,
q,
waitFunc,
routingTable,
timeoutCfg,
proxyPort,
)
lggr.Error(err, "proxy server failed")
return err
eg.Go(func() error {
lggr.Info("starting the proxy server", "port", proxyPort)

if err := runProxyServer(ctx, lggr, q, waitFunc, routingTable, timeoutCfg, proxyPort); !util.IsIgnoredErr(err) {
lggr.Error(err, "proxy server failed")
return err
}

return nil
})

build.PrintComponentInfo(lggr, "Interceptor")

// errGrp.Wait() should hang forever for healthy admin and proxy servers.
// if it returns an error, log and exit immediately.
waitErr := errGrp.Wait()
lggr.Error(waitErr, "error with interceptor")
os.Exit(1)
if err := eg.Wait(); err != nil && !errors.Is(err, context.Canceled) {
lggr.Error(err, "fatal error")
os.Exit(1)
}

lggr.Info("Bye!")
}

func runAdminServer(
Expand Down
3 changes: 1 addition & 2 deletions interceptor/proxy_handlers_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"time"

"github.com/go-logr/logr"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -360,7 +359,7 @@ func splitHostPort(hostPortStr string) (string, int, error) {
host := spl[0]
port, err := strconv.Atoi(spl[1])
if err != nil {
return "", 0, errors.Wrap(err, "port was invalid")
return "", 0, fmt.Errorf("port was invalid: %w", err)
}
return host, port, nil
}
3 changes: 1 addition & 2 deletions operator/controllers/http/ping.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"net/http"

"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
"sigs.k8s.io/controller-runtime/pkg/client"

Expand All @@ -28,7 +27,7 @@ func pingInterceptors(
k8s.EndpointsFuncForControllerClient(cl),
)
if err != nil {
return errors.Wrap(err, "pingInterceptors")
return fmt.Errorf("pingInterceptors: %w", err)
}
errGrp, _ := errgroup.WithContext(ctx)
for _, endpointURL := range endpointURLs {
Expand Down
10 changes: 7 additions & 3 deletions pkg/http/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ package http

import (
"context"
"fmt"
"net/http"

"github.com/kedacore/http-add-on/pkg/util"
)

func ServeContext(ctx context.Context, addr string, hdl http.Handler) error {
Expand All @@ -14,9 +15,12 @@ func ServeContext(ctx context.Context, addr string, hdl http.Handler) error {

go func() {
<-ctx.Done()
if err := srv.Shutdown(ctx); err != nil {
fmt.Println("failed shutting down server:", err)

if err := srv.Shutdown(context.Background()); err != nil {
logger := util.LoggerFromContext(ctx)
logger.Error(err, "failed shutting down server")
}
}()

return srv.ListenAndServe()
}
6 changes: 1 addition & 5 deletions pkg/k8s/deployment_cache_informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"time"

"github.com/go-logr/logr"
"github.com/pkg/errors"
appsv1 "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/watch"
Expand All @@ -32,11 +31,8 @@ func (i *InformerBackedDeploymentCache) MarshalJSON() ([]byte, error) {
return json.Marshal(&depls)
}

func (i *InformerBackedDeploymentCache) Start(ctx context.Context) error {
func (i *InformerBackedDeploymentCache) Start(ctx context.Context) {
i.deplInformer.Informer().Run(ctx.Done())
return errors.Wrap(
ctx.Err(), "deployment cache informer was stopped",
)
}

func (i *InformerBackedDeploymentCache) Get(
Expand Down
3 changes: 1 addition & 2 deletions pkg/k8s/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"net/url"

"github.com/pkg/errors"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
Expand All @@ -29,7 +28,7 @@ func EndpointsForService(
) ([]*url.URL, error) {
endpoints, err := endpointsFn(ctx, ns, serviceName)
if err != nil {
return nil, errors.Wrap(err, "pkg.k8s.EndpointsForService")
return nil, fmt.Errorf("pkg.k8s.EndpointsForService: %w", err)
}
ret := []*url.URL{}
for _, subset := range endpoints.Subsets {
Expand Down
2 changes: 1 addition & 1 deletion pkg/net/dial_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func DialContextWithRetry(coreDialer *net.Dialer, backoff wait.Backoff) DialCont
select {
case <-ctx.Done():
t.Stop()
return nil, fmt.Errorf("context timed out: %s", ctx.Err())
return nil, fmt.Errorf("context timed out: %w", ctx.Err())
case <-t.C:
t.Stop()
}
Expand Down
15 changes: 2 additions & 13 deletions pkg/queue/queue_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"net/url"

"github.com/go-logr/logr"
"github.com/pkg/errors"
)

const countsPath = "/queue"
Expand Down Expand Up @@ -66,22 +65,12 @@ func GetCounts(
interceptorURL.Path = countsPath
resp, err := httpCl.Get(interceptorURL.String())
if err != nil {
errMsg := fmt.Sprintf(
"requesting the queue counts from %s",
interceptorURL.String(),
)
return nil, errors.Wrap(err, errMsg)
return nil, fmt.Errorf("requesting the queue counts from %s: %w", interceptorURL.String(), err)
}
defer resp.Body.Close()
counts := NewCounts()
if err := json.NewDecoder(resp.Body).Decode(counts); err != nil {
return nil, errors.Wrap(
err,
fmt.Sprintf(
"decoding response from the interceptor at %s",
interceptorURL.String(),
),
)
return nil, fmt.Errorf("decoding response from the interceptor at %s: %w", interceptorURL.String(), err)
}

return counts, nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/routing/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ package routing

import (
"context"
"errors"
"net/http"
"sync"
"time"

"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/cache"
Expand Down
25 changes: 25 additions & 0 deletions pkg/util/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package util

import (
"context"
"errors"
"net/http"
)

var (
ignoredErrs = []error{
nil,
context.Canceled,
http.ErrServerClosed,
}
)

func IsIgnoredErr(err error) bool {
for _, ignoredErr := range ignoredErrs {
if errors.Is(err, ignoredErr) {
return true
}
}

return false
}
Loading

0 comments on commit a3039f6

Please sign in to comment.