diff --git a/pkg/controller/cloneset/cloneset_status.go b/pkg/controller/cloneset/cloneset_status.go index f876d5587d..8ffa83f6b6 100644 --- a/pkg/controller/cloneset/cloneset_status.go +++ b/pkg/controller/cloneset/cloneset_status.go @@ -19,6 +19,7 @@ package cloneset import ( "context" "fmt" + "github.com/openkruise/kruise/pkg/util/hotstandby" appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" clonesetcore "github.com/openkruise/kruise/pkg/controller/cloneset/core" @@ -81,35 +82,67 @@ func (r *realStatusUpdater) inconsistentStatus(cs *appsv1alpha1.CloneSet, newSta newStatus.ExpectedUpdatedReplicas != oldStatus.ExpectedUpdatedReplicas || newStatus.UpdateRevision != oldStatus.UpdateRevision || newStatus.CurrentRevision != oldStatus.CurrentRevision || - newStatus.LabelSelector != oldStatus.LabelSelector + newStatus.LabelSelector != oldStatus.LabelSelector || + newStatus.HotStandbyReplicas != oldStatus.HotStandbyReplicas || + newStatus.HotStandbyReadyReplicas != oldStatus.HotStandbyReadyReplicas || + newStatus.HotStandbyAvailableReplicas != oldStatus.HotStandbyAvailableReplicas || + newStatus.HotStandbyUpdatedReadyReplicas != oldStatus.HotStandbyUpdatedReadyReplicas || + newStatus.HotStandbyUpdatedReplicas != oldStatus.HotStandbyUpdatedReplicas || + newStatus.HotStandbyUpdatedAvailableReplicas != oldStatus.HotStandbyUpdatedAvailableReplicas || + newStatus.HotStandbyExpectedUpdatedReplicas != oldStatus.HotStandbyExpectedUpdatedReplicas } func (r *realStatusUpdater) calculateStatus(cs *appsv1alpha1.CloneSet, newStatus *appsv1alpha1.CloneSetStatus, pods []*v1.Pod) { coreControl := clonesetcore.New(cs) for _, pod := range pods { - newStatus.Replicas++ - if coreControl.IsPodUpdateReady(pod, 0) { - newStatus.ReadyReplicas++ - } - if sync.IsPodAvailable(coreControl, pod, cs.Spec.MinReadySeconds) { - newStatus.AvailableReplicas++ - } - if clonesetutils.EqualToRevisionHash("", pod, newStatus.UpdateRevision) { - newStatus.UpdatedReplicas++ - } - if clonesetutils.EqualToRevisionHash("", pod, newStatus.UpdateRevision) && coreControl.IsPodUpdateReady(pod, 0) { - newStatus.UpdatedReadyReplicas++ - } - if clonesetutils.EqualToRevisionHash("", pod, newStatus.UpdateRevision) && sync.IsPodAvailable(coreControl, pod, cs.Spec.MinReadySeconds) { - newStatus.UpdatedAvailableReplicas++ + if !hotstandby.IsHotStandbyPod(pod) { + newStatus.Replicas++ + if coreControl.IsPodUpdateReady(pod, 0) { + newStatus.ReadyReplicas++ + } + if sync.IsPodAvailable(coreControl, pod, cs.Spec.MinReadySeconds) { + newStatus.AvailableReplicas++ + } + if clonesetutils.EqualToRevisionHash("", pod, newStatus.UpdateRevision) { + newStatus.UpdatedReplicas++ + } + if clonesetutils.EqualToRevisionHash("", pod, newStatus.UpdateRevision) && coreControl.IsPodUpdateReady(pod, 0) { + newStatus.UpdatedReadyReplicas++ + } + if clonesetutils.EqualToRevisionHash("", pod, newStatus.UpdateRevision) && sync.IsPodAvailable(coreControl, pod, cs.Spec.MinReadySeconds) { + newStatus.UpdatedAvailableReplicas++ + } + } else { + newStatus.HotStandbyReplicas++ + if coreControl.IsPodUpdateReady(pod, 0) { + newStatus.HotStandbyReadyReplicas++ + } + if sync.IsPodAvailable(coreControl, pod, cs.Spec.MinReadySeconds) { + newStatus.HotStandbyAvailableReplicas++ + } + if clonesetutils.EqualToRevisionHash("", pod, newStatus.UpdateRevision) { + newStatus.HotStandbyUpdatedReplicas++ + } + if clonesetutils.EqualToRevisionHash("", pod, newStatus.UpdateRevision) && coreControl.IsPodUpdateReady(pod, 0) { + newStatus.HotStandbyUpdatedReadyReplicas++ + } + if clonesetutils.EqualToRevisionHash("", pod, newStatus.UpdateRevision) && sync.IsPodAvailable(coreControl, pod, cs.Spec.MinReadySeconds) { + newStatus.HotStandbyUpdatedAvailableReplicas++ + } } + } - // Consider the update revision as stable if revisions of all pods are consistent to it and have the expected number of replicas, no need to wait all of them ready - if newStatus.UpdatedReplicas == newStatus.Replicas && newStatus.Replicas == *cs.Spec.Replicas { + // Consider the update revision as stable if revisions of all normal pods and hot-standby pods are consistent to it. + // no need to wait all of them ready + if newStatus.UpdatedReplicas == newStatus.Replicas && newStatus.Replicas == *cs.Spec.Replicas && newStatus.HotStandbyUpdatedReplicas == newStatus.HotStandbyReplicas { newStatus.CurrentRevision = newStatus.UpdateRevision } if partition, err := util.CalculatePartitionReplicas(cs.Spec.UpdateStrategy.Partition, cs.Spec.Replicas); err == nil { newStatus.ExpectedUpdatedReplicas = *cs.Spec.Replicas - int32(partition) } + + if cs.Spec.HotStandbyReplicas != nil { + newStatus.HotStandbyExpectedUpdatedReplicas = *cs.Spec.HotStandbyReplicas + } } diff --git a/pkg/controller/cloneset/sync/cloneset_scale.go b/pkg/controller/cloneset/sync/cloneset_scale.go index fb56c9a0fb..6c26b9a669 100644 --- a/pkg/controller/cloneset/sync/cloneset_scale.go +++ b/pkg/controller/cloneset/sync/cloneset_scale.go @@ -19,6 +19,8 @@ package sync import ( "context" "fmt" + "github.com/openkruise/kruise/pkg/util/hotstandby" + "k8s.io/utils/integer" "sort" "sync" "sync/atomic" @@ -68,7 +70,7 @@ func (r *realControl) Scale( } // 2. calculate scale numbers - diffRes := calculateDiffsWithExpectation(updateCS, pods, currentRevision, updateRevision, revision.IsPodUpdate) + diffRes, newAvailableHotStandbyPods := calculateDiffsWithExpectation(updateCS, pods, currentRevision, updateRevision, revision.IsPodUpdate) updatedPods, notUpdatedPods := clonesetutils.GroupUpdateAndNotUpdatePods(pods, updateRevision) if diffRes.scaleUpNum > diffRes.scaleUpLimit { @@ -76,17 +78,19 @@ func (r *realControl) Scale( } // 3. scale out - if diffRes.scaleUpNum > 0 { + if diffRes.scaleUpNum > 0 || diffRes.hotStandbyScaleUpNum > 0 { // total number of this creation expectedCreations := diffRes.scaleUpLimit // lack number of current version expectedCurrentCreations := diffRes.scaleUpNumOldRevision + // total number of hot-standby this creation + expectedHotStandbyCreations := diffRes.hotStandbyScaleUpNum klog.V(3).InfoS("CloneSet began to scale out pods, including current revision", "cloneSet", klog.KObj(updateCS), "expectedCreations", expectedCreations, "expectedCurrentCreations", expectedCurrentCreations) // available instance-id come from free pvc - availableIDs := getOrGenAvailableIDs(expectedCreations, pods, pvcs) + availableIDs := getOrGenAvailableIDs(expectedCreations+expectedHotStandbyCreations, pods, pvcs) // existing pvc names existingPVCNames := sets.NewString() for _, pvc := range pvcs { @@ -94,7 +98,8 @@ func (r *realControl) Scale( } return r.createPods(expectedCreations, expectedCurrentCreations, - currentCS, updateCS, currentRevision, updateRevision, availableIDs.List(), existingPVCNames) + currentCS, updateCS, currentRevision, updateRevision, availableIDs.List(), existingPVCNames, + expectedHotStandbyCreations, newAvailableHotStandbyPods) } // 4. try to delete pods already in pre-delete @@ -137,7 +142,29 @@ func (r *realControl) Scale( klog.V(3).InfoS("CloneSet began to scale in", "cloneSet", klog.KObj(updateCS), "scaleDownNum", diffRes.scaleDownNum, "oldRevision", diffRes.scaleDownNumOldRevision, "deleteReadyLimit", diffRes.deleteReadyLimit) - podsPreparingToDelete := r.choosePodsToDelete(updateCS, diffRes.scaleDownNum, diffRes.scaleDownNumOldRevision, notUpdatedPods, updatedPods) + podsPreparingToDelete := r.choosePodsToDelete(updateCS, diffRes.scaleDownNum, diffRes.scaleDownNumOldRevision, + hotstandby.FilterOutNormalPods(notUpdatedPods), hotstandby.FilterOutNormalPods(updatedPods)) + podsToDelete := make([]*v1.Pod, 0, len(podsPreparingToDelete)) + for _, pod := range podsPreparingToDelete { + if !isPodReady(coreControl, pod) { + podsToDelete = append(podsToDelete, pod) + } else if diffRes.deleteReadyLimit > 0 { + podsToDelete = append(podsToDelete, pod) + diffRes.deleteReadyLimit-- + } + } + + return r.deletePods(updateCS, podsToDelete, pvcs) + } + + // 7. scale out hot-standby pods + if diffRes.hotStandbyScaleDownNum > 0 { + klog.V(3).Infof("CloneSet %s begin to scale in %d hot-standby pods including %d (current rev), delete ready limit: %d", + klog.KObj(updateCS), diffRes.hotStandbyScaleDownNum, 0, 0) + + // addedBy: cyy@20230821 选择正常Pod删除 + podsPreparingToDelete := r.choosePodsToDelete(updateCS, diffRes.hotStandbyScaleDownNum, diffRes.hotStandbyOldRevCount, + hotstandby.FilterOutHotStandbyPods(notUpdatedPods), hotstandby.FilterOutHotStandbyPods(updatedPods)) podsToDelete := make([]*v1.Pod, 0, len(podsPreparingToDelete)) for _, pod := range podsPreparingToDelete { if !isPodReady(coreControl, pod) { @@ -191,13 +218,57 @@ func (r *realControl) createPods( currentCS, updateCS *appsv1alpha1.CloneSet, currentRevision, updateRevision string, availableIDs []string, existingPVCNames sets.String, + hotStandbyCreations int, + newAvailableHotStandbyPods []*v1.Pod, ) (bool, error) { - // new all pods need to create + // recover hot-standby pods to normal pods + hotStandbyMaxRecoveryNum := len(newAvailableHotStandbyPods) + var realRecoveryNum int + if hotStandbyMaxRecoveryNum > 0 { + recoveryNum := integer.IntMin(expectedCreations, hotStandbyMaxRecoveryNum) + if recoveryNum > 0 { + for i := 0; i < hotStandbyMaxRecoveryNum; i++ { + if realRecoveryNum >= recoveryNum { + break + } + + if err := r.PatchHotStandbyPodToNormal(updateCS, newAvailableHotStandbyPods[i]); err == nil { + realRecoveryNum++ + } + } + } + } + coreControl := clonesetcore.New(updateCS) - newPods, err := coreControl.NewVersionedPods(currentCS, updateCS, currentRevision, updateRevision, - expectedCreations, expectedCurrentCreations, availableIDs) - if err != nil { - return false, err + var newPods []*v1.Pod + + expectedNormalPodCreations := expectedCreations - realRecoveryNum + if expectedNormalPodCreations > 0 { + // new all normal pods need to create + var err error + newPods, err = coreControl.NewVersionedPods(currentCS, updateCS, currentRevision, updateRevision, + expectedNormalPodCreations, expectedCurrentCreations, availableIDs) + if err != nil { + return false, err + } + for _, p := range newPods { + hotstandby.PutNormalPodLabel(p) + newPods = append(newPods, p) + } + } + + // new all hot-standby pods need to create + expectedHotStandbyPodCreations := hotStandbyCreations + realRecoveryNum + if expectedHotStandbyPodCreations > 0 { + newHotStandbyPods, err := coreControl.NewVersionedPods(currentCS, updateCS, currentRevision, updateRevision, + expectedHotStandbyPodCreations, 0, availableIDs) + if err != nil { + return false, err + } + for _, p := range newHotStandbyPods { + hotstandby.PutHotStandbyPodLabel(p) + newPods = append(newPods, p) + } } podsCreationChan := make(chan *v1.Pod, len(newPods)) @@ -208,7 +279,7 @@ func (r *realControl) createPods( var created int64 successPodNames := sync.Map{} - _, err = clonesetutils.DoItSlowly(len(newPods), initialBatchSize, func() error { + _, err := clonesetutils.DoItSlowly(len(newPods), initialBatchSize, func() error { pod := <-podsCreationChan cs := updateCS @@ -404,3 +475,50 @@ func (r *realControl) choosePodsToDelete(cs *appsv1alpha1.CloneSet, totalDiff in return podsToDelete } + +func (r *realControl) chooseHotStandbyPodsToDelete(cs *appsv1alpha1.CloneSet, totalDiff int, currentRevDiff int, notUpdatedPods, updatedPods []*v1.Pod) []*v1.Pod { + coreControl := clonesetcore.New(cs) + choose := func(pods []*v1.Pod, diff int) []*v1.Pod { + // No need to sort pods if we are about to delete all of them. + if diff < len(pods) { + var ranker clonesetutils.Ranker + if constraints := coreControl.GetPodSpreadConstraint(); len(constraints) > 0 { + ranker = clonesetutils.NewSpreadConstraintsRanker(pods, constraints, r.Client) + } else { + ranker = clonesetutils.NewSameNodeRanker(pods) + } + sort.Sort(clonesetutils.ActivePodsWithRanks{ + Pods: pods, + Ranker: ranker, + AvailableFunc: func(pod *v1.Pod) bool { + return IsPodAvailable(coreControl, pod, cs.Spec.MinReadySeconds) + }, + }) + } else if diff > len(pods) { + klog.Warningf("Diff > len(pods) in chooseHotStandbyPodsToDelete func which is not expected.") + return pods + } + return pods[:diff] + } + + // addedBy: cyy@20230821 选择正常Pod删除 + notNormalUpdatedPods := hotstandby.FilterOutNormalPods(notUpdatedPods) + normalUpdatedPods := hotstandby.FilterOutNormalPods(updatedPods) + + var podsToDelete []*v1.Pod + if currentRevDiff >= totalDiff { + podsToDelete = choose(notNormalUpdatedPods, totalDiff) + } else if currentRevDiff > 0 { + podsToDelete = choose(notNormalUpdatedPods, currentRevDiff) + podsToDelete = append(podsToDelete, choose(normalUpdatedPods, totalDiff-currentRevDiff)...) + } else { + podsToDelete = choose(normalUpdatedPods, totalDiff) + } + + return podsToDelete +} + +func (r *realControl) PreDelete(cs *appsv1alpha1.CloneSet, podsToDelete []*v1.Pod) error { + _, err := r.deletePods(cs, podsToDelete, nil) + return err +} diff --git a/pkg/controller/cloneset/sync/cloneset_sync_utils.go b/pkg/controller/cloneset/sync/cloneset_sync_utils.go index b26aebb5ce..3e620ff2f6 100644 --- a/pkg/controller/cloneset/sync/cloneset_sync_utils.go +++ b/pkg/controller/cloneset/sync/cloneset_sync_utils.go @@ -18,13 +18,13 @@ package sync import ( "flag" - "math" - "reflect" - + "github.com/openkruise/kruise/pkg/util/hotstandby" v1 "k8s.io/api/core/v1" intstrutil "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/klog/v2" "k8s.io/utils/integer" + "math" + "reflect" appspub "github.com/openkruise/kruise/apis/apps/pub" appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" @@ -81,6 +81,20 @@ type expectationDiffs struct { updateNum int // updateMaxUnavailable is the maximum number of ready Pods that can be updating updateMaxUnavailable int + + // hotStandbyScaleUpNum is the diff number that hot-standby pods should update + // '0' means no need to update + // positive number means need to update more Pods to updateRevision + // negative number means need to update more Pods to currentRevision (rollback) + hotStandbyUpdateNum int + // hotStandbyScaleUpNum is a non-negative integer, which indicates the number that hot-standby should scale up. + hotStandbyScaleUpNum int + // hotStandbyScaleDownNum is a non-negative integer, which indicates the number that hot-standby should scale down. + // It has excluded the number of Pods that are already specified to delete. + hotStandbyScaleDownNum int + // hotStandbyOldRevCount is part of the useSurge number of hot-standby pods + // it indicates the above number of old revision hot-standby Pods + hotStandbyOldRevCount int } func (e expectationDiffs) isEmpty() bool { @@ -91,7 +105,7 @@ type IsPodUpdateFunc func(pod *v1.Pod, updateRevision string) bool // This is the most important algorithm in cloneset-controller. // It calculates the pod numbers to scaling and updating for current CloneSet. -func calculateDiffsWithExpectation(cs *appsv1alpha1.CloneSet, pods []*v1.Pod, currentRevision, updateRevision string, isPodUpdate IsPodUpdateFunc) (res expectationDiffs) { +func calculateDiffsWithExpectation(cs *appsv1alpha1.CloneSet, pods []*v1.Pod, currentRevision, updateRevision string, isPodUpdate IsPodUpdateFunc) (res expectationDiffs, newAvailableHotStandbyPods []*v1.Pod) { coreControl := clonesetcore.New(cs) replicas := int(*cs.Spec.Replicas) var partition, maxSurge, maxUnavailable, scaleMaxUnavailable int @@ -143,8 +157,18 @@ func calculateDiffsWithExpectation(cs *appsv1alpha1.CloneSet, pods []*v1.Pod, cu } } + // Split pods into normal pods and hot-standby pods + var oldHotStandbyCount int + var newHotStandbyCount int for _, p := range pods { if isPodUpdate(p, updateRevision) { + if hotstandby.IsHotStandbyPod(p) { + newHotStandbyCount++ + if IsPodAvailable(coreControl, p, cs.Spec.MinReadySeconds) { + newAvailableHotStandbyPods = append(newAvailableHotStandbyPods, p) + } + continue + } newRevisionCount++ @@ -162,6 +186,11 @@ func calculateDiffsWithExpectation(cs *appsv1alpha1.CloneSet, pods []*v1.Pod, cu } } else { + if hotstandby.IsHotStandbyPod(p) { + oldHotStandbyCount++ + continue + } + oldRevisionCount++ switch state := lifecycle.GetPodLifecycleState(p); state { @@ -228,7 +257,7 @@ func calculateDiffsWithExpectation(cs *appsv1alpha1.CloneSet, pods []*v1.Pod, cu } // prepare for scale calculation - currentTotalCount := len(pods) + currentTotalCount := newRevisionCount + oldRevisionCount currentTotalOldCount := oldRevisionCount if shouldScalingExcludePreparingDelete(cs) { currentTotalCount = currentTotalCount - preDeletingOldRevisionCount - preDeletingNewRevisionCount @@ -265,6 +294,34 @@ func calculateDiffsWithExpectation(cs *appsv1alpha1.CloneSet, pods []*v1.Pod, cu res.updateMaxUnavailable = maxUnavailable + len(pods) - replicas } + // calculate hot-standby replicas + var hotStandbyReplicas int + if cs.Spec.HotStandbyReplicas != nil { + hotStandbyReplicas = int(*cs.Spec.HotStandbyReplicas) + } + + if oldHotStandbyCount+newHotStandbyCount < hotStandbyReplicas { + res.hotStandbyScaleUpNum = hotStandbyReplicas - (oldHotStandbyCount + newHotStandbyCount) + } else { + res.hotStandbyScaleDownNum = (oldHotStandbyCount + newHotStandbyCount) - hotStandbyReplicas + } + + updateHotStandbyOldDiff := oldHotStandbyCount + updateHotStandbyNewDiff := newHotStandbyCount - hotStandbyReplicas + // If the currentRevision and updateRevision are consistent, Pods can only update to this revision + // If the CloneSetPartitionRollback is not enabled, Pods can only update to the new revision + if updateRevision == currentRevision || !utilfeature.DefaultFeatureGate.Enabled(features.CloneSetPartitionRollback) { + updateHotStandbyOldDiff = integer.IntMax(updateHotStandbyOldDiff, 0) + updateHotStandbyNewDiff = integer.IntMin(updateHotStandbyNewDiff, 0) + } + if util.IntAbs(updateHotStandbyOldDiff) <= util.IntAbs(updateHotStandbyNewDiff) { + res.hotStandbyUpdateNum = updateHotStandbyOldDiff + } else { + res.hotStandbyUpdateNum = 0 - updateHotStandbyNewDiff + } + + res.hotStandbyOldRevCount = oldHotStandbyCount + return } diff --git a/pkg/controller/cloneset/sync/cloneset_update.go b/pkg/controller/cloneset/sync/cloneset_update.go index 031b55f6dd..86e1b99dd2 100644 --- a/pkg/controller/cloneset/sync/cloneset_update.go +++ b/pkg/controller/cloneset/sync/cloneset_update.go @@ -19,6 +19,7 @@ package sync import ( "context" "fmt" + "github.com/openkruise/kruise/pkg/util/hotstandby" "sort" "time" @@ -78,58 +79,78 @@ func (c *realControl) Update(cs *appsv1alpha1.CloneSet, } // 2. calculate update diff and the revision to update - diffRes := calculateDiffsWithExpectation(cs, pods, currentRevision.Name, updateRevision.Name, nil) - if diffRes.updateNum == 0 { + diffRes, _ := calculateDiffsWithExpectation(cs, pods, currentRevision.Name, updateRevision.Name, nil) + if diffRes.updateNum == 0 && diffRes.hotStandbyUpdateNum == 0 { return nil } // 3. find all matched pods can update - targetRevision := updateRevision - if diffRes.updateNum < 0 { - targetRevision = currentRevision - } - var waitUpdateIndexes []int - for i, pod := range pods { - if coreControl.IsPodUpdatePaused(pod) { - continue - } + canUpdateIndexesFunc := func(pods []*v1.Pod) []int { + var waitUpdateIndexes []int + for i, pod := range pods { + if coreControl.IsPodUpdatePaused(pod) { + continue + } - var waitUpdate, canUpdate bool - if diffRes.updateNum > 0 { - waitUpdate = !clonesetutils.EqualToRevisionHash("", pod, updateRevision.Name) - } else { - waitUpdate = clonesetutils.EqualToRevisionHash("", pod, updateRevision.Name) - } - if waitUpdate { - switch lifecycle.GetPodLifecycleState(pod) { - case appspub.LifecycleStatePreparingDelete: - klog.V(3).InfoS("CloneSet found pod in PreparingDelete state, so skipped updating it", - "cloneSet", klog.KObj(cs), "pod", klog.KObj(pod)) - case appspub.LifecycleStateUpdated: - klog.V(3).InfoS("CloneSet found pod in Updated state but not in updated revision", - "cloneSet", klog.KObj(cs), "pod", klog.KObj(pod)) - canUpdate = true - default: - if gracePeriod, _ := appspub.GetInPlaceUpdateGrace(pod); gracePeriod != "" { - klog.V(3).InfoS("CloneSet found pod still in grace period, so skipped updating it", - "cloneSet", klog.KObj(cs), "pod", klog.KObj(pod), "gracePeriod", gracePeriod) - } else { + var waitUpdate, canUpdate bool + if diffRes.updateNum > 0 || diffRes.hotStandbyUpdateNum > 0 { + waitUpdate = !clonesetutils.EqualToRevisionHash("", pod, updateRevision.Name) + } else { + waitUpdate = clonesetutils.EqualToRevisionHash("", pod, updateRevision.Name) + } + if waitUpdate { + switch lifecycle.GetPodLifecycleState(pod) { + case appspub.LifecycleStatePreparingDelete: + klog.V(3).InfoS("CloneSet found pod in PreparingDelete state, so skipped updating it", + "cloneSet", klog.KObj(cs), "pod", klog.KObj(pod)) + case appspub.LifecycleStateUpdated: + klog.V(3).InfoS("CloneSet found pod in Updated state but not in updated revision", + "cloneSet", klog.KObj(cs), "pod", klog.KObj(pod)) canUpdate = true + default: + if gracePeriod, _ := appspub.GetInPlaceUpdateGrace(pod); gracePeriod != "" { + klog.V(3).InfoS("CloneSet found pod still in grace period, so skipped updating it", + "cloneSet", klog.KObj(cs), "pod", klog.KObj(pod), "gracePeriod", gracePeriod) + } else { + canUpdate = true + } } } + if canUpdate { + waitUpdateIndexes = append(waitUpdateIndexes, i) + } } - if canUpdate { - waitUpdateIndexes = append(waitUpdateIndexes, i) - } + return waitUpdateIndexes + } + + targetRevision := updateRevision + if diffRes.updateNum < 0 { + targetRevision = currentRevision } - // 4. sort all pods waiting to update - waitUpdateIndexes = SortUpdateIndexes(coreControl, cs.Spec.UpdateStrategy, pods, waitUpdateIndexes) + normalPods := hotstandby.FilterOutNormalPods(pods) + hotStandbyPods := hotstandby.FilterOutHotStandbyPods(pods) + waitUpdateIndexes := canUpdateIndexesFunc(normalPods) + waitUpdateHotStandbyIndexes := canUpdateIndexesFunc(hotStandbyPods) + + // 4. sort all normal pods waiting to update + waitUpdateIndexes = SortUpdateIndexes(coreControl, cs.Spec.UpdateStrategy, normalPods, waitUpdateIndexes) - // 5. limit max count of pods can update + // 5. limit max count of normal pods can update waitUpdateIndexes = limitUpdateIndexes(coreControl, cs.Spec.MinReadySeconds, diffRes, waitUpdateIndexes, pods, targetRevision.Name) - // 6. update pods + // 6. add hot-standby pod indexes + waitUpdateHotStandbyIndexes = hotstandby.GetHotStandbyPodIndexes(util.IntAbs(diffRes.hotStandbyUpdateNum), waitUpdateHotStandbyIndexes) + + // 7. update pods + var waitUpdatePods []*v1.Pod + for _, idx := range waitUpdateIndexes { + waitUpdatePods = append(waitUpdatePods, normalPods[idx]) + } + for _, idx := range waitUpdateHotStandbyIndexes { + waitUpdatePods = append(waitUpdatePods, hotStandbyPods[idx]) + } + for _, idx := range waitUpdateIndexes { pod := pods[idx] // Determine the pub before updating the pod @@ -383,3 +404,39 @@ func limitUpdateIndexes(coreControl clonesetcore.Control, minReadySeconds int32, } return waitUpdateIndexes } + +func (r *realControl) PatchHotStandbyPodToNormal(cs *appsv1alpha1.CloneSet, pod *v1.Pod) error { + if pod == nil || !hotstandby.IsHotStandbyPod(pod) { + return fmt.Errorf("bad request for updating a non-hot-standby pod to normal, pod: %s", pod.Name) + } + + if pod.Labels == nil { + pod.Labels = make(map[string]string) + } + + if cs.Spec.Lifecycle != nil && lifecycle.IsPodHooked(cs.Spec.Lifecycle.InPlaceUpdate, pod) { + labels := make(map[string]string) + labels[hotstandby.PodHotStandbyRecoveryKey] = hotstandby.True + labels[hotstandby.PodHotStandbyEnableKey] = hotstandby.False + + if updated, gotPod, err := r.lifecycleControl.UpdatePodLifecycleWithHandlerAndLabels(pod, appspub.LifecycleStatePreparingUpdate, cs.Spec.Lifecycle.InPlaceUpdate, labels); err == nil && updated { + clonesetutils.ResourceVersionExpectations.Expect(gotPod) + klog.V(3).Infof("CloneSet %s update pod %s lifecycle to PreparingUpdate", + clonesetutils.GetControllerKey(cs), pod.Name) + r.recorder.Eventf(cs, v1.EventTypeNormal, "SuccessfulUpdate", "succeed to convert pod from host-standby to normal, pod: %s", pod.Name) + return nil + } else { + r.recorder.Eventf(cs, v1.EventTypeWarning, "FailedUpdate", "failed to update pod from host-standby to normal: %v, pod: %v", err, util.DumpJSON(pod)) + return err + } + } else { + body := fmt.Sprintf(`{"metadata":{"labels":{"%s":"%s","%s":"%s"}}}`, + hotstandby.PodHotStandbyRecoveryKey, hotstandby.True, hotstandby.PodHotStandbyEnableKey, hotstandby.False) + if err := r.Patch(context.TODO(), pod, client.RawPatch(types.StrategicMergePatchType, []byte(body))); err != nil { + r.recorder.Eventf(cs, v1.EventTypeWarning, "FailedPatch", "failed to patch pod from host-standby to normal: %v, pod: %v", err, util.DumpJSON(pod)) + return err + } + r.recorder.Eventf(cs, v1.EventTypeNormal, "SuccessfulPatch", "succeed to convert pod from host-standby to normal, pod: %s", pod.Name) + return nil + } +} diff --git a/pkg/util/hotstandby/hot_standby_utils.go b/pkg/util/hotstandby/hot_standby_utils.go new file mode 100644 index 0000000000..2f3e4feb75 --- /dev/null +++ b/pkg/util/hotstandby/hot_standby_utils.go @@ -0,0 +1,68 @@ +package hotstandby + +import ( + v1 "k8s.io/api/core/v1" + "strconv" +) + +const ( + // PodHotStandbyEnableKey Pod label key that shows whether hot-standby is enabled + PodHotStandbyEnableKey = "hotstandby.apps.kruise.io/hot-standby" + + // PodHotStandbyRecoveryKey Pod label key that shows hot-standby pod is recovered to normal + PodHotStandbyRecoveryKey = "hotstandby.apps.kruise.io/plan-to-recover" + + True = "true" + + False = "false" +) + +// IsHotStandbyPod returns if Pod has hot-standby label +func IsHotStandbyPod(pod *v1.Pod) bool { + if len(pod.Labels) <= 0 { + return false + } + + if value, _ := strconv.ParseBool(pod.Labels[PodHotStandbyEnableKey]); value { + return true + } + + return false +} + +func PutHotStandbyPodLabel(pod *v1.Pod) { + pod.Labels[PodHotStandbyEnableKey] = True +} + +func PutNormalPodLabel(pod *v1.Pod) { + pod.Labels[PodHotStandbyEnableKey] = False +} + +// FilterOutNormalPods returns Pods matched and unmatched the hot-standby label +func FilterOutNormalPods(pods []*v1.Pod) (normal []*v1.Pod) { + for _, p := range pods { + if !IsHotStandbyPod(p) { + normal = append(normal, p) + } + } + return +} + +// FilterOutHotStandbyPods returns Pods matched and unmatched the hot-standby label +func FilterOutHotStandbyPods(pods []*v1.Pod) (hotStandby []*v1.Pod) { + for _, p := range pods { + if IsHotStandbyPod(p) { + hotStandby = append(hotStandby, p) + } + } + return +} + +// GetHotStandbyPodIndexes returns hot-standby Pod indexes by update num +func GetHotStandbyPodIndexes(updateNum int, waitUpdateIndexes []int) []int { + if updateNum < len(waitUpdateIndexes) { + waitUpdateIndexes = waitUpdateIndexes[:updateNum] + } + + return waitUpdateIndexes +} diff --git a/pkg/util/lifecycle/lifecycle_utils.go b/pkg/util/lifecycle/lifecycle_utils.go index 7fecfa7de0..08ff8678da 100644 --- a/pkg/util/lifecycle/lifecycle_utils.go +++ b/pkg/util/lifecycle/lifecycle_utils.go @@ -43,6 +43,7 @@ const ( type Interface interface { UpdatePodLifecycle(pod *v1.Pod, state appspub.LifecycleStateType, markPodNotReady bool) (bool, *v1.Pod, error) UpdatePodLifecycleWithHandler(pod *v1.Pod, state appspub.LifecycleStateType, inPlaceUpdateHandler *appspub.LifecycleHook) (bool, *v1.Pod, error) + UpdatePodLifecycleWithHandlerAndLabels(pod *v1.Pod, state appspub.LifecycleStateType, inPlaceUpdateHandler *appspub.LifecycleHook, labels map[string]string) (bool, *v1.Pod, error) } type realControl struct { @@ -203,6 +204,68 @@ func (c *realControl) UpdatePodLifecycleWithHandler(pod *v1.Pod, state appspub.L return true, gotPod, err } +func (c *realControl) UpdatePodLifecycleWithHandlerAndLabels(pod *v1.Pod, state appspub.LifecycleStateType, + inPlaceUpdateHandler *appspub.LifecycleHook, labels map[string]string) (updated bool, gotPod *v1.Pod, err error) { + if inPlaceUpdateHandler == nil || pod == nil { + return false, pod, nil + } + + if inPlaceUpdateHandler.MarkPodNotReady { + if err = c.executePodNotReadyPolicy(pod, state); err != nil { + return false, nil, err + } + } + + if GetPodLifecycleState(pod) == state { + return false, pod, nil + } + + pod = pod.DeepCopy() + if adp, ok := c.adp.(podadapter.AdapterWithPatch); ok { + var labelsHandler, finalizersHandler string + for k, v := range inPlaceUpdateHandler.LabelsHandler { + labelsHandler = fmt.Sprintf(`%s,"%s":"%s"`, labelsHandler, k, v) + } + for _, v := range inPlaceUpdateHandler.FinalizersHandler { + finalizersHandler = fmt.Sprintf(`%s,"%s"`, finalizersHandler, v) + } + finalizersHandler = fmt.Sprintf(`[%s]`, strings.TrimLeft(finalizersHandler, ",")) + + var labelsStr string + for k, v := range labels { + labelsStr += fmt.Sprintf(",\"%s\":\"%s\"", k, v) + } + + body := fmt.Sprintf( + `{"metadata":{"labels":{"%s":"%s"%s%s},"annotations":{"%s":"%s"},"finalizers":%s}}`, + appspub.LifecycleStateKey, + string(state), + labelsHandler, + labelsStr, + appspub.LifecycleTimestampKey, + time.Now().Format(time.RFC3339), + finalizersHandler, + ) + gotPod, err = adp.PatchPod(pod, client.RawPatch(types.StrategicMergePatchType, []byte(body))) + } else { + if pod.Labels == nil { + pod.Labels = make(map[string]string) + } + for k, v := range inPlaceUpdateHandler.LabelsHandler { + pod.Labels[k] = v + } + pod.Finalizers = append(pod.Finalizers, inPlaceUpdateHandler.FinalizersHandler...) + + SetPodLifecycle(state)(pod) + for k, v := range labels { + pod.Labels[k] = v + } + gotPod, err = c.adp.UpdatePod(pod) + } + + return true, gotPod, err +} + func IsPodHooked(hook *appspub.LifecycleHook, pod *v1.Pod) bool { if hook == nil || pod == nil { return false