Skip to content

Commit

Permalink
feature: support hot-standby pods
Browse files Browse the repository at this point in the history
  • Loading branch information
yenniechen committed Sep 27, 2024
1 parent 5def396 commit 435b1a2
Show file tree
Hide file tree
Showing 6 changed files with 468 additions and 72 deletions.
69 changes: 51 additions & 18 deletions pkg/controller/cloneset/cloneset_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package cloneset
import (
"context"
"fmt"
"github.com/openkruise/kruise/pkg/util/hotstandby"

appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
clonesetcore "github.com/openkruise/kruise/pkg/controller/cloneset/core"
Expand Down Expand Up @@ -81,35 +82,67 @@ func (r *realStatusUpdater) inconsistentStatus(cs *appsv1alpha1.CloneSet, newSta
newStatus.ExpectedUpdatedReplicas != oldStatus.ExpectedUpdatedReplicas ||
newStatus.UpdateRevision != oldStatus.UpdateRevision ||
newStatus.CurrentRevision != oldStatus.CurrentRevision ||
newStatus.LabelSelector != oldStatus.LabelSelector
newStatus.LabelSelector != oldStatus.LabelSelector ||
newStatus.HotStandbyReplicas != oldStatus.HotStandbyReplicas ||
newStatus.HotStandbyReadyReplicas != oldStatus.HotStandbyReadyReplicas ||
newStatus.HotStandbyAvailableReplicas != oldStatus.HotStandbyAvailableReplicas ||
newStatus.HotStandbyUpdatedReadyReplicas != oldStatus.HotStandbyUpdatedReadyReplicas ||
newStatus.HotStandbyUpdatedReplicas != oldStatus.HotStandbyUpdatedReplicas ||
newStatus.HotStandbyUpdatedAvailableReplicas != oldStatus.HotStandbyUpdatedAvailableReplicas ||
newStatus.HotStandbyExpectedUpdatedReplicas != oldStatus.HotStandbyExpectedUpdatedReplicas
}

func (r *realStatusUpdater) calculateStatus(cs *appsv1alpha1.CloneSet, newStatus *appsv1alpha1.CloneSetStatus, pods []*v1.Pod) {
coreControl := clonesetcore.New(cs)
for _, pod := range pods {
newStatus.Replicas++
if coreControl.IsPodUpdateReady(pod, 0) {
newStatus.ReadyReplicas++
}
if sync.IsPodAvailable(coreControl, pod, cs.Spec.MinReadySeconds) {
newStatus.AvailableReplicas++
}
if clonesetutils.EqualToRevisionHash("", pod, newStatus.UpdateRevision) {
newStatus.UpdatedReplicas++
}
if clonesetutils.EqualToRevisionHash("", pod, newStatus.UpdateRevision) && coreControl.IsPodUpdateReady(pod, 0) {
newStatus.UpdatedReadyReplicas++
}
if clonesetutils.EqualToRevisionHash("", pod, newStatus.UpdateRevision) && sync.IsPodAvailable(coreControl, pod, cs.Spec.MinReadySeconds) {
newStatus.UpdatedAvailableReplicas++
if !hotstandby.IsHotStandbyPod(pod) {
newStatus.Replicas++
if coreControl.IsPodUpdateReady(pod, 0) {
newStatus.ReadyReplicas++
}
if sync.IsPodAvailable(coreControl, pod, cs.Spec.MinReadySeconds) {
newStatus.AvailableReplicas++
}
if clonesetutils.EqualToRevisionHash("", pod, newStatus.UpdateRevision) {
newStatus.UpdatedReplicas++
}
if clonesetutils.EqualToRevisionHash("", pod, newStatus.UpdateRevision) && coreControl.IsPodUpdateReady(pod, 0) {
newStatus.UpdatedReadyReplicas++
}
if clonesetutils.EqualToRevisionHash("", pod, newStatus.UpdateRevision) && sync.IsPodAvailable(coreControl, pod, cs.Spec.MinReadySeconds) {
newStatus.UpdatedAvailableReplicas++
}
} else {
newStatus.HotStandbyReplicas++
if coreControl.IsPodUpdateReady(pod, 0) {
newStatus.HotStandbyReadyReplicas++
}
if sync.IsPodAvailable(coreControl, pod, cs.Spec.MinReadySeconds) {
newStatus.HotStandbyAvailableReplicas++
}
if clonesetutils.EqualToRevisionHash("", pod, newStatus.UpdateRevision) {
newStatus.HotStandbyUpdatedReplicas++
}
if clonesetutils.EqualToRevisionHash("", pod, newStatus.UpdateRevision) && coreControl.IsPodUpdateReady(pod, 0) {
newStatus.HotStandbyUpdatedReadyReplicas++
}
if clonesetutils.EqualToRevisionHash("", pod, newStatus.UpdateRevision) && sync.IsPodAvailable(coreControl, pod, cs.Spec.MinReadySeconds) {
newStatus.HotStandbyUpdatedAvailableReplicas++
}
}

}
// Consider the update revision as stable if revisions of all pods are consistent to it and have the expected number of replicas, no need to wait all of them ready
if newStatus.UpdatedReplicas == newStatus.Replicas && newStatus.Replicas == *cs.Spec.Replicas {
// Consider the update revision as stable if revisions of all normal pods and hot-standby pods are consistent to it.
// no need to wait all of them ready
if newStatus.UpdatedReplicas == newStatus.Replicas && newStatus.Replicas == *cs.Spec.Replicas && newStatus.HotStandbyUpdatedReplicas == newStatus.HotStandbyReplicas {
newStatus.CurrentRevision = newStatus.UpdateRevision
}

if partition, err := util.CalculatePartitionReplicas(cs.Spec.UpdateStrategy.Partition, cs.Spec.Replicas); err == nil {
newStatus.ExpectedUpdatedReplicas = *cs.Spec.Replicas - int32(partition)
}

if cs.Spec.HotStandbyReplicas != nil {
newStatus.HotStandbyExpectedUpdatedReplicas = *cs.Spec.HotStandbyReplicas
}
}
140 changes: 129 additions & 11 deletions pkg/controller/cloneset/sync/cloneset_scale.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package sync
import (
"context"
"fmt"
"github.com/openkruise/kruise/pkg/util/hotstandby"
"k8s.io/utils/integer"
"sort"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -68,33 +70,36 @@ func (r *realControl) Scale(
}

// 2. calculate scale numbers
diffRes := calculateDiffsWithExpectation(updateCS, pods, currentRevision, updateRevision, revision.IsPodUpdate)
diffRes, newAvailableHotStandbyPods := calculateDiffsWithExpectation(updateCS, pods, currentRevision, updateRevision, revision.IsPodUpdate)
updatedPods, notUpdatedPods := clonesetutils.GroupUpdateAndNotUpdatePods(pods, updateRevision)

if diffRes.scaleUpNum > diffRes.scaleUpLimit {
r.recorder.Event(updateCS, v1.EventTypeWarning, "ScaleUpLimited", fmt.Sprintf("scaleUp is limited because of scaleStrategy.maxUnavailable, limit: %d", diffRes.scaleUpLimit))
}

// 3. scale out
if diffRes.scaleUpNum > 0 {
if diffRes.scaleUpNum > 0 || diffRes.hotStandbyScaleUpNum > 0 {
// total number of this creation
expectedCreations := diffRes.scaleUpLimit
// lack number of current version
expectedCurrentCreations := diffRes.scaleUpNumOldRevision
// total number of hot-standby this creation
expectedHotStandbyCreations := diffRes.hotStandbyScaleUpNum

klog.V(3).InfoS("CloneSet began to scale out pods, including current revision",
"cloneSet", klog.KObj(updateCS), "expectedCreations", expectedCreations, "expectedCurrentCreations", expectedCurrentCreations)

// available instance-id come from free pvc
availableIDs := getOrGenAvailableIDs(expectedCreations, pods, pvcs)
availableIDs := getOrGenAvailableIDs(expectedCreations+expectedHotStandbyCreations, pods, pvcs)
// existing pvc names
existingPVCNames := sets.NewString()
for _, pvc := range pvcs {
existingPVCNames.Insert(pvc.Name)
}

return r.createPods(expectedCreations, expectedCurrentCreations,
currentCS, updateCS, currentRevision, updateRevision, availableIDs.List(), existingPVCNames)
currentCS, updateCS, currentRevision, updateRevision, availableIDs.List(), existingPVCNames,
expectedHotStandbyCreations, newAvailableHotStandbyPods)
}

// 4. try to delete pods already in pre-delete
Expand Down Expand Up @@ -137,7 +142,29 @@ func (r *realControl) Scale(
klog.V(3).InfoS("CloneSet began to scale in", "cloneSet", klog.KObj(updateCS), "scaleDownNum", diffRes.scaleDownNum,
"oldRevision", diffRes.scaleDownNumOldRevision, "deleteReadyLimit", diffRes.deleteReadyLimit)

podsPreparingToDelete := r.choosePodsToDelete(updateCS, diffRes.scaleDownNum, diffRes.scaleDownNumOldRevision, notUpdatedPods, updatedPods)
podsPreparingToDelete := r.choosePodsToDelete(updateCS, diffRes.scaleDownNum, diffRes.scaleDownNumOldRevision,
hotstandby.FilterOutNormalPods(notUpdatedPods), hotstandby.FilterOutNormalPods(updatedPods))
podsToDelete := make([]*v1.Pod, 0, len(podsPreparingToDelete))
for _, pod := range podsPreparingToDelete {
if !isPodReady(coreControl, pod) {
podsToDelete = append(podsToDelete, pod)
} else if diffRes.deleteReadyLimit > 0 {
podsToDelete = append(podsToDelete, pod)
diffRes.deleteReadyLimit--
}
}

return r.deletePods(updateCS, podsToDelete, pvcs)
}

// 7. scale out hot-standby pods
if diffRes.hotStandbyScaleDownNum > 0 {
klog.V(3).Infof("CloneSet %s begin to scale in %d hot-standby pods including %d (current rev), delete ready limit: %d",
klog.KObj(updateCS), diffRes.hotStandbyScaleDownNum, 0, 0)

// addedBy: cyy@20230821 选择正常Pod删除
podsPreparingToDelete := r.choosePodsToDelete(updateCS, diffRes.hotStandbyScaleDownNum, diffRes.hotStandbyOldRevCount,
hotstandby.FilterOutHotStandbyPods(notUpdatedPods), hotstandby.FilterOutHotStandbyPods(updatedPods))
podsToDelete := make([]*v1.Pod, 0, len(podsPreparingToDelete))
for _, pod := range podsPreparingToDelete {
if !isPodReady(coreControl, pod) {
Expand Down Expand Up @@ -191,13 +218,57 @@ func (r *realControl) createPods(
currentCS, updateCS *appsv1alpha1.CloneSet,
currentRevision, updateRevision string,
availableIDs []string, existingPVCNames sets.String,
hotStandbyCreations int,
newAvailableHotStandbyPods []*v1.Pod,
) (bool, error) {
// new all pods need to create
// recover hot-standby pods to normal pods
hotStandbyMaxRecoveryNum := len(newAvailableHotStandbyPods)
var realRecoveryNum int
if hotStandbyMaxRecoveryNum > 0 {
recoveryNum := integer.IntMin(expectedCreations, hotStandbyMaxRecoveryNum)
if recoveryNum > 0 {
for i := 0; i < hotStandbyMaxRecoveryNum; i++ {
if realRecoveryNum >= recoveryNum {
break
}

if err := r.PatchHotStandbyPodToNormal(updateCS, newAvailableHotStandbyPods[i]); err == nil {
realRecoveryNum++
}
}
}
}

coreControl := clonesetcore.New(updateCS)
newPods, err := coreControl.NewVersionedPods(currentCS, updateCS, currentRevision, updateRevision,
expectedCreations, expectedCurrentCreations, availableIDs)
if err != nil {
return false, err
var newPods []*v1.Pod

expectedNormalPodCreations := expectedCreations - realRecoveryNum
if expectedNormalPodCreations > 0 {
// new all normal pods need to create
var err error
newPods, err = coreControl.NewVersionedPods(currentCS, updateCS, currentRevision, updateRevision,
expectedNormalPodCreations, expectedCurrentCreations, availableIDs)
if err != nil {
return false, err
}
for _, p := range newPods {
hotstandby.PutNormalPodLabel(p)
newPods = append(newPods, p)
}
}

// new all hot-standby pods need to create
expectedHotStandbyPodCreations := hotStandbyCreations + realRecoveryNum
if expectedHotStandbyPodCreations > 0 {
newHotStandbyPods, err := coreControl.NewVersionedPods(currentCS, updateCS, currentRevision, updateRevision,
expectedHotStandbyPodCreations, 0, availableIDs)
if err != nil {
return false, err
}
for _, p := range newHotStandbyPods {
hotstandby.PutHotStandbyPodLabel(p)
newPods = append(newPods, p)
}
}

podsCreationChan := make(chan *v1.Pod, len(newPods))
Expand All @@ -208,7 +279,7 @@ func (r *realControl) createPods(

var created int64
successPodNames := sync.Map{}
_, err = clonesetutils.DoItSlowly(len(newPods), initialBatchSize, func() error {
_, err := clonesetutils.DoItSlowly(len(newPods), initialBatchSize, func() error {
pod := <-podsCreationChan

cs := updateCS
Expand Down Expand Up @@ -404,3 +475,50 @@ func (r *realControl) choosePodsToDelete(cs *appsv1alpha1.CloneSet, totalDiff in

return podsToDelete
}

func (r *realControl) chooseHotStandbyPodsToDelete(cs *appsv1alpha1.CloneSet, totalDiff int, currentRevDiff int, notUpdatedPods, updatedPods []*v1.Pod) []*v1.Pod {
coreControl := clonesetcore.New(cs)
choose := func(pods []*v1.Pod, diff int) []*v1.Pod {
// No need to sort pods if we are about to delete all of them.
if diff < len(pods) {
var ranker clonesetutils.Ranker
if constraints := coreControl.GetPodSpreadConstraint(); len(constraints) > 0 {
ranker = clonesetutils.NewSpreadConstraintsRanker(pods, constraints, r.Client)
} else {
ranker = clonesetutils.NewSameNodeRanker(pods)
}
sort.Sort(clonesetutils.ActivePodsWithRanks{
Pods: pods,
Ranker: ranker,
AvailableFunc: func(pod *v1.Pod) bool {
return IsPodAvailable(coreControl, pod, cs.Spec.MinReadySeconds)
},
})
} else if diff > len(pods) {
klog.Warningf("Diff > len(pods) in chooseHotStandbyPodsToDelete func which is not expected.")
return pods
}
return pods[:diff]
}

// addedBy: cyy@20230821 选择正常Pod删除
notNormalUpdatedPods := hotstandby.FilterOutNormalPods(notUpdatedPods)
normalUpdatedPods := hotstandby.FilterOutNormalPods(updatedPods)

var podsToDelete []*v1.Pod
if currentRevDiff >= totalDiff {
podsToDelete = choose(notNormalUpdatedPods, totalDiff)
} else if currentRevDiff > 0 {
podsToDelete = choose(notNormalUpdatedPods, currentRevDiff)
podsToDelete = append(podsToDelete, choose(normalUpdatedPods, totalDiff-currentRevDiff)...)
} else {
podsToDelete = choose(normalUpdatedPods, totalDiff)
}

return podsToDelete
}

func (r *realControl) PreDelete(cs *appsv1alpha1.CloneSet, podsToDelete []*v1.Pod) error {
_, err := r.deletePods(cs, podsToDelete, nil)
return err
}
Loading

0 comments on commit 435b1a2

Please sign in to comment.