Skip to content

Commit

Permalink
fix(xds): preserve deleted resources
Browse files Browse the repository at this point in the history
  • Loading branch information
jakubdyszkiewicz committed Sep 25, 2024
1 parent 9c5165b commit 518c60d
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 35 deletions.
3 changes: 2 additions & 1 deletion pkg/plugins/runtime/gateway/gateway_route_generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package gateway_test
import (
"context"
"path"
"sync"

"github.com/envoyproxy/go-control-plane/pkg/cache/v3"
. "github.com/onsi/ginkgo/v2"
Expand Down Expand Up @@ -41,7 +42,7 @@ var _ = Describe("Gateway Route", func() {
if err != nil {
return nil, err
}
reconciler := xds_server.DefaultReconciler(rt, serverCtx, statsCallbacks)
reconciler := xds_server.DefaultReconciler(rt, serverCtx, statsCallbacks, &sync.Mutex{})

// We expect there to be a Dataplane fixture named
// "default" in the current mesh.
Expand Down
3 changes: 2 additions & 1 deletion pkg/plugins/runtime/gateway/listener_generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package gateway_test
import (
"context"
"path"
"sync"

"github.com/envoyproxy/go-control-plane/pkg/cache/v3"
. "github.com/onsi/ginkgo/v2"
Expand All @@ -26,7 +27,7 @@ var _ = Describe("Gateway Listener", func() {
if err != nil {
return nil, err
}
reconciler := xds_server.DefaultReconciler(rt, serverCtx, statsCallbacks)
reconciler := xds_server.DefaultReconciler(rt, serverCtx, statsCallbacks, &sync.Mutex{})

Expect(StoreInlineFixture(rt, []byte(gateway))).To(Succeed())

Expand Down
28 changes: 18 additions & 10 deletions pkg/xds/server/v3/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package v3

import (
"context"
"sync"
"time"

envoy_service_discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
Expand Down Expand Up @@ -39,9 +40,10 @@ func RegisterXDS(
authCallbacks := auth.NewCallbacks(rt.ReadOnlyResourceManager(), authenticator, auth.DPNotFoundRetry{}) // no need to retry on DP Not Found because we are creating DP in DataplaneLifecycle callback

metadataTracker := xds_callbacks.NewDataplaneMetadataTracker()
reconciler := DefaultReconciler(rt, xdsContext, statsCallbacks)
ingressReconciler := DefaultIngressReconciler(rt, xdsContext, statsCallbacks)
egressReconciler := DefaultEgressReconciler(rt, xdsContext, statsCallbacks)
snapshotCacheMux := &sync.Mutex{}
reconciler := DefaultReconciler(rt, xdsContext, statsCallbacks, snapshotCacheMux)
ingressReconciler := DefaultIngressReconciler(rt, xdsContext, statsCallbacks, snapshotCacheMux)
egressReconciler := DefaultEgressReconciler(rt, xdsContext, statsCallbacks, snapshotCacheMux)
watchdogFactory, err := xds_sync.DefaultDataplaneWatchdogFactory(rt, metadataTracker, reconciler, ingressReconciler, egressReconciler, xdsMetrics, envoyCpCtx, envoy_common.APIV3)
if err != nil {
return err
Expand All @@ -58,7 +60,7 @@ func RegisterXDS(
util_xds_v3.AdaptCallbacks(xds_callbacks.DataplaneCallbacksToXdsCallbacks(xds_callbacks.NewDataplaneSyncTracker(watchdogFactory.New))),
util_xds_v3.AdaptCallbacks(DefaultDataplaneStatusTracker(rt, envoyCpCtx.Secrets)),
util_xds_v3.AdaptCallbacks(xds_callbacks.NewNackBackoff(rt.Config().XdsServer.NACKBackoff.Duration)),
newResourceWarmingForcer(xdsContext.Cache(), xdsContext.Hasher()),
newResourceWarmingForcer(xdsContext.Cache(), xdsContext.Hasher(), snapshotCacheMux),
}

if cb := rt.XDS().ServerCallbacks; cb != nil {
Expand All @@ -76,6 +78,7 @@ func DefaultReconciler(
rt core_runtime.Runtime,
xdsContext XdsContext,
statsCallbacks util_xds.StatsCallbacks,
snapshotCacheMux *sync.Mutex,
) xds_sync.SnapshotReconciler {
resolver := xds_template.SequentialResolver(
&xds_template.SimpleProxyTemplateResolver{
Expand All @@ -89,15 +92,17 @@ func DefaultReconciler(
ResourceSetHooks: rt.XDS().Hooks.ResourceSetHooks(),
ProxyTemplateResolver: resolver,
},
cacher: &simpleSnapshotCacher{xdsContext.Hasher(), xdsContext.Cache()},
statsCallbacks: statsCallbacks,
cacher: &simpleSnapshotCacher{xdsContext.Hasher(), xdsContext.Cache()},
statsCallbacks: statsCallbacks,
snapshotCacheMux: snapshotCacheMux,
}
}

func DefaultIngressReconciler(
rt core_runtime.Runtime,
xdsContext XdsContext,
statsCallbacks util_xds.StatsCallbacks,
snapshotCacheMux *sync.Mutex,
) xds_sync.SnapshotReconciler {
resolver := &xds_template.StaticProxyTemplateResolver{
Template: &mesh_proto.ProxyTemplate{
Expand All @@ -114,15 +119,17 @@ func DefaultIngressReconciler(
ResourceSetHooks: rt.XDS().Hooks.ResourceSetHooks(),
ProxyTemplateResolver: resolver,
},
cacher: &simpleSnapshotCacher{xdsContext.Hasher(), xdsContext.Cache()},
statsCallbacks: statsCallbacks,
cacher: &simpleSnapshotCacher{xdsContext.Hasher(), xdsContext.Cache()},
statsCallbacks: statsCallbacks,
snapshotCacheMux: snapshotCacheMux,
}
}

func DefaultEgressReconciler(
rt core_runtime.Runtime,
xdsContext XdsContext,
statsCallbacks util_xds.StatsCallbacks,
snapshotCacheMux *sync.Mutex,
) xds_sync.SnapshotReconciler {
resolver := &xds_template.StaticProxyTemplateResolver{
Template: &mesh_proto.ProxyTemplate{
Expand All @@ -139,8 +146,9 @@ func DefaultEgressReconciler(
ResourceSetHooks: rt.XDS().Hooks.ResourceSetHooks(),
ProxyTemplateResolver: resolver,
},
cacher: &simpleSnapshotCacher{xdsContext.Hasher(), xdsContext.Cache()},
statsCallbacks: statsCallbacks,
cacher: &simpleSnapshotCacher{xdsContext.Hasher(), xdsContext.Cache()},
statsCallbacks: statsCallbacks,
snapshotCacheMux: snapshotCacheMux,
}
}

Expand Down
65 changes: 51 additions & 14 deletions pkg/xds/server/v3/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package v3

import (
"context"
"sync"

envoy_core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
envoy_types "github.com/envoyproxy/go-control-plane/pkg/cache/types"
Expand All @@ -26,9 +27,10 @@ var reconcileLog = core.Log.WithName("xds").WithName("reconcile")
var _ xds_sync.SnapshotReconciler = &reconciler{}

type reconciler struct {
generator snapshotGenerator
cacher snapshotCacher
statsCallbacks util_xds.StatsCallbacks
generator snapshotGenerator
cacher snapshotCacher
statsCallbacks util_xds.StatsCallbacks
snapshotCacheMux *sync.Mutex
}

func (r *reconciler) Clear(proxyId *model.ProxyId) error {
Expand Down Expand Up @@ -57,6 +59,17 @@ func (r *reconciler) Reconcile(ctx context.Context, xdsCtx xds_context.Context,
return false, errors.Wrapf(err, "failed to generate a snapshot")
}

for _, resources := range snapshot.Resources {
for name, resource := range resources.Items {
if err := validateResource(resource.Resource); err != nil {
return false, errors.Wrapf(err, "invalid resource %q", name)
}
}
}

r.snapshotCacheMux.Lock()
defer r.snapshotCacheMux.Unlock()

// To avoid assigning a new version every time, compare with
// the previous snapshot and reuse its version whenever possible,
// fallback to UUID otherwise
Expand All @@ -65,6 +78,12 @@ func (r *reconciler) Reconcile(ctx context.Context, xdsCtx xds_context.Context,
previous = &envoy_cache.Snapshot{}
}

preserveDeletedResources(snapshot, previous)

if err := snapshot.Consistent(); err != nil {
return false, errors.Wrap(err, "inconsistent snapshot")
}

snapshot, changed := autoVersion(previous, snapshot)

resKey := proxy.Id.ToResourceKey()
Expand All @@ -79,17 +98,6 @@ func (r *reconciler) Reconcile(ctx context.Context, xdsCtx xds_context.Context,
return false, nil
}

for _, resources := range snapshot.Resources {
for name, resource := range resources.Items {
if err := validateResource(resource.Resource); err != nil {
return false, errors.Wrapf(err, "invalid resource %q", name)
}
}
}

if err := snapshot.Consistent(); err != nil {
return false, errors.Wrap(err, "inconsistent snapshot")
}
log.Info("config has changed", "versions", changed)

if err := r.cacher.Cache(ctx, node, snapshot); err != nil {
Expand All @@ -102,6 +110,35 @@ func (r *reconciler) Reconcile(ctx context.Context, xdsCtx xds_context.Context,
return true, nil
}

func preserveDeletedResources(snapshot *envoy_cache.Snapshot, previous *envoy_cache.Snapshot) {
snapshotEndpoints := snapshot.GetResources(envoy_resource.EndpointType)
// core.Log.Info("preserveDeletedResources", "previous", previous.GetResources(envoy_resource.EndpointType), "current", snapshotEndpoints)
for name, res := range previous.GetResources(envoy_resource.EndpointType) {
if _, ok := snapshotEndpoints[name]; !ok {
// core.Log.Info("preserving endpoints", "clusterName", name)
if snapshot.Resources[envoy_types.Endpoint].Items == nil {
snapshot.Resources[envoy_types.Endpoint].Items = map[string]envoy_types.ResourceWithTTL{}
}
snapshot.Resources[envoy_types.Endpoint].Items[name] = envoy_types.ResourceWithTTL{
Resource: res,
}
}
}

snapshotClusters := snapshot.GetResources(envoy_resource.ClusterType)
for name, res := range previous.GetResources(envoy_resource.ClusterType) {
if _, ok := snapshotClusters[name]; !ok {
// core.Log.Info("preserving cluster", "clusterName", name)
if snapshot.Resources[envoy_types.Cluster].Items == nil {
snapshot.Resources[envoy_types.Cluster].Items = map[string]envoy_types.ResourceWithTTL{}
}
snapshot.Resources[envoy_types.Cluster].Items[name] = envoy_types.ResourceWithTTL{
Resource: res,
}
}
}
}

func validateResource(r envoy_types.Resource) error {
switch v := r.(type) {
// Newer go-control-plane versions have `ValidateAll()` method, that accumulates as many validation errors as possible.
Expand Down
12 changes: 4 additions & 8 deletions pkg/xds/server/v3/reconcile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package v3

import (
"context"
"sync"

envoy_cluster "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
envoy_core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
Expand Down Expand Up @@ -140,6 +141,7 @@ var _ = Describe("Reconcile", func() {
}),
&simpleSnapshotCacher{xdsContext.Hasher(), xdsContext.Cache()},
statsCallbacks,
&sync.Mutex{},
}

// given
Expand Down Expand Up @@ -227,14 +229,8 @@ var _ = Describe("Reconcile", func() {
Not(Equal(routeV1)),
Not(BeEmpty()),
))
Expect(snapshot.GetVersion(resource.ClusterType)).To(SatisfyAll(
Not(Equal(clusterV1)),
Not(BeEmpty()),
))
Expect(snapshot.GetVersion(resource.EndpointType)).To(SatisfyAll(
Not(Equal(endpointV1)),
Not(BeEmpty()),
))
Expect(snapshot.GetVersion(resource.ClusterType)).To(Equal(clusterV1))
Expect(snapshot.GetVersion(resource.EndpointType)).To(Equal(endpointV1))
Expect(snapshot.GetVersion(resource.SecretType)).To(SatisfyAll(
Not(Equal(secretV1)),
Not(BeEmpty()),
Expand Down
6 changes: 5 additions & 1 deletion pkg/xds/server/v3/resource_warming_forcer.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,16 @@ type resourceWarmingForcer struct {
sync.Mutex
lastEndpointNonces map[xds.StreamID]string
nodeIDs map[xds.StreamID]string
snapshotCacheMux *sync.Mutex
}

func newResourceWarmingForcer(cache envoy_cache.SnapshotCache, hasher envoy_cache.NodeHash) *resourceWarmingForcer {
func newResourceWarmingForcer(cache envoy_cache.SnapshotCache, hasher envoy_cache.NodeHash, snapshotCacheMux *sync.Mutex) *resourceWarmingForcer {
return &resourceWarmingForcer{
cache: cache,
hasher: hasher,
lastEndpointNonces: map[xds.StreamID]string{},
nodeIDs: map[xds.StreamID]string{},
snapshotCacheMux: snapshotCacheMux,
}
}

Expand Down Expand Up @@ -115,6 +117,8 @@ func (r *resourceWarmingForcer) OnStreamRequest(streamID xds.StreamID, request *
}

func (r *resourceWarmingForcer) forceNewEndpointsVersion(nodeID string) error {
r.snapshotCacheMux.Lock()
defer r.snapshotCacheMux.Unlock()
snapshot, err := r.cache.GetSnapshot(nodeID)
if err != nil {
return nil // GetSnapshot returns an error if there is no snapshot. We don't need to force on a new snapshot
Expand Down

0 comments on commit 518c60d

Please sign in to comment.