From 518c60d3ec4c4b0d6ee7b26aebb6525bd24b91af Mon Sep 17 00:00:00 2001 From: Jakub Dyszkiewicz Date: Wed, 25 Sep 2024 19:51:06 +0200 Subject: [PATCH] fix(xds): preserve deleted resources --- .../gateway/gateway_route_generator_test.go | 3 +- .../gateway/listener_generator_test.go | 3 +- pkg/xds/server/v3/components.go | 28 +++++--- pkg/xds/server/v3/reconcile.go | 65 +++++++++++++++---- pkg/xds/server/v3/reconcile_test.go | 12 ++-- pkg/xds/server/v3/resource_warming_forcer.go | 6 +- 6 files changed, 82 insertions(+), 35 deletions(-) diff --git a/pkg/plugins/runtime/gateway/gateway_route_generator_test.go b/pkg/plugins/runtime/gateway/gateway_route_generator_test.go index d9592bae482e..ccd94c00dc6a 100644 --- a/pkg/plugins/runtime/gateway/gateway_route_generator_test.go +++ b/pkg/plugins/runtime/gateway/gateway_route_generator_test.go @@ -3,6 +3,7 @@ package gateway_test import ( "context" "path" + "sync" "github.com/envoyproxy/go-control-plane/pkg/cache/v3" . "github.com/onsi/ginkgo/v2" @@ -41,7 +42,7 @@ var _ = Describe("Gateway Route", func() { if err != nil { return nil, err } - reconciler := xds_server.DefaultReconciler(rt, serverCtx, statsCallbacks) + reconciler := xds_server.DefaultReconciler(rt, serverCtx, statsCallbacks, &sync.Mutex{}) // We expect there to be a Dataplane fixture named // "default" in the current mesh. diff --git a/pkg/plugins/runtime/gateway/listener_generator_test.go b/pkg/plugins/runtime/gateway/listener_generator_test.go index c87dedfbee40..dce960c81322 100644 --- a/pkg/plugins/runtime/gateway/listener_generator_test.go +++ b/pkg/plugins/runtime/gateway/listener_generator_test.go @@ -3,6 +3,7 @@ package gateway_test import ( "context" "path" + "sync" "github.com/envoyproxy/go-control-plane/pkg/cache/v3" . "github.com/onsi/ginkgo/v2" @@ -26,7 +27,7 @@ var _ = Describe("Gateway Listener", func() { if err != nil { return nil, err } - reconciler := xds_server.DefaultReconciler(rt, serverCtx, statsCallbacks) + reconciler := xds_server.DefaultReconciler(rt, serverCtx, statsCallbacks, &sync.Mutex{}) Expect(StoreInlineFixture(rt, []byte(gateway))).To(Succeed()) diff --git a/pkg/xds/server/v3/components.go b/pkg/xds/server/v3/components.go index 1dc2b5cca1eb..4ad2e77fb39c 100644 --- a/pkg/xds/server/v3/components.go +++ b/pkg/xds/server/v3/components.go @@ -2,6 +2,7 @@ package v3 import ( "context" + "sync" "time" envoy_service_discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" @@ -39,9 +40,10 @@ func RegisterXDS( authCallbacks := auth.NewCallbacks(rt.ReadOnlyResourceManager(), authenticator, auth.DPNotFoundRetry{}) // no need to retry on DP Not Found because we are creating DP in DataplaneLifecycle callback metadataTracker := xds_callbacks.NewDataplaneMetadataTracker() - reconciler := DefaultReconciler(rt, xdsContext, statsCallbacks) - ingressReconciler := DefaultIngressReconciler(rt, xdsContext, statsCallbacks) - egressReconciler := DefaultEgressReconciler(rt, xdsContext, statsCallbacks) + snapshotCacheMux := &sync.Mutex{} + reconciler := DefaultReconciler(rt, xdsContext, statsCallbacks, snapshotCacheMux) + ingressReconciler := DefaultIngressReconciler(rt, xdsContext, statsCallbacks, snapshotCacheMux) + egressReconciler := DefaultEgressReconciler(rt, xdsContext, statsCallbacks, snapshotCacheMux) watchdogFactory, err := xds_sync.DefaultDataplaneWatchdogFactory(rt, metadataTracker, reconciler, ingressReconciler, egressReconciler, xdsMetrics, envoyCpCtx, envoy_common.APIV3) if err != nil { return err @@ -58,7 +60,7 @@ func RegisterXDS( util_xds_v3.AdaptCallbacks(xds_callbacks.DataplaneCallbacksToXdsCallbacks(xds_callbacks.NewDataplaneSyncTracker(watchdogFactory.New))), util_xds_v3.AdaptCallbacks(DefaultDataplaneStatusTracker(rt, envoyCpCtx.Secrets)), util_xds_v3.AdaptCallbacks(xds_callbacks.NewNackBackoff(rt.Config().XdsServer.NACKBackoff.Duration)), - newResourceWarmingForcer(xdsContext.Cache(), xdsContext.Hasher()), + newResourceWarmingForcer(xdsContext.Cache(), xdsContext.Hasher(), snapshotCacheMux), } if cb := rt.XDS().ServerCallbacks; cb != nil { @@ -76,6 +78,7 @@ func DefaultReconciler( rt core_runtime.Runtime, xdsContext XdsContext, statsCallbacks util_xds.StatsCallbacks, + snapshotCacheMux *sync.Mutex, ) xds_sync.SnapshotReconciler { resolver := xds_template.SequentialResolver( &xds_template.SimpleProxyTemplateResolver{ @@ -89,8 +92,9 @@ func DefaultReconciler( ResourceSetHooks: rt.XDS().Hooks.ResourceSetHooks(), ProxyTemplateResolver: resolver, }, - cacher: &simpleSnapshotCacher{xdsContext.Hasher(), xdsContext.Cache()}, - statsCallbacks: statsCallbacks, + cacher: &simpleSnapshotCacher{xdsContext.Hasher(), xdsContext.Cache()}, + statsCallbacks: statsCallbacks, + snapshotCacheMux: snapshotCacheMux, } } @@ -98,6 +102,7 @@ func DefaultIngressReconciler( rt core_runtime.Runtime, xdsContext XdsContext, statsCallbacks util_xds.StatsCallbacks, + snapshotCacheMux *sync.Mutex, ) xds_sync.SnapshotReconciler { resolver := &xds_template.StaticProxyTemplateResolver{ Template: &mesh_proto.ProxyTemplate{ @@ -114,8 +119,9 @@ func DefaultIngressReconciler( ResourceSetHooks: rt.XDS().Hooks.ResourceSetHooks(), ProxyTemplateResolver: resolver, }, - cacher: &simpleSnapshotCacher{xdsContext.Hasher(), xdsContext.Cache()}, - statsCallbacks: statsCallbacks, + cacher: &simpleSnapshotCacher{xdsContext.Hasher(), xdsContext.Cache()}, + statsCallbacks: statsCallbacks, + snapshotCacheMux: snapshotCacheMux, } } @@ -123,6 +129,7 @@ func DefaultEgressReconciler( rt core_runtime.Runtime, xdsContext XdsContext, statsCallbacks util_xds.StatsCallbacks, + snapshotCacheMux *sync.Mutex, ) xds_sync.SnapshotReconciler { resolver := &xds_template.StaticProxyTemplateResolver{ Template: &mesh_proto.ProxyTemplate{ @@ -139,8 +146,9 @@ func DefaultEgressReconciler( ResourceSetHooks: rt.XDS().Hooks.ResourceSetHooks(), ProxyTemplateResolver: resolver, }, - cacher: &simpleSnapshotCacher{xdsContext.Hasher(), xdsContext.Cache()}, - statsCallbacks: statsCallbacks, + cacher: &simpleSnapshotCacher{xdsContext.Hasher(), xdsContext.Cache()}, + statsCallbacks: statsCallbacks, + snapshotCacheMux: snapshotCacheMux, } } diff --git a/pkg/xds/server/v3/reconcile.go b/pkg/xds/server/v3/reconcile.go index aeb2b59c92f1..db7abb8bee6a 100644 --- a/pkg/xds/server/v3/reconcile.go +++ b/pkg/xds/server/v3/reconcile.go @@ -2,6 +2,7 @@ package v3 import ( "context" + "sync" envoy_core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" envoy_types "github.com/envoyproxy/go-control-plane/pkg/cache/types" @@ -26,9 +27,10 @@ var reconcileLog = core.Log.WithName("xds").WithName("reconcile") var _ xds_sync.SnapshotReconciler = &reconciler{} type reconciler struct { - generator snapshotGenerator - cacher snapshotCacher - statsCallbacks util_xds.StatsCallbacks + generator snapshotGenerator + cacher snapshotCacher + statsCallbacks util_xds.StatsCallbacks + snapshotCacheMux *sync.Mutex } func (r *reconciler) Clear(proxyId *model.ProxyId) error { @@ -57,6 +59,17 @@ func (r *reconciler) Reconcile(ctx context.Context, xdsCtx xds_context.Context, return false, errors.Wrapf(err, "failed to generate a snapshot") } + for _, resources := range snapshot.Resources { + for name, resource := range resources.Items { + if err := validateResource(resource.Resource); err != nil { + return false, errors.Wrapf(err, "invalid resource %q", name) + } + } + } + + r.snapshotCacheMux.Lock() + defer r.snapshotCacheMux.Unlock() + // To avoid assigning a new version every time, compare with // the previous snapshot and reuse its version whenever possible, // fallback to UUID otherwise @@ -65,6 +78,12 @@ func (r *reconciler) Reconcile(ctx context.Context, xdsCtx xds_context.Context, previous = &envoy_cache.Snapshot{} } + preserveDeletedResources(snapshot, previous) + + if err := snapshot.Consistent(); err != nil { + return false, errors.Wrap(err, "inconsistent snapshot") + } + snapshot, changed := autoVersion(previous, snapshot) resKey := proxy.Id.ToResourceKey() @@ -79,17 +98,6 @@ func (r *reconciler) Reconcile(ctx context.Context, xdsCtx xds_context.Context, return false, nil } - for _, resources := range snapshot.Resources { - for name, resource := range resources.Items { - if err := validateResource(resource.Resource); err != nil { - return false, errors.Wrapf(err, "invalid resource %q", name) - } - } - } - - if err := snapshot.Consistent(); err != nil { - return false, errors.Wrap(err, "inconsistent snapshot") - } log.Info("config has changed", "versions", changed) if err := r.cacher.Cache(ctx, node, snapshot); err != nil { @@ -102,6 +110,35 @@ func (r *reconciler) Reconcile(ctx context.Context, xdsCtx xds_context.Context, return true, nil } +func preserveDeletedResources(snapshot *envoy_cache.Snapshot, previous *envoy_cache.Snapshot) { + snapshotEndpoints := snapshot.GetResources(envoy_resource.EndpointType) + // core.Log.Info("preserveDeletedResources", "previous", previous.GetResources(envoy_resource.EndpointType), "current", snapshotEndpoints) + for name, res := range previous.GetResources(envoy_resource.EndpointType) { + if _, ok := snapshotEndpoints[name]; !ok { + // core.Log.Info("preserving endpoints", "clusterName", name) + if snapshot.Resources[envoy_types.Endpoint].Items == nil { + snapshot.Resources[envoy_types.Endpoint].Items = map[string]envoy_types.ResourceWithTTL{} + } + snapshot.Resources[envoy_types.Endpoint].Items[name] = envoy_types.ResourceWithTTL{ + Resource: res, + } + } + } + + snapshotClusters := snapshot.GetResources(envoy_resource.ClusterType) + for name, res := range previous.GetResources(envoy_resource.ClusterType) { + if _, ok := snapshotClusters[name]; !ok { + // core.Log.Info("preserving cluster", "clusterName", name) + if snapshot.Resources[envoy_types.Cluster].Items == nil { + snapshot.Resources[envoy_types.Cluster].Items = map[string]envoy_types.ResourceWithTTL{} + } + snapshot.Resources[envoy_types.Cluster].Items[name] = envoy_types.ResourceWithTTL{ + Resource: res, + } + } + } +} + func validateResource(r envoy_types.Resource) error { switch v := r.(type) { // Newer go-control-plane versions have `ValidateAll()` method, that accumulates as many validation errors as possible. diff --git a/pkg/xds/server/v3/reconcile_test.go b/pkg/xds/server/v3/reconcile_test.go index 2225dd511f7f..60d1daaf7e2d 100644 --- a/pkg/xds/server/v3/reconcile_test.go +++ b/pkg/xds/server/v3/reconcile_test.go @@ -2,6 +2,7 @@ package v3 import ( "context" + "sync" envoy_cluster "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" envoy_core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" @@ -140,6 +141,7 @@ var _ = Describe("Reconcile", func() { }), &simpleSnapshotCacher{xdsContext.Hasher(), xdsContext.Cache()}, statsCallbacks, + &sync.Mutex{}, } // given @@ -227,14 +229,8 @@ var _ = Describe("Reconcile", func() { Not(Equal(routeV1)), Not(BeEmpty()), )) - Expect(snapshot.GetVersion(resource.ClusterType)).To(SatisfyAll( - Not(Equal(clusterV1)), - Not(BeEmpty()), - )) - Expect(snapshot.GetVersion(resource.EndpointType)).To(SatisfyAll( - Not(Equal(endpointV1)), - Not(BeEmpty()), - )) + Expect(snapshot.GetVersion(resource.ClusterType)).To(Equal(clusterV1)) + Expect(snapshot.GetVersion(resource.EndpointType)).To(Equal(endpointV1)) Expect(snapshot.GetVersion(resource.SecretType)).To(SatisfyAll( Not(Equal(secretV1)), Not(BeEmpty()), diff --git a/pkg/xds/server/v3/resource_warming_forcer.go b/pkg/xds/server/v3/resource_warming_forcer.go index 5064420faf53..6f43164f05c1 100644 --- a/pkg/xds/server/v3/resource_warming_forcer.go +++ b/pkg/xds/server/v3/resource_warming_forcer.go @@ -66,14 +66,16 @@ type resourceWarmingForcer struct { sync.Mutex lastEndpointNonces map[xds.StreamID]string nodeIDs map[xds.StreamID]string + snapshotCacheMux *sync.Mutex } -func newResourceWarmingForcer(cache envoy_cache.SnapshotCache, hasher envoy_cache.NodeHash) *resourceWarmingForcer { +func newResourceWarmingForcer(cache envoy_cache.SnapshotCache, hasher envoy_cache.NodeHash, snapshotCacheMux *sync.Mutex) *resourceWarmingForcer { return &resourceWarmingForcer{ cache: cache, hasher: hasher, lastEndpointNonces: map[xds.StreamID]string{}, nodeIDs: map[xds.StreamID]string{}, + snapshotCacheMux: snapshotCacheMux, } } @@ -115,6 +117,8 @@ func (r *resourceWarmingForcer) OnStreamRequest(streamID xds.StreamID, request * } func (r *resourceWarmingForcer) forceNewEndpointsVersion(nodeID string) error { + r.snapshotCacheMux.Lock() + defer r.snapshotCacheMux.Unlock() snapshot, err := r.cache.GetSnapshot(nodeID) if err != nil { return nil // GetSnapshot returns an error if there is no snapshot. We don't need to force on a new snapshot