Skip to content

Commit

Permalink
support specified-delete in asts
Browse files Browse the repository at this point in the history
Signed-off-by: Abner-1 <[email protected]>
  • Loading branch information
ABNER-1 committed Sep 13, 2024
1 parent be1a79e commit eb2e8d0
Show file tree
Hide file tree
Showing 4 changed files with 368 additions and 13 deletions.
76 changes: 64 additions & 12 deletions pkg/controller/statefulset/stateful_set_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
imagejobutilfunc "github.com/openkruise/kruise/pkg/util/imagejob/utilfunction"
"github.com/openkruise/kruise/pkg/util/inplaceupdate"
"github.com/openkruise/kruise/pkg/util/lifecycle"
"github.com/openkruise/kruise/pkg/util/specifieddelete"
)

// Realistic value for maximum in-flight requests when processing in parallel mode.
Expand Down Expand Up @@ -622,11 +623,17 @@ func (ssc *defaultStatefulSetControl) rollingUpdateStatefulsetPods(
}
}

// handle specified deleted pod under maxUnavailable constrain
// NOTE: specified deletion is not constraint by partition setting
specifiedDeletedPods, err := ssc.handleSpecifiedDeletedPods(set, status, currentRevision, updateRevision, replicas, maxUnavailable, unavailablePods)
if err != nil {
return status, err

Check warning on line 630 in pkg/controller/statefulset/stateful_set_control.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/statefulset/stateful_set_control.go#L630

Added line #L630 was not covered by tests
}

