Skip to content

Commit

Permalink
feat(kuma-dp): add a separate component to handle kuma-sidecar readin…
Browse files Browse the repository at this point in the history
…ess probes (backport of #11107) (#11238)
  • Loading branch information
kumahq[bot] committed Aug 30, 2024
1 parent 500bece commit e21c751
Show file tree
Hide file tree
Showing 26 changed files with 513 additions and 18 deletions.
14 changes: 14 additions & 0 deletions app/kuma-dp/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/kumahq/kuma/app/kuma-dp/pkg/dataplane/envoy"
"github.com/kumahq/kuma/app/kuma-dp/pkg/dataplane/meshmetrics"
"github.com/kumahq/kuma/app/kuma-dp/pkg/dataplane/metrics"
"github.com/kumahq/kuma/app/kuma-dp/pkg/dataplane/readiness"
kuma_cmd "github.com/kumahq/kuma/pkg/cmd"
"github.com/kumahq/kuma/pkg/config"
kumadp "github.com/kumahq/kuma/pkg/config/app/kuma-dp"
Expand Down Expand Up @@ -183,6 +184,7 @@ func newRunCmd(opts kuma_cmd.RunCmdOpts, rootCtx *RootContext) *cobra.Command {
bootstrap, kumaSidecarConfiguration, err := rootCtx.BootstrapGenerator(gracefulCtx, opts.Config.ControlPlane.URL, opts.Config, envoy.BootstrapParams{
Dataplane: opts.Dataplane,
DNSPort: cfg.DNS.EnvoyDNSPort,
ReadinessPort: cfg.Dataplane.ReadinessPort,
EnvoyVersion: *envoyVersion,
Workdir: cfg.DataplaneRuntime.SocketDir,
DynamicMetadata: rootCtx.BootstrapDynamicMetadata,
Expand Down Expand Up @@ -236,6 +238,15 @@ func newRunCmd(opts kuma_cmd.RunCmdOpts, rootCtx *RootContext) *cobra.Command {

observabilityComponents := setupObservability(kumaSidecarConfiguration, bootstrap, cfg)
components = append(components, observabilityComponents...)

var readinessReporter *readiness.Reporter
if cfg.Dataplane.ReadinessPort > 0 {
readinessReporter = readiness.NewReporter(
bootstrap.GetAdmin().GetAddress().GetSocketAddress().GetAddress(),
cfg.Dataplane.ReadinessPort)
components = append(components, readinessReporter)
}

if err := rootCtx.ComponentManager.Add(components...); err != nil {
return err
}
Expand Down Expand Up @@ -265,6 +276,9 @@ func newRunCmd(opts kuma_cmd.RunCmdOpts, rootCtx *RootContext) *cobra.Command {
if draining {
runLog.Info("already drained, exit immediately")
} else {
if readinessReporter != nil {
readinessReporter.Terminating()
}
runLog.Info("draining Envoy connections")
if err := envoyComponent.FailHealthchecks(); err != nil {
runLog.Error(err, "could not drain connections")
Expand Down
1 change: 1 addition & 0 deletions app/kuma-dp/pkg/dataplane/envoy/envoy.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ var runLog = core.Log.WithName("kuma-dp").WithName("run").WithName("envoy")
type BootstrapParams struct {
Dataplane rest.Resource
DNSPort uint32
ReadinessPort uint32
EnvoyVersion EnvoyVersion
DynamicMetadata map[string]string
Workdir string
Expand Down
1 change: 1 addition & 0 deletions app/kuma-dp/pkg/dataplane/envoy/remote_bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ func (b *remoteBootstrap) requestForBootstrap(ctx context.Context, client *http.
},
DynamicMetadata: params.DynamicMetadata,
DNSPort: params.DNSPort,
ReadinessPort: params.ReadinessPort,
OperatingSystem: b.operatingSystem,
Features: b.features,
Resources: resources,
Expand Down
111 changes: 111 additions & 0 deletions app/kuma-dp/pkg/dataplane/readiness/component.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package readiness

import (
"context"
"fmt"
"net"
"net/http"
"sync/atomic"
"time"

"github.com/asaskevich/govalidator"
"github.com/bakito/go-log-logr-adapter/adapter"

"github.com/kumahq/kuma/pkg/core"
"github.com/kumahq/kuma/pkg/core/runtime/component"
)

const (
pathPrefixReady = "/ready"
stateReady = "READY"
stateTerminating = "TERMINATING"
)

// Reporter reports the health status of this Kuma Dataplane Proxy
type Reporter struct {
localListenAddr string
localListenPort uint32
isTerminating atomic.Bool
}

var logger = core.Log.WithName("readiness")

func NewReporter(localIPAddr string, localListenPort uint32) *Reporter {
return &Reporter{
localListenPort: localListenPort,
localListenAddr: localIPAddr,
}
}

func (r *Reporter) Start(stop <-chan struct{}) error {
protocol := "tcp"
addr := r.localListenAddr
if govalidator.IsIPv6(addr) {
protocol = "tcp6"
addr = fmt.Sprintf("[%s]", addr)
}
lis, err := net.Listen(protocol, fmt.Sprintf("%s:%d", addr, r.localListenPort))
if err != nil {
return err
}

defer func() {
_ = lis.Close()
}()

logger.Info("starting readiness reporter", "addr", lis.Addr().String())

mux := http.NewServeMux()
mux.HandleFunc(pathPrefixReady, r.handleReadiness)
server := &http.Server{
ReadHeaderTimeout: time.Second,
Handler: mux,
ErrorLog: adapter.ToStd(logger),
}

errCh := make(chan error)
go func() {
if err := server.Serve(lis); err != nil {
errCh <- err
}
}()

select {
case err := <-errCh:
return err
case <-stop:
logger.Info("stopping readiness reporter")
return server.Shutdown(context.Background())
}
}

func (r *Reporter) Terminating() {
r.isTerminating.Store(true)
}

func (r *Reporter) handleReadiness(writer http.ResponseWriter, req *http.Request) {
state := stateReady
stateHTTPStatus := http.StatusOK
if r.isTerminating.Load() {
state = stateTerminating
stateHTTPStatus = http.StatusServiceUnavailable
}

stateBytes := []byte(state)
writer.Header().Set("content-type", "text/plain")
writer.Header().Set("content-length", fmt.Sprintf("%d", len(stateBytes)))
writer.Header().Set("cache-control", "no-cache, max-age=0")
writer.Header().Set("x-powered-by", "kuma-dp")
writer.WriteHeader(stateHTTPStatus)
_, err := writer.Write(stateBytes)
logger.V(1).Info("responding readiness state", "state", state, "client", req.RemoteAddr)
if err != nil {
logger.Info("[WARNING] could not write response", "err", err)
}
}

func (r *Reporter) NeedLeaderElection() bool {
return false
}

var _ component.Component = &Reporter{}
15 changes: 11 additions & 4 deletions pkg/config/app/kuma-dp/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@ var DefaultConfig = func() Config {
},
},
Dataplane: Dataplane{
Mesh: "",
Name: "", // Dataplane name must be set explicitly
DrainTime: config_types.Duration{Duration: 30 * time.Second},
ProxyType: "dataplane",
Mesh: "",
Name: "", // Dataplane name must be set explicitly
DrainTime: config_types.Duration{Duration: 30 * time.Second},
ProxyType: "dataplane",
ReadinessPort: 9902,
},
DataplaneRuntime: DataplaneRuntime{
BinaryPath: "envoy",
Expand Down Expand Up @@ -132,6 +133,8 @@ type Dataplane struct {
ProxyType string `json:"proxyType,omitempty" envconfig:"kuma_dataplane_proxy_type"`
// Drain time for listeners.
DrainTime config_types.Duration `json:"drainTime,omitempty" envconfig:"kuma_dataplane_drain_time"`
// Port that exposes kuma-dp readiness status on localhost, set this value to 0 to provide readiness by "/ready" endpoint from Envoy adminAPI
ReadinessPort uint32 `json:"readinessPort,omitempty" envconfig:"kuma_readiness_port"`
}

func (d *Dataplane) PostProcess() error {
Expand Down Expand Up @@ -305,6 +308,10 @@ func (d *Dataplane) Validate() error {
errs = multierr.Append(errs, errors.Errorf(".DrainTime must be positive"))
}

if d.ReadinessPort > 65353 {
return errors.New(".ReadinessPort has to be in [0, 65353] range")
}

return errs
}

Expand Down
1 change: 1 addition & 0 deletions pkg/config/app/kuma-dp/testdata/default-config.golden.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ controlPlane:
dataplane:
drainTime: 30s
proxyType: dataplane
readinessPort: 9902
dataplaneRuntime:
binaryPath: envoy
dynamicConfiguration:
Expand Down
10 changes: 10 additions & 0 deletions pkg/core/xds/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ const (
// Supported Envoy node metadata fields.
FieldDataplaneAdminPort = "dataplane.admin.port"
FieldDataplaneAdminAddress = "dataplane.admin.address"
FieldDataplaneReadinessPort = "dataplane.readinessReporter.port"
FieldDataplaneDNSPort = "dataplane.dns.port"
FieldDataplaneDataplaneResource = "dataplane.resource"
FieldDynamicMetadata = "dynamicMetadata"
Expand Down Expand Up @@ -52,6 +53,7 @@ type DataplaneMetadata struct {
Resource model.Resource
AdminPort uint32
AdminAddress string
ReadinessPort uint32
DNSPort uint32
DynamicMetadata map[string]string
ProxyType mesh_proto.ProxyType
Expand Down Expand Up @@ -113,6 +115,13 @@ func (m *DataplaneMetadata) GetAdminPort() uint32 {
return m.AdminPort
}

func (m *DataplaneMetadata) GetReadinessPort() uint32 {
if m == nil {
return 0
}
return m.ReadinessPort
}

func (m *DataplaneMetadata) GetAdminAddress() string {
if m == nil {
return ""
Expand Down Expand Up @@ -154,6 +163,7 @@ func DataplaneMetadataFromXdsMetadata(xdsMetadata *structpb.Struct) *DataplaneMe
}
metadata.AdminPort = uint32Metadata(xdsMetadata, FieldDataplaneAdminPort)
metadata.AdminAddress = xdsMetadata.Fields[FieldDataplaneAdminAddress].GetStringValue()
metadata.ReadinessPort = uint32Metadata(xdsMetadata, FieldDataplaneReadinessPort)
metadata.DNSPort = uint32Metadata(xdsMetadata, FieldDataplaneDNSPort)
if value := xdsMetadata.Fields[FieldDataplaneDataplaneResource]; value != nil {
res, err := rest.YAML.UnmarshalCore([]byte(value.GetStringValue()))
Expand Down
12 changes: 9 additions & 3 deletions pkg/core/xds/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ var _ = Describe("DataplaneMetadataFromXdsMetadata", func() {
StringValue: "8000",
},
},
"dataplane.readinessReporter.port": {
Kind: &structpb.Value_StringValue{
StringValue: "9300",
},
},
"systemCaPath": {
Kind: &structpb.Value_StringValue{
StringValue: "/etc/certs/cert.pem",
Expand All @@ -50,9 +55,10 @@ var _ = Describe("DataplaneMetadataFromXdsMetadata", func() {
},
},
expected: xds.DataplaneMetadata{
AdminPort: 1234,
DNSPort: 8000,
SystemCaPath: "/etc/certs/cert.pem",
AdminPort: 1234,
DNSPort: 8000,
SystemCaPath: "/etc/certs/cert.pem",
ReadinessPort: 9300,
},
}),
Entry("should ignore dependencies version provided through metadata if version is not set at all", testCase{
Expand Down
1 change: 1 addition & 0 deletions pkg/xds/bootstrap/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ func (b *bootstrapGenerator) Generate(ctx context.Context, request types.Bootstr
},
DynamicMetadata: request.DynamicMetadata,
DNSPort: request.DNSPort,
ReadinessPort: request.ReadinessPort,
ProxyType: request.ProxyType,
Features: request.Features,
Resources: request.Resources,
Expand Down
1 change: 1 addition & 0 deletions pkg/xds/bootstrap/parameters.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type configParameters struct {
Service string
AdminAddress string
AdminPort uint32
ReadinessPort uint32
AdminAccessLogPath string
XdsHost string
XdsPort uint32
Expand Down
3 changes: 3 additions & 0 deletions pkg/xds/bootstrap/template_v3.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,9 @@ func genConfig(parameters configParameters, proxyConfig xds.Proxy, enableReloada
if parameters.DNSPort != 0 {
res.Node.Metadata.Fields[core_xds.FieldDataplaneDNSPort] = util_proto.MustNewValueForStruct(strconv.Itoa(int(parameters.DNSPort)))
}
if parameters.ReadinessPort != 0 {
res.Node.Metadata.Fields[core_xds.FieldDataplaneReadinessPort] = util_proto.MustNewValueForStruct(strconv.Itoa(int(parameters.ReadinessPort)))
}
if parameters.ProxyType != "" {
res.Node.Metadata.Fields[core_xds.FieldDataplaneProxyType] = util_proto.MustNewValueForStruct(parameters.ProxyType)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/xds/bootstrap/types/bootstrap_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type BootstrapRequest struct {
CaCert string `json:"caCert"`
DynamicMetadata map[string]string `json:"dynamicMetadata"`
DNSPort uint32 `json:"dnsPort,omitempty"`
ReadinessPort uint32 `json:"readinessPort,omitempty"`
OperatingSystem string `json:"operatingSystem"`
Features []string `json:"features"`
Resources ProxyResources `json:"resources"`
Expand Down
4 changes: 4 additions & 0 deletions pkg/xds/envoy/names/resource_names.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ func GetMetricsHijackerClusterName() string {
return Join("kuma", "metrics", "hijacker")
}

func GetDPPReadinessClusterName() string {
return Join("kuma", "readiness")
}

func GetInternalClusterNamePrefix() string {
return "_"
}
Expand Down
Loading

0 comments on commit e21c751

Please sign in to comment.