Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Regard the pod at preparing update state as update revision when scaling #1290

Merged
merged 1 commit into from
Jun 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions pkg/controller/cloneset/sync/cloneset_scale.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/openkruise/kruise/pkg/util"
"github.com/openkruise/kruise/pkg/util/expectations"
"github.com/openkruise/kruise/pkg/util/lifecycle"
"github.com/openkruise/kruise/pkg/util/revision"
)

const (
Expand Down Expand Up @@ -68,8 +69,8 @@ func (r *realControl) Scale(
}

// 2. calculate scale numbers
diffRes := calculateDiffsWithExpectation(updateCS, pods, currentRevision, updateRevision)
updatedPods, notUpdatedPods := clonesetutils.SplitPodsByRevision(pods, updateRevision)
diffRes := calculateDiffsWithExpectation(updateCS, pods, currentRevision, updateRevision, revision.IsPodUpdate)
updatedPods, notUpdatedPods := clonesetutils.GroupUpdateAndNotUpdatePods(pods, updateRevision)

if diffRes.scaleUpNum > diffRes.scaleUpLimit {
r.recorder.Event(updateCS, v1.EventTypeWarning, "ScaleUpLimited", fmt.Sprintf("scaleUp is limited because of scaleStrategy.maxUnavailable, limit: %d", diffRes.scaleUpLimit))
Expand Down Expand Up @@ -107,7 +108,7 @@ func (r *realControl) Scale(

// 5. specified delete
if podsToDelete := util.DiffPods(podsSpecifiedToDelete, podsInPreDelete); len(podsToDelete) > 0 {
newPodsToDelete, oldPodsToDelete := clonesetutils.SplitPodsByRevision(podsToDelete, updateRevision)
newPodsToDelete, oldPodsToDelete := clonesetutils.GroupUpdateAndNotUpdatePods(podsToDelete, updateRevision)
klog.V(3).Infof("CloneSet %s try to delete pods specified. Delete ready limit: %d. New Pods: %v, old Pods: %v.",
controllerKey, diffRes.deleteReadyLimit, util.GetPodNames(newPodsToDelete).List(), util.GetPodNames(oldPodsToDelete).List())

Expand Down
26 changes: 20 additions & 6 deletions pkg/controller/cloneset/sync/cloneset_sync_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,11 @@ func (e expectationDiffs) isEmpty() bool {
return reflect.DeepEqual(e, expectationDiffs{})
}

type IsPodUpdateFunc func(pod *v1.Pod, updateRevision string) bool

// This is the most important algorithm in cloneset-controller.
// It calculates the pod numbers to scaling and updating for current CloneSet.
func calculateDiffsWithExpectation(cs *appsv1alpha1.CloneSet, pods []*v1.Pod, currentRevision, updateRevision string) (res expectationDiffs) {
func calculateDiffsWithExpectation(cs *appsv1alpha1.CloneSet, pods []*v1.Pod, currentRevision, updateRevision string, isPodUpdate IsPodUpdateFunc) (res expectationDiffs) {
coreControl := clonesetcore.New(cs)
replicas := int(*cs.Spec.Replicas)
var partition, maxSurge, maxUnavailable, scaleMaxUnavailable int
Expand Down Expand Up @@ -117,18 +119,30 @@ func calculateDiffsWithExpectation(cs *appsv1alpha1.CloneSet, pods []*v1.Pod, cu
}
klog.V(1).Infof("Calculate diffs for CloneSet %s/%s, replicas=%d, partition=%d, maxSurge=%d, maxUnavailable=%d,"+
" allPods=%d, newRevisionPods=%d, newRevisionActivePods=%d, oldRevisionPods=%d, oldRevisionActivePods=%d,"+
" unavailableNewRevisionCount=%d, unavailableOldRevisionCount=%d,"+
" preDeletingNewRevisionCount=%d, preDeletingOldRevisionCount=%d, toDeleteNewRevisionCount=%d, toDeleteOldRevisionCount=%d."+
" unavailableNewRevisionCount=%d, unavailableOldRevisionCount=%d, preDeletingNewRevisionCount=%d, preDeletingOldRevisionCount=%d,"+
" toDeleteNewRevisionCount=%d, toDeleteOldRevisionCount=%d, enabledPreparingUpdateAsUpdate=%v, useDefaultIsPodUpdate=%v."+
" Result: %+v",
cs.Namespace, cs.Name, replicas, partition, maxSurge, maxUnavailable,
len(pods), newRevisionCount, newRevisionActiveCount, oldRevisionCount, oldRevisionActiveCount,
unavailableNewRevisionCount, unavailableOldRevisionCount,
preDeletingNewRevisionCount, preDeletingOldRevisionCount, toDeleteNewRevisionCount, toDeleteOldRevisionCount,
unavailableNewRevisionCount, unavailableOldRevisionCount, preDeletingNewRevisionCount, preDeletingOldRevisionCount,
toDeleteNewRevisionCount, toDeleteOldRevisionCount, utilfeature.DefaultFeatureGate.Enabled(features.PreparingUpdateAsUpdate), isPodUpdate == nil,
res)
}()

// If PreparingUpdateAsUpdate feature gate is enabled:
// - when scaling, we hope the preparing-update pods should be regarded as update-revision pods,
// the isPodUpdate parameter will be IsPodUpdate function in pkg/util/revision/revision.go file;
// - when updating, we hope the preparing-update pods should be regarded as current-revision pods,
// the isPodUpdate parameter will be EqualToRevisionHash function by default;
if isPodUpdate == nil {
isPodUpdate = func(pod *v1.Pod, updateRevision string) bool {
return clonesetutils.EqualToRevisionHash("", pod, updateRevision)
}
}

for _, p := range pods {
if clonesetutils.EqualToRevisionHash("", p, updateRevision) {
if isPodUpdate(p, updateRevision) {

newRevisionCount++

switch state := lifecycle.GetPodLifecycleState(p); state {
Expand Down
30 changes: 29 additions & 1 deletion pkg/controller/cloneset/sync/cloneset_sync_utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
"github.com/openkruise/kruise/pkg/features"
utilfeature "github.com/openkruise/kruise/pkg/util/feature"
"github.com/openkruise/kruise/pkg/util/revision"
apps "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -42,6 +43,7 @@ func TestCalculateDiffsWithExpectation(t *testing.T) {
pods []*v1.Pod
revisionConsistent bool
disableFeatureGate bool
isPodUpdate IsPodUpdateFunc
expectResult expectationDiffs
}{
{
Expand Down Expand Up @@ -904,8 +906,34 @@ func TestCalculateDiffsWithExpectation(t *testing.T) {
},
expectResult: expectationDiffs{},
},
{
name: "[scalingWithPreparingUpdate=true] scaling up when a preparing pod is not updated, and expected-updated is 1",
set: createTestCloneSet(4, intstr.FromString("90%"), intstr.FromInt(1), intstr.FromInt(0)),
setLabels: map[string]string{appsv1alpha1.CloneSetScalingExcludePreparingDeleteKey: "true"},
pods: []*v1.Pod{
createTestPod(oldRevision, appspub.LifecycleStatePreparingUpdate, true, false),
createTestPod(oldRevision, appspub.LifecycleStateNormal, true, false),
createTestPod(oldRevision, appspub.LifecycleStateNormal, true, false),
},
isPodUpdate: revision.IsPodUpdate,
expectResult: expectationDiffs{scaleUpNum: 1, scaleUpLimit: 1, scaleUpNumOldRevision: 1},
},
{
name: "[scalingWithPreparingUpdate=true] scaling up when a preparing pod is not updated, and expected-updated is 2",
set: createTestCloneSet(4, intstr.FromInt(2), intstr.FromInt(1), intstr.FromInt(0)),
setLabels: map[string]string{appsv1alpha1.CloneSetScalingExcludePreparingDeleteKey: "true"},
pods: []*v1.Pod{
createTestPod(oldRevision, appspub.LifecycleStatePreparingUpdate, true, false),
createTestPod(oldRevision, appspub.LifecycleStateNormal, true, false),
createTestPod(oldRevision, appspub.LifecycleStateNormal, true, false),
},
isPodUpdate: revision.IsPodUpdate,
expectResult: expectationDiffs{scaleUpNum: 1, scaleUpLimit: 1},
},
}

defer utilfeature.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PreparingUpdateAsUpdate, true)()

for i := range cases {
t.Run(cases[i].name, func(t *testing.T) {
if cases[i].disableFeatureGate {
Expand All @@ -923,7 +951,7 @@ func TestCalculateDiffsWithExpectation(t *testing.T) {
}
cases[i].set.Labels[key] = value
}
res := calculateDiffsWithExpectation(cases[i].set, cases[i].pods, current, newRevision)
res := calculateDiffsWithExpectation(cases[i].set, cases[i].pods, current, newRevision, cases[i].isPodUpdate)
if !reflect.DeepEqual(res, cases[i].expectResult) {
t.Errorf("got %#v, expect %#v", res, cases[i].expectResult)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/cloneset/sync/cloneset_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (c *realControl) Update(cs *appsv1alpha1.CloneSet,
}

// 2. calculate update diff and the revision to update
diffRes := calculateDiffsWithExpectation(cs, pods, currentRevision.Name, updateRevision.Name)
diffRes := calculateDiffsWithExpectation(cs, pods, currentRevision.Name, updateRevision.Name, nil)
if diffRes.updateNum == 0 {
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/cloneset/sync/cloneset_update_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1217,7 +1217,7 @@ func TestCalculateUpdateCount(t *testing.T) {

replicas := int32(tc.totalReplicas)
cs := &appsv1alpha1.CloneSet{Spec: appsv1alpha1.CloneSetSpec{Replicas: &replicas, UpdateStrategy: tc.strategy}}
diffRes := calculateDiffsWithExpectation(cs, tc.pods, currentRevision, updateRevision)
diffRes := calculateDiffsWithExpectation(cs, tc.pods, currentRevision, updateRevision, nil)

var waitUpdateIndexes []int
var targetRevision string
Expand Down
12 changes: 12 additions & 0 deletions pkg/controller/cloneset/utils/cloneset_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/openkruise/kruise/pkg/util/expectations"
utilfeature "github.com/openkruise/kruise/pkg/util/feature"
"github.com/openkruise/kruise/pkg/util/requeueduration"
"github.com/openkruise/kruise/pkg/util/revision"
apps "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -142,6 +143,17 @@ func SplitPodsByRevision(pods []*v1.Pod, rev string) (matched, unmatched []*v1.P
return
}

func GroupUpdateAndNotUpdatePods(pods []*v1.Pod, updateRevision string) (update, notUpdate []*v1.Pod) {
veophi marked this conversation as resolved.
Show resolved Hide resolved
for _, p := range pods {
if revision.IsPodUpdate(p, updateRevision) {
update = append(update, p)
} else {
notUpdate = append(notUpdate, p)
}
}
return
}

// UpdateStorage insert volumes generated by cs.Spec.VolumeClaimTemplates into Pod.
func UpdateStorage(cs *appsv1alpha1.CloneSet, pod *v1.Pod) {
currentVolumes := pod.Spec.Volumes
Expand Down
3 changes: 2 additions & 1 deletion pkg/controller/statefulset/stateful_set_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
appspub "github.com/openkruise/kruise/apis/apps/pub"
appsv1beta1 "github.com/openkruise/kruise/apis/apps/v1beta1"
"github.com/openkruise/kruise/pkg/util/lifecycle"
"github.com/openkruise/kruise/pkg/util/revision"
)

var patchCodec = scheme.Codecs.LegacyCodec(appsv1beta1.SchemeGroupVersion)
Expand Down Expand Up @@ -486,7 +487,7 @@ func isCurrentRevisionNeeded(set *appsv1beta1.StatefulSet, updateRevision string
if pod == nil || i == ordinal {
continue
}
if getPodRevision(pod) != updateRevision {
if !revision.IsPodUpdate(pod, updateRevision) {
noUpdatedReplicas++
}
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/controller/statefulset/stateful_update_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
v1 "k8s.io/api/core/v1"

appsv1beta1 "github.com/openkruise/kruise/apis/apps/v1beta1"
"github.com/openkruise/kruise/pkg/util/revision"
"github.com/openkruise/kruise/pkg/util/updatesort"
)

Expand Down Expand Up @@ -54,7 +55,7 @@ func sortPodsToUpdate(rollingUpdateStrategy *appsv1beta1.RollingUpdateStatefulSe
}
if isTerminating(replicas[target]) {
updatedIdxs = append(updatedIdxs, target)
} else if getPodRevision(replicas[target]) == updateRevision {
} else if revision.IsPodUpdate(replicas[target], updateRevision) {
updatedIdxs = append(updatedIdxs, target)
} else {
waitUpdateIdxs = append(waitUpdateIdxs, target)
Expand Down
32 changes: 32 additions & 0 deletions pkg/controller/statefulset/stateful_update_utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (

appspub "github.com/openkruise/kruise/apis/apps/pub"
appsv1beta1 "github.com/openkruise/kruise/apis/apps/v1beta1"
"github.com/openkruise/kruise/pkg/features"
utilfeature "github.com/openkruise/kruise/pkg/util/feature"
)

func TestSortPodsToUpdate(t *testing.T) {
Expand Down Expand Up @@ -125,8 +127,38 @@ func TestSortPodsToUpdate(t *testing.T) {
},
expected: []int{8, 7, 11, 9},
},
{
strategy: &appsv1beta1.RollingUpdateStatefulSetStrategy{
UnorderedUpdate: &appsv1beta1.UnorderedUpdateStrategy{PriorityStrategy: &appspub.UpdatePriorityStrategy{
WeightPriority: []appspub.UpdatePriorityWeightTerm{
{Weight: 20, MatchSelector: metav1.LabelSelector{MatchLabels: map[string]string{"k": "v1"}}},
{Weight: 10, MatchSelector: metav1.LabelSelector{MatchLabels: map[string]string{"k": "v2"}}},
},
}},
Partition: func() *int32 { var i int32 = 6; return &i }(),
},
updateRevision: "r1",
totalReplicas: 10,
replicas: []*v1.Pod{
{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{apps.ControllerRevisionHashLabelKey: "r0", appspub.LifecycleStateKey: string(appspub.LifecycleStatePreparingUpdate)}}},
{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{apps.ControllerRevisionHashLabelKey: "r0", appspub.LifecycleStateKey: string(appspub.LifecycleStatePreparingUpdate)}}},
{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{apps.ControllerRevisionHashLabelKey: "r0"}}},
nil,
{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{apps.ControllerRevisionHashLabelKey: "r0"}}},
{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{apps.ControllerRevisionHashLabelKey: "r0"}}},
{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{apps.ControllerRevisionHashLabelKey: "r0"}}},
{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{apps.ControllerRevisionHashLabelKey: "r1"}}},
{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{apps.ControllerRevisionHashLabelKey: "r1"}}},
{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{apps.ControllerRevisionHashLabelKey: "r0"}}},
nil,
{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{apps.ControllerRevisionHashLabelKey: "r0"}}},
},
expected: []int{8, 7, 1, 0},
},
}

defer utilfeature.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PreparingUpdateAsUpdate, true)()

for i, tc := range cases {
res := sortPodsToUpdate(tc.strategy, tc.updateRevision, tc.totalReplicas, tc.replicas)
if !reflect.DeepEqual(res, tc.expected) {
Expand Down
5 changes: 5 additions & 0 deletions pkg/features/kruise_features.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ const (
// CloneSetEventHandlerOptimization enable optimization for cloneset-controller to reduce the
// queuing frequency cased by pod update.
CloneSetEventHandlerOptimization featuregate.Feature = "CloneSetEventHandlerOptimization"

// PreparingUpdateAsUpdate enable CloneSet/Advanced StatefulSet controller to regard preparing-update Pod
// as updated when calculating update/current revision during scaling.
PreparingUpdateAsUpdate featuregate.Feature = "PreparingUpdateAsUpdate"
veophi marked this conversation as resolved.
Show resolved Hide resolved
)

var defaultFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{
Expand All @@ -124,6 +128,7 @@ var defaultFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{
PodProbeMarkerGate: {Default: true, PreRelease: featuregate.Alpha},
PreDownloadImageForDaemonSetUpdate: {Default: false, PreRelease: featuregate.Alpha},
CloneSetEventHandlerOptimization: {Default: false, PreRelease: featuregate.Alpha},
PreparingUpdateAsUpdate: {Default: false, PreRelease: featuregate.Alpha},
}

func init() {
Expand Down
54 changes: 54 additions & 0 deletions pkg/util/revision/revision.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
Copyright 2023 The Kruise Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package revision

import (
"strings"

appspub "github.com/openkruise/kruise/apis/apps/pub"
"github.com/openkruise/kruise/pkg/features"
utilfeature "github.com/openkruise/kruise/pkg/util/feature"
"github.com/openkruise/kruise/pkg/util/lifecycle"
apps "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
)

// IsPodUpdate return true when:
// - Pod controller-revision-hash equals to updateRevision;
// - Pod at preparing update state if PreparingUpdateAsUpdate feature-gated is enabled.
func IsPodUpdate(pod *v1.Pod, updateRevision string) bool {
if utilfeature.DefaultFeatureGate.Enabled(features.PreparingUpdateAsUpdate) &&
lifecycle.GetPodLifecycleState(pod) == appspub.LifecycleStatePreparingUpdate {
return true
}
return equalToRevisionHash("", pod, updateRevision)
}

func equalToRevisionHash(s string, pod *v1.Pod, hash string) bool {
objHash := pod.GetLabels()[apps.ControllerRevisionHashLabelKey]
if objHash == hash {
return true
}
return getShortHash(hash) == getShortHash(objHash)
}

func getShortHash(hash string) string {
// This makes sure the real hash must be the last '-' substring of revision name
// vendor/k8s.io/kubernetes/pkg/controller/history/controller_history.go#82
list := strings.Split(hash, "-")
return list[len(list)-1]
}
Loading
Loading