From 55a55dd40e9b90b754e7558b04d279c97f44f789 Mon Sep 17 00:00:00 2001 From: AiRanthem Date: Fri, 27 Sep 2024 16:20:11 +0800 Subject: [PATCH] fix conversation Signed-off-by: AiRanthem --- apis/apps/v1alpha1/uniteddeployment_types.go | 11 ++++- .../uniteddeployment/adapter/adapter_util.go | 6 +-- pkg/controller/uniteddeployment/allocator.go | 4 +- .../uniteddeployment/allocator_test.go | 7 ++++ .../uniteddeployment_controller.go | 13 +++--- .../uniteddeployment_controller_utils.go | 2 +- .../resource_version_expectation.go | 42 ------------------- 7 files changed, 29 insertions(+), 56 deletions(-) diff --git a/apis/apps/v1alpha1/uniteddeployment_types.go b/apis/apps/v1alpha1/uniteddeployment_types.go index 51dddea478..2616f541d5 100644 --- a/apis/apps/v1alpha1/uniteddeployment_types.go +++ b/apis/apps/v1alpha1/uniteddeployment_types.go @@ -340,8 +340,15 @@ func (s *UnitedDeploymentStatus) GetSubsetStatus(subset string) *UnitedDeploymen return &s.SubsetStatuses[i] } } - s.SubsetStatuses = append(s.SubsetStatuses, UnitedDeploymentSubsetStatus{Name: subset}) - return &s.SubsetStatuses[len(s.SubsetStatuses)-1] + return nil +} + +func (u *UnitedDeployment) InitSubsetStatuses() { + for _, subset := range u.Spec.Topology.Subsets { + if u.Status.GetSubsetStatus(subset.Name) == nil { + u.Status.SubsetStatuses = append(u.Status.SubsetStatuses, UnitedDeploymentSubsetStatus{Name: subset.Name}) + } + } } // UnitedDeploymentCondition describes current state of a UnitedDeployment. diff --git a/pkg/controller/uniteddeployment/adapter/adapter_util.go b/pkg/controller/uniteddeployment/adapter/adapter_util.go index 30b4faefd2..039a8aabe8 100644 --- a/pkg/controller/uniteddeployment/adapter/adapter_util.go +++ b/pkg/controller/uniteddeployment/adapter/adapter_util.go @@ -19,12 +19,12 @@ package adapter import ( "fmt" + appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/validation" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" podutil "k8s.io/kubernetes/pkg/api/v1/pod" - - appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" + "k8s.io/kubernetes/pkg/controller" ) func getSubsetPrefix(controllerName, subsetName string) string { @@ -104,7 +104,7 @@ func CalculateUpdatedReplicas(podList []*corev1.Pod, updatedRevision string) (up revision := getRevision(&pod.ObjectMeta) // Only count pods that are updated and are not terminating - if revision == updatedRevision && pod.GetDeletionTimestamp() == nil { + if revision == updatedRevision && controller.IsPodActive(pod) { updatedReplicas++ if podutil.IsPodReady(pod) { updatedReadyReplicas++ diff --git a/pkg/controller/uniteddeployment/allocator.go b/pkg/controller/uniteddeployment/allocator.go index 8c0192dfa3..32d2711de5 100644 --- a/pkg/controller/uniteddeployment/allocator.go +++ b/pkg/controller/uniteddeployment/allocator.go @@ -72,7 +72,7 @@ func NewReplicaAllocator(ud *appsv1alpha1.UnitedDeployment) ReplicaAllocator { // NotPendingReplicas refers to the number of Pods that an unschedulable subset can safely accommodate. // Exceeding this number may lead to scheduling failures within that subset. // This value is only effective in the Adaptive scheduling strategy. -func getNotPendingReplicasMap(nameToSubset *map[string]*Subset) map[string]int32 { +func getSubsetRunningReplicas(nameToSubset *map[string]*Subset) map[string]int32 { if nameToSubset == nil { return nil } @@ -292,7 +292,7 @@ func (ac *elasticAllocator) validateAndCalculateMinMaxMap(replicas int32, nameTo numSubset := len(ac.Spec.Topology.Subsets) minReplicasMap := make(map[string]int32, numSubset) maxReplicasMap := make(map[string]int32, numSubset) - notPendingReplicasMap := getNotPendingReplicasMap(nameToSubset) + notPendingReplicasMap := getSubsetRunningReplicas(nameToSubset) for index, subset := range ac.Spec.Topology.Subsets { minReplicas := int32(0) maxReplicas := int32(math.MaxInt32) diff --git a/pkg/controller/uniteddeployment/allocator_test.go b/pkg/controller/uniteddeployment/allocator_test.go index 1b933b7fc8..a1a5753be7 100644 --- a/pkg/controller/uniteddeployment/allocator_test.go +++ b/pkg/controller/uniteddeployment/allocator_test.go @@ -348,30 +348,37 @@ func TestAdaptiveElasticAllocator(t *testing.T) { } } cases := []struct { + name string totalReplicas, minReplicas, maxReplicas, pendingPods int32 subset1Replicas, subset2Replicas int32 }{ { + name: "5 pending pods > maxReplicas -> 0, 10", totalReplicas: 10, minReplicas: 2, maxReplicas: 4, pendingPods: 5, subset1Replicas: 0, subset2Replicas: 10, }, { + name: "4 pending pods = maxReplicas -> 0, 10", totalReplicas: 10, minReplicas: 2, maxReplicas: 4, pendingPods: 4, subset1Replicas: 0, subset2Replicas: 10, }, { + name: "3 pending pods < maxReplicas -> 1, 9", totalReplicas: 10, minReplicas: 2, maxReplicas: 4, pendingPods: 3, subset1Replicas: 1, subset2Replicas: 9, }, { + name: "2 pending pods = minReplicas -> 2, 8", totalReplicas: 10, minReplicas: 2, maxReplicas: 4, pendingPods: 2, subset1Replicas: 2, subset2Replicas: 8, }, { + name: "1 pending pods < minReplicas -> 3, 7", totalReplicas: 10, minReplicas: 2, maxReplicas: 4, pendingPods: 1, subset1Replicas: 3, subset2Replicas: 7, }, { + name: "no pending pods -> 2, 8", totalReplicas: 10, minReplicas: 2, maxReplicas: 4, pendingPods: 0, subset1Replicas: 4, subset2Replicas: 6, }, diff --git a/pkg/controller/uniteddeployment/uniteddeployment_controller.go b/pkg/controller/uniteddeployment/uniteddeployment_controller.go index 50f4abbc2c..8a150aa32f 100644 --- a/pkg/controller/uniteddeployment/uniteddeployment_controller.go +++ b/pkg/controller/uniteddeployment/uniteddeployment_controller.go @@ -211,11 +211,10 @@ func (r *ReconcileUnitedDeployment) Reconcile(_ context.Context, request reconci klog.InfoS("resource version not up-to-date, requeue in 1s", "resourceVersion", instance.GetResourceVersion()) return reconcile.Result{RequeueAfter: time.Second}, nil } + klog.InfoS("Updated Resource observed", "unitedDeployment", klog.KObj(instance), "ResourceVersion", instance.GetResourceVersion()) oldStatus := instance.Status.DeepCopy() - for _, subset := range instance.Spec.Topology.Subsets { - instance.Status.GetSubsetStatus(subset.Name) // ensure subset status exists - } + instance.InitSubsetStatuses() currentRevision, updatedRevision, _, collisionCount, err := r.constructUnitedDeploymentRevisions(instance) if err != nil { klog.ErrorS(err, "Failed to construct controller revision of UnitedDeployment", "unitedDeployment", klog.KObj(instance)) @@ -251,10 +250,8 @@ func (r *ReconcileUnitedDeployment) Reconcile(_ context.Context, request reconci nextUpdate := getNextUpdate(instance, nextReplicas, nextPartitions) klog.V(4).InfoS("Got UnitedDeployment next update", "unitedDeployment", klog.KObj(instance), "nextUpdate", nextUpdate) - ResourceVersionExpectation.Expect(instance) newStatus, err := r.manageSubsets(instance, nameToSubset, nextUpdate, currentRevision, updatedRevision, subsetType) if err != nil { - ResourceVersionExpectation.Delete(instance) klog.ErrorS(err, "Failed to update UnitedDeployment", "unitedDeployment", klog.KObj(instance)) r.recorder.Event(instance.DeepCopy(), corev1.EventTypeWarning, fmt.Sprintf("Failed%s", eventTypeSubsetsUpdate), err.Error()) return reconcile.Result{}, err @@ -454,7 +451,11 @@ func (r *ReconcileUnitedDeployment) classifySubsetBySubsetName(ud *appsv1alpha1. func (r *ReconcileUnitedDeployment) updateStatus(instance *appsv1alpha1.UnitedDeployment, newStatus, oldStatus *appsv1alpha1.UnitedDeploymentStatus, nameToSubset *map[string]*Subset, nextReplicas, nextPartition *map[string]int32, currentRevision, updatedRevision *appsv1.ControllerRevision, collisionCount int32, control ControlInterface) error { newStatus = r.calculateStatus(newStatus, nameToSubset, nextReplicas, nextPartition, currentRevision, updatedRevision, collisionCount, control) - _, err := r.updateUnitedDeployment(instance, oldStatus, newStatus) + newObj, err := r.updateUnitedDeployment(instance, oldStatus, newStatus) + if err == nil && newObj != nil { + ResourceVersionExpectation.Expect(newObj) + klog.InfoS("new resource version expected", "UnitedDeployment", klog.KObj(newObj), "ResourceVersion", newObj.GetResourceVersion()) + } return err } diff --git a/pkg/controller/uniteddeployment/uniteddeployment_controller_utils.go b/pkg/controller/uniteddeployment/uniteddeployment_controller_utils.go index cd13d9b08d..9675915be2 100644 --- a/pkg/controller/uniteddeployment/uniteddeployment_controller_utils.go +++ b/pkg/controller/uniteddeployment/uniteddeployment_controller_utils.go @@ -137,4 +137,4 @@ func getUnitedDeploymentKey(ud *appsv1alpha1.UnitedDeployment) string { return ud.GetNamespace() + "/" + ud.GetName() } -var ResourceVersionExpectation = expectations.NewReallyNewerResourceVersionExpectation() +var ResourceVersionExpectation = expectations.NewResourceVersionExpectation() diff --git a/pkg/util/expectations/resource_version_expectation.go b/pkg/util/expectations/resource_version_expectation.go index 4d5e753870..fc11c21fc2 100644 --- a/pkg/util/expectations/resource_version_expectation.go +++ b/pkg/util/expectations/resource_version_expectation.go @@ -36,13 +36,6 @@ func NewResourceVersionExpectation() ResourceVersionExpectation { return &realResourceVersionExpectation{objectVersions: make(map[types.UID]*objectCacheVersions, 100)} } -func NewReallyNewerResourceVersionExpectation() ResourceVersionExpectation { - return &reallyNewerResourceVersionExpectation{ - realResourceVersionExpectation: realResourceVersionExpectation{ - objectVersions: make(map[types.UID]*objectCacheVersions, 100), - }} -} - type realResourceVersionExpectation struct { sync.Mutex objectVersions map[types.UID]*objectCacheVersions @@ -126,38 +119,3 @@ func isResourceVersionNewer(old, new string) bool { return newCount >= oldCount } - -type reallyNewerResourceVersionExpectation struct { - realResourceVersionExpectation -} - -func (r *reallyNewerResourceVersionExpectation) Observe(obj metav1.Object) { - r.Lock() - defer r.Unlock() - - expectations := r.objectVersions[obj.GetUID()] - if expectations == nil { - return - } - if isResourceVersionReallyNewer(r.objectVersions[obj.GetUID()].version, obj.GetResourceVersion()) { - delete(r.objectVersions, obj.GetUID()) - } -} - -func isResourceVersionReallyNewer(old, new string) bool { - if len(old) == 0 { - return true - } - - oldCount, err := strconv.ParseUint(old, 10, 64) - if err != nil { - return true - } - - newCount, err := strconv.ParseUint(new, 10, 64) - if err != nil { - return false - } - - return newCount > oldCount -}