updateIndexes := sortPodsToUpdate(set.Spec.UpdateStrategy.RollingUpdate, updateRevision.Name, *set.Spec.Replicas, replicas)
klog.V(3).InfoS("Prepare to update pods indexes for StatefulSet", "statefulSet", klog.KObj(set), "podIndexes", updateIndexes)
// update pods in sequence
for _, target := range updateIndexes {

// the target is already up-to-date, go to next
if getPodRevision(replicas[target]) == updateRevision.Name {
continue
Expand Down Expand Up @@ -667,22 +674,26 @@ func (ssc *defaultStatefulSetControl) rollingUpdateStatefulsetPods(
}

// delete the Pod if it is not already terminating and does not match the update revision.
if !isTerminating(replicas[target]) {
if !specifiedDeletedPods.Has(replicas[target].Name) && !isTerminating(replicas[target]) {
// todo validate in-place for pub
inplacing, inplaceUpdateErr := ssc.inPlaceUpdatePod(set, replicas[target], updateRevision, revisions)
if inplaceUpdateErr != nil {
return status, inplaceUpdateErr
}
// if pod is inplacing or actual deleting, decrease revision
revisionNeedDecrease := inplacing
if !inplacing {
klog.V(2).InfoS("StatefulSet terminating Pod for update", "statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[target]))
if _, err := ssc.deletePod(set, replicas[target]); err != nil {
if _, actualDeleting, err := ssc.deletePod(set, replicas[target]); err != nil {
return status, err
} else {
revisionNeedDecrease = actualDeleting
}
}
// mark target as unavailable because it's updated
unavailablePods.Insert(replicas[target].Name)

if getPodRevision(replicas[target]) == currentRevision.Name {
if revisionNeedDecrease && getPodRevision(replicas[target]) == currentRevision.Name {
status.CurrentReplicas--
}
}
Expand All @@ -691,22 +702,63 @@ func (ssc *defaultStatefulSetControl) rollingUpdateStatefulsetPods(
return status, nil
}

func (ssc *defaultStatefulSetControl) deletePod(set *appsv1beta1.StatefulSet, pod *v1.Pod) (bool, error) {
func (ssc *defaultStatefulSetControl) handleSpecifiedDeletedPods(
set *appsv1beta1.StatefulSet,
status *appsv1beta1.StatefulSetStatus,
currentRevision *apps.ControllerRevision,
updateRevision *apps.ControllerRevision,
replicas []*v1.Pod,
maxUnavailable int,
unavailablePods sets.String) (sets.String, error) {
specifiedDeletedPods := sets.NewString()
for target := len(replicas) - 1; target >= 0; target-- {
if replicas[target] == nil || !specifieddelete.IsSpecifiedDelete(replicas[target]) {
continue
}
// the unavailable pods count exceed the maxUnavailable and the target is available, so we can't process it,
// why skip here rather than return?
// case: pod 0 ready, pod1 unready, pod 2 unready, pod3 ready, pod4 ready
// when maxUnavailable = 3, pod4 with specified deleted will be deleted but pod3 can't
// pod 2 and pod 1 can be deleted because they were unavailable
if len(unavailablePods) >= maxUnavailable && !unavailablePods.Has(replicas[target].Name) {
klog.V(4).InfoS("StatefulSet was waiting for unavailable Pods to update, blocked pod",
"statefulSet", klog.KObj(set), "unavailablePods", unavailablePods.List(), "blockedPod", klog.KObj(replicas[target]))
continue
}

specifiedDeletedPods.Insert(replicas[target].Name)
if _, actualDeleting, err := ssc.deletePod(set, replicas[target]); err != nil {
return specifiedDeletedPods, err

Check warning on line 731 in pkg/controller/statefulset/stateful_set_control.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/statefulset/stateful_set_control.go#L731

Added line #L731 was not covered by tests
} else if actualDeleting {
// if actual deleted, update revision count in status
if getPodRevision(replicas[target]) == currentRevision.Name {
status.CurrentReplicas--

Check warning on line 735 in pkg/controller/statefulset/stateful_set_control.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/statefulset/stateful_set_control.go#L735

Added line #L735 was not covered by tests
} else if getPodRevision(replicas[target]) == updateRevision.Name {
status.UpdatedReplicas--

Check warning on line 737 in pkg/controller/statefulset/stateful_set_control.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/statefulset/stateful_set_control.go#L737

Added line #L737 was not covered by tests
}
}
// mark target as unavailable because it's deleting or pre-deleting
unavailablePods.Insert(replicas[target].Name)
}
return specifiedDeletedPods, nil
}

func (ssc *defaultStatefulSetControl) deletePod(set *appsv1beta1.StatefulSet, pod *v1.Pod) (modified, actualDeleting bool, err error) {
if set.Spec.Lifecycle != nil && lifecycle.IsPodHooked(set.Spec.Lifecycle.PreDelete, pod) {
markPodNotReady := set.Spec.Lifecycle.PreDelete.MarkPodNotReady
if updated, _, err := ssc.lifecycleControl.UpdatePodLifecycle(pod, appspub.LifecycleStatePreparingDelete, markPodNotReady); err != nil {
return false, err
return false, false, err

Check warning on line 750 in pkg/controller/statefulset/stateful_set_control.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/statefulset/stateful_set_control.go#L750

Added line #L750 was not covered by tests
} else if updated {
klog.V(3).InfoS("StatefulSet scaling update pod lifecycle to PreparingDelete", "statefulSet", klog.KObj(set), "pod", klog.KObj(pod))
return true, nil
return true, false, nil

Check warning on line 753 in pkg/controller/statefulset/stateful_set_control.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/statefulset/stateful_set_control.go#L753

Added line #L753 was not covered by tests
}
return false, nil
return false, false, nil

Check warning on line 755 in pkg/controller/statefulset/stateful_set_control.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/statefulset/stateful_set_control.go#L755

Added line #L755 was not covered by tests
}
if err := ssc.podControl.DeleteStatefulPod(set, pod); err != nil {
ssc.recorder.Eventf(set, v1.EventTypeWarning, "FailedDelete", "failed to delete pod %s: %v", pod.Name, err)
return false, err
return false, false, err
}
return true, nil
return true, true, nil
}

func (ssc *defaultStatefulSetControl) refreshPodState(set *appsv1beta1.StatefulSet, pod *v1.Pod, updateRevision string) (bool, time.Duration, error) {
Expand Down Expand Up @@ -992,7 +1044,7 @@ func (ssc *defaultStatefulSetControl) processCondemned(ctx context.Context, set
logger.V(2).Info("Pod of StatefulSet is terminating for scale down",
"statefulSet", klog.KObj(set), "pod", klog.KObj(condemned[i]))

modified, err := ssc.deletePod(set, condemned[i])
modified, _, err := ssc.deletePod(set, condemned[i])
if err != nil || (monotonic && modified) {
return true, err
}
Expand Down Expand Up @@ -1035,7 +1087,7 @@ func (ssc *defaultStatefulSetControl) processReplica(
// regardless of the exit code.
if isFailed(replicas[i]) || isSucceeded(replicas[i]) {
if replicas[i].DeletionTimestamp == nil {
if _, err := ssc.deletePod(set, replicas[i]); err != nil {
if _, _, err := ssc.deletePod(set, replicas[i]); err != nil {

Check warning on line 1090 in pkg/controller/statefulset/stateful_set_control.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/statefulset/stateful_set_control.go#L1090

Added line #L1090 was not covered by tests
return true, false, err
}
}
Expand Down
164 changes: 164 additions & 0 deletions pkg/controller/statefulset/stateful_set_control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"
apps "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
Expand All @@ -54,6 +55,7 @@ import (
utilpointer "k8s.io/utils/pointer"

appspub "github.com/openkruise/kruise/apis/apps/pub"
appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
appsv1beta1 "github.com/openkruise/kruise/apis/apps/v1beta1"
kruiseclientset "github.com/openkruise/kruise/pkg/client/clientset/versioned"
kruisefake "github.com/openkruise/kruise/pkg/client/clientset/versioned/fake"
Expand Down Expand Up @@ -2215,6 +2217,153 @@ func TestStatefulSetControlRollingUpdateBlockByMaxUnavailable(t *testing.T) {
}
}

func TestStatefulSetControlRollingUpdateWithSpecifiedDelete(t *testing.T) {
set := burst(newStatefulSet(6))
var partition int32 = 3
var maxUnavailable = intstr.FromInt(3)
set.Spec.UpdateStrategy = appsv1beta1.StatefulSetUpdateStrategy{
Type: apps.RollingUpdateStatefulSetStrategyType,
RollingUpdate: func() *appsv1beta1.RollingUpdateStatefulSetStrategy {
return &appsv1beta1.RollingUpdateStatefulSetStrategy{
Partition: &partition,
MaxUnavailable: &maxUnavailable,
PodUpdatePolicy: appsv1beta1.InPlaceIfPossiblePodUpdateStrategyType,
}
}(),
}

client := fake.NewSimpleClientset()
kruiseClient := kruisefake.NewSimpleClientset(set)
spc, _, ssc, stop := setupController(client, kruiseClient)
defer close(stop)
if err := scaleUpStatefulSetControl(set, ssc, spc, assertBurstInvariants); err != nil {
t.Fatal(err)
}
set, err := spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
if err != nil {
t.Fatal(err)
}
selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
if err != nil {
t.Fatal(err)
}

// set pod 0 to specified delete
originalPods, err := spc.setPodSpecifiedDelete(set, 0)
if err != nil {
t.Fatal(err)
}
sort.Sort(ascendingOrdinal(originalPods))

// start to update
set.Spec.Template.Spec.Containers[0].Image = "foo"

// first update pod 5 only because pod 0 is specified deleted
if err = ssc.UpdateStatefulSet(context.TODO(), set, originalPods); err != nil {
t.Fatal(err)
}
pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
if err != nil {
t.Fatal(err)
}

// inplace update 5 and create 0
if err = ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
t.Fatal(err)
}
pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
if err != nil {
t.Fatal(err)
}
if len(pods) != 6 {
t.Fatalf("Expected create pods 5, got pods %v", pods)
}
sort.Sort(ascendingOrdinal(pods))
_, exist := pods[0].Labels[appsv1alpha1.SpecifiedDeleteKey]
assert.True(t, !exist)
// pod 0 is old image and pod 5/4 is new image
assert.Equal(t, pods[5].Spec.Containers[0].Image, "foo")
assert.Equal(t, pods[4].Spec.Containers[0].Image, "foo")
assert.Equal(t, pods[0].Spec.Containers[0].Image, "nginx")

// set pod 1/2/5 to specified deleted and pod 0/4/5 to ready
spc.setPodSpecifiedDelete(set, 0)
spc.setPodSpecifiedDelete(set, 1)
spc.setPodSpecifiedDelete(set, 2)
for i := 0; i < 6; i++ {
spc.setPodRunning(set, i)
spc.setPodReady(set, i)
}
originalPods, _ = spc.setPodSpecifiedDelete(set, 5)
sort.Sort(ascendingOrdinal(originalPods))

// create new pod for 1/2/5, do not update 3
if err = ssc.UpdateStatefulSet(context.TODO(), set, originalPods); err != nil {
t.Fatal(err)
}
pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
if err != nil {
t.Fatal(err)
}

// create new pods 5 and inplace update 3
if err = ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
t.Fatal(err)
}
pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
if err != nil {
t.Fatal(err)
}
sort.Sort(ascendingOrdinal(pods))
if len(pods) != 6 {
t.Fatalf("Expected create pods 5, got pods %v", pods)
}

_, exist = pods[5].Labels[appsv1alpha1.SpecifiedDeleteKey]
assert.True(t, !exist)
_, exist = pods[2].Labels[appsv1alpha1.SpecifiedDeleteKey]
assert.True(t, !exist)
_, exist = pods[1].Labels[appsv1alpha1.SpecifiedDeleteKey]
assert.True(t, !exist)
// pod 0 still undeleted
_, exist = pods[0].Labels[appsv1alpha1.SpecifiedDeleteKey]
assert.True(t, exist)
assert.Equal(t, pods[5].Spec.Containers[0].Image, "foo")
assert.Equal(t, pods[3].Spec.Containers[0].Image, "nginx")
assert.Equal(t, pods[2].Spec.Containers[0].Image, "nginx")
assert.Equal(t, pods[1].Spec.Containers[0].Image, "nginx")

// set pod 3 to specified deleted and all pod to ready => pod3 will be deleted and updated
for i := 0; i < 6; i++ {
spc.setPodRunning(set, i)
spc.setPodReady(set, i)
}
originalPods, _ = spc.setPodSpecifiedDelete(set, 3)
sort.Sort(ascendingOrdinal(originalPods))
// create new pod for 3, do not inplace-update 3
if err = ssc.UpdateStatefulSet(context.TODO(), set, originalPods); err != nil {
t.Fatal(err)
}
pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
if err != nil {
t.Fatal(err)
}

// create new pods 5 and inplace update 3
if err = ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
t.Fatal(err)
}
pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
if err != nil {
t.Fatal(err)
}
sort.Sort(ascendingOrdinal(pods))
if len(pods) != 6 {
t.Fatalf("Expected create pods 5, got pods %v", pods)
}
assert.Equal(t, pods[3].Spec.Containers[0].Image, "foo")
}

func TestStatefulSetControlInPlaceUpdate(t *testing.T) {
set := burst(newStatefulSet(3))
var partition int32 = 1
Expand Down Expand Up @@ -3119,6 +3268,21 @@ func (om *fakeObjectManager) setPodTerminated(set *appsv1beta1.StatefulSet, ordi
return om.podsLister.Pods(set.Namespace).List(selector)
}

func (om *fakeObjectManager) setPodSpecifiedDelete(set *appsv1beta1.StatefulSet, ordinal int) ([]*v1.Pod, error) {
pod := newStatefulSetPod(set, ordinal)
if pod.Labels == nil {
pod.Labels = make(map[string]string)
}
pod.Labels[appsv1alpha1.SpecifiedDeleteKey] = "true"
fakeResourceVersion(pod)
om.podsIndexer.Update(pod)
selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
if err != nil {
return nil, err
}
return om.podsLister.Pods(set.Namespace).List(selector)
}

var _ StatefulPodControlObjectManager = &fakeObjectManager{}

type fakeStatefulSetStatusUpdater struct {
Expand Down
Loading

0 comments on commit eb2e8d0

Please sign in to comment